Usageο
Installationο
To use EasyPubSub, first install it using pip:
$ pip install easypubsub
Starting a Proxyο
EasyPubSub uses a single, central Proxy
as an intermediary between publishers and subscribers.
A proxy is not required by the underlying library (ZeroMQ), but it is for EasyPubSub. This is to simplify setup and usage for users.
The job of the proxy is to forward messages between publishers and subscribers, without them having to directly connect with each other.
1import time
2
3from easypubsub.proxy import Proxy
4
5PUBLISHERS_ADDRESS = "tcp://127.0.0.1:5555"
6SUBSCRIBERS_ADDRESS = "tcp://127.0.0.1:5556"
7
8proxy = Proxy(PUBLISHERS_ADDRESS, SUBSCRIBERS_ADDRESS)
9proxy.launch()
10
11try:
12 while True:
13 time.sleep(0.1)
14except KeyboardInterrupt:
15 proxy.stop()
The proxy will accept connections from publishers on PUBLISHERS_ADDRESS
and from subscribers on SUBSCRIBERS_ADDRESS
.
Publishingο
An EasyPubSub Publisher
is used to publish data over the network. In the example below, a publisher called
lottery
, publishes random numbers to the topic winning_number
every ten seconds. The publishing can happen regardless of the presence
of any subscribers, or even of the proxy. Once the proxy is running, the publisher will establish connection and push data to the proxy.
If any connection is lost (for example because the proxy is restarted), the publisher will reconnect automatically.
Note
Since the connection is established asynchronously, data is not guaranteed to be sent to the proxy. Messages could be lost before a connection is established. For more details, see ZeroMQβs documentation.
1import random
2import time
3
4from easypubsub.publisher import Publisher
5
6PUBLISHERS_ADDRESS = "tcp://127.0.0.1:5555"
7PUBLISH_INTERVAL = 10 # seconds.
8
9publisher = Publisher("lottery", PUBLISHERS_ADDRESS, default_topic="winning_number")
10try:
11 while True:
12 publisher.publish(message=random.randint(1, 100))
13 time.sleep(PUBLISH_INTERVAL)
14except KeyboardInterrupt:
15 pass
While in this case the publisher is sending a simple integer as message, the message can be any Python object that can be pickled. For example lists, dictionaries, numpy arrays, etc.
Publishing using the publish_this
decoratorο
Often, one wants to publish the result of a function every time that function is called.
EasyPubSub provides a decorator, publish_this
, that can be used to automatically publish the result of a function call.
1import random
2import time
3
4from easypubsub.decorator import publish_this
5
6PUBLISHERS_ADDRESS = "tcp://127.0.0.1:5555"
7PUBLISH_INTERVAL = 10 # seconds.
8
9
10@publish_this(name="lottery", topic="winning_number", address=PUBLISHERS_ADDRESS)
11def my_random_number_generator():
12 """Generate a random number."""
13 return random.randint(1, 100)
14
15
16try:
17 while True:
18 my_random_number_generator()
19 time.sleep(PUBLISH_INTERVAL)
20except KeyboardInterrupt:
21 pass
In the example above, every time my_random_number_generator
is called, the result is published to the topic lottery.winning_number
.
A Publisher
is built and managed automatically by the decorator.
Subscribingο
As you can imagine, subscribing is very similar to publishing! In the example below, a Subscriber
called
lottery_player
subscribes any topic available (by omitting the topics
argument).
1import time
2
3from easypubsub.subscriber import Subscriber
4
5SUBSCRIBERS_ADDRESS = "tcp://127.0.0.1:5556"
6subscriber = Subscriber("lottery_player", SUBSCRIBERS_ADDRESS)
7
8try:
9 while True:
10 result = subscriber.receive()
11 if len(result) > 0:
12 print("Received:")
13 for topic, message in result:
14 print(f"{topic}: {message}")
15 else:
16 # No message received.
17 time.sleep(1.0)
18except KeyboardInterrupt:
19 pass
When calling receive()
, a list of publications is returned (all the ones collected since the last call).
Each publication is a tuple of the form (topic, message)
, for example ("lottery.winning_number", 42)
.
EasyPubSub over a LANο
So far EasyPubSub has only been tested to connect python instances in a single machine (localhost), but as long as the network is configured correctly (port forwarding etc.), it should be possible to connect to other machines.