2

I currently have a for loop as follows

async def process(tasks, num):
      count = 0
      results = []
      for task in tasks:
           if count >= num:
               break
           result = await some_async_task(task)
           if result == 'foo':
               continue
           results.append(result)
           count+=1

I was wondering if I can use gather or wait_for primitive here. But I am not sure how to implement these if logics in there? Like.. I dont want to unneccessary await a task if count>=num. If there are 20 tasks and num = 4, then I dont want to run all 20 tasks.

1
  • @user4815162342: I accepted the answer that worked. I didnt tried the other response actually. Commented Oct 26, 2019 at 7:32

3 Answers 3

3

This can easily be achieved using the aiostream library. Here is a working example:

import asyncio
from random import random
from aiostream import stream, pipe


async def some_async_task(i):
    await asyncio.sleep(random())
    return i if random() < 0.2 else None


async def process(task_args, n):
    return await (
        stream.iterate(task_args)
        | pipe.map(some_async_task, task_limit=n)
        | pipe.filter(bool)
        | pipe.take(n)
        | pipe.list()
    )


async def main():
    print(await process(task_args=range(100), n=10))


if __name__ == "__main__":
    asyncio.run(main())

The program prints the list of the first 10 tasks that succeeded:

[1, 8, 16, 18, 19, 37, 42, 43, 45, 47]

Also notice that you may tune the number of some_async_task that can run at the same time using the task_limit argument.

Disclaimer: I am the project maintainer.

Sign up to request clarification or add additional context in comments.

8 Comments

Nice demonstration of aiostream, but I'm not sure it does what the question actually asks for. Specifically, what would the implementation of the OP's process() function look like using aiostream? The constraint is not to run more tasks in parallel than necessary given num and the current count of results.
@user4815162342 Oh I see: in the aiostream version, there is always n tasks running until the criteria is met and then the extra tasks gets cancelled. In your version there is at most n tasks running (and less than that most of the time) but it provides the guarantee that no task is going to provide a result that is not going to be used. Interesting, there is some kind of trade-off here, right? Also see my edit.
Yes, not starting excess jobs is what I the OP stated as a goal. On the other hand, your aiostream version is more general and perhaps easier to adapt to different situations. Mine better matches the question (as I understood it), doesn't have the external dependency, and might be faster. (If I were using this in production, I'd measure the performance of this chaining - Python abstractions are far from "zero-cost", after all.)
It seems to me that the OP was asking for not running all the tasks since he only needs n tasks succeeding (in my example, there is no point in running all 100 tasks since about 50 tasks are necessary to get 10 results). However, I agree that your solution is elegant and simple enough to discard the need for a third-party library :)
About the performance, it really depends how you look it: the use of async generator is definitely more costly for the CPU, but it seems safe to assume that most of the time is spent waiting for the tasks to finish. With your solution, those tasks might run one after the other when a few results are missing and increase the run time. That's the trade-off I was talking about earlier, no excess jobs vs run time.
|
2

You could process the tasks in batches of size that equals the number of results you still need. If you give such batch to asyncio.gather(), it will both run them in parallel and preserve the order of results. For example:

async def process(tasks, num):
    results = []
    task_iter = iter(tasks)
    while len(results) < num:
        next_batch = tuple(itertools.islice(task_iter, num - len(results)))
        if len(next_batch) == 0:
            break
        batch_results = await asyncio.gather(*next_batch)
        results.extend(r for r in batch_results if r == 'foo')

1 Comment

This indeed does what to OP is asking for, but not the best use of asyncio. This will become "synchronous" when num - len(results) == 1. I wouldn't recommend this code for production.
0

Well, I really think this question and the selected answer are worth an analysis.

First off, you have to be aware of the fact that not awaiting other tasks depends on the result of the already awaited ones. Meaning, you have to potentially await all your tasks in order to get the first N results you are interested in.

Having said that, the selected answer has the following characteristics:

  1. For smaller values of num the use of asyncio makes less sense.
  2. For bigger values of num the solution would be better written using asyncio.as_completed so you can get the N first result matching your condition.

Think about this case, you want the first result (only one) matching some condition out of 1000 requests! In this case, the selected answer is not best than a synchronous implementation, in fact, it is worst since it presents unnecessary complications.

With the selected solution this will be always the case when len(results) = N - 1, since this will cause num = 1, making the whole thing conceptually synchronous due next_batch having always only 1 element!!

Another thing, using the selected answer does not prevent "unnecessary awaits", for instance, if you want 100 solutions out of 300 coroutines you might easily end up running the 3rd 100-batch running all previous "not wanted coroutines/tasks"!

Conclusion

If no running extra awaits it is a must (maybe saving money on a very expensive query to a DB or even to an AWS Lambda request), you might want to sacrifice speed and run synchronously (no asyncio).

On the other hand, if you are only interested in the first N results asyncio.as_completed is the way to go.

Here is an example ...

Example:

import asyncio
import random
import time


# Max time a corroutine gets the dalay for.
max_time_waited = 0
# Couter for the number of coroutines called.
call_counter = 0

# Aux variable for avoiding race conditions when editting global variables.
# no need for this in you code, it is just for ilustrative purposes.
lock = asyncio.Lock()


# Some task.
async def coro(task_number):

    global max_time_waited 
    global call_counter

    delay = random.randint(1, 10)

    await lock.acquire()
    max_time_waited = max(delay, max_time_waited)
    lock.release()

    call_counter += 1    
    await asyncio.sleep(delay)
    
    return "foo" if task_number % 2 else "bar"


async def process(tasks, num):

    interesting_results = []

    for task in asyncio.as_completed(tasks):
        result = await task
        if result == "foo":
            interesting_results.append(result)

        if len(interesting_results) >= num:
            break

    return interesting_results


async def main(number):

    tasks = [
        coro(task_number)
        for task_number in range(100)
    ]

    return await process(tasks, number)

init = time.perf_counter()
result = asyncio.run(main(4))
print(f"Result obtained in {time.perf_counter() - init} seconds")
print(f"Possible max time waiting: {max_time_waited} seconds")
print(f"Total coroutine calls: {call_counter}")
print(result)

Example ouput

Result obtained in 1.009999111003708 seconds
Possible max time waiting: 10 seconds
Total coroutine calls: 100
['foo', 'foo', 'foo', 'foo']

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.