43

I just started using Airflow, can anyone enlighten me how to pass a parameter into PythonOperator like below:

t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs=None,
    #op_kwargs=(key1='value1', key2='value2'),
    dag=dag,
)

def SendEmail(**kwargs):
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_message(msg)
    s.quit()

I would like to be able to pass some parameters into the t5_send_notification's callable which is SendEmail, ideally I want to attach the full log and/or part of the log (which is essentially from the kwargs) to the email to be sent out, guessing the t5_send_notification is the place to gather those information.

Thank you very much.

2 Answers 2

71
  1. Pass a dict object to op_kwargs
  2. Use the keys to access their value from kwargs dict in your python callable

    def SendEmail(**kwargs):
        print(kwargs['key1'])
        print(kwargs['key2'])
        msg = MIMEText("The pipeline for client1 is completed, please check.")
        msg['Subject'] = "xxxx"
        msg['From'] = "xxxx"
        ......
        s = smtplib.SMTP('localhost')
        s.send_message(msg)
        s.quit()
    
    
    t5_send_notification = PythonOperator(
        task_id='t5_send_notification',
        provide_context=True,
        python_callable=SendEmail,
        op_kwargs={'key1': 'value1', 'key2': 'value2'},
        dag=dag,
    )
    
Sign up to request clarification or add additional context in comments.

3 Comments

that didn't work. it should be like: SendEmail(key1, key2, **kwargs):
@Amin which version of the airflow you are using? the reason why I asked is that you might be using python3 as the latest versions of airflow support python3 much better than a year ago, but still there are lots of people using python2 for airflow dev.
If it has something to do with python version please mention it in the answer.
24

PythonOperator have a named parameter op_kwargs and accepts dict object.

have

t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs={"my_param":'value1'},
    dag=dag,
)

def SendEmail(my_param,**kwargs):
    print(my_param) #'value_1'
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_me

3 Comments

Thank you Ethan, your code of op_kwargs={my_param='value1'}, reports error.
{key : value}
Do you have the entire error? Is the key in quotes as well?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.