2

I am trying to execute multiple AWS Lambda invocations with python 3.7.2 and aiobotocore package. Here is my code.

import asyncio
import aiobotocore


async def invoke(payload, session):
    async with session.create_client('lambda', region_name='us-east-1') as client:
        return await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)


def generate_invocations(payloads, session):
    for payload in payloads:
        yield invoke(payload, session)


def invoke_all(payloads):
    loop = asyncio.get_event_loop()

    async def wrapped():
        session = aiobotocore.get_session(loop=loop)
        invocations = generate_invocations(payloads, session)
        return await asyncio.gather(*invocations)

    return loop.run_until_complete(wrapped())


def main():
    payloads_list = []  # MY PAYLOADS LIST 
    lambda_responses = invoke_all(payloads_list)
    print(lambda_responses)


if __name__ == '__main__':
    main()

The code runs really fast (for 10 payloads around 1 sec instead of 15 by using boto3 lambda client invocations) but I have two issues:

1) The elements in lambda_responses include 'Payload' key which value is of type aiobotocore.response.StreamingBody. When I try value.read() I receive "coroutine object StreamingBody.read" and I think there some problem in my code. I can receive desired response by "json.loads(json.loads(r['Payload']._buffer.pop())['body'])" but what is the proper way to get it.

2) In rare cases the "Payload" in one of responses has empty buffer. How can I ensure that invoke_all function returns non empty responses? Is it correct usage of aiobotocore?

I am new to python 3 and async functionality. Inspired by examples of aiobotocore documentation and Mathew Marcus blog.

Thanks!

1 Answer 1

7
  1. The elements in lambda_responses include 'Payload' key which value is of type aiobotocore.response.StreamingBody. When I try value.read() I receive "coroutine object StreamingBody.read"

This means the read() coroutine is meant to be awaited, which you should do while still in the event loop. For example, you could change the invoke coroutine to also read the response:

async def invoke(payload, session):
    async with session.create_client('lambda', region_name='us-east-1') as client:
        resp = await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
        payload = await resp['Payload'].read()
        return payload  # or assemble a dict with relevant parts
  1. In rare cases the "Payload" in one of responses has empty buffer.

This is likely because you are accessing the buffer before actually reading the contents. In some cases the info arrives soon enough that you find it in the internal buffer anyway, but sometimes you have to wait for it. Using a public method such as read() ensures that you're correctly using the API. The _buffer property, on the other hand, begins with an underscore, which denotes that it is an implementation detail not meant to be accessed directly.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks! Works great.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.