I do not understand how callables (function called as specified by PythonOperator) n Airflow should have their parameter list set. I have seen the with no parameters or with named params or **kwargs. I can always add "ti" or **allargs as parameters it seems, and ti seems to be used for task instance info, or ds for execution date. But my callables do not NEED params apparently. They can be simply be "def function():". If I wrote a regular python function func() instead of func(**kwargs), it would fail at runtime when called unless no params were passed. Airflow always seems to pass t1 all the time, so how can the callable function signature not require it?? Example below from a training site where _process_data func gets the ti, but _extract_bitcoin_price() does not. I was thinking that is because of the xcom push, but ti is ALWAYS available it seems, so how can "def somefunc()" ever work? I tried looking at pythonoperator source code, but I am unclear how this works or best practices for including parameters in a callable. Thanks!!
from airflow import DAG
from airflow.operators.python_operator
import PythonOperator
from datetime import datetime
import json
from typing import Dict
import requests
import logging
API = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"
def \_extract_bitcoin_price():
return requests.get(API).json()\['bitcoin'\]
def \_process_data(ti):
response = ti.xcom_pull(task_ids='extract_bitcoin_price')
logging.info(response)
processed_data = {'usd': response\['usd'\], 'change': response\['usd_24h_change'\]}
ti.xcom_push(key='processed_data', value=processed_data)
def \_store_data(ti):
data = ti.xcom_pull(task_ids='process_data', key='processed_data')
logging.info(f"Store: {data\['usd'\]} with change {data\['change'\]}")
with DAG('classic_dag', schedule_interval='@daily', start_date=datetime(2021, 12, 1), catchup=False) as dag:
extract_bitcoin_price = PythonOperator(
task_id='extract_bitcoin_price',
python_callable=_extract_bitcoin_price
)
process_data = PythonOperator(
task_id='process_data',
python_callable=_process_data
)
store_data = PythonOperator(
task_id='store_data',
python_callable=_store_data
)
extract_bitcoin_price >> process_data >> store_data
Tried callables with no params somefunc() expecting to get error saying too many params passed, but it succeeded. Adding somefunc(ti) also works! How can both work?