Source code for naz.throttle

import abc
import time
import typing
import logging

from . import log


[docs]class BaseThrottleHandler(abc.ABC): """ This is the interface that must be implemented to satisfy naz's throttle handling. User implementations should inherit this class and implement the :func:`throttled <BaseThrottleHandler.throttled>`, :func:`not_throttled <BaseThrottleHandler.not_throttled>`, :func:`allow_request <BaseThrottleHandler.allow_request>` and :func:`throttle_delay <BaseThrottleHandler.throttle_delay>` methods with the type signatures shown. When an SMPP client exceeds it's rate limit, or when the SMSC is under load or for whatever reason; The SMSC may decide to start throtlling requests from that particular client. When it does so, it replies to the client with a throttling status. Under such conditions, it is important for the client to start rate limiting itself. The way naz implements this self imposed self-regulation is via Throttle Handlers. The methods in this class are also called when the SMSC is under load and is responding with `ESME_RMSGQFUL`(message queue full) responses """
[docs] @abc.abstractmethod async def throttled(self) -> None: """ this method will be called by naz everytime we get a throttling response from SMSC. """ raise NotImplementedError("throttled method must be implemented.")
[docs] @abc.abstractmethod async def not_throttled(self) -> None: """ this method will be called by naz everytime we get any response from SMSC that is not a throttling response. """ raise NotImplementedError("not_throttled method must be implemented.")
[docs] @abc.abstractmethod async def allow_request(self) -> bool: """ this method will be called by naz just before sending a request to SMSC. The response from this method will determine wether naz will send the request or not. """ raise NotImplementedError("allow_request method must be implemented.")
[docs] @abc.abstractmethod async def throttle_delay(self) -> float: """ if the last :func:`allow_request <BaseThrottleHandler.allow_request>` method call returned False(thus denying sending a request), naz will call the throttle_delay method to determine how long in seconds to wait before calling allow_request again. """ raise NotImplementedError("throttle_delay method must be implemented.")
[docs]class SimpleThrottleHandler(BaseThrottleHandler): """ This is an implementation of BaseThrottleHandler. It works by: - calculating the percentage of responses from the SMSC that are THROTTLING(or ESME_RMSGQFUL) responses. - if that percentage goes above :attr:`deny_request_at <SimpleThrottleHandler.deny_request_at>` percent AND \ total number of responses from SMSC is greater than :attr:`sample_size <SimpleThrottleHandler.sample_size>` over \ :attr:`sampling_period <SimpleThrottleHandler.sampling_period>` seconds - then deny making anymore requests to SMSC """
[docs] def __init__( self, sampling_period: float = 180.00, sample_size: float = 50.00, deny_request_at: float = 1.00, throttle_wait: float = 3.00, logger: typing.Union[None, logging.Logger] = None, ) -> None: """ Parameters: sampling_period: the duration in seconds over which we will calculate the percentage of throttled responses. sample_size: the minimum number of responses we should have got from SMSC over :sampling_period duration to enable us make a decision. deny_request_at: the percent of throtlled responses above which we will deny naz from sending more requests to SMSC. throttle_wait: the time in seconds to wait before calling allow_request after the last allow_request that returned False. """ if not isinstance(sampling_period, float): raise ValueError( "`sampling_period` should be of type:: `float` You entered: {0}".format( type(sampling_period) ) ) if not isinstance(sample_size, float): raise ValueError( "`sample_size` should be of type:: `float` You entered: {0}".format( type(sample_size) ) ) if not isinstance(deny_request_at, float): raise ValueError( "`deny_request_at` should be of type:: `float` You entered: {0}".format( type(deny_request_at) ) ) if not isinstance(throttle_wait, float): raise ValueError( "`throttle_wait` should be of type:: `float` You entered: {0}".format( type(throttle_wait) ) ) if not isinstance(logger, (type(None), logging.Logger)): raise ValueError( "`logger` should be of type:: `None` or `logging.Logger` You entered: {0}".format( type(logger) ) ) self.NON_throttle_responses: int = 0 self.throttle_responses: int = 0 self.updated_at: float = time.monotonic() self.sampling_period: float = sampling_period self.sample_size: float = sample_size self.deny_request_at: float = deny_request_at self.throttle_wait: float = throttle_wait if logger is not None: self.logger = logger else: self.logger = log.SimpleLogger("naz.SimpleThrottleHandler")
@property def percent_throttles(self) -> float: total_smsc_responses: int = self.NON_throttle_responses + self.throttle_responses if total_smsc_responses < self.sample_size: # we do not have enough data to make decision, so asssume happy case return 0.0 return round((self.throttle_responses / (total_smsc_responses)) * 100, 2)
[docs] async def allow_request(self) -> bool: self.logger.log( logging.DEBUG, {"event": "naz.SimpleThrottleHandler.allow_request", "stage": "start"} ) # calculat percentage of throttles before resetting NON_throttle_responses and throttle_responses current_percent_throttles: float = self.percent_throttles _throttle_responses: int = self.throttle_responses _NON_throttle_responses: int = self.NON_throttle_responses now: float = time.monotonic() time_since_update: float = now - self.updated_at if time_since_update > self.sampling_period: # we are only interested in percent throttles in buckets of self.sampling_period seconds. # so reset values after self.sampling_period seconds. self.NON_throttle_responses = 0 self.throttle_responses = 0 self.updated_at = now if current_percent_throttles > self.deny_request_at: self.logger.log( logging.INFO, { "event": "naz.SimpleThrottleHandler.allow_request", "stage": "end", "percent_throttles": current_percent_throttles, "throttle_responses": _throttle_responses, "NON_throttle_responses": _NON_throttle_responses, "sampling_period": self.sampling_period, "sample_size": self.sample_size, "deny_request_at": self.deny_request_at, "state": "deny_request", }, ) return False self.logger.log( logging.DEBUG, { "event": "naz.SimpleThrottleHandler.allow_request", "stage": "end", "percent_throttles": current_percent_throttles, "throttle_responses": _throttle_responses, "NON_throttle_responses": _NON_throttle_responses, "sampling_period": self.sampling_period, "sample_size": self.sample_size, "deny_request_at": self.deny_request_at, "state": "allow_request", }, ) return True
[docs] async def not_throttled(self) -> None: self.NON_throttle_responses += 1
[docs] async def throttled(self) -> None: self.throttle_responses += 1
[docs] async def throttle_delay(self) -> float: # todo: sleep in an exponetial manner upto a maximum then wrap around. return self.throttle_wait