1

I do not understand how callables (function called as specified by PythonOperator) n Airflow should have their parameter list set. I have seen the with no parameters or with named params or **kwargs. I can always add "ti" or **allargs as parameters it seems, and ti seems to be used for task instance info, or ds for execution date. But my callables do not NEED params apparently. They can be simply be "def function():". If I wrote a regular python function func() instead of func(**kwargs), it would fail at runtime when called unless no params were passed. Airflow always seems to pass t1 all the time, so how can the callable function signature not require it?? Example below from a training site where _process_data func gets the ti, but _extract_bitcoin_price() does not. I was thinking that is because of the xcom push, but ti is ALWAYS available it seems, so how can "def somefunc()" ever work? I tried looking at pythonoperator source code, but I am unclear how this works or best practices for including parameters in a callable. Thanks!!

from airflow import DAG
from airflow.operators.python_operator 
import PythonOperator

from datetime import datetime
import json
from typing import Dict
import requests
import logging

API = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"

def \_extract_bitcoin_price():
    return requests.get(API).json()\['bitcoin'\]

def \_process_data(ti):
    response = ti.xcom_pull(task_ids='extract_bitcoin_price')
    logging.info(response)
    processed_data = {'usd': response\['usd'\], 'change': response\['usd_24h_change'\]}
    ti.xcom_push(key='processed_data', value=processed_data)

def \_store_data(ti):
    data = ti.xcom_pull(task_ids='process_data', key='processed_data')
    logging.info(f"Store: {data\['usd'\]} with change {data\['change'\]}")

with DAG('classic_dag', schedule_interval='@daily', start_date=datetime(2021, 12, 1), catchup=False) as dag:

extract_bitcoin_price = PythonOperator(
    task_id='extract_bitcoin_price',
    python_callable=_extract_bitcoin_price
)

process_data = PythonOperator(
    task_id='process_data',
    python_callable=_process_data
)

store_data = PythonOperator(
    task_id='store_data',
    python_callable=_store_data
)

extract_bitcoin_price >> process_data >> store_data

Tried callables with no params somefunc() expecting to get error saying too many params passed, but it succeeded. Adding somefunc(ti) also works! How can both work?

1 Answer 1

3

I think what you are missing is that Airflow allows to pass the context of the task to the python callable (as you can see one of them is the ti). These are additional useful parameters that Airflow provides and you can use them in your task.

In older Airflow versions user had to set provide_context=True which for that to work:

 process_data = PythonOperator(
    ...,
    provide_context=True
)

Since Airflow>=2.0 there is no need to use provide_context. Airflow handles it under the hood.

When you see in the Python Callable signatures like:

def func(ti, **kwargs):
    ...

This means that the ti is "unpacked" from the kwargs. You can also do:

def func(**kwargs):
    ti = kwargs['ti']

EDIT: I think what you are missing is that while you write:

def func()
        ...

store_data = PythonOperator(
    task_id='task',
    python_callable=func
)

Airflow does more than just calling func. The code being executed is the execute() function of PythonOperator and this function calls the python callable you provided with args and kwargs.

Sign up to request clarification or add additional context in comments.

1 Comment

"Since Airflow>=2.0 there is no need to use provide_context. Airflow handles handles it under the hood." Tnanks, That answers a question I did not even ask but was wondering about: why ti is passed in the first place without provide_context.. I still don't get though how signature like def func() with no parameters can succeed if context is always passed. Does airflow check the function signature as some kind of python metadata and adjust ? Or maybe no-params signature always fails... will have to retry that and comment back.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.