2

I can't find any working documentation on how to use the JSON variables passed with "-c" for e.g. a backfill job.

I've been printing my python tasks **kwargs to find out, but I still can't determine it. provide_context=True

Can anyone point me in the right direction?

So, what I want to do:

airflow backfill mydag -c '{"override":"yes"}' -s 2018-12-01 -e 2018-12-12

I have a PythonOperator:

PythonOperator(
    task_id = 'task_identifier',
    python_callable = 'run_task',
    op_kwargs = {
        'task_conf': task_config 
    },
    provide_context=True,
    dag = this_dag
)

Within run_task, I would like to access the override variable:

def run_task(*args, **kwargs): 

    dag_run = kwargs.get('dag_run')
    logging.info(kwargs['dag_run'].conf.get('override'))

But I can't find a method to access this override variable

[2018-12-17 10:07:24,649] {models.py:1760} ERROR - 'NoneType' object has no attribute 'get'
Traceback (most recent call last):
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
    return_value = self.execute_callable()
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/daimonie/airflow/dags/dag_scheduled_queries.py", line 65, in run_query
    logging.info(kwargs['dag_run'].conf.get('override'))

Edit: I did find a config setting and description that seems to indicate that these parameters need to already been set in the code

# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params=True

The donot_pickle parameter was set to False.

5
  • It is unclear what you are asking, please post some of your code. Commented Dec 14, 2018 at 13:47
  • @James, hope this clears it up Commented Dec 14, 2018 at 13:55
  • See github.com/trbs/airflow-examples/blob/master/dags/… for examples, especially for usage of kwargs. You provide task_conf, but there is no task_conf parameter. Commented Dec 14, 2018 at 14:18
  • @tobi6 As you can see above, these are code snippets. I've some variable called task_conf on the stack. Commented Dec 14, 2018 at 14:27
  • As I understand it, the DAGs run in the bucket. I'm creating a temporary workaround with a .json file, but it is very confusing that the CLI parameters are not propagated to my task Commented Dec 20, 2018 at 9:08

4 Answers 4

4

For anyone wondering, the reason for the error

AttributeError: 'NoneType' object has no attribute 'get

when trying to retrieve a key/value from the conf attribute of dag_run is because the conf arg passed to airflow backfill mydag -c needs to be configured as:

'{"conf": {"key1": "value", ...}}'

not

'{"key1": "value", ...}'

then you can reference it from the context e.g.

def do_something(**context):
    value = context['dag_run'].conf['key']
    ...
Sign up to request clarification or add additional context in comments.

1 Comment

0

You need to do two things.

  1. Replace kwargs['run_dag'].conf.get('override') by kwargs['dag_run'].conf.get('override').
  2. Also change the signature from def run_task(*args, **kwargs): to def run_task(**kwargs):

1 Comment

Hey @sdvd, I still get an error - kwargs[dag_run].conf is Nonetype.
0

You can pass parameters from the CLI using -c '{"key":"value"}' and then use it in the Python callable function as "dag_run.conf["key"]"

In your case,

Operator:

PythonOperator(
    task_id = 'task_identifier',
    python_callable = 'run_task',
    provide_context=True,
    dag = this_dag
)

Callable Function:

def run_task(**kwargs):
    print(kwargs["dag_run"].conf["override"]

1 Comment

Hey @kaxil, I still get an error - kwargs[dag_run].conf is Nonetype.
0

For those who, like me, have wandered here in searching for resolving the same error (ERROR - 'NoneType' object has no attribute 'get'), when tries to read kwargs['dag_run'].conf.get('something').

By default, the dictionary dag_run.conf is not setted when dag runs by scheduler. That's why the error appears. So, when you need to run scheduled dag manually with custom parameters from time to time, just something like this will work:

In callable function:

def run_task(**kwargs):
    if kwargs['dag_run'].conf:
        my_parameter = kwargs['dag_run'].conf.get('my_parameter', None)
    else:
        my_parameter = "some_default_value"

1 Comment

See my answer. I provide the reason you're getting the error you're noticing

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.