26

I have a list of awaitables that I want to pass to the asyncio.AbstractEventLoop but I need to throttle the requests to a third party API.

I would like to avoid something that waits to pass the future to the loop because in the meantime I block my loop waiting. What options do I have? Semaphores and ThreadPools will limit how many are running concurrently, but that's not my problem. I need to throttle my requests to 100/sec, but it doesn't matter how long it takes to complete the request.

This is a very concise (non)working example using the standard library, that demonstrates the problem. This is supposed to throttle at 100/sec but throttles at 116.651/sec. What's the best way to throttle the scheduling of an asynchronous request in asyncio?

Working code:

import asyncio
from threading import Lock

class PTBNL:

    def __init__(self):
        self._req_id_seq = 0
        self._futures = {}
        self._results = {}
        self.token_bucket = TokenBucket()
        self.token_bucket.set_rate(100)

    def run(self, *awaitables):

        loop = asyncio.get_event_loop()

        if not awaitables:
            loop.run_forever()
        elif len(awaitables) == 1:
            return loop.run_until_complete(*awaitables)
        else:
            future = asyncio.gather(*awaitables)
            return loop.run_until_complete(future)

    def sleep(self, secs) -> True:

        self.run(asyncio.sleep(secs))
        return True

    def get_req_id(self) -> int:

        new_id = self._req_id_seq
        self._req_id_seq += 1
        return new_id

    def start_req(self, key):

        loop = asyncio.get_event_loop()
        future = loop.create_future()
        self._futures[key] = future
        return future

    def end_req(self, key, result=None):

        future = self._futures.pop(key, None)
        if future:
            if result is None:
                result = self._results.pop(key, [])
            if not future.done():
                future.set_result(result)

    def req_data(self, req_id, obj):
        # Do Some Work Here
        self.req_data_end(req_id)
        pass

    def req_data_end(self, req_id):
        print(req_id, " has ended")
        self.end_req(req_id)

    async def req_data_async(self, obj):

        req_id = self.get_req_id()
        future = self.start_req(req_id)

        self.req_data(req_id, obj)

        await future
        return future.result()

    async def req_data_batch_async(self, contracts):

        futures = []
        FLAG = False

        for contract in contracts:
            req_id = self.get_req_id()
            future = self.start_req(req_id)
            futures.append(future)

            nap = self.token_bucket.consume(1)

            if FLAG is False:
                FLAG = True
                start = asyncio.get_event_loop().time()

            asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract)

        await asyncio.gather(*futures)
        elapsed = asyncio.get_event_loop().time() - start

        return futures, len(contracts)/elapsed

class TokenBucket:

    def __init__(self):
        self.tokens = 0
        self.rate = 0
        self.last = asyncio.get_event_loop().time()
        self.lock = Lock()

    def set_rate(self, rate):
        with self.lock:
            self.rate = rate
            self.tokens = self.rate

    def consume(self, tokens):
        with self.lock:
            if not self.rate:
                return 0

            now = asyncio.get_event_loop().time()
            lapse = now - self.last
            self.last = now
            self.tokens += lapse * self.rate

            if self.tokens > self.rate:
                self.tokens = self.rate

            self.tokens -= tokens

            if self.tokens >= 0:
                return 0
            else:
                return -self.tokens / self.rate


if __name__ == '__main__':

    asyncio.get_event_loop().set_debug(True)
    app = PTBNL()

    objs = [obj for obj in range(500)]

    l,t = app.run(app.req_data_batch_async(objs))

    print(l)
    print(t)

Edit: I've added a simple example of TrottleTestApp here using semaphores, but still can't throttle the execution:

import asyncio
import time


class ThrottleTestApp:

    def __init__(self):
        self._req_id_seq = 0
        self._futures = {}
        self._results = {}
        self.sem = asyncio.Semaphore()

    async def allow_requests(self, sem):
        """Permit 100 requests per second; call 
           loop.create_task(allow_requests())
        at the beginning of the program to start this routine.  That call returns
        a task handle that can be canceled to end this routine.

        asyncio.Semaphore doesn't give us a great way to get at the value other
        than accessing sem._value.  We do that here, but creating a wrapper that
        adds a current_value method would make this cleaner"""

        while True:
            while sem._value < 100: sem.release()
            await asyncio.sleep(1)  # Or spread more evenly 
                                    # with a shorter sleep and 
                                    # increasing the value less

    async def do_request(self, req_id, obj):
        await self.sem.acquire()

        # this is the work for the request
        self.req_data(req_id, obj)

    def run(self, *awaitables):

        loop = asyncio.get_event_loop()

        if not awaitables:
            loop.run_forever()
        elif len(awaitables) == 1:
            return loop.run_until_complete(*awaitables)
        else:
            future = asyncio.gather(*awaitables)
            return loop.run_until_complete(future)

    def sleep(self, secs: [float]=0.02) -> True:

        self.run(asyncio.sleep(secs))
        return True

    def get_req_id(self) -> int:

        new_id = self._req_id_seq
        self._req_id_seq += 1
        return new_id

    def start_req(self, key):

        loop = asyncio.get_event_loop()
        future = loop.create_future()
        self._futures[key] = future
        return future

    def end_req(self, key, result=None):

        future = self._futures.pop(key, None)
        if future:
            if result is None:
                result = self._results.pop(key, [])
            if not future.done():
                future.set_result(result)

    def req_data(self, req_id, obj):
        # This is the method that "does" something
        self.req_data_end(req_id)
        pass

    def req_data_end(self, req_id):

        print(req_id, " has ended")
        self.end_req(req_id)

    async def req_data_batch_async(self, objs):

        futures = []
        FLAG = False

        for obj in objs:
            req_id = self.get_req_id()
            future = self.start_req(req_id)
            futures.append(future)

            if FLAG is False:
                FLAG = True
                start = time.time()

            self.do_request(req_id, obj)

        await asyncio.gather(*futures)
        elapsed = time.time() - start
        print("Roughly %s per second" % (len(objs)/elapsed))

        return futures


if __name__ == '__main__':

    asyncio.get_event_loop().set_debug(True)
    app = ThrottleTestApp()

    objs = [obj for obj in range(10000)]

    app.run(app.req_data_batch_async(objs))
2
  • Are you trying to limit the number of requests which were in progress per second, or requests started in a particular second. For example, if you start 100 requests that take 3 seconds each, can you start 200 more requests in the following 2 seconds? Commented Aug 3, 2017 at 22:51
  • @AaronSchif It does not matter when they initiate, just that over any 1-sec rolling window no more than 100 have been initiated. Commented Aug 4, 2017 at 4:00

4 Answers 4

80
+50

You can do this by implementing the leaky bucket algorithm:

import asyncio
from contextlib import AbstractAsyncContextManager
from functools import partial
from heapq import heappop, heappush
from itertools import count
from types import TracebackType
from typing import List, Optional, Tuple, Type


class AsyncLimiter(AbstractAsyncContextManager):
    """A leaky bucket rate limiter.

    This is an :ref:`asynchronous context manager <async-context-managers>`;
    when used with :keyword:`async with`, entering the context acquires
    capacity::

        limiter = AsyncLimiter(10)
        for foo in bar:
            async with limiter:
                # process foo elements at 10 items per minute

    :param max_rate: Allow up to `max_rate` / `time_period` acquisitions before
       blocking.
    :param time_period: duration, in seconds, of the time period in which to
       limit the rate. Note that up to `max_rate` acquisitions are allowed
       within this time period in a burst.

    """

    __slots__ = (
        "max_rate",
        "time_period",
        "_rate_per_sec",
        "_level",
        "_last_check",
        "_event_loop",
        "_waiters",
        "_next_count",
        "_waker_handle",
    )

    max_rate: float  #: The configured `max_rate` value for this limiter.
    time_period: float  #: The configured `time_period` value for this limiter.

    def __init__(self, max_rate: float, time_period: float = 60) -> None:
        self.max_rate = max_rate
        self.time_period = time_period
        self._rate_per_sec = max_rate / time_period
        self._level = 0.0
        self._last_check = 0.0

        # timer until next waiter can resume
        self._waker_handle: asyncio.TimerHandle | None = None
        # min-heap with (amount requested, order, future) for waiting tasks
        self._waiters: List[Tuple[float, int, "asyncio.Future[None]"]] = []
        # counter used to order waiting tasks
        self._next_count = partial(next, count())

    @property
    def _loop(self) -> asyncio.AbstractEventLoop:
        self._event_loop: asyncio.AbstractEventLoop
        try:
            loop = self._event_loop
        except AttributeError:
            loop = self._event_loop = asyncio.get_running_loop()
        return loop

    def _leak(self) -> None:
        """Drip out capacity from the bucket."""
        now = self._loop.time()
        if self._level:
            # drip out enough level for the elapsed time since
            # we last checked
            elapsed = now - self._last_check
            decrement = elapsed * self._rate_per_sec
            self._level = max(self._level - decrement, 0)
        self._last_check = now

    def has_capacity(self, amount: float = 1) -> bool:
        """Check if there is enough capacity remaining in the limiter

        :param amount: How much capacity you need to be available.

        """
        self._leak()
        return self._level + amount <= self.max_rate

    async def acquire(self, amount: float = 1) -> None:
        """Acquire capacity in the limiter.

        If the limit has been reached, blocks until enough capacity has been
        freed before returning.

        :param amount: How much capacity you need to be available.
        :exception: Raises :exc:`ValueError` if `amount` is greater than
           :attr:`max_rate`.

        """
        if amount > self.max_rate:
            raise ValueError("Can't acquire more than the maximum capacity")

        loop = self._loop
        while not self.has_capacity(amount):
            # Add a future to the _waiters heapq to be notified when capacity
            # has come up. The future callback uses call_soon so other tasks
            # are checked *after* completing capacity acquisition in this task.
            fut = loop.create_future()
            fut.add_done_callback(partial(loop.call_soon, self._wake_next))
            heappush(self._waiters, (amount, self._next_count(), fut))
            self._wake_next()
            await fut

        self._level += amount
        # reset the waker to account for the new, lower level.
        self._wake_next()

        return None

    def _wake_next(self, *_args: object) -> None:
        """Wake the next waiting future or set a timer"""
        # clear timer and any cancelled futures at the top of the heap
        heap, handle, self._waker_handle = self._waiters, self._waker_handle, None
        if handle is not None:
            handle.cancel()
        while heap and heap[0][-1].done():
            heappop(heap)

        if not heap:
            # nothing left waiting
            return

        amount, _, fut = heap[0]
        self._leak()
        needed = amount - self.max_rate + self._level
        if needed <= 0:
            heappop(heap)
            fut.set_result(None)
            # fut.set_result triggers another _wake_next call
            return

        wake_next_at = self._last_check + (1 / self._rate_per_sec * needed)
        self._waker_handle = self._loop.call_at(wake_next_at, self._wake_next)

    def __repr__(self) -> str:  # pragma: no cover
        args = f"max_rate={self.max_rate!r}, time_period={self.time_period!r}"
        state = f"level: {self._level:f}, waiters: {len(self._waiters)}"
        if (handle := self._waker_handle) and not handle.cancelled():
            microseconds = int((handle.when() - self._loop.time()) * 10**6)
            if microseconds > 0:
                state += f", waking in {microseconds} \N{MICRO SIGN}s"
        return f"<AsyncLimiter({args}) at {id(self):#x} [{state}]>"

    async def __aenter__(self) -> None:
        await self.acquire()
        return None

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        tb: Optional[TracebackType],
    ) -> None:
        return None

Note that we leak capacity from the bucket opportunistically, there is no need to run a separate async task just to lower the level; instead, capacity are leaked out when testing for sufficient remaining capacity.

Note that tasks that wait for capacity are kept in a min-heap, and when there might be capacity to spare again, the first still-waiting task is woken up early.

You can use this as a context manager; trying to acquire the bucket when it is full blocks until enough capacity has been freed again:

bucket = AsyncLimiter(100)

# ...

async with bucket:
    # only reached once the bucket is no longer full

or you can call acquire() directly:

await bucket.acquire()  # blocks until there is space in the bucket

or you can simply test if there is space first:

if bucket.has_capacity():
    # reject a request due to rate limiting

Note that you can count some requests as 'heavier' or 'lighter' by increasing or decreasing the amount you 'drip' into the bucket:

await bucket.acquire(10)
if bucket.has_capacity(0.5):

Do be careful with this though; when mixing large and small drips, small drips tend to get run before large drips when at or close to the maximum rate, because there is a greater likelyhood that there is enough free capacity for a smaller drip before there is space for a larger one.

Demo:

>>> import asyncio, time
>>> bucket = AsyncLimiter(5, 10)
>>> async def task(id):
...     await asyncio.sleep(id * 0.01)
...     async with bucket:
...         print(f'{id:>2d}: Drip! {time.time() - ref:>5.2f}')
...
>>> ref = time.time()
>>> tasks = [task(i) for i in range(15)]
>>> result = asyncio.run(asyncio.wait(tasks))
 0: Drip!  0.00
 1: Drip!  0.02
 2: Drip!  0.02
 3: Drip!  0.03
 4: Drip!  0.04
 5: Drip!  2.05
 6: Drip!  4.06
 7: Drip!  6.06
 8: Drip!  8.06
 9: Drip! 10.07
10: Drip! 12.07
11: Drip! 14.08
12: Drip! 16.08
13: Drip! 18.08
14: Drip! 20.09

The bucket is filled up quickly at the start in a burst, causing the rest of the tasks to be spread out more evenly; every 2 seconds enough capacity is freed for another task to be handled.

The maximum burst size is equal to the maximum rate value, in the above demo that was set to 5. If you do not want to permit bursts, set the maximum rate to 1, and the time period to the minimum time between drips:

>>> bucket = AsyncLimiter(1, 1.5)  # no bursts, drip every 1.5 seconds
>>> async def task():
...     async with bucket:
...         print(f'Drip! {time.time() - ref:>5.2f}')
...
>>> ref = time.time()
>>> tasks = [task() for _ in range(5)]
>>> result = asyncio.run(asyncio.wait(tasks))
Drip!  0.00
Drip!  1.50
Drip!  3.01
Drip!  4.51
Drip!  6.02

I've gotten round to packaging this up as a Python project: https://github.com/mjpieters/aiolimiter, I've kept the implementation in this answer up-to-date with improvements made in that project.

Sign up to request clarification or add additional context in comments.

8 Comments

Looking at your demo with bucket = AsyncLimiter(5, 10) I expect that in any 10 second window there should be no more than 5 print statements executed. Yet print statements are produced at times 0.0, 0.02, 0.02, 0.03, 0.04, 2.05, 4.06, etc. That is, In a period of 4.06 seconds you have issued the print statement 7 times. Do you not see a problem with this? If not, then please explain to me why these results are valid.
@Booboo please read the section about burst behaviour.
I did read that section prior to commenting. Clearly at startup in that initial 10-second period you are "dripping" more than 5 times. Perhaps I am missing the obvious, but I still don't understand why you believe that behavior to be correct.
What you are missing is that the leaky bucket algorithm has a full-up time; the max rate is the number of requests that go through unimpeded until the bucket is full and things get slown down. If you sr the max rate to 1 then the delays kick in immediately.
So there is no way to benefit from the rate limit of 5 reqs per 10 secs, without bursting, and without restricting to the lower rate limit of 1 req per 2 secs ??
Over time a rate limit of 1 req per 2 secs is the same as 5 reqs per 10 secs.
Even if the request take so long to respond ?
you need to be more specific with your question. I do not understand it. The duration to get a response is not part of the rate limit. The duration can be 1 hr or 1 ms, and the rate limit will still be 5 reqs per 10 secs.
5

Another solution - using bounded semaphores - by a coworker, mentor, and friend, is the following:

import asyncio


class AsyncLeakyBucket(object):

    def __init__(self, max_tasks: float, time_period: float = 60, loop: asyncio.events=None):
        self._delay_time = time_period / max_tasks
        self._sem = asyncio.BoundedSemaphore(max_tasks)
        self._loop = loop or asyncio.get_event_loop()
        self._loop.create_task(self._leak_sem())

    async def _leak_sem(self):
        """
        Background task that leaks semaphore releases based on the desired rate of tasks per time_period
        """
        while True:
            await asyncio.sleep(self._delay_time)
            try:
                self._sem.release()
            except ValueError:
                pass

    async def __aenter__(self) -> None:
        await self._sem.acquire()

    async def __aexit__(self, exc_type, exc, tb) -> None:
        pass

Can still be used with the same async with bucket code as in @Martijn's answer

1 Comment

Note that this’ll still allow bursts, at up to max_tasks at a time. That’s because when below the set rate, the semaphore freely let’s tasks acquire it. You can then end up with series of bursts if the time that tasks hold the semaphore is uniform (all tasks hold the semaphore lock for an equal amount of time).
0

A simple solution to manage max requests per second and max simultaneous connections to an API, which I use with Interactive Brokers API.

import asyncio
import datetime as dt
import random


async def send_request(num):
    print(f"Request  {num:>2} at {dt.datetime.now()}")
    await asyncio.sleep(random.choice([0.1, 0.2]))
    print(f"Response {num:>2} at {dt.datetime.now()}")


def requests_per_second(request_datetimes):
    rps = 0
    if len(request_datetimes) > 0:
        rps = 1 / (dt.datetime.now() - request_datetimes[-1]).total_seconds()
    return rps


async def rate_limited_gather(*args, rate_limit=50, max_connections=10):
    """Manage max requests per second and max open connections for an API"""
    awaitables = []
    request_datetimes = []
    loop = asyncio.get_event_loop()
    connections = 0
    for arg in args:
        while (
            requests_per_second(request_datetimes) > rate_limit or connections >= max_connections
        ):
            await asyncio.sleep(1 / rate_limit)
            connections = sum([not a.done() for a in awaitables])
        print(f"Requests per second: {requests_per_second(request_datetimes)}")
        request_datetimes.append(dt.datetime.now())
        awaitables.append(loop.create_task(arg))
        connections = sum([not a.done() for a in awaitables])
    await asyncio.gather(*awaitables, return_exceptions=True)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(rate_limited_gather(*[send_request(x) for x in range(10)]))

Example output:

Requests per second: 0
Request   0 at 2023-03-11 10:34:49.348671
Requests per second: 49.696849219759464
Request   1 at 2023-03-11 10:34:49.368800
Requests per second: 49.69931911932807
Request   2 at 2023-03-11 10:34:49.388930
Requests per second: 49.69931911932807
Request   3 at 2023-03-11 10:34:49.409057
Requests per second: 49.72403162448411
Request   4 at 2023-03-11 10:34:49.429170
Response  0 at 2023-03-11 10:34:49.449260
Requests per second: 49.691910157026435
Request   5 at 2023-03-11 10:34:49.449298
Response  1 at 2023-03-11 10:34:49.469389
Requests per second: 49.68450340338848
Request   6 at 2023-03-11 10:34:49.469436
Response  2 at 2023-03-11 10:34:49.489529
Requests per second: 49.67956679417755
Request   7 at 2023-03-11 10:34:49.489566
Requests per second: 49.73392350922564
Request   8 at 2023-03-11 10:34:49.509682
Requests per second: 49.7116723006562
Request   9 at 2023-03-11 10:34:49.529858
Response  6 at 2023-03-11 10:34:49.569973
Response  7 at 2023-03-11 10:34:49.590072
Response  3 at 2023-03-11 10:34:49.609170
Response  4 at 2023-03-11 10:34:49.629267
Response  5 at 2023-03-11 10:34:49.650361
Response  8 at 2023-03-11 10:34:49.710456
Response  9 at 2023-03-11 10:34:49.730560

Comments

0

Here is my solution where we want to throttle "API requests". In this demo the "requests" are just print statements and we want to make sure that we can execute no more than 5 requests in any 10 second period. If the initializer to the AsyncRateLimiter class specifies evenly_space=True, then we treat this as 1 request every 2 seconds, which forces a minimum separation of 2 seconds between successive requests.

from collections import deque
import time
import asyncio

class AsyncRateLimiter:
    def __init__(self, rate_limit, period, evenly_space=False):
        """
        We will throttle "requests" so that in period seconds we
        will not exceed rate_limit requests. If evenly_space is set to True,
        then requests will be made no more frequently than one every
        period / rate_limit seconds."""

        self._rate_limit = int(rate_limit)
        if self._rate_limit != rate_limit or rate_limit < 1:
            raise ValueError('rate_limit must be an integer >= 1')
        if period <= 0.0:
            raise ValueError('period must be > 0.0')
        if evenly_space and self._rate_limit != 1:
            self._period = period / self._rate_limit
            self._rate_limit = 1
        else:
            self._period = float(period)
        self._called_timestamps = deque()
        self._lock = asyncio.Lock()

    async def throttle(self):
        async with self._lock:
            # The length of self._called_timestamps <= self._rate_limit
            while True:
                now = time.monotonic()
                while self._called_timestamps:
                    time_left = self._called_timestamps[0] + self._period - now
                    if time_left > 0:
                        break
                    self._called_timestamps.popleft()
                if len(self._called_timestamps) < self._rate_limit:
                    break
                # The length of self._called_timestamps == self._rate_limit
                # and we were unable to pop any timestamps:
                await asyncio.sleep(time_left)
            self._called_timestamps.append(now)

    async def __aenter__(self):
        await self.throttle()
        return None

    async def __aexit__(self, exc_type, esc, tb):
        return None


if __name__ == '__main__':
    async def main():
        print('burst mode allowed:')
        rate_limiter = AsyncRateLimiter(5, 10, False)
        ref = time.time()

        for _ in range(15):
            async with rate_limiter:
                print(time.time() - ref)
                time.sleep(.1)

        print()

        print('burst mode not allowed:')
        rate_limiter = AsyncRateLimiter(5, 10, True)
        ref = time.time()

        for _ in range(15):
            async with rate_limiter:
                print(time.time() - ref)
                time.sleep(.1)

    asyncio.run(main())

Prints:

burst mode allowed:
0.0
0.10148024559020996
0.20262718200683594
0.30342888832092285
0.4049217700958252
10.00919795036316
10.113298177719116
10.21420693397522
10.314610004425049
10.416192770004272
20.019639253616333
20.12737250328064
20.228044509887695
20.329019784927368
20.430496215820312

burst mode not allowed:
0.0
2.017353057861328
4.047292709350586
6.056902885437012
8.057133197784424
10.056851625442505
12.062262535095215
14.057326078414917
16.057450771331787
18.05731225013733
20.06695556640625
22.077364683151245
24.076985597610474
26.082133293151855
28.107062339782715

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.