1

My use case is the following : I’m using python 3.8

I have an async function analyse_doc that is a wrapper for a http request to a web service. I have approx 1000 docs to analyse as fast as possible. The service allows for 15 transaction per second (and not 15 concurrent request at any second). So first sec I can send 15, then 2nd sec I can send 15 again and so on. If I try to hit the service more than 15 times per sec I get 429 error msg or sometimes 503/504 error (server is busy…)

My question is : is it possible to implement smt in python that effectively sends 15 requests per sec asynchronously then wait 1 sec then do it again until the queue is empty. Also some tasks might fail. Those failing tasks might need a rerun at some point.

So far my code is the following (unbounded parallelism… not even a semaphore) but it handles retry.

tasks = {asyncio.create_task(analyse_async(doc)): doc for doc in documents}
pending = set(tasks)

# Handle retry
while pending:
    #  backoff in case of 429
    time.sleep(1)

    # concurrent call return_when all completed
    finished, pending = await asyncio.wait(
        pending, return_when=asyncio.ALL_COMPLETED
    )

    
    # check  if task has exception and register for new run.
    for task in finished:
        arg = tasks[task]

        if task.exception():
            new_task = asyncio.create_task(analyze_doc(doc))
            tasks[new_task] = doc
            pending.add(new_task)
   
0

1 Answer 1

1

You could try adding another sleep tasks into the mix to drive the request generation. Something like this

import asyncio
import random

ONE_SECOND = 1
CONCURRENT_TASK_LIMIT = 2
TASKS_TO_CREATE = 10

loop =  asyncio.new_event_loop()

work_todo = []
work_in_progress = []

# just creates arbitrary work to do
def create_tasks():
    for i in range(TASKS_TO_CREATE):
        work_todo.append(worker_task(i))

    # muddle this up to see how drain works
    random.shuffle(work_todo)

# represents the actual work
async def worker_task(index):
    print(f"i am worker {index} and i am starting")
    await asyncio.sleep(index)
    print(f"i am worker {index} and i am done")

# gets the next 'concurrent' workload segment (if there is one)
def get_next_tasks():
    todo = []

    i = 0

    while i < CONCURRENT_TASK_LIMIT and len(work_todo) > 0:
        todo.append(work_todo.pop())
        i += 1

    return todo

# drains down any outstanding tasks and closes the loop
async def are_we_done_yet():
    print('draining')
    
    await asyncio.gather(*work_in_progress)

    loop.stop()
    
    # closes out the program
    print('done')

# puts work on the queue every tick (1 second)
async def work():
    next_tasks = get_next_tasks()
    if len(next_tasks) > 0:
        print(f'found {len(next_tasks)} tasks to do')
        for task in next_tasks:
            # schedules the work, puts it in the in-progress pile
            work_in_progress.append(loop.create_task(task))

        # this is the 'tick' or speed work gets scheduled on
        await asyncio.sleep(ONE_SECOND)
        
        # every 'tick' we add this tasks onto the loop again unless there isn't any more to do...
        loop.create_task(work())
    else:
        # ... if there isn't any to do we just enter drain mode
        await are_we_done_yet()

# bootstrap the process
create_tasks()
loop.create_task(work())
loop.run_forever()


Updated version with a simulated exception

import asyncio
import random

ONE_SECOND = 1
CONCURRENT_TASK_LIMIT = 2
TASKS_TO_CREATE = 10

loop =  asyncio.new_event_loop()

work_todo = []
work_in_progress = []

# just creates arbitrary work to do
def create_tasks():
    for i in range(TASKS_TO_CREATE):
        work_todo.append(worker_task(i))

    # muddle this up to see how drain works
    random.shuffle(work_todo)

# represents the actual work
async def worker_task(index):
    try:
        print(f"i am worker {index} and i am starting")
        await asyncio.sleep(index)

        if index % 9 == 0:
            print('simulating error')
            raise NotImplementedError("some error happened")

        print(f"i am worker {index} and i am done")
    except:
        # put this work back on the pile (fudge the index so it doesn't throw this time)
        work_todo.append(worker_task(index + 1))
        

# gets the next 'concurrent' workload segment (if there is one)
def get_next_tasks():
    todo = []

    i = 0

    while i < CONCURRENT_TASK_LIMIT and len(work_todo) > 0:
        todo.append(work_todo.pop())
        i += 1

    return todo

# drains down any outstanding tasks and closes the loop
async def are_we_done_yet():
    print('draining')
    
    await asyncio.gather(*work_in_progress)

    if (len(work_todo)) > 0:
        loop.create_task(work())
        print('found some retries')
    else:
        loop.stop()
        # closes out the program
        print('done')
    
    

# puts work on the queue every tick (1 second)
async def work():
    next_tasks = get_next_tasks()
    if len(next_tasks) > 0:
        print(f'found {len(next_tasks)} tasks to do')
        for task in next_tasks:
            # schedules the work, puts it in the in-progress pile
            work_in_progress.append(loop.create_task(task))

        # this is the 'tick' or speed work gets scheduled on
        await asyncio.sleep(ONE_SECOND)
        
        # every 'tick' we add this tasks onto the loop again unless there isn't any more to do...
        loop.create_task(work())
    else:
        # ... if there isn't any to do we just enter drain mode
        await are_we_done_yet()

# bootstrap the process
create_tasks()
loop.create_task(work())
loop.run_forever()

This just simulates something going wrong and re-queues the failed task. If the error happens after the main work method has finished it won't get re-queued so in the are-we-there-yet method it would need to check and rerun any failed tasks - this isn't particularly optimal as it'll wait to drain before checking everything else but gives you an idea of an implementation

Sign up to request clarification or add additional context in comments.

4 Comments

I like this approach. Would it be possible to handle exception at task level. Like if a worker_task fails then i can add it back to work_todo.
Added a second example with a simulated exception - it doesn't really meet your requirement for 'as fast as possible' in some scenarios though but does demo the concept a bit.
looks really great! thx. What's the purpose of the if block in are_we_done_yet ? Cause asyncio.gather is supposed to resolve every coroutines. Can't we directly write loop.stop after the ayncio.gather ?
I think gather will only do the ones it knew about at the time it was called (it takes an iterable of tasks rather than a live list), I think this can get into a situation where retries at the very end of a run might not get picked up so it just checks them again at the end just in case.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.