import pickle
from typing import Any, List, Tuple, Union
import zmq
from easypubsub.logging import getLogger
[docs]class Subscriber:
"""The EasyPubSub Subscriber provides an interface to subscribe to one or more topics.
Attributes:
name (str): The name of the subscriber. This is used for logging purposes.
proxy_subscribers_address (str): The address that the subscriber will use to
connect to the :obj:`~easypubsub.proxy.Proxy`.
topics (str | List[str]): The topics to subscribe to. If not specified, the subscriber
will subscribe to all topics. If specified, it can be a string or a list of strings.
receive_timeout (float): The timeout for receiving messages in the :meth:`receive` method.
Example:
>>> from easypubsub.subscriber import Subscriber
>>> subscriber = Subscriber("my_subscriber", "tcp://127.0.0.1:5556")
>>> subscriber.receive()
[("my_publisher", "Hello world!"), ("my_publisher.my_topic", "Hello again, world.")]
"""
def __init__(
self,
name: str,
proxy_subscribers_address: str,
topics: Union[str, List[str]] = "",
receive_timeout: float = 0.1,
) -> None:
self.name = name
self.subscribers_address = proxy_subscribers_address
self.receive_timeout_ms = round(receive_timeout * 1000)
if topics == "":
self.topics = []
elif isinstance(topics, str):
self.topics = [topics.encode("utf-8")]
else:
self.topics = [topic.encode("utf-8") for topic in topics]
self._logger = getLogger(f"EasyPubSub.Subscriber({name})")
self._connect()
def __del__(self) -> None:
self.poller.unregister(self.socket)
def _connect(self) -> None:
self.ctx = zmq.Context.instance()
self.socket = self.ctx.socket(zmq.SUB)
self._logger.info(f"Connecting to {self.subscribers_address}.")
self.socket.connect(self.subscribers_address)
if len(self.topics) > 0:
for topic in self.topics:
self._logger.info(f"Subscribing to {topic.decode('utf-8')}.")
self.socket.setsockopt(zmq.SUBSCRIBE, topic)
else:
self._logger.info("Subscribing to all topics.")
self.socket.setsockopt(zmq.SUBSCRIBE, b"")
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
[docs] def receive(self) -> List[Tuple[str, Any]]:
"""Receive one or more messages from the subscriptions.
Returns:
List[Tuple[str, Any]]: A list of tuples, each containing the topic and the message.
Example:
>>> subscriber.receive()
[("my_publisher", "Hello world!"), ("my_publisher.my_topic", "Hello again, world.")]
"""
messages: List[Any] = []
messages_available = True
while messages_available:
sockets = dict(self.poller.poll(self.receive_timeout_ms))
if self.socket in sockets:
topic, message = self.socket.recv_multipart()
messages.append((topic.decode("utf-8"), pickle.loads(message)))
else:
messages_available = False
return messages