¿Cómo tener un tamaño de búfer de cola ZMQ (ZeroMQ – PyZMQ) limitado en Python?

Yo uso la biblioteca pyzmq con pub / sub en python .

Tengo un editor de ZMQ rápido mediante el .connect() de .connect() Y un suscriptor de ZMQ más lento mediante el .bind() de .bind() .

Luego, después de unos minutos, mi suscriptor obtiene los datos antiguos publicados de los editores ( debido al búfer ZMQ ).

Mi pregunta: ¿Existe un enfoque para administrar el tamaño del búfer de cola ZMQ? (establecer un buffer limitado)

Nota : No quiero usar ZMQ PUSH / PULL.
Nota : He leído esta publicación, pero este enfoque solo borra el búfer: borra el búfer ZMQ

Nota : También probé con las opciones de marca de agua alta , pero no funcionó:

 socket.setsockopt(zmq.RCVHWM, 10) # not working socket.setsockopt(zmq.SNDHWM, 10) # not working 

Editor:

 import zmq import time port = "5556" context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:%s" % port) socket.setsockopt(zmq.SNDHWM, 10) # not working while True: data = time.time() print("%d" % data) socket.send("%d" % data) time.sleep(1) 

Abonado:

 import zmq import time port = "5556" context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:%s" % port) socket.setsockopt(zmq.SUBSCRIBE, '') socket.setsockopt(zmq.RCVHWM, 10) # not working while 1: time.sleep(2) data = socket.recv() print data 

Incluso con estas opciones, el tamaño de la cola es más de 10 todavía (con la marca de límite superior de envío / recepción configurada).

Encontré una manera de obtener la opción ” Sólo último mensaje ” en el ZMQ Suscriptor de ZMQ (usando la opción CONFLATE ).

Pero primero debes configurar la opción CONFLATE antes de conectarte:

 import zmq import time port = "5556" context = zmq.Context() socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, '') socket.setsockopt(zmq.CONFLATE, 1) # last msg only. socket.connect("tcp://localhost:%s" % port) # must be placed after above options. while 1: time.sleep(2) data = socket.recv() print data 

Por otra parte, eliminé cualquier cola almacenada en búfer en el código del suscriptor.


[ En Adicional ]:

Con las opciones zmq.SNDBUF y zmq.RCVBUF podríamos establecer un límite en el tamaño del búfer ZMQ. ( Más completo y un ejemplo )


Para establecer el tamaño de la cola / búfer, debe establecer las marcas de límite superior a través de las opciones de socket

 setsockopt(zmq.SNDHWM, 10) setsockopt(zmq.RCVHWM, 10)