I am trying to execute multiple AWS Lambda invocations with python 3.7.2 and aiobotocore package. Here is my code.
import asyncio
import aiobotocore
async def invoke(payload, session):
async with session.create_client('lambda', region_name='us-east-1') as client:
return await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
def generate_invocations(payloads, session):
for payload in payloads:
yield invoke(payload, session)
def invoke_all(payloads):
loop = asyncio.get_event_loop()
async def wrapped():
session = aiobotocore.get_session(loop=loop)
invocations = generate_invocations(payloads, session)
return await asyncio.gather(*invocations)
return loop.run_until_complete(wrapped())
def main():
payloads_list = [] # MY PAYLOADS LIST
lambda_responses = invoke_all(payloads_list)
print(lambda_responses)
if __name__ == '__main__':
main()
The code runs really fast (for 10 payloads around 1 sec instead of 15 by using boto3 lambda client invocations) but I have two issues:
1) The elements in lambda_responses include 'Payload' key which value is of type aiobotocore.response.StreamingBody. When I try value.read() I receive "coroutine object StreamingBody.read" and I think there some problem in my code. I can receive desired response by "json.loads(json.loads(r['Payload']._buffer.pop())['body'])" but what is the proper way to get it.
2) In rare cases the "Payload" in one of responses has empty buffer. How can I ensure that invoke_all function returns non empty responses? Is it correct usage of aiobotocore?
I am new to python 3 and async functionality. Inspired by examples of aiobotocore documentation and Mathew Marcus blog.
Thanks!