9

Having an asynchronous generator I would expect to be able to iterate through it asynchronously. However, I am missing something or messing something up or both as I end up with a regular synchronous for loop in the end:

import asyncio


async def time_consuming(t):
    print(f"Going to sleep for {t} seconds")
    await asyncio.sleep(t)
    print(f"Slept {t} seconds")
    return t


async def generator():
    for i in range(4, 0, -1):
        yield await time_consuming(i)


async def consumer():
    async for t in generator():
        print(f"Doing something with {t}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(consumer())
    loop.close()

This will take about 12 seconds to run and return this:

Going to sleep for 4 seconds
Slept 4 seconds
Doing something with 4
Going to sleep for 3 seconds
Slept 3 seconds
Doing something with 3
Going to sleep for 2 seconds
Slept 2 seconds
Doing something with 2
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1

Though I expected it to take about 4 seconds to run and return something like this:

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 4 seconds
Doing something with 4
Slept 3 seconds
Doing something with 3
Slept 2 seconds
Doing something with 2
Slept 1 seconds
Doing something with 1

1 Answer 1

10

An asynchronous generator does not mean that you execute iteration concurrently! All that you gain is more places for the coroutine to yield to other tasks. The iteration steps still run in series.

Put differently: an asynchronous iterator is useful for an iterator that needs to use I/O to obtain each iteration step. Think looping over the results of a web socket, or lines in a file. If each next() step over the iterator requires waiting for a slow I/O source to provide data, that's a good point to yield control to something else that has been set to run concurrently.

If you expected each individual step of your generator to be run concurrently, then you still would have to schedule additional tasks, explicitly, with the event loop.

You can then return from the generator when all those extra tasks have completed. If you scheduled your 4 time_consuming() coroutines as tasks, use asyncio.wait() to wait for one or all of the tasks to complete, and yield results from tasks that are done, then yes, after your for i in range(...): loop is complete, your process would only take 4 seconds in total:

async def generator():
    pending = []
    for i in range(4, 0, -1):
        pending.append(asyncio.create_task(time_consuming(i)))

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            yield task.result()

at which point the output becomes

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1
Slept 2 seconds
Doing something with 2
Slept 3 seconds
Doing something with 3
Slept 4 seconds
Doing something with 4

Note that this is the reverse order from your expected output, because this takes task results as they complete rather than wait for the first task to be created to complete. Usually this is what you want, really. Why wait for 4 seconds when you already have a result ready after 1?

You can have your variant too, of a sort, but you'd just code that up differently. Then you can just use asyncio.gather() on the 4 tasks, which schedules a bunch of coroutines to run as concurrent tasks, and return their results as a list, after which you can yield those results:

async def generator():
    tasks = []
    for i in range(4, 0, -1):
        tasks.append(time_consuming(i))

    for res in await asyncio.gather(*tasks):
        yield res 

but now the output becomes

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Slept 2 seconds
Slept 3 seconds
Slept 4 seconds
Doing something with 4
Doing something with 3
Doing something with 2
Doing something with 1

because we can't do anything further until the longest task, time_consuming(4), has completed, yet the shorter-running tasks complete before that point and already output their Slept ... seconds message.

Sign up to request clarification or add additional context in comments.

12 Comments

Got it. I was avoiding the explicit scheduling of tasks on the event loop since I don't know properly how to do it inside the running loop. Maybe I need to exit it and then schedule the tasks in a new loop. I'll do my research. And yes, the time_consuming function awaits some requests in my real code.
@RodrigoMartins: that's as simple as asyncio.create_task().
Thank you very much! The order I expected was just a wrong guess. Your solution scheduling tasks and enqueueing 'em is perfect.
@RodrigoMartins: sorry, I had overcomplicated things with a queue. All you need here is to wait for the tasks to complete, so done has all the information we need.
@user4815162342 thanks, yes, absolutely. I’ll go get my brown paper bag now.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.