1

I am new to the world of Airflow and I have done a number of tutorials to convert one of my Python scripts to DAG.

Basically there are four PythonOperator calls calling the following

def taskOne():
    # Fetch data from API and create mapping dicts

def taskTwo():
    # Fetch data from another API

def taskThree():
    # Combine data from task one and two into one dataframe

def taskFour():
    # Push the dataset to Apache Kafka

The following operators are declared to call the code:

taskOne_operator    = PythonOperator(task_id='taskOne',     python_callable=taskOne, dag=dag)
taskTwo_operator    = PythonOperator(task_id='taskTwo',     python_callable=taskTwo, dag=dag)
taskThree_operator  = PythonOperator(task_id='taskThree',   python_callable=taskThree, dag=dag)
taskFour_operator   = PythonOperator(task_id='taskFour',    python_callable=taskFour, dag=dag)

I have declared the following dependencies within the tasks:

[taskOne_operator,taskTwo_operator]>>taskThree_operator
taskThree_operator>>taskFour_operator

The first two tasks run successful, which is a good thing. But, apparently (or obviously, yet against my expectations) taskThree cannot find either the mapping dicts or the dataset created in taskOne and taskTwo. In that regard simply cutting up the original Python script into multiple parts does not work.

NameError: name 'data' is not defined

I have tried working with returns for the tasks, but have not managed. Can anyone share some insight on what I am doing wrong and maybe point me into the direction of some tutorials on how to fix this?

Thanks!

1 Answer 1

1

Tasks in Airflow do not share Python objects as if they run in the same script. Each Airflow task runs in a separate process. Plus, tasks can be distributed over multiple machines when running the {Celery,Kubernetes}Executor, so you also cannot always assume data can be shared via local disk.

You have several options for sharing data between tasks in Airflow:

  • Push & pull to & from XCom
  • Use the Airflow TaskFlow API
  • Handle sharing data outside Airflow

Push & pull to & from XCom

XComs (Cross-Communication) is the mechanism in Airflow to share data between tasks. This works by "pushing" and "pulling" an XCom:

def _train_model(**context):
    model_id = do_magic()
    context["task_instance"].xcom_push(key="model_id", value=model_id)

def _deploy_model(**context):
    model_id = context["task_instance"].xcom_pull(task_ids="train_model", key="model_id")
    print(f"Deploying model {model_id}")

train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
train_model >> deploy_model

The methods xcom_push and xcom_pull are defined on the TaskInstance class, so you must fetch context["task_instance"] from the context (there's a shorter alias ti if you prefer that). The given value is then stored in a table "xcom" in the Airflow metastore, and can also be viewed in the Airflow webserver under Admin -> XComs.

Note that (by default, also configurable) returning a value from a Python callable will automatically push the returned value to XCom with a key "return_value". There is no shorter way to pull an XCom value.

Historically, pushing XComs worked by pickling the object and storing it in the Airflow metastore. Since Airflow 2.0, data is serialized to JSON (pickle is configurable though). Plus, you can configure different locations to store the data instead of the Airflow metastore. For example AWS S3. Since you mention dataframes and Kafka, I can imagine lots of data, so it might be worth looking up custom backend documentation for more on that: https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-backends.

Use the Airflow TaskFlow API

The Airflow TaskFlow API was added in Airflow 2.0 and provides a more functional approach to Airflow tasks, and reduces XCom boilerplate code. Here's your DAG using the TaskFlow API:

import datetime

from airflow import DAG
from airflow.decorators import task

with DAG("example_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval=None) as dag:

    @task
    def task_one():
        return "one"

    @task
    def task_two():
        return "two"

    @task
    def task_three(one, two):
        return ",".join([one, two])

    @task
    def task_four(result):
        print(result)

    task_four(task_three(task_one(), task_two()))

The @task decorator converts a regular Python function into an Airflow task. Return values are automatically pushed to XCom. And passing the output of one function as an argument to the next automatically pulls the XCom value.

For completeness, there's also @dag decorator:

from airflow.decorators import dag

@dag(schedule_interval=None, start_date=datetime.datetime(2021, 1, 1))
def example_dag():
    ... your tasks ...

example_dag_ = example_dag()

Handle sharing data outside Airflow

In some cases, the Airflow XCom model doesn't fit. You'll have to implement your own logic for writing & reading data in the Airflow task which gives the most flexibility but is obviously also the most work. This can be done for various reasons. For example, if your task needs to update a record in a database, and the updated record is read again in the next task.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.