I am using cloud composer 2.0.23 (airflow 2.2.5) My job is a gen2 cloud function, reading\writing data in BQ. And I send HTTP requests in the operator.
I notice that if I have jobs with execution time more than 10~15min, even if the cloud function finished successfully, the status shown on airflow never updates.
[update1]
Cloud function info:

Airflow job status:
It should be "success".
[update2] code snippet in my DAG.py
def call_cloud_function_endpoint(gcf_url, **kwargs):
"""Call Cloud Function's endpoint and record time used in BigQuery"""
# Set timer for recording function execution time
start = pendulum.now(tz='Asia/Taipei')
# This parts calls Cloud Function, which has 'Requir Authentication' enabled'
auth_req = google.auth.transport.requests.Request()
id_token = google.oauth2.id_token.fetch_id_token(auth_req, gcf_url)
if ('params' in kwargs) and kwargs['params']:
p_list=[]
for k in kwargs['params']:
p_list.append(f"{k}={kwargs['params'][k]}")
gcf_url = f"{gcf_url}?{'&'.join(p_list)}"
print(f'send request: {gcf_url}')
req = urllib.request.Request(gcf_url)
req.add_header('Authorization', f'Bearer {id_token}')
response = urllib.request.urlopen(req).read()
# Cloud Function returns a response by json.dump
# Use json.loads to turn response into dict type
result = json.loads(response)
# Timer for finished Execution
end = pendulum.now(tz='Asia/Taipei')
# Create execution record and write to BigQuery
gcf_function_name = gcf_url.split('/')[-1]
execution_result = [{ # depends on your response body
'date': start.format('YYYY-MM-DD'),
'time': start.format('HH:mm:ss'),
'task_name': gcf_function_name,
'runtime': end.diff(start).in_seconds(),
'status': result['status'],
'message': result['message'],
'output_num': result['output_num']
}]
write_to_bq(TABLE_ID, execution_result)
# Cloud Function response contains
# 'status': indicate execution result by 'success' or 'failed'
# 'message': could be records the cloud function has output or error message
if result['status'] == 'failed':
raise AirflowFailException(result['message'])
return result['message']
with DAG(
dag_id='DAG_Demo',
start_date= TODAY,
schedule_interval=None, # schedule_interval=None if the DAG is manually triggered
catchup=False,
default_args=default_args,
tags=['Demo']
) as dag:
# All Tasks
my_task = PythonOperator(
task_id='my_task',
python_callable=call_cloud_function_endpoint,
op_kwargs={
'gcf_url': config['gcf_endpoint']['my_task']['url'],
'params': config['gcf_endpoint']['my_task']['params'],
}
)
And the logic of my cloud function (GEN2)
import json
import time
import functions_framework
@functions_framework.http
def sleep_900_sec(request):
time.sleep(900)
data = {"status": "success", "message": "Successfully Run", "output_num": 3}
return json.dumps(data)
Maybe I should not use PythonOperator? (use HTTPOperator? BashOperator? or else) Or, did I miss some setting in the Airflow/cloud function configuration?
Any suggestion is appreciated
Thanks in prior



