16

Bear with me since I've just started using Airflow, and what I'm trying to do is to collect the return code from a BashOperator task and save it to a local variable, and then based on that return code branch out to another task. The issue I have is figuring out how to get the BashOperator to return something. The following is my code segment:

dag = DAG(dag_id='dag_1',
      default_args=default_args,
      schedule_interval='0 2 * * *',
      user_defined_macros=user_def_macros,
      dagrun_timeout=timedelta(minutes=60)
      )
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)

I'm trying the xcom_push but honestly have no idea how it works.. Is this the right way to collect the result? In the logs the last line is: Command exited with return code 0.

3 Answers 3

15

as per the BashOperator doc,

If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes

Knowing that, you just need to have your bash script to print the error code last, so append the following to your bash_command :

<your code> ; echo $?

In your case, it is :

oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}; echo $?", dag=dag)
Sign up to request clarification or add additional context in comments.

1 Comment

from version 2.0 and above,(I'm not sure of the specific commit), the last line is automatically pushed and you don't need to write xcom_push=True, see here forum.astronomer.io/t/…
3

Can you post the entire DAG. I think you are having issue in interpreting how Airflow works

From Task1 (if it is a bash operator) you can do :

t1 = BashOperator(task_id='t1', bash_command='echo "{{ ti.xcom_push("t1") }}"', dag=dag)

And in Task2:

t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"', dag=dag)

where ti is task_instance variable and {{}} notation is used to access Variables section

1 Comment

I just updated more code. So I believe you are right, I'm not using it correctly. Here is my task instance "ti" just "oodas"? I'm not sure what the key is that BashOperator uses for xcom
-1

For those using Airflow 2+, BashOperator now returns the entire output (source), not just the last line. This way you can not only return error code as K-Yo suggested, but also also perform some additional logging for non-0 return code.

1 Comment

I don't think that's correct. Operator uses the subprocess hook and that one return last line only, see: github.com/apache/airflow/blob/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.