Source code for naz.ratelimiter

import abc
import time
import typing
import asyncio
import logging

from . import log


[docs]class BaseRateLimiter(abc.ABC): """ This is the interface that must be implemented to satisfy naz's rate limiting. User implementations should inherit this class and implement the :func:`limit <BaseRateLimiter.limit>` methods with the type signatures shown. It may be important to control the rate at which the client(naz) sends requests to an SMSC/server. naz lets you do this, by allowing you to specify a custom rate limiter. """
[docs] @abc.abstractmethod async def limit(self) -> None: """ rate limit sending of messages to SMSC. """ raise NotImplementedError("limit method must be implemented.")
[docs]class SimpleRateLimiter(BaseRateLimiter): """ This is an implementation of BaseRateLimiter. It does rate limiting using a `token bucket rate limiting algorithm <https://en.wikipedia.org/wiki/Token_bucket>`_ example usage: .. highlight:: python .. code-block:: python rate_limiter = SimpleRateLimiter(send_rate=10) await rate_limiter.limit() send_messsages() """
[docs] def __init__( self, send_rate: float = 100_000.00, logger: typing.Union[None, logging.Logger] = None ) -> None: """ Parameters: send_rate: the maximum rate, in messages/second, at which naz can send messages to SMSC. """ if not isinstance(send_rate, float): raise ValueError( "`send_rate` should be of type:: `float` You entered: {0}".format(type(send_rate)) ) if not isinstance(logger, (type(None), logging.Logger)): raise ValueError( "`logger` should be of type:: `None` or `logging.Logger` You entered: {0}".format( type(logger) ) ) self.send_rate: float = send_rate self.max_tokens: float = self.send_rate self.tokens: float = self.max_tokens self.delay_for_tokens: float = 1.0 self.updated_at: float = time.monotonic() self.messages_delivered: int = 0 self.effective_send_rate: float = 0.00 if logger is not None: self.logger = logger else: self.logger = log.SimpleLogger("naz.SimpleRateLimiter")
[docs] async def limit(self) -> None: self.logger.log(logging.DEBUG, {"event": "naz.SimpleRateLimiter.limit", "stage": "start"}) while self.tokens < 1: self._add_new_tokens() # todo: sleep in an exponetial manner upto a maximum then wrap around. await asyncio.sleep(self.delay_for_tokens) self.logger.log( logging.DEBUG, { "event": "naz.SimpleRateLimiter.limit", "stage": "end", "state": "limiting rate", "send_rate": self.send_rate, "delay": self.delay_for_tokens, "effective_send_rate": self.effective_send_rate, }, ) self.messages_delivered += 1 self.tokens -= 1
def _add_new_tokens(self) -> None: now = time.monotonic() time_since_update = now - self.updated_at self.effective_send_rate = self.messages_delivered / time_since_update new_tokens = time_since_update * self.send_rate if new_tokens > 1: self.tokens = min(self.tokens + new_tokens, self.max_tokens) self.updated_at = now self.messages_delivered = 0