1

I want to implement an asynchronous token bucket algorithm which manages when to run tasks (co-routines). Simply put, this would control the number of co-routines to run in any given timespan.

I've tried this leaky bucket and semaphores but neither are quite true token bucket algorithms. The goal is to exhaust the bucket (i.e. run co-routines) if there is capacity to do so, otherwise wait just long enough before running the next routine.

So what I've decided to do is to override the semaphore class to control how many co-routines I run in any time period like so:

TokenSemaphore.py

import datetime
from asyncio.locks import Semaphore
import asyncio
import collections
from math import floor


class TokenSemaphore(Semaphore):
    """A Semaphore implementation.
    A semaphore manages an internal counter which is decremented by each
    acquire() call and incremented by each release() call. The counter
    can never go below zero; when acquire() finds that it is zero, it blocks,
    waiting until some other thread calls release().
    Semaphores also support the context management protocol.
    The optional argument gives the initial value for the internal
    counter; it defaults to 1. If the value given is less than 0,
    ValueError is raised.
    """    
    def __init__(self, capacity=1, rate=1, loop=None):
        if capacity < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._capacity = capacity
        self._rate = rate
        self._time_table = collections.deque(maxlen=self._capacity)
        super().__init__(value=capacity, loop=loop) 

    @property
    def capacity(self):
        return self._capacity

    def _wake_up_next(self):
        while self._waiters:
            waiter = self._waiters.popleft()
            if not waiter.done():
                waiter.set_result(None)
                return

    def has_capacity(self):
        if len(self._time_table) < self._capacity:
            self._time_table.append(datetime.datetime.now())
            return True
        tf = datetime.datetime.now()
        delta = (tf - self._time_table[0]).total_seconds()
        if delta < self._rate:
            return False
        else:
            self._time_table.append(tf)
            return True    
    def locked(self):
        """Returns True if semaphore can not be acquired immediately."""
        return self._capacity == 0    

    async def acquire(self):
        """Acquire a semaphore.
        If the internal counter is larger than zero on entry,
        decrement it by one and return True immediately.  If it is
        zero on entry, block, waiting until some other coroutine has
        called release() to make it larger than 0, and then return
        True.
        """
        while not self.has_capacity():
            fut = self._loop.create_future()
            self._waiters.append(fut)
            try:
                await fut
            except:
                # See the similar code in Queue.get.
                fut.cancel()
                if self._capacity > 0 and not fut.cancelled():
                    self._wake_up_next()
                raise
        self._capacity -= 1
        return True    

    async def release(self):
        """Release a semaphore, incrementing the internal counter by one.
        When it was zero on entry and another coroutine is waiting for it to
        become larger than zero again, wake up that coroutine.
        """
        tf = datetime.datetime.now()
        delta = (tf - self._time_table[-1]).total_seconds()
        result = self._rate * floor(delta)
        sleep_time = 1.0/float(self._rate) - result if result < 1.0/float(self._rate) else 0
        await asyncio.sleep(sleep_time)
        tf = datetime.datetime.now()
        delta = (tf - self._time_table[-1]).total_seconds()
        self._capacity += result
        self._wake_up_next()

Note that my release() is async def because I think I need to sleep here if I don't have enough tokens in my bucket. The Semaphore's release is not async def. I have a feeling this is where I'm messing up but I don't know for sure.

To test my implementation, I've written this:

run.py

import asyncio
import aiohttp
import re
import datetime
from TokenSemaphore import TokenSemaphore
SITE = "https://example.com"

async def myWorker(semaphore):
    await semaphore.acquire()
    print("Successfully acquired the semaphore")
    async with aiohttp.ClientSession() as session:
        async with session.get(SITE, verify_ssl=False) as resp:
            print(resp.status, datetime.datetime.now() - ref, semaphore.capacity)
    print("Releasing Semaphore")
    await semaphore.release()   


async def main(loop):
    mySemaphore = TokenSemaphore(capacity=40, rate=2)
    # mySemaphore = asyncio.Semaphore(value=40)
    tasks = [myWorker(mySemaphore) for _ in range(44)]
    await asyncio.wait(tasks)
    print("Main Coroutine") 

ref = datetime.datetime.now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
print("All Workers Completed")
loop.close()

The Issue

So it seems like the TokenSemaphore works but it doesn't exhaust the bucket if their is capacity. My print statement shows the available capacity of the bucket and it shows that it has plenty of capacity (i.e. availability to run more tasks). I cannot understand why my token semaphore isn't running more co-routines when their is sufficient capacity to do so.

$ python run.py 
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
200 0:00:00.177742 20
Releasing Semaphore
200 0:00:00.178944 20
Releasing Semaphore
200 0:00:00.184608 20
Releasing Semaphore
200 0:00:01.103417 20
Releasing Semaphore
200 0:00:01.105539 22
Releasing Semaphore
200 0:00:01.106280 22
Releasing Semaphore
200 0:00:01.106929 22
Releasing Semaphore
200 0:00:01.107701 22
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:01.110719 29
Releasing Semaphore
200 0:00:01.111228 29
Releasing Semaphore
200 0:00:01.111801 29
Releasing Semaphore
200 0:00:01.112366 29
Releasing Semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
200 0:00:01.116581 25
Releasing Semaphore
200 0:00:01.153321 25
Releasing Semaphore
200 0:00:01.155235 25
Releasing Semaphore
200 0:00:01.155791 25
Releasing Semaphore
200 0:00:01.156530 25
Releasing Semaphore
200 0:00:01.157258 25
Releasing Semaphore
200 0:00:01.221712 25
Releasing Semaphore
200 0:00:01.223267 25
Releasing Semaphore
200 0:00:01.223724 25
Releasing Semaphore
200 0:00:01.224246 25
Releasing Semaphore
200 0:00:01.224745 25
Releasing Semaphore
200 0:00:01.228829 25
Releasing Semaphore
200 0:00:04.326125 25
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:04.361430 30
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:04.910990 29
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:05.440614 28
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:05.974999 27
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:06.516174 26
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:07.051482 25
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:07.601656 24
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:08.147306 23
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:08.682823 22
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:09.216370 21
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:09.752510 20
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:10.302981 19
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:10.843989 18
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:11.384492 17
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:11.939925 16
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:12.485116 15
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:13.016098 14
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:13.554884 13
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:14.096828 12
Releasing Semaphore
Main Coroutine
All Workers Completed
1
  • 1
    Okay, I see one error in my self._capacity. I decrement my capacity in acquire() and that affects has_capacity(). So I can fix that and it exhausts my capacity to zero! but this is not quite finished now. The first 40 tasks run just fine. any outstanding tasks have the error Task was destroyed but it is pending!. Commented Aug 12, 2018 at 18:39

1 Answer 1

1

3 problems:

  1. We expect the _time_table to get up to size _capacity but it is decremented in acquire. It would be better maybe to move the change to _time_table out of has_capacity.
  2. In release, result is evaluating to 0, so capacity isn't being increased after the coroutine wakes up. Just increase capacity by 1.
  3. In general you probably want to sleep on acquire rather than release so that you don't end up with a wait at the end of your execution for no reason.

Take a look at this and see if it helps:

class TokenSemaphore(Semaphore):
    """A Semaphore implementation.
    A semaphore manages an internal counter which is decremented by each
    acquire() call and incremented by each release() call. The counter
    can never go below zero; when acquire() finds that it is zero, it blocks,
    waiting until some other thread calls release().
    Semaphores also support the context management protocol.
    The optional argument gives the initial value for the internal
    counter; it defaults to 1. If the value given is less than 0,
    ValueError is raised.
    """
    def __init__(self, capacity=1, rate=1, loop=None):
        if capacity < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._capacity = capacity
        # Tracks of coroutines waiting on acquire.
        self._waiting = 0
        self._rate = rate
        self._time_table = collections.deque(maxlen=self._capacity)
        # Time of last token that was issued.
        self._last_token = None
        super().__init__(value=capacity, loop=loop)

    @property
    def capacity(self):
        return max(self._capacity - self._waiting, 0)

    def locked(self):
        """Returns True if semaphore can not be acquired immediately."""
        return self.capacity == 0

    def _get_sleep_time(self):
        now = datetime.datetime.now()
        token_freq = datetime.timedelta(seconds=(1.0/float(self._rate)))
        if self._last_token is None:
            delta = now - self._time_table[-1]
            sleep_time = token_freq - delta
            self._last_token = now + sleep_time
            return sleep_time.total_seconds()
        elif self._last_token < now:
            self._last_token += token_freq
            return 0
        else:
            self._last_token += token_freq
            return (self._last_token - now).total_seconds()

    async def acquire(self):
        """Acquire a semaphore.
        If the internal counter is larger than zero on entry,
        decrement it by one and return True immediately.  If it is
        zero on entry, block, waiting until some other coroutine has
        called release() to make it larger than 0, and then return
        True.
        """
        print(self._capacity)
        if self.locked():
            self._waiting += 1
            fut = self._loop.create_future()
            self._waiters.append(fut)
            sleep_time = self._get_sleep_time()
            # Schedule the execution.
            await asyncio.sleep(sleep_time)
            try:
                # Wait for the corresponding task that's already executing to
                # finish.
                await fut
            except:
                # See the similar code in Queue.get.
                fut.cancel()
                if self._capacity > 0 and not fut.cancelled():
                    self._wake_up_next()
                raise
            finally:
                self._waiting -= 1
        else:
            self._last_token = None
        self._capacity -= 1
        self._time_table.append(datetime.datetime.now())
        return True

    def _wake_up_next(self):
        while self._waiters:
            waiter = self._waiters.popleft()
            if not waiter.done():
                waiter.set_result(None)
                return

    async def release(self):
        """Release a semaphore, incrementing the internal counter by one.
        When it was zero on entry and another coroutine is waiting for it to
        become larger than zero again, wake up that coroutine.
        """
        self._capacity += 1
        self._wake_up_next()
Sign up to request clarification or add additional context in comments.

6 Comments

This helps but I now think I have an issue where I may need to lock self._capacity. The value of the capacity is getting added too much by each routine I think. 200 0:00:00.201525 0 Releasing Semaphore Successfully acquired the semaphore Successfully acquired the semaphore Successfully acquired the semaphore Successfully acquired the semaphore 200 0:00:00.698236 28 Releasing Semaphore 200 0:00:00.702178 29 Releasing Semaphore 200 0:00:00.704255 31 Releasing Semaphore 200 0:00:00.707803 36 Releasing Semaphore Main Coroutine All Workers Completed
What do you mean by "lock self._capacity"? Synchronization is not required here for updating self._capacity since all coroutines execute in the same thread. The different numbers traced for semaphore.capacity are due to other coroutines completing in-between the completions of the last 4 items.
What's happening that the extra coroutines in the example are running before they should. So my thinking is self._capacity gets +1 but shouldn't just quite yet. Of the 44 tasks in the example, the first 40 tasks runs just fine. the 41st waits just long enough but the 42-44th tasks do not wait long enough.
@torrho, I understand - it's not just rate limiting but also smoothing. I've updated the answer to reflect this and also fix another issue (unnecessary sleep at the end of the executions).
"it's not just rate limiting but also smoothing." Yeah holy cow this was way harder than I anticipated it would be. thank you for explaining the smoothing issue.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.