i have this two custom operator
class FetchDataOperator(BaseOperator):
@apply_defaults
def __init__(self, connection, sql_commands, key, *args, **kwargs):
super(FetchDataOperator, self).__init__(*args, **kwargs)
self.connection = connection
self.sql_commands = sql_commands
self.key = key
def execute(self, context):
cursor = self.connection.cursor()
cursor.execute(self.sql_commands)
records = cursor.fetchall()
context['ti'].xcom_push(key=self.key, value=records)
class InsertDataOperator(BaseOperator):
@apply_defaults
def __init__(self, connection, sql_commands, data, *args, **kwargs):
super(InsertDataOperator, self).__init__(*args, **kwargs)
self.connection = connection
self.sql_commands = sql_commands
self.data = data
def execute(self, context):
cursor = self.connection.cursor()
cursor.executemany(self.sql_commands, self.data)
self.connection.commit()
im confused on how to get the data that i pushed into xcom on FetchDataOperator, into 'data' parameter in InsertDataOperator, i tried this
insert_data = InsertDataOperator(
task_id="insert_data",
connection=conn,
sql_commands=insert_query,
op_kwargs={
'data' : "{{ti.xcom_pull(key='data', task_ids='fetch_data')}}"
},
provide_context=True
)
but it giving me this error
airflow.exceptions.AirflowException: Argument ['data'] is required
is there any way to do it correctly? Thank you!