0

I am using cloud composer 2.0.23 (airflow 2.2.5) My job is a gen2 cloud function, reading\writing data in BQ. And I send HTTP requests in the operator.

I notice that if I have jobs with execution time more than 10~15min, even if the cloud function finished successfully, the status shown on airflow never updates.

[update1] Cloud function info: enter image description here

enter image description here

Airflow job status: enter image description here It should be "success".

[update2] code snippet in my DAG.py

def call_cloud_function_endpoint(gcf_url, **kwargs):
    """Call Cloud Function's endpoint and record time used in BigQuery"""

    # Set timer for recording function execution time
    start = pendulum.now(tz='Asia/Taipei')

    # This parts calls Cloud Function, which has 'Requir Authentication' enabled'
    auth_req = google.auth.transport.requests.Request()
    id_token = google.oauth2.id_token.fetch_id_token(auth_req, gcf_url)

    if ('params' in kwargs) and kwargs['params']:
        p_list=[]
        for k in kwargs['params']:
            p_list.append(f"{k}={kwargs['params'][k]}") 
        gcf_url = f"{gcf_url}?{'&'.join(p_list)}"

    print(f'send request: {gcf_url}')

    req = urllib.request.Request(gcf_url)
    req.add_header('Authorization', f'Bearer {id_token}')
    response = urllib.request.urlopen(req).read()

    # Cloud Function returns a response by json.dump
    # Use json.loads to turn response into dict type
    result = json.loads(response)

    # Timer for finished Execution 
    end = pendulum.now(tz='Asia/Taipei')

    # Create execution record and write to BigQuery
    gcf_function_name = gcf_url.split('/')[-1]
    execution_result = [{ # depends on your response body
                        'date': start.format('YYYY-MM-DD'),
                        'time': start.format('HH:mm:ss'),
                        'task_name': gcf_function_name,
                        'runtime': end.diff(start).in_seconds(), 
                        'status': result['status'], 
                        'message': result['message'],
                        'output_num': result['output_num']
                        }]

    write_to_bq(TABLE_ID, execution_result)

    # Cloud Function response contains 
    # 'status': indicate execution result by 'success' or 'failed'
    # 'message': could be records the cloud function has output or error message
    if result['status'] == 'failed':
        raise AirflowFailException(result['message'])
    return result['message']


with DAG(
    dag_id='DAG_Demo',
    start_date= TODAY,
    schedule_interval=None,  # schedule_interval=None if the DAG is manually triggered
    catchup=False,
    default_args=default_args,
    tags=['Demo']
) as dag:

    # All Tasks
    my_task = PythonOperator(
        task_id='my_task', 
        python_callable=call_cloud_function_endpoint, 
        op_kwargs={
            'gcf_url': config['gcf_endpoint']['my_task']['url'],
            'params': config['gcf_endpoint']['my_task']['params'],
            }
    )

And the logic of my cloud function (GEN2)

import json
import time
import functions_framework

@functions_framework.http
def sleep_900_sec(request):
    
    time.sleep(900)
    data = {"status": "success", "message": "Successfully Run", "output_num": 3}
    return json.dumps(data)

Maybe I should not use PythonOperator? (use HTTPOperator? BashOperator? or else) Or, did I miss some setting in the Airflow/cloud function configuration?


Any suggestion is appreciated

Thanks in prior

6
  • Can you provide the screenshot of the status? Commented Sep 8, 2022 at 5:13
  • @PoalaAstrid please check my updates, thanks! Commented Sep 8, 2022 at 6:15
  • All I want to do is sending request to a cloud function API, then process the respose Commented Sep 12, 2022 at 1:04
  • Hi, I was replicating your case and it worked but I want to know how big is the data you're reading/writing and how many rows it has? Commented Sep 14, 2022 at 0:10
  • This case is a 'sleeping' job, do nothing. In my tests, if the execution time exceeds certain amount of time. Airflow will never update the task status. Maybe it is because of HTTPoperator? different regions between cloud function&composer? Or something else? Commented Sep 14, 2022 at 4:24

2 Answers 2

3

TL;DR: Cloud Composer likely has a firewall rule that kills idle TCP connections. (e.g. long-running HTTP requests) Set TCP Keep-alive flags to prevent this.

I solved a similar problem by setting TCP Keep-alive flags in Python. There is a in-depth explanation of this issue focussing on Airflow's internal SimpleHttpOperator, but it's entirely transferable for PythonOperators that issue any HTTP request.

I resolved this issue by setting TCP Keep-alive flags for Python's urllib3 HTTPConnection.default_socket_options. As suggested in this blog post comment

I would recommend changing your code as follows:

def call_cloud_function_endpoint(gcf_url, **kwargs):
  import socket
  from urllib3.connection import HTTPConnection

  HTTPConnection.default_socket_options += [
      (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
      (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60),
      (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60),
      (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 100),
  ]
  """Call Cloud Function's endpoint and record time used in BigQuery"""
  # Set timer for recording function execution time
...
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for your reply. I'll do some tests and tell you the result
0

I have replicated your case with a large-sized public dataset, and it was stuck in "up for retry" or "running" in Airflow. So I changed the Memory allocated and Timeout in Cloud Functions configuration. Try to increase both but I set Timeout into maximum.

enter image description here

It worked for me.

This image is the logs for Airflow. airflow

This is for Cloud Function.

cloud-function

Let me know if this can help or if not, we can still figure out the solution to your problem.

1 Comment

Hi, please check my updates. In my test, it is not the matter of CPU\MEM, it is about "time". Thank you!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.