0

In my old DAG, I created tasks like so:

    start_task = DummyOperator(task_id = "start_task")

    t1 = PythonOperator(task_id = "t1", python_callable = get_t1)
    t2 = PythonOperator(task_id = "t2", python_callable = get_t2)
    t3 = PythonOperator(task_id = "t3", python_callable = get_t3)
    t4 = PythonOperator(task_id = "t4", python_callable = get_t4)
    t5 = PythonOperator(task_id = "t5", python_callable = get_t5)
    t6 = PythonOperator(task_id = "t6", python_callable = get_t6)
    t7 = PythonOperator(task_id = "t7", python_callable = get_t7)
    t8 = PythonOperator(task_id = "t8", python_callable = get_t8)
    t9 = PythonOperator(task_id = "t9", python_callable = get_t9)
    t10 = PythonOperator(task_id = "t10", python_callable = get_t10)
    t11 = PythonOperator(task_id = "t11", python_callable = get_t11)

    end_task = DummyOperator(task_id = "end_task")

    start_task >> [t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11] >> end_task

Each of these tasks runs a different query, and each task is run concurrently. I have revised my code because much of it was redundant and could be put inside functions. In my new code, I also attempted to create tasks dynamically by reading in the queries and metadata for each task from a .json.

New Code:

    loaded_info = load_info()  # function call to load .json data into a list
    start_task = DummyOperator(task_id = "start_task")
    end_task = DummyOperator(task_id = "end_task")
    tasks = []  # empty list to append tasks to in for loop
    for x in loaded_info:
        qce = QCError(**x)
        id = qce.column
        task = PythonOperator(task_id = id, python_callable = create_task(qce))
        tasks.append(task)
    start_task >> tasks >> end_task

This new code appears fine, however it prevents my from running airflow initdb. After running the command, the terminal will wait and never finish until I finally CRTL+C to kill it, then eventually gives me an error after kill:

raise AirflowTaskTimeout(self.error_message)
pandas.io.sql.DatabaseError: Execution failed on sql 'select ..., count(*) as frequency from ... where ... <> all (array['...', '...', etc.]) or ... is null group by ... order by ... asc': Timeout, PID: 315

(Note: the query in the error statement above is just the first query in the .json). Considering I never had this error with the old DAG, I'm assuming this is due to the dynamic task creation, but I need help identifying what exactly is causing this error.

What I have tried:

  • Running each query individually in Airflow Webserver Admin Ad-Hoc (they all work fine)
  • Creating a test script to run locally and output the contents of the .json to ensure everything is correctly formatted, etc.
2
  • In the load_info, instead of calling a query, can you just return a hard coded output and check what happens? Just trying to narrow down the problem Commented Jul 3, 2019 at 4:02
  • I ended up finding a solution @nightgaunt, see my answer below for more info Commented Jul 3, 2019 at 16:52

1 Answer 1

1

I managed to get airflow initdb to run finally (but I have not yet tested my job, and will update on its status later).

It turns out that when defining a python operator, you cannot include an argument like I was doing:

 task = PythonOperator(task_id = id, python_callable = create_task(qce))

Passing qce into create_tasks is what was causing the error. To pass arguments into your tasks, see here.

For those of you who want to see the fix for my exact case, I have this:

with DAG("dva_event_analysis_dag", default_args = DEFAULT_ARGS, schedule_interval = None, catchup = False) as dag:
    loaded_info = load_info()
    start_task = DummyOperator(task_id = "start_task")
    end_task = DummyOperator(task_id = "end_task")
    tasks = []
    for x in loaded_info:
        id = x["column"]
        task = PythonOperator(task_id = id, provide_context = True, python_callable = create_task, op_kwargs = x)
        tasks.append(task)
    start_task >> tasks >> end_task

Update (7/03/2019): Job status is successful. This was indeed the fix to my error. Hopefully this helps out others with similar issues.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.