7

Basically I'm working with airflow and developed a task that my download a file from an external source.

t1 = PythonOperator(
        task_id='download',
        python_callable=download,
        provide_context=True,
        dag=dag)

and this airflow is running in a virtual environment (pipenv).

The download function is:

def download(**kwargs):
   folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
   file_name = download_file(folder_id)
   return file_name

so basically I'm using Xcons to pass data from one task to another...and using this configurations it's impossible to manage all of dependencies of each DAG...

In the documentation I found this class called "PythonVirtualenvOperator", so to implement that I wrote :

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        requirements=['requests'],
        python_version='3.8',
        provide_context=True,
        dag=dag
    )

and its giving me the following error:

TypeError: can't pickle module objects

the download_file function it's an API connection that is in another file.

any suggestion how can I manage the environment and have connection between tasks?

2
  • What is the download_file function? Can you post your entire DAG for us to help you Commented Nov 11, 2019 at 16:13
  • The download_file function it's an API connection that is in another file. Commented Nov 11, 2019 at 17:41

2 Answers 2

6

The problem is

provide_context=True,

Airflow cannot pickle the context because of all the unserializable stuff in it. You can use templating and op_kwargs to work around this if you only need simple stuff like execution_ts:

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        provide_context=False,
        op_kwargs={
          execution_date_str: '{{ execution_date }}',
        },
        dag=dag)

Of course, you will need to update the arguments to your callable. I didn't go any deeper than that because it worked for my use case.

Sign up to request clarification or add additional context in comments.

3 Comments

op_kwargs is not valid dictionary in the example you provided. please provide minimum working solution.
I changed the = to a :.
how about python_callable method as he should not use **kwargs but args relating to op_kwargs dict keys.
3

From the definition of the PythonVirtualenvOperator:

The function must be defined using def, and not be
part of a class. All imports must happen inside the function
and no variables outside of the scope may be referenced.

I'm guessing that someplace in the chain of code that's being called in your download function, there's a method that's imported from another file using a top-level import. Perhaps moving that import into your download function is enough?

4 Comments

Thanks for the suggestion, the best way is to use my class "download" as a module to be installed in the virtual operator.
Hi @YaroslavKolodiy, I am facing an issue with PythonVirtualenvOperator where the task is not using mention packages and mentioned python version insdie the task. It's looks like operator just skipping the option and using default python version n libraries present in the server where airflow is installed. Can you provide some suggestion on this to me.
Hi @Pelab I resolved my problem using containerization, I decided to migrate all of my tasks to DockerOperator that have the scripts and dependencies already installed.
@Pelab if you want to user the PythonVirtualenvOperator verify if you are creating correctly the task doc

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.