Tasks in Airflow do not share Python objects as if they run in the same script. Each Airflow task runs in a separate process. Plus, tasks can be distributed over multiple machines when running the {Celery,Kubernetes}Executor, so you also cannot always assume data can be shared via local disk.
You have several options for sharing data between tasks in Airflow:
- Push & pull to & from XCom
- Use the Airflow TaskFlow API
- Handle sharing data outside Airflow
Push & pull to & from XCom
XComs (Cross-Communication) is the mechanism in Airflow to share data between tasks. This works by "pushing" and "pulling" an XCom:
def _train_model(**context):
model_id = do_magic()
context["task_instance"].xcom_push(key="model_id", value=model_id)
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(task_ids="train_model", key="model_id")
print(f"Deploying model {model_id}")
train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
train_model >> deploy_model
The methods xcom_push and xcom_pull are defined on the TaskInstance class, so you must fetch context["task_instance"] from the context (there's a shorter alias ti if you prefer that). The given value is then stored in a table "xcom" in the Airflow metastore, and can also be viewed in the Airflow webserver under Admin -> XComs.
Note that (by default, also configurable) returning a value from a Python callable will automatically push the returned value to XCom with a key "return_value". There is no shorter way to pull an XCom value.
Historically, pushing XComs worked by pickling the object and storing it in the Airflow metastore. Since Airflow 2.0, data is serialized to JSON (pickle is configurable though). Plus, you can configure different locations to store the data instead of the Airflow metastore. For example AWS S3. Since you mention dataframes and Kafka, I can imagine lots of data, so it might be worth looking up custom backend documentation for more on that: https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-backends.
Use the Airflow TaskFlow API
The Airflow TaskFlow API was added in Airflow 2.0 and provides a more functional approach to Airflow tasks, and reduces XCom boilerplate code. Here's your DAG using the TaskFlow API:
import datetime
from airflow import DAG
from airflow.decorators import task
with DAG("example_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval=None) as dag:
@task
def task_one():
return "one"
@task
def task_two():
return "two"
@task
def task_three(one, two):
return ",".join([one, two])
@task
def task_four(result):
print(result)
task_four(task_three(task_one(), task_two()))
The @task decorator converts a regular Python function into an Airflow task. Return values are automatically pushed to XCom. And passing the output of one function as an argument to the next automatically pulls the XCom value.
For completeness, there's also @dag decorator:
from airflow.decorators import dag
@dag(schedule_interval=None, start_date=datetime.datetime(2021, 1, 1))
def example_dag():
... your tasks ...
example_dag_ = example_dag()
Handle sharing data outside Airflow
In some cases, the Airflow XCom model doesn't fit. You'll have to implement your own logic for writing & reading data in the Airflow task which gives the most flexibility but is obviously also the most work. This can be done for various reasons. For example, if your task needs to update a record in a database, and the updated record is read again in the next task.