Source code for naz.broker

import abc
import asyncio

from . import protocol


[docs]class BaseBroker(abc.ABC): """ This is the interface that must be implemented to satisfy naz's broker. User implementations should inherit this class and implement the :func:`enqueue <BaseBroker.enqueue>` and :func:`dequeue <BaseBroker.dequeue>` methods with the type signatures shown. naz calls an implementation of this class to enqueue and/or dequeue an item. """
[docs] @abc.abstractmethod async def enqueue(self, message: protocol.Message) -> None: """ enqueue/save an item. Parameters: message: The item to be enqueued/saved The message is a `naz.protocol.Message` class instance; It is up to the broker implementation to do the serialization(if neccesary) in order to be able to store it. `naz.protocol.Message` has a `to_json()` method that you can use to serialize a `naz.protocol.Message` class instance into json. """ raise NotImplementedError("enqueue method must be implemented.")
[docs] @abc.abstractmethod async def dequeue(self) -> protocol.Message: """ dequeue an item. Returns: item that was dequeued. The item has to be returned as a `naz.protocol.Message` class instance. It is up to the broker implementation to do the de-serialization(if neccesary). `naz.protocol` module has a utility function :func:`json_to_Message <naz.protocol.json_to_Message>` that you can use to de-serialize a json string into `naz.protocol.Message` class instance. """ raise NotImplementedError("dequeue method must be implemented.")
[docs]class SimpleBroker(BaseBroker): """ This is an in-memory implementation of BaseBroker. Note: It should only be used for tests and demo purposes. """
[docs] def __init__(self, maxsize: int = 2500) -> None: """ Parameters: maxsize: the maximum number of items(not size) that can be put in the queue. """ if not isinstance(maxsize, int): raise ValueError( "`maxsize` should be of type:: `int` You entered: {0}".format(type(maxsize)) ) self.queue: asyncio.queues.Queue = asyncio.Queue(maxsize=maxsize)
[docs] async def enqueue(self, message: protocol.Message) -> None: self.queue.put_nowait(message)
[docs] async def dequeue(self) -> protocol.Message: return await self.queue.get()