Limitar la longitud de la cola con PyZMQ

Quiero limitar la cantidad de memoria consumida por mis colas de mensajes de ZeroMQ en una aplicación Python. Sé que establecer la marca de límite superior limitará la cantidad que se pondrá en cola en el lado del remitente, pero ¿hay una manera de controlar cuánto se pondrá en cola en el lado del receptor? El enlace Python ZeroMQ parece tenerlo establecido en ilimitado.

Mi escenario de prueba: Tengo dos terminales python que estoy usando para probar. Uno es el receptor:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) [GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> context = zmq.Context() >>> socket = context.socket(zmq.PULL) >>> socket.setsockopt(zmq.RCVBUF, 256) >>> socket.bind("tcp://127.0.0.1:12345") 

El otro es el remitente:

 Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) [GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> context=zmq.Context() >>> socket = context.socket(zmq.PUSH) >>> socket.setsockopt(zmq.SNDBUF, 2048) >>> socket.setsockopt(zmq.HWM, 1) >>> socket.connect("tcp://127.0.0.1:12345") >>> num = 0 >>> while True: ... print num ... socket.send(str(num)) ... num = num + 1 ... 

socket.recv() en el lado del receptor un par de veces para asegurarme de que la cola funcione, pero aparte de eso, deje que los dos terminales se queden allí. El bucle de envío parece que nunca se bloquea y el indicador de recepción parece tener una huella de memoria creciente.

En contradicción con la documentación de ZeroMQ, la marca de límite superior debe establecerse tanto en el lado PUSH como en el lado PULL . Una vez que cambié el PULL , funcionó mejor. El nuevo código PULL es:

 Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) [GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> context=zmq.Context() >>> socket = context.socket(zmq.PULL) >>> socket.setsockopt(zmq.RCVBUF, 256) >>> socket.setsockopt(zmq.HWM, 1) >>> socket.bind("tcp://127.0.0.1:12345") 

En realidad, la documentación dice esto:

“Cuando un zócalo ZMQ_PUSH entra en un estado excepcional debido a que ha alcanzado la marca de límite superior para todos los nodos en sentido descendente, o si no hay nodos en sentido descendente en absoluto, entonces cualquier operación zmq_send (3) en el zócalo se bloqueará hasta que el estado excepcional finalice o al menos un nodo posterior está disponible para el envío; los mensajes no se descartan “.

http://api.zeromq.org/2-1:zmq-socket

Lo que indica claramente que puede (y debería) establecer la marca de límite superior de los nodos aguas abajo (también conocido como arrastre), y tal vez implica que establecerla en el lado de empuje no tendrá ningún efecto (aunque sospecho que eso no es cierto, porque todavía existe la caso en el que los nodos intermedios están disponibles pero los mensajes llegan más rápido de lo que se pueden enviar.)

Con las opciones zmq.SNDBUF y zmq.RCVBUF puede establecer un límite en el tamaño del búfer .


Además, estoy usando la opción zmq.CONFLATE en el lado del receptor para limitar el tamaño de la cola ZeroMQ a uno:

Aquí hay un ejemplo con ZMQ PUSH/PULL :

Remitente ( zmq.PUSH ):

 def create_pub_socket(ip, port): try: context = zmq.Context() socket = context.socket(zmq.PUSH) socket.setsockopt(zmq.SNDHWM, 1) zmq_address = "tcp://{}:{}".format(ip, port) socket.connect(zmq_address) return socket except zmq.ZMQError as exp: print(exp) return False sock = create_push_socket('127.0.0.1', 5558) if sock: sock.send_json({'a': 1}) 

Getter ( zmq.PULL ):

 def listen(self): sock = None try: context = zmq.Context() sock = context.socket(zmq.PULL) sock.setsockopt(zmq.RCVHWM, 1) sock.setsockopt(zmq.CONFLATE, 1) # last msg only. sock.bind("tcp://*:5558") except zmq.ZMQError: logger.captureException() configs = None while configs is None: if sock: configs = sock.recv_json() time.sleep(1e-1) else: time.sleep(5) listen() # Recursive. listen()