Source code for naz.correlater

import abc
import time
import typing


[docs]class BaseCorrelater(abc.ABC): """ Interface that must be implemented to satisfy naz's Correlater. User implementations should inherit this class and implement the :func:`get <BaseCorrelater.get>` and :func:`put <BaseCorrelater.put>` methods with the type signatures shown. A Correlater is class that naz uses to store relations between SMPP sequence numbers and user applications' log_id's and/or hook_metadata. | Note: This correlation is on a BEST effort basis; it is not guaranteed to be reliable. | One reason, among others, is that the SMPP specifiation mandates sequence numbers to wrap around after ≈ 2billion. | Another reason is that we use receipted_message_id tag from deliver_sm to correlate a delivery notification with a submit_sm_resp. However receipted_message_id is an optional parameter that SMSC may omit. """
[docs] @abc.abstractmethod async def put( self, smpp_command: str, sequence_number: int, log_id: str, hook_metadata: str, smsc_message_id: typing.Union[None, str] = None, ) -> None: """ called by naz to put/store the correlation of a given SMPP sequence number to log_id and/or hook_metadata. Parameters: smpp_command: any one of the SMSC commands eg submit_sm sequence_number: SMPP sequence_number log_id: an ID that a user's application had previously supplied to naz to track/correlate different messages. hook_metadata: a string that a user's application had previously supplied to naz that it may want to be correlated with the log_id. smsc_message_id: a unique identifier of a particular message on the SMSC. It comes from SMSC """ raise NotImplementedError("put method must be implemented.")
[docs] @abc.abstractmethod async def get( self, smpp_command: str, sequence_number: int, smsc_message_id: typing.Union[None, str] = None, ) -> typing.Tuple[str, str]: """ called by naz to get the correlation of a given SMPP sequence number to log_id and/or hook_metadata. Parameters: smpp_command: any one of the SMSC commands eg submit_sm sequence_number: SMPP sequence_number smsc_message_id: a unique identifier of a particular message on the SMSC. It comes from SMSC Returns: log_id and hook_metadata """ raise NotImplementedError("get method must be implemented.")
[docs]class SimpleCorrelater(BaseCorrelater): """ A simple implementation of BaseCorrelater. It stores the correlation/relation between a given SMPP sequence_number(and/or smsc_message_id) and a user supplied log_id and/or hook_metadata. SimpleCorrelater also features an auto-expiration of dictionary keys(and their values) based on time. The storage is done in memory using a python dictionary. The storage looks like: .. highlight:: python .. code-block:: python { "sequence_number1": { "log_id": "log_id1", "hook_metadata": "hook_metadata1", "stored_at": 681.109023565 }, "smsc_message_id1": { "log_id": "log_id1", "hook_metadata": "hook_metadata1", "stored_at": 681.109023565 }, "sequence_number1": { "log_id": "log_id2", "hook_metadata": "hook_metadata2", "stored_at": 682.109023565 } ... } """
[docs] def __init__(self, max_ttl: float = 15.00) -> None: """ Parameters: max_ttl: The time in seconds that an item is going to be stored. After the expiration of max_ttl seconds, that item will be deleted. """ if not isinstance(max_ttl, float): raise ValueError( "`max_ttl` should be of type:: `float` You entered: {0}".format(type(max_ttl)) ) self.store: dict = {} self.max_ttl: float = max_ttl
[docs] async def put( self, smpp_command: str, sequence_number: int, log_id: str, hook_metadata: str, smsc_message_id: typing.Union[None, str] = None, ) -> None: stored_at = time.monotonic() if smpp_command == "submit_sm_resp": # TODO: dict with smsc_message_id should replace dict with corresponding sequence_number # currently we are not deduping data; we should self.store[smsc_message_id] = { "log_id": log_id, "hook_metadata": hook_metadata, "stored_at": stored_at, } else: self.store[sequence_number] = { "log_id": log_id, "hook_metadata": hook_metadata, "stored_at": stored_at, } # garbage collect await self._delete_after_ttl()
[docs] async def get( self, smpp_command: str, sequence_number: int, smsc_message_id: typing.Union[None, str] = None, ) -> typing.Tuple[str, str]: if smpp_command == "deliver_sm": item = self.store.get(smsc_message_id) else: item = self.store.get(sequence_number) if not item: # garbage collect await self._delete_after_ttl() return "", "" # garbage collect await self._delete_after_ttl() return item["log_id"], item["hook_metadata"]
async def _delete_after_ttl(self) -> None: """ iterate over all stored items and delete any that are older than self.max_ttl seconds """ now = time.monotonic() for key in list(self.store.keys()): stored_at = self.store[key]["stored_at"] time_diff = now - stored_at if time_diff > self.max_ttl: del self.store[key]