I have the following example for errorhandling with asyncio:
import asyncio
import logging
def handle_exception(loop, context):
msg = context.get("exception", context["message"])
logging.error(f"Caught exception: {msg}")
logging.warning("Shutting down...")
asyncio.create_task(shutdown(loop))
async def shutdown(loop, signal=None):
"""Cleanup tasks tied to the service's shutdown."""
if signal:
logging.warning(f"Received exit signal {signal.name}...")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logging.warning(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
async def bug():
await asyncio.sleep(10)
raise ValueError("some error")
def main():
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
try:
loop.create_task(bug())
loop.run_forever()
finally:
loop.close()
logging.warning("shutdown completed")
if __name__ == '__main__':
main()
This works for me with this "lower level function". But i don't get how to do it without lower level functions. If i try this (rest of the code is the same as above):
async def main():
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
asyncio.create_task(bug())
await asyncio.sleep(20)
if __name__ == '__main__':
asyncio.run(main())
i always get an asyncio.exceptions.CancelledError
How to do this with asyncio.run ?