Latencia ZMQ con PUB-SUB (suscriptor lento)

He encontrado muchas preguntas sobre un tema similar, pero no me ayudaron a resolver mi problema.

Utilizando :

  • Linux Ubuntu 14.04
  • python 3.4
  • zmq: 4.0.4 // pyZMQ 14.3.1

TL; DR

La cola del receptor en el zócalo SUB de ZMQ está creciendo indefinidamente incluso después de que se establezca HWM. Esto sucede cuando el suscriptor es más lento que el editor. ¿Qué puedo hacer para prevenirlo?

Fondo

Yo trabajo en la interacción de la computadora humana archivada. Tenemos una base de código enorme para controlar el cursor del mouse, este tipo de cosas. Quería “romperlo” en varios módulos, comunicándome con ZMQ. Debe tener la menor latencia posible, pero no es tan importante soltar (perder) mensajes.

Otro aspecto interesante es la posibilidad de agregar “espías” entre los nodos. Así, los sockets PUB / SUB parecen ser los más adecuados.

Algo como esto :

+----------+ +-----------+ +------------+ | | PUB | | PUB | | | Input | +----+------> | Filter | +----+------> | Output | | | | SUB | | | SUB | | +----------+ v +-----------+ v +------------+ +-----+ +-----+ |Spy 1| |Spy 2| +-----+ +-----+ 

Problema

Todo funciona bien, excepto cuando agregamos los espías. Si agregamos un espía haciendo “cosas pesadas”, como visualizaciones en tiempo real con matplotlib, notamos una latencia creciente en las ttwigs. IE: en el gráfico anterior, el filtro y la salida son rápidos, no se ve latencia, pero en Spy 2, la latencia puede llegar a 10 min después de correr 20 min (!!)

Parece que la cola en el receptor crece indefinidamente. Investigamos las funcionalidades de la marca High Water (HWM) de ZMQ para establecer un nivel bajo para eliminar mensajes antiguos, pero nada cambió.

Codigo minimo

La architecture

 +------------+ +-------------+ | | PUB | | | sender | -------------> | receiver | | | SUB| | +------------+ +-------------+ 

El receptor es un receptor lento (actuando como un espía en el primer gráfico)

Código:

Sender.py

 import time import zmq ctx = zmq.Context() sender = ctx.socket(zmq.PUB) sender.setsockopt(zmq.SNDBUF, 256) sender.set_hwm(10) sender.bind('tcp://127.0.0.1:1500') print(zmq.zmq_version()) ## 4.0.4 print(zmq.__version__) ## 14.3.1 print(sender.get_hwm()) ## 10 i = 0 while True: mess = "{} {}".format(i, time.time()) sender.send_string(mess) print("Send : {}".format(mess)) i+= 1 

receiver.py:

 import time import zmq ctx = zmq.Context() front_end = ctx.socket(zmq.SUB) front_end.set_hwm(1) front_end.setsockopt(zmq.RCVBUF, 8) front_end.setsockopt_string(zmq.SUBSCRIBE, '') front_end.connect('tcp://127.0.0.1:1500') print(zmq.zmq_version()) ## 4.0.4 print(zmq.__version__) ## 14.3.1 print(front_end.get_hwm()) ## 1 while True: mess = front_end.recv_string() i, t = mess.split(" ") mess = "{} {}".format(i, time.time() - float(t)) print("received : {}".format(mess)) time.sleep(1) # slow 

No creo que este sea un comportamiento normal para ZMQ Pub / Sub. Intenté establecer el HWM en el receptor, en el suscriptor, en ambos, pero nada cambió.

Qué me estoy perdiendo ?

Edit :

No creo que haya sido claro cuando expliqué mi problema. Hice una implementación moviendo el cursor del mouse. La entrada fue la posición del cursor del mouse en ZMQ a 200Hz (con un .sleep( 1.0 / 200 ) ), se realizó un procesamiento y se actualizó la posición del cursor del mouse (no tengo este sleep en mi ejemplo mínimo).

Todo fue suave, incluso cuando lancé a los espías. Sin embargo, los espías tenían una latencia creciente (debido al procesamiento lento). La latencia no aparece en el cursor, al final de la “tubería”.

Creo que el problema proviene de la lentitud del suscriptor que pone en cola los mensajes.

En mi ejemplo, si matamos al remitente y dejamos que el receptor esté vivo, los mensajes se seguirán mostrando hasta que se muestren todos (?) Los mensajes enviados.

El espía está trazando la posición del cursor para proporcionar algo de retroalimentación, aún es muy inconveniente tener un retraso así … Solo quiero que me envíen el último mensaje, es por eso que intenté bajar el HWM.

Falta un mejor diseño / validación en tiempo real

ZeroMQ es una potente capa de mensajería.

Dicho esto, verifique cuántos mensajes envía realmente por segundo en el original while True: killer-loop

Medirlo Diseño sobre hechos, no sobre sentimientos.

Los hechos importan.

 start_CLK = time.time() # .SET _CLK time.sleep( 0.001) # .NOP avoid DIV/0! i = 0 # .SET CTR while True: # .LOOP sender.send_string( "{} {}".format( i, time.time() ) ) # .SND ZMQ-PUB print i / ( time.time() - start_CLK ) # .GUI perf [msg/sec] i+= 1 # .INC CTR 

ZeroMQ hace lo mejor para rellenar esa avalancha en el esquema.

Y es bastante bueno en esto.

Su [ Filter ] + [ Spy1 ] + [ Output ] + [ Spy2 ] procesamiento de la tubería, de extremo a extremo, tiene cualquiera

  • ser más rápido, incl. ambos gastos generales de .send () + .recv_string () que el [ Input ] -sender

o

  • ser el principal elemento de locking de desecho, lo que hace que la cola interna de PUB / SUB crezca, crezca, crezca

Este problema de la cadena de colas se puede resolver con otro diseño de architecture.

Cosas para volver a pensar:

  1. muestree la cadencia [ Filter ] .send () (el factor de intercalación depende de los problemas de estabilidad del proceso en tiempo real bajo su control, ya sea 1 mseg (por una resolución de temporizador O / S, por lo que no se realizan experimentos de física cuántica son posibles con los controles de temporizador COTS O / S: o)), 10 ms para transmisión de voz bidireccional, 50 ms para transmisión de TV / GUI, 300 ms para transmisión de eventos de teclado y otros)

  2. en línea v / s post-procesamiento / visualización fuera de línea (notó un procesamiento pesado de matplotlib , en el que normalmente se llevan unos 800 – 1600 – 3600 mseg , incluso en gráficos 2D simples – mídalo antes de decidir sobre un cambio en PUB / SUB- < proc1 > -PUB / SUB- < proc2 > architecture de procesamiento (ya notó, que < spy2 > causa problemas al boost < proc2 > -PUB: alimentación y envío de gastos generales).

  3. número de subprocesos en comparación con el número de núcleos localhost, que los ejecutan, como se ve desde la dirección de correo localhost, todos los procesos residen en el mismo host local. Además, agregue + un subproceso por ZMQ.Contexto utilizado, y revise la sobrecarga de locking de Python GIL, si todos los subprocesos se crearon desde el mismo intérprete de Python … El locking crece. Bloqueo duele. Una architecture mejor distribuida puede mejorar estos aspectos de rendimiento. Sin embargo, revise primero [1] y [2]

nb llamando a un retraso de 20 minutos en el procesamiento de la tubería (un sesgo TimeDOMAIN del sistema en tiempo real) una latencia es mucho eufemística

De http://zguide.zeromq.org/page:all#toc50 :

Cuando su socket scope su HWM, bloqueará o eliminará los datos según el tipo de socket. Los sockets PUB y ROUTER eliminarán los datos si alcanzan su HWM, mientras que otros tipos de sockets se bloquearán. Durante el transporte en proceso, el remitente y el receptor comparten los mismos buffers, por lo que el HWM real es la sum del HWM establecido por ambos lados.

Así que las tomas SUB realmente no dejan caer los mensajes antiguos. Puede hacer algunos trucos con un enrutador para implementar un abonado de suscripción, o pensar en un diseño que pueda atender los elementos que son lentos. Una de las cosas buenas de Zero es que gran parte de su código central puede seguir siendo el mismo, y es probable que se mueva alrededor de los envoltorios que se ocupan de los sockets.