5

I created a table inserting data fetched from an api and store in to a pandas dataframe using sqlalchemy. I am gonna need to query the api, every 4 hours, to get new data. Problem being that the api, will give me back not only the new data but as well the old ones, already imported in mysql how can i import just the new data into the mysql table

i retrieved the data from the api, stored the data in to a pandas object, created the connection to the mysql db and created a fresh new table.

import requests
import json
from pandas.io.json import json_normalize
myToken = 'xxx'
myUrl = 'somewebsite'
head = {'Authorization': 'token {}'.format(myToken)}
response = requests.get(myUrl, headers=head)
data=response.json()
#print(data.dumps(data, indent=4, sort_keys=True))
results=json_normalize(data['results'])
results.rename(columns={'datastream.name': 'datastream_name',                    
                        'datastream.url':'datastream_url',
                        'datastream.datastream_type_id':'datastream_id',
                        'start':'error_date'}, inplace=True)

results_final=pd.DataFrame([results.datastream_name,
                            results.datastream_url, 
                            results.error_date, 
                            results.datastream_id,
                            results.message,
                            results.type_label]).transpose()

from sqlalchemy import create_engine
from sqlalchemy import exc
engine = create_engine('mysql://usr:psw@ip/schema')
con = engine.connect()
results_final.to_sql(name='error',con=con,if_exists='replace')
con.close()

End goal is to insert into the table, just the not existing data coming from the api

2 Answers 2

5

You could pull the results already in the database into a new dataframe and then compare the two dataframes. After that you would only insert the rows not in the table. Not knowing the format of your table or data I'm just using a generic SELECT statement here.

from sqlalchemy import create_engine
from sqlalchemy import exc
engine = create_engine('mysql://usr:psw@ip/schema')
con = engine.connect()
sql = "SELECT * FROM table_name"
old_results = pd.read_sql(sql, con)
df = pd.merge(old_results, results_final, how='outer', indicator=True)
new_results = df[df['_merge']=='right_only'][results_final.columns]
new_results.to_sql(name='error',con=con,if_exists='append')
con.close()

You also need to change if_exists to append because set to replace it drops all values in the table and replaces them with the values in the pandas dataframe.

Sign up to request clarification or add additional context in comments.

4 Comments

Hi, what is the "_merge" inside the squared bracket? I did not find mentions of it in the Pandas docs
@JayronSoares it's a a column added to the dataframe after merging them together that tells you which dataset the row came from. You can read more on it here with the entry of the indicator keyword
Thanks, @Mathew Barlowe, I'm modifying my ETL code to insert only new rows from the OLTP database to the DW database.
@JayronSoares if you found my answer useful if you could please upvote I would appreciate it thanks
0

I developed this function to handle both: news values and when columns from the source table and target table are not equal.

def load_data(df):
engine = create_engine('mysql+pymysql://root:pass@localhost/dw', echo_pool=True, pool_size=10, max_overflow=20)
with engine.connect() as conn, conn.begin():
    try:
        df_old = pd.read_sql('SELECT * FROM table', conn)

        # Check if exists new rows to be inserted
        
        if len(df) > len(df_saved) or df.disconnected_time.max() > df_saved.disconnected_time.max():
            print("There are new rows to be inserted. ")
            
            df_merged = pd.merge(df_old, df, how='outer', indicator=True)
            df_final = df_merged[df_merged['_merge']=='right_only'][df.columns]
            df_final.to_sql(name='table',con=conn,index=False, if_exists='append')
        
    except Exception as err:
        print (str(err))
        
    else:
        # This handling errors when the lengths of the columns are not equal to the target
        if df_bulbr.shape[1] > df_old.shape[1]:
            data = pd.read_sql('SELECT * FROM table', conn)
            df2 = pd.concat([df,data])
            df2.to_sql('table', conn, index=False, if_exists='replace')
    
    outcome = conn.execute("select count(1) from table")
    countRow = outcome.first()[0]
    
return print(f" Total of {countRow} rows load." )

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.