I am a new python programmer. I have created the code to put API data from pipedrive to Snowflake database. Here are the steps in my code.
- Delete the csv file if it exists.
- Make an API call put all the paginated data in a list.
- Create a csv file from the list.
- Truncate the table in Snowflake.
- Remove data from Snowflake stage table
- Put the data in Snowflake stage table.
- Copy data from stage table to a normal table.
I would love to get some feedback on it as I will create more scripts based on this code.
Here is my code.
import requests
from module import usr, pwd, acct, db, schem, api_token
import snowflake.connector
import datetime
import time
from datetime import datetime
import csv
import os
import contextlib
end_point = 'persons'
limit = 500
start = 0
start_time = time.time()
csvfile = r'C:/Users/User1/PycharmProjects/Pipedrive/persons.csv'
def snowflake_connect():
mydb = snowflake.connector.connect(
user=usr,
password=pwd,
account=acct,
database=db,
schema=schem,
)
cursor = mydb.cursor()
return cursor
def snowflake_truncate(cursor):
print("Truncating table PERSONS_NEW: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
cursor.execute('TRUNCATE TABLE PERSONS_NEW')
print("PERSONS_NEW truncated: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
return cursor
def snowflake_insert(cursor):
cursor.execute("remove @persons_test pattern='.*.csv.gz'")
for c in cursor:
print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
cursor.execute('put file://{} @persons_test auto_compress=true'.format(csvfile))
for c in cursor:
print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
cursor.execute("""COPY INTO MARKETING.PIPEDRIVE_MASTER.persons_new FROM @persons_test/persons.csv.gz file_format=(TYPE=csv field_delimiter=',' skip_header=0 FIELD_OPTIONALLY_ENCLOSED_BY = '"') on_error = 'abort_statement'""")
for c in cursor:
print(c, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def get_persons(start):
url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
response = requests.request("GET", url).json()
while (response['additional_data']['pagination']['more_items_in_collection']):
url = 'https://company.pipedrive.com/v1/{}?user_id=0&start={}&limit={}&api_token={}'.format(end_point, start, limit, api_token)
response = requests.request("GET", url).json()
read_persons(response)
start = start + 500
def read_persons(response):
for data in response['data']:
id = data['id']
activities_count = data['activities_count']
if data['add_time'] == '':
add_time = None
else:
add_time = data['add_time']
closed_deals_count = data['closed_deals_count']
company_id = data['company_id']
done_activities_count = data['done_activities_count']
followers_count = data['followers_count']
label = data['label']
last_activity_date = data['last_activity_date']
last_activity_id = data['last_activity_id']
last_incoming_mail_time = data['last_incoming_mail_time']
last_name = data['last_name']
last_outgoing_mail_time = data['last_outgoing_mail_time']
lost_deals_count = data['lost_deals_count']
name = data['name']
next_activity_date = data['next_activity_date']
next_activity_id = data['next_activity_id']
next_activity_time = data['next_activity_time']
notes_count = data['notes_count']
open_deals_count = data['open_deals_count']
if data['org_id'] == None:
org_id = None
else:
org_id = data['org_id']['value']
org_name = data['org_name']
fieldnames = [id, activities_count, add_time, cc_email, closed_deals_count, company_id, done_activities_count, followers_count, label, last_activity_date, last_activity_id, last_incoming_mail_time,
last_name, last_outgoing_mail_time, lost_deals_count, name, next_activity_date, next_activity_id, next_activity_time, notes_count, open_deals_count, org_id, org_name]
write_csv(fieldnames)
def delete_existing_csv():
with contextlib.suppress(FileNotFoundError):
os.remove(csvfile)
def write_csv(fieldnames):
with open(csvfile, "a", encoding="utf-8", newline='') as fp:
wr = csv.writer(fp, delimiter=',')
wr.writerow(fieldnames)
if __name__ == "__main__":
delete_existing_csv()
print("Creating CSV file: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
get_persons(start)
print("CSV file succesfully created: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
cursor = snowflake_connect()
snowflake_truncate(cursor)
snowflake_insert(cursor)
cursor.close()
end_time = time.time()
elapsed_time = round(end_time - start_time, 2)
print("Job sucessfully completed in: {} seconds".format(elapsed_time))