¿Cómo enviar secuencias de video OpenCV a través de sockets ZeroMQ?

Tengo una cámara web simple que leo con OpenCV y ahora estoy tratando de enviar este video a un progtwig diferente (Python) usando ZeroMQ . Así que tengo el siguiente script simple para leer la cámara web y enviarla utilizando un socket ZeroMQ:

import cv2 import os import zmq import base64 context = zmq.Context() footage_socket = context.socket(zmq.PUB) footage_socket.connect('tcp://localhost:5555') # init the camera camera = cv2.VideoCapture(0) while True: try: (grabbed, frame) = camera.read() # grab the current frame frame = cv2.resize(frame, (640, 480)) # resize the frame footage_socket.send_string(base64.b64encode(frame)) # Show the video in a window cv2.imshow("Frame", frame) # show the frame to our screen cv2.waitKey(1) # Display it at least one ms # # before going to the next frame except KeyboardInterrupt: camera.release() cv2.destroyAllWindows() print "\n\nBye bye\n" break 

Esto funciona bien porque muestra el video y no da ningún error.

cv2.imshow() las dos líneas que muestran la imagen ( cv2.imshow() y cv2.waitKey(1) ). Entonces empecé el guión a continuación en paralelo. Este segundo guión debe recibir el video y mostrarlo.

 import cv2 import zmq import base64 import numpy as np context = zmq.Context() footage_socket = context.socket(zmq.SUB) footage_socket.bind('tcp://*:5555') footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode('')) # camera = cv2.VideoCapture("output.avi") while True: try: frame = footage_socket.recv_string() frame = np.fromstring(base64.b64decode(frame), dtype=np.uint8) cv2.imshow("Frame", frame) # show the frame to our screen cv2.waitKey(1) # Display it at least one ms # # before going to the next frame except KeyboardInterrupt: cv2.destroyAllWindows() break print "\n\nBye bye\n" 

Desafortunadamente, esto se congela en cv2.waitKey(1) .

¿Alguien sabe lo que estoy haciendo mal aquí? ¿Necesito decodificar las imágenes de manera diferente? Todos los consejos son bienvenidos!

Paso 0: Inventario de Riesgos.

Dado que el objective es claro, su creación rápida de prototipos de la infraestructura de aplicaciones distribuidas depende de varios puntos de riesgo.

0) cv2 módulo OpenCV cv2 utiliza en gran medida los componentes subyacentes basados ​​en C y Python cv2.imshow() cuelga con bastante frecuencia si se intenta utilizar los servicios cv2.imshow() y cv2 (FSA externo) y los servicios de visualización de marcos y ventanas.

1) El marco de trabajo de ZeroMQ puede ayudarlo mucho más que tratar de imponer que los datos se conviertan en (string) solo para usar .send_string() / .recv_string() . Puede mejorar aquí una vez, en lugar de mover la imagen conocida ( pxW * pxH ) geometría * Profundidad RGB dentro de algún objeto BLOB mapeado más inteligente (hablará sobre este aspecto un poco más en la perspectiva de la architecture a continuación).

2) Teniendo en cuenta los puntos 0 y 1, la infraestructura de ZeroMQ (más en prototipos) debería ser robusta contra las excepciones no manejadas (lo que dejaría zmq.Context() instancias y sus .Socket() asociados .Socket() cuelgan por sí solos en los recursos asignados en caso cv2.imshow() bloquea, como ocurre a menudo en los bucles de creación rápida de prototipos.

Por lo tanto, un encuadre minucioso y autodisciplinado del código dentro de un try: except: finally: manejador y .setsockopt( zmq.LINGER, 0 ) explícito inicial .setsockopt( zmq.LINGER, 0 ) inmediatamente después de la .setsockopt( zmq.LINGER, 0 ) instancias de socket + final .close() + context.term() dentro del finally: sección de manejador es una necesidad justa.


Paso 1: Valide la parte ZeroMQ enviando un SEQ de int

El mejor primer nivel de aislamiento de problemas es configurar el flujo, solo para entregar una SEQ no controlada de enteros, siendo .send( ) transmitido desde el lado PUB .

 ... # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE SEQ += 1 footage_socket.send( SEQ, zmq.NOBLOCK ) # PUB.send( SEQ ) -> *SUB* ... 

A menos que su lado receptor demuestre que es un manejo robusto de un flujo de int-s, no tiene sentido seguir adelante.

 ... aMsgIN = footage_socket.recv( zmq.NOBLOCK ) # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE print "{0:}".format( aMsgIN if len( aMsgIN ) > 0 else "." ), # sleep(...) # backthrottle the loop a bit ... 

Paso 2: .imshow() de .recv() data & event-loops

En caso de que sus bombas de datos funcionen según sea necesario, la pantalla remota comenzará a tener sentido como próximo objective.

ZeroMQ entrega un mensaje completo (un BLOB) o nada. Esto es un hecho. A continuación, ZeroMQ no garantiza a nadie la entrega sin errores. Estos son los hechos, su diseño tiene que vivir.

La adquisición es la parte más simple, solo tome los datos (tal vez algunas conversiones de espacio de colores pueden ser centrales aquí, pero de lo contrario, la tarea es (para procesamiento de imágenes sub 4K / sub 30 fps) generalmente sin errores en este lado.

frame , como se adquirió anteriormente, es una instancia numpy.ndarray . El mejor rendimiento se obtendría al enviar un BLOB codificado de manera binaria, sin ninguna conversión “inteligente”, ya que, obviamente, el frame es solo un gran paquete de bits (aunque hay algunas delicadezas más avanzadas posibles con Cero copia Mecánica de ZeroMQ, pero esto no haría ningún beneficio directo en esta etapa en el lado del remitente).

 # struct.unpack() / .pack() seems "just"-enough for fast & smart custom Payload protocol designs DecodeWireMSG_DATA( aMSG_DATA = struct.unpack( "!" + ( aPayloadHEADER + ( ( pxW * pxH ) * RGB_DEPTH * CELL_SIZE ) ) * "I", aMsgIN ) ) 

La parte más dura está en el lado del receptor. Si uno intenta usar el motor de bucle de eventos incorporado cv2 , oculto dentro de .imshow() , este bucle chocará con su lógica externa de leer un flujo de actualizaciones a través de .recv() tal como se publicó desde el lado de PUB .

Como compromiso razonable, uno puede ignorar todas las frame “retrasadas”, que no obtuvieron procesos en sincronía con la cadencia de adquisición / transmisión del lado de PUB, y simplemente mostrar la más reciente … usando la opción zmq.CONFLATE en la infraestructura de transporte ZeroMQ (las imágenes “antiguas” retrasadas han perdido su sentido si el propósito de la reconstrucción del flujo de eventos es solo una precepción visual, por el contrario, si el propósito es documentar la adquisición completa 1: 1, zmq.CONFLATE descartaría frame instancias de frame , que deberían procesarse, por lo que se debería agregar alguna otra architecture para un propósito documental 1: 1, mejor separado de la twig “sólo visual” del flujo de datos / procesamiento).

Una vez hecho esto, el .recv() (podría ser una composición de un bucle Poller() + .recv() ) proporcionará al lado SUB una bomba de datos apropiada, que es independiente de las herramientas cv2 y es .imshow() (oculto) -FSA evento-loop.


Consejos de architecture y rendimiento:

  • para proyectos con mayor fps + FullHD / 2K / 4K, cv2 sistemáticamente las latencias / retrasos de procesamiento acumulados del procesamiento de cv2 , utilizando los métodos ‘ { .start(), .stop() } instancias de { .start(), .stop() } .

  • teniendo datos duros, puede detectar a tiempo, cuando surjan necesidades adicionales para resolver algunas restricciones aún más difíciles en tiempo real, así que piense en:

  • Principalmente evite cualquier riesgo de caer en un agujero negro incontrolable de recolección de basura de Python: controle siempre { gc.disable() | gc.enable();gc.collect() } { gc.disable() | gc.enable();gc.collect() } rodeando las secciones de ruta crítica y lanzando un gc.collect() explícito gc.collect() donde su diseño sabe que es factible.

  • evite nuevos retrasos en la asignación de memoria: puede pre-asignar todos los arreglos numpy necesarios y, más tarde, simplemente imponer el numpy para usarlos en un modo in situ de modificaciones de datos, evitando así cualquier otro estado de espera asociado a la administración de memoria ad-hoc

  • diseñe el tráfico para boost la inmunidad a errores con actualizaciones separadas, de múltiples flujos e independientes de solo secciones (franjas) de toda la imagen grande / (color) -deep (recuerde la Garantía Cero: obtenga un mensaje completo “grueso” o None )

  • zmq.AFFINITY rendimiento de zmq.AFFINITY .Context() utilizando zmq.AFFINITY para asignar diferentes clases de prioridades de tráfico de E / S en zmq.Context( N ) segregado. zmq.Context( N ) entrada / salida zmq.Context( N ) .

  • sintonice con zmq.SNDBUF + zmq.SNDHWM en el lado PUB, si se esperan múltiples suscriptores y no se usa zmq.CONFLATE .

  • Por último, pero no por ello menos importante, puede aprovechar numba.jit() aceleración precomstackda de LLVM del código reutilizable para las funciones de ruta crítica (normalmente el procesamiento de numpy pesados), donde microsegundos adicionales numpy traen sus efectos más beneficiosos en su canalización de procesamiento de video, mientras aún permanece en la comodidad de python puro (bueno, claro, aún con algunas advertencias de cv2 ).


Algunos consejos y trucos más con cv2 en la fase de creación de prototipos:

Puede gustar esto para el cv2 imágenes basado en cv2 .

Puede ser así para el cv2 simple de parámetros de la interfaz gráfica de usuario cv2 .

Puede gustar esto para el procesamiento de perfiles de canalización zmq.Stopwatch() con detalles de zmq.Stopwatch() hasta [usec]

El objeto de marco contiene estados de memoria para el estado del servidor y cuando se envía al cliente, se congela porque el marco que recibe es una copia superficial. Intenta buscar cómo puedes hacer una copia profunda para el marco.

Al final resolví el problema tomando pasos intermedios. Primero escribí imágenes individuales en el disco, y luego volví a leer esas imágenes. Eso me llevó al hecho de que necesitaba codificar el marco como una imagen (opté por jpg), y con los métodos mágicos cv2.imencode('.jpg', frame) y cv2.imdecode(npimg, 1) pude hazlo funcionar. He pegado el código de trabajo completo a continuación.

Este primer script lee la cámara web y envía el material de archivo a través de un zeromq socket:

 import cv2 import zmq import base64 context = zmq.Context() footage_socket = context.socket(zmq.PUB) footage_socket.connect('tcp://localhost:5555') camera = cv2.VideoCapture(0) # init the camera while True: try: (grabbed, frame) = camera.read() # grab the current frame frame = cv2.resize(frame, (640, 480)) # resize the frame encoded, buffer = cv2.imencode('.jpg', frame) footage_socket.send_string(base64.b64encode(buffer)) except KeyboardInterrupt: camera.release() cv2.destroyAllWindows() print "\n\nBye bye\n" break 

y esta segunda secuencia de comandos recibe las imágenes de marco y las muestra:

 import cv2 import zmq import base64 import numpy as np context = zmq.Context() footage_socket = context.socket(zmq.SUB) footage_socket.bind('tcp://*:5555') footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode('')) while True: try: frame = footage_socket.recv_string() img = base64.b64decode(frame) npimg = np.fromstring(img, dtype=np.uint8) source = cv2.imdecode(npimg, 1) cv2.imshow("image", source) cv2.waitKey(1) except KeyboardInterrupt: cv2.destroyAllWindows() print "\n\nBye bye\n" break 

En cualquier caso te deseo un hermoso día!