easypubsub package

proxy module

class easypubsub.proxy.Proxy(publishers_address: str, subscribers_address: str)[source]

The EasyPubSub Proxy acts as an intermediary between Publishers and Subscribers.

publishers_address

The address that publisher will use to connect to the Proxy.

Type

str

subscribers_address

The address that subscribers will use to connect to the Proxy.

Type

str

Example

>>> from easypubsub.proxy import Proxy
>>> proxy = Proxy("tcp://localhost:5555", "tcp://localhost:5556")
>>> proxy.launch()
...
>>> proxy.stop()
launch() None[source]

Launch the Proxy.

This method will launch the Proxy in a separate thread, and return immediately. To stop the Proxy, call the Proxy.stop() method.

stop(timeout=5.0) None[source]

Stop the Proxy thread.

Parameters
  • timeout (float) – The maximum time, in seconds, to wait for the Proxy to stop.

  • time (If the Proxy does not stop cleanly within this) –

  • terminated (it will be) –

  • forcefully.

publisher module

class easypubsub.publisher.Publisher(name: str, proxy_publishers_address: str, default_topic: str = '')[source]

The EasyPubSub Publisher provides an interface to publish messages to a topic.

name

The name of the publisher. This will be used as a prefix to all topics used by this publisher.

Type

str

proxy_publishers_address

The address that the publisher will use to connect to the Proxy.

Type

str

default_topic

The default topic to use when publishing messages. By default, it uses “” (empty string), which means that the messages will be published using name as the topic .

Type

str

Example

>>> from easypubsub.publisher import Publisher
>>> publisher = Publisher("my_publisher", "tcp://127.0.0.1:5555")
>>> publisher.publish("Hello world!")
This message will be published to the topic "my_publisher"
>>> publisher.publish("Hello again, world.", "my_topic")
This message will be published to the topic "my_publisher.my_topic"
publish(message: Any, topic: Optional[str] = None) None[source]

Publish a message to a topic.

Parameters
  • message (Any) – The message to publish. This can be any type of data that can be pickled.

  • topic (Optional[str]) – The topic to publish the message to. If not specified, the default_topic will be used.

subscriber module

class easypubsub.subscriber.Subscriber(name: str, proxy_subscribers_address: str, topics: Union[str, List[str]] = '', receive_timeout: float = 0.1)[source]

The EasyPubSub Subscriber provides an interface to subscribe to one or more topics.

name

The name of the subscriber. This is used for logging purposes.

Type

str

proxy_subscribers_address

The address that the subscriber will use to connect to the Proxy.

Type

str

topics

The topics to subscribe to. If not specified, the subscriber will subscribe to all topics. If specified, it can be a string or a list of strings.

Type

str | List[str]

receive_timeout

The timeout for receiving messages in the receive() method.

Type

float

Example

>>> from easypubsub.subscriber import Subscriber
>>> subscriber = Subscriber("my_subscriber", "tcp://127.0.0.1:5556")
>>> subscriber.receive()
[("my_publisher", "Hello world!"), ("my_publisher.my_topic", "Hello again, world.")]
receive() List[Tuple[str, Any]][source]

Receive one or more messages from the subscriptions.

Returns

A list of tuples, each containing the topic and the message.

Return type

List[Tuple[str, Any]]

Example

>>> subscriber.receive()
[("my_publisher", "Hello world!"), ("my_publisher.my_topic", "Hello again, world.")]

decorator module

class easypubsub.decorator.publish_this(name: str, topic: str, address: str)[source]

Decorator for publishing values returned by functions. See Publisher for more information about the arguments.

Example

>>> @publish_this(name="lottery", topic="winning_number", address="tcp://localhost:5555")
>>> def my_random_number_generator():
>>>     return random.randint(1, 100)
...
>>> my_random_number_generator()  # This just got published to the topic "lottery.winning_number"