Drip bucket limiter python code example

Example 1: Drip bucket limiter python

import asyncio
import contextlib
import collections
import time

from types import TracebackType
from typing import Dict, Optional, Type

try:  # Python 3.7
    base = contextlib.AbstractAsyncContextManager
    _current_task = asyncio.current_task
except AttributeError:
    base = object  # type: ignore
    _current_task = asyncio.Task.current_task  # type: ignore

class AsyncLeakyBucket(base):
    """A leaky bucket rate limiter.

    Allows up to max_rate / time_period acquisitions before blocking.

    time_period is measured in seconds; the default is 60.

    """
    def __init__(
        self,
        max_rate: float,
        time_period: float = 60,
        loop: Optional[asyncio.AbstractEventLoop] = None
    ) -> None:
        self._loop = loop
        self._max_level = max_rate
        self._rate_per_sec = max_rate / time_period
        self._level = 0.0
        self._last_check = 0.0
        # queue of waiting futures to signal capacity to
        self._waiters: Dict[asyncio.Task, asyncio.Future] = collections.OrderedDict()

    def _leak(self) -> None:
        """Drip out capacity from the bucket."""
        if self._level:
            # drip out enough level for the elapsed time since
            # we last checked
            elapsed = time.time() - self._last_check
            decrement = elapsed * self._rate_per_sec
            self._level = max(self._level - decrement, 0)
        self._last_check = time.time()

    def has_capacity(self, amount: float = 1) -> bool:
        """Check if there is enough space remaining in the bucket"""
        self._leak()
        requested = self._level + amount
        # if there are tasks waiting for capacity, signal to the first
        # there there may be some now (they won't wake up until this task
        # yields with an await)
        if requested < self._max_level:
            for fut in self._waiters.values():
                if not fut.done():
                    fut.set_result(True)
                    break
        return self._level + amount <= self._max_level

    async def acquire(self, amount: float = 1) -> None:
        """Acquire space in the bucket.

        If the bucket is full, block until there is space.

        """
        if amount > self._max_level:
            raise ValueError("Can't acquire more than the bucket capacity")

        loop = self._loop or asyncio.get_event_loop()
        task = _current_task(loop)
        assert task is not None
        while not self.has_capacity(amount):
            # wait for the next drip to have left the bucket
            # add a future to the _waiters map to be notified
            # 'early' if capacity has come up
            fut = loop.create_future()
            self._waiters[task] = fut
            try:
                await asyncio.wait_for(
                    asyncio.shield(fut),
                    1 / self._rate_per_sec * amount,
                    loop=loop
                )
            except asyncio.TimeoutError:
                pass
            fut.cancel()
        self._waiters.pop(task, None)

        self._level += amount

        return None

    async def __aenter__(self) -> None:
        await self.acquire()
        return None

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        tb: Optional[TracebackType]
    ) -> None:
        return None

Example 2: Drip bucket limiter python

import asyncio
from threading import Lock

class PTBNL:

    def __init__(self):
        self._req_id_seq = 0
        self._futures = {}
        self._results = {}
        self.token_bucket = TokenBucket()
        self.token_bucket.set_rate(100)

    def run(self, *awaitables):

        loop = asyncio.get_event_loop()

        if not awaitables:
            loop.run_forever()
        elif len(awaitables) == 1:
            return loop.run_until_complete(*awaitables)
        else:
            future = asyncio.gather(*awaitables)
            return loop.run_until_complete(future)

    def sleep(self, secs) -> True:

        self.run(asyncio.sleep(secs))
        return True

    def get_req_id(self) -> int:

        new_id = self._req_id_seq
        self._req_id_seq += 1
        return new_id

    def start_req(self, key):

        loop = asyncio.get_event_loop()
        future = loop.create_future()
        self._futures[key] = future
        return future

    def end_req(self, key, result=None):

        future = self._futures.pop(key, None)
        if future:
            if result is None:
                result = self._results.pop(key, [])
            if not future.done():
                future.set_result(result)

    def req_data(self, req_id, obj):
        # Do Some Work Here
        self.req_data_end(req_id)
        pass

    def req_data_end(self, req_id):
        print(req_id, " has ended")
        self.end_req(req_id)

    async def req_data_async(self, obj):

        req_id = self.get_req_id()
        future = self.start_req(req_id)

        self.req_data(req_id, obj)

        await future
        return future.result()

    async def req_data_batch_async(self, contracts):

        futures = []
        FLAG = False

        for contract in contracts:
            req_id = self.get_req_id()
            future = self.start_req(req_id)
            futures.append(future)

            nap = self.token_bucket.consume(1)

            if FLAG is False:
                FLAG = True
                start = asyncio.get_event_loop().time()

            asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract)

        await asyncio.gather(*futures)
        elapsed = asyncio.get_event_loop().time() - start

        return futures, len(contracts)/elapsed

class TokenBucket:

    def __init__(self):
        self.tokens = 0
        self.rate = 0
        self.last = asyncio.get_event_loop().time()
        self.lock = Lock()

    def set_rate(self, rate):
        with self.lock:
            self.rate = rate
            self.tokens = self.rate

    def consume(self, tokens):
        with self.lock:
            if not self.rate:
                return 0

            now = asyncio.get_event_loop().time()
            lapse = now - self.last
            self.last = now
            self.tokens += lapse * self.rate

            if self.tokens > self.rate:
                self.tokens = self.rate

            self.tokens -= tokens

            if self.tokens >= 0:
                return 0
            else:
                return -self.tokens / self.rate


if __name__ == '__main__':

    asyncio.get_event_loop().set_debug(True)
    app = PTBNL()

    objs = [obj for obj in range(500)]

    l,t = app.run(app.req_data_batch_async(objs))

    print(l)
    print(t)

Tags:

Misc Example