ZeroMQ – Connecting computers via message queue

The goal

ZeroMQ is a simple to implement way to transfer data between server & client application. It is available for a vast range of programming languages. Including Python PyZMQ. It has a lot of options, including broadcasting. I will only present a very simple example because most of it we don’t need.

Questions to David Rotermund

Commands

   
zmq.Context Create a zmq Context
zmq.Context.socket Create a Socket associated with this Context.
zmq.Socket The ZMQ socket object
zmq.Socket.bind Bind the socket to an address.
zmq.Socket.connect Connect to a remote 0MQ socket.
zmq.Socket.recv Receive a message
zmq.Socket.recv_json Receive a Python object as a message using json to serialize.
zmq.Socket.recv_pyobj Receive a Python object as a message using pickle to serialize.
zmq.Socket.recv_string Receive a unicode string, as sent by send_string.
zmq.Socket.recv_multipart Receive a multipart message as a list of bytes or Frame objects
zmq.Socket.send Send a single zmq message frame on this socket.
zmq.Socket.send_json Send a Python object as a message using json to serialize.
zmq.Socket.send_pyobj Send a Python object as a message using pickle to serialize.
zmq.Socket.send_string Send a Python unicode string as a message with an encoding.
zmq.Socket.send_multipart Send a sequence of buffers as a multipart message.

Socket Types

There are many socket types. I will only quote the two relevant ones for this example from the API documentation:

ZMQ_REQ

A socket of type ZMQ_REQ is used by a client to send requests to and receive replies from a service. This socket type allows only an alternating sequence of zmq_send(request) and subsequent zmq_recv(reply) calls. Each request sent is round-robined among all services, and each reply received is matched with the last issued request.

If no services are available, then any send operation on the socket shall block until at least one service becomes available. The REQ socket shall not discard messages.

Summary of ZMQ_REQ characteristics

   
Compatible peer sockets ZMQ_REP, ZMQ_ROUTER
Direction Bidirectional
Send/receive pattern Send, Receive, Send, Receive, …
Outgoing routing strategy Round-robin
Incoming routing strategy Last peer
Action in mute state Block

ZMQ_REP

A socket of type ZMQ_REP is used by a service to receive requests from and send replies to a client. This socket type allows only an alternating sequence of zmq_recv(request) and subsequent zmq_send(reply) calls. Each request received is fair-queued from among all clients, and each reply sent is routed to the client that issued the last request. If the original requester does not exist any more the reply is silently discarded.

Summary of ZMQ_REP characteristics

   
Compatible peer sockets ZMQ_REQ, ZMQ_DEALER
Direction Bidirectional
Send/receive pattern Receive, Send, Receive, Send, …
Incoming routing strategy Fair-queued
Outgoing routing strategy Last peer

Simple example where we move np.ndarrays around

Client

import zmq
import numpy as np

context = zmq.Context()

socket = context.socket(zmq.REQ)
# We connect to the localhost at port 5555
socket.connect("tcp://localhost:5555")

# Generating a random np matrix
rng = np.random.default_rng()
np_array = rng.random((100, 10))

# Send it over the wire
socket.send_pyobj(np_array)

# We get the squared version back 
np_array_return = socket.recv_pyobj()

# Check if we get a correct return message 
print(type(np_array_return))

np_array = np_array**2

print("Difference:")
print(np.sum(np.abs(np_array_return - np_array)))

Output:

<class 'numpy.ndarray'>
Difference:
0.0

Server

import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
# Listen at port 5555
socket.bind("tcp://*:5555")

while True:
    # We should receive a numpy array
    input_np = socket.recv_pyobj()

    # Let's do something with this information
    print(type(input_np))
    print(input_np.shape)
    input_np = input_np**2

    # We send the results back
    socket.send_pyobj(input_np)

Output:

<class 'numpy.ndarray'>
(100, 10)

Non-blocking

There is this option to receive packages in a non blocking way:

    confirm = False
    while not confirm:
        try:
            packet_in = socket.recv_pyobj(zmq.NOBLOCK)
            confirm = True
        except zmq.error.Again:
            pass

The source code is Open Source and can be found on GitHub.