Python + ZMQ: Operation cannot be accomplished in current state
Q: How can I resolve this?
A: Avoid the known risk of REQ/REP
deadlocking!
While the ZeroMQ is a powerful framework, understanding its internal composition is necessary for robust and reliable distributed systems design and prototyping.
After a closer look, using a common REQ/REP
Formal Communication Pattern may leave ( and does leave ) counter-parties in a mutual dead-lock: where one is expecting the other to do a step, which will be never accomplished, and there is no way to escape from the deadlocked state.
For more illustrated details and FSA-schematic diagram, see this post
Next, a fail-over system has to survive any collisions of its own components. Thus, one has to design well the distributed system state-signalling and avoid as many dependencies on element-FSA-design/stepping/blocking as possible, otherwise, the fail-safe behaviour remains just an illusion.
Always handle resources with care, do not consider components of the ZeroMQ smart-signalling/messaging as any kind of "expendable disposables", doing so might be tolerated in scholar examples, not in production system environments. You still have to pay the costs ( time, resources allocations / de-allocations / garbage-collection(s) ). As noted in comments, never let resources creation/allocation without a due control. while True: .socket(); .bind(); .send();
is brutally wrong in principle and deteriorating the rest of the design.
Implement the lazy pirate pattern. Create a new socket from your context when an error is caught, before trying to send the message again.
The pretty good brute force solution is to close and reopen the REQ socket after an error
Here is a python example.
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
from __future__ import print_function
import zmq
REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"
context = zmq.Context(1)
print("I: Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
sequence += 1
request = str(sequence).encode()
print("I: Sending (%s)" % request)
client.send(request)
expect_reply = True
while expect_reply:
socks = dict(poll.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv()
if not reply:
break
if int(reply) == sequence:
print("I: Server replied OK (%s)" % reply)
retries_left = REQUEST_RETRIES
expect_reply = False
else:
print("E: Malformed reply from server: %s" % reply)
else:
print("W: No response from server, retrying…")
# Socket is confused. Close and remove it.
client.setsockopt(zmq.LINGER, 0)
client.close()
poll.unregister(client)
retries_left -= 1
if retries_left == 0:
print("E: Server seems to be offline, abandoning")
break
print("I: Reconnecting and resending (%s)" % request)
# Create new connection
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll.register(client, zmq.POLLIN)
client.send(request)
context.term()
On server side, "receive" and "send" pair is critical. I was facing a simiar issue, while socket.send was missed.
def zmq_listen():
global counter
message = socket_.recv().decode("utf-8")
logger.info(f"[{counter}] Message: {message}")
request = json.loads(message)
request["msg_id"] = f"m{counter}"
ack = {"msg_id": request["msg_id"]}
socket_.send(json.dumps(ack).encode("utf-8"))
return request