6
  1. If i run this code, it will hang without throwing ZeroDivisionError.
  2. If i move await asyncio.gather(*tasks, return_exceptions=True) above await queue.join(), it will finally throw ZeroDivisionError and stop.
  3. If i then comment out 1 / 0 and run, it will execute everything, but will hang in the end.

Now the question is, how can i achive both:

  1. Being able to see unexpected exceptions as in the case 2 above, and...
  2. Actually stop when all task are done in the Queue

.

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        print('Get a "work item" out of the queue.')
        sleep_for = await queue.get()

        print('Sleep for the "sleep_for" seconds.')
        await asyncio.sleep(sleep_for)

        # Error on purpose
        1 / 0

        print('Notify the queue that the "work item" has been processed.')
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')

async def main():
    print('Create a queue that we will use to store our "workload".')
    queue = asyncio.Queue()

    print('Generate random timings and put them into the queue.')
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    print('Create three worker tasks to process the queue concurrently.')
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    print('Wait until the queue is fully processed.')
    started_at = time.monotonic()

    print('Joining queue')
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    print('Cancel our worker tasks.')
    for task in tasks:
        task.cancel()

    print('Wait until all worker tasks are cancelled.')
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

asyncio.run(main())

2 Answers 2

7

There are several ways to approach this, but the central idea is that in asyncio, unlike in classic threading, it is straightforward to await multiple things at once.

For example, you can await queue.join() and the worker tasks, whichever completes first. Since workers don't complete normally (you cancel them later), a worker completing means that it has raised.

# convert queue.join() to a full-fledged task, so we can test
# whether it's done
queue_complete = asyncio.create_task(queue.join())

# wait for the queue to complete or one of the workers to exit
await asyncio.wait([queue_complete, *tasks], return_when=asyncio.FIRST_COMPLETED)

if not queue_complete.done():
    # If the queue hasn't completed, it means one of the workers has
    # raised - find it and propagate the exception.  You can also
    # use t.exception() to get the exception object. Canceling other
    # tasks is another possibility.
    for t in tasks:
        if t.done():
            t.result()  # this will raise
Sign up to request clarification or add additional context in comments.

3 Comments

this only works with a fully pre-populated queue. How to use with a async generator that feeds the queue with await queue.put() ? Maybe a separate question
@YannickEinsweiler I've only now noticed this comment. With a queue that is nott pre-populated, you could create a coroutine that does await producer(); await queue.join() and use that as queue_complete. (producer is the coroutine that loops over the async generator and enqueues elements the generator produces.)
It is important to note, if you do not want results returned until all items in the queue are completed, then you will need to remove return_when=asyncio.FIRST_COMPLETED. This will then return when all items in the queue are completed.
0

A workaround (but ugly) solution: add try-catch block inside async def worker(...):, this will catch any exception in the code and prevent a no-ending loop.

Follow the same code as the question:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        try:
            ...
            1 / 0  # Error code
            ...
        except Exception as e:
            print(e)  # Show error
        finanlly:
            queue.task_done() # Make sure to clear the task

async def main():
     ...

asyncio.run(main())

1 Comment

About the finally part, I tried it and it shows strange behavior python 3.9 as some tasks are left inside the queue unattended... However, everything works if such finally is just not used, and simply put the queue.task_done() call just beneath the except. stackoverflow.com/questions/65603767/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.