0

i have this two custom operator

class FetchDataOperator(BaseOperator):

    @apply_defaults
    def __init__(self, connection, sql_commands, key, *args, **kwargs):
        super(FetchDataOperator, self).__init__(*args, **kwargs)
        self.connection = connection
        self.sql_commands = sql_commands
        self.key = key

    def execute(self, context):
        cursor = self.connection.cursor()
        cursor.execute(self.sql_commands)
        records = cursor.fetchall()
        context['ti'].xcom_push(key=self.key, value=records)
class InsertDataOperator(BaseOperator):

    @apply_defaults
    def __init__(self, connection, sql_commands, data, *args, **kwargs):
        super(InsertDataOperator, self).__init__(*args, **kwargs)
        self.connection = connection
        self.sql_commands = sql_commands
        self.data = data

    def execute(self, context):
        cursor = self.connection.cursor()
        cursor.executemany(self.sql_commands, self.data)
        self.connection.commit()

im confused on how to get the data that i pushed into xcom on FetchDataOperator, into 'data' parameter in InsertDataOperator, i tried this

insert_data = InsertDataOperator(
    task_id="insert_data",
    connection=conn,
    sql_commands=insert_query,
    op_kwargs={
        'data' : "{{ti.xcom_pull(key='data', task_ids='fetch_data')}}"
    },
    provide_context=True
)

but it giving me this error

airflow.exceptions.AirflowException: Argument ['data'] is required

is there any way to do it correctly? Thank you!

1 Answer 1

2

Constructor of InsertDataOperator has signature:

__init__(self, connection, sql_commands, data, *args, **kwargs)

So obviously data should be passed as its a required argument.

Change your constructor call to:

insert_data = InsertDataOperator(
    task_id="insert_data",
    connection=conn,
    sql_commands=insert_query,
    data="{{ ti.xcom_pull(key='data', task_ids='fetch_data') }}"
)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.