¿Qué tipo de problemas (si los hay) serían la combinación de asyncio con multiprocesamiento?

Como casi todo el mundo es consciente de la primera vez que observan el subprocesamiento en Python, existe la GIL que hace que la vida sea miserable para las personas que realmente quieren procesar en paralelo, o al menos darle una oportunidad.

Actualmente estoy buscando implementar algo como el patrón de Reactor. Efectivamente, quiero escuchar las conexiones de socket entrantes en un hilo, y cuando alguien intenta conectarse, acepte esa conexión y pásela a otro hilo para su procesamiento.

No estoy (todavía) seguro de qué tipo de carga podría estar enfrentando. Sé que actualmente hay un límite de 2MB en los mensajes entrantes. Teóricamente podríamos obtener miles por segundo (aunque no sé si prácticamente hemos visto algo así). La cantidad de tiempo empleado en procesar un mensaje no es terriblemente importante, aunque obviamente más rápido sería mejor.

Estaba buscando en el patrón de Reactor, y desarrollé un pequeño ejemplo utilizando la biblioteca de multiprocessing que (al menos en las pruebas) parece funcionar bien. Sin embargo, ahora / pronto tendremos la biblioteca de asyncio disponible, que manejaría el bucle de eventos para mí.

¿Hay algo que pueda asyncio combinando asyncio y multiprocessing ?

Debe poder combinar de forma segura asyncio y multiprocessing sin demasiados problemas, aunque no debe usar multiprocessing directamente. El pecado cardinal de asyncio (y cualquier otro marco asíncrono basado en bucle de evento) está bloqueando el bucle de evento. Si intenta utilizar el multiprocessing directamente, cada vez que bloquee para esperar un proceso secundario, bloqueará el bucle de eventos. Obviamente, esto es malo.

La forma más sencilla de evitar esto es usar BaseEventLoop.run_in_executor para ejecutar una función en un concurrent.futures.ProcessPoolExecutor . ProcessPoolExecutor es un grupo de procesos implementado mediante multiprocessing.Process , pero asyncio tiene soporte incorporado para ejecutar una función sin bloquear el bucle de eventos. Aquí hay un ejemplo simple:

 import time import asyncio from concurrent.futures import ProcessPoolExecutor def blocking_func(x): time.sleep(x) # Pretend this is expensive calculations return x * 5 @asyncio.coroutine def main(): #pool = multiprocessing.Pool() #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop. executor = ProcessPoolExecutor() out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not print(out) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main()) 

Para la mayoría de los casos, esta función solo es suficiente. Si necesita otras construcciones de multiprocessing , como Queue , Event , Manager , etc., hay una biblioteca de terceros llamada aioprocessing (revelación completa: lo escribí), que proporciona versiones compatibles con asyncio de todas las estructuras de datos de multiprocessing . Aquí hay un ejemplo demostrando que:

 import time import asyncio import aioprocessing import multiprocessing def func(queue, event, lock, items): with lock: event.set() for item in items: time.sleep(3) queue.put(item+5) queue.close() @asyncio.coroutine def example(queue, event, lock): l = [1,2,3,4,5] p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) p.start() while True: result = yield from queue.coro_get() if result is None: break print("Got result {}".format(result)) yield from p.coro_join() @asyncio.coroutine def example2(queue, event, lock): yield from event.coro_wait() with (yield from lock): yield from queue.coro_put(78) yield from queue.coro_put(None) # Shut down the worker if __name__ == "__main__": loop = asyncio.get_event_loop() queue = aioprocessing.AioQueue() lock = aioprocessing.AioLock() event = aioprocessing.AioEvent() tasks = [ asyncio.async(example(queue, event, lock)), asyncio.async(example2(queue, event, lock)), ] loop.run_until_complete(asyncio.wait(tasks)) loop.close() 

Sí, hay algunos bits que pueden (o no) morderlo.

  • Cuando ejecutas algo como asyncio se espera que se ejecute en un subproceso o proceso. Esto no (por sí mismo) funciona con el parallel processing. De alguna manera, tiene que distribuir el trabajo mientras deja las operaciones de IO (específicamente aquellas en sockets) en un solo proceso / subproceso.
  • Si bien su idea de transferir conexiones individuales a un proceso de controlador diferente es buena, es difícil de implementar. El primer obstáculo es que necesita una forma de desconectar la conexión de asyncio sin cerrarla. El siguiente obstáculo es que no puede simplemente enviar un descriptor de archivo a un proceso diferente a menos que use un código específico de la plataforma (probablemente Linux) desde una extensión C.
  • Tenga en cuenta que se sabe que el módulo de multiprocessing crea una serie de subprocesos para la comunicación. La mayoría de las veces, cuando utiliza estructuras de comunicación (como las Queue ), se genera un subproceso. Desafortunadamente esos hilos no son completamente invisibles. Por ejemplo, pueden fallar en derribar de forma limpia (cuando intenta terminar su progtwig), pero dependiendo de su número, el uso de recursos puede ser notable por sí solo.

Si realmente pretende manejar conexiones individuales en procesos individuales, sugiero examinar diferentes enfoques. Por ejemplo, puede poner un zócalo en modo de escucha y luego aceptar simultáneamente conexiones de múltiples procesos de trabajo en paralelo. Una vez que un trabajador ha terminado de procesar una solicitud, puede aceptar la siguiente conexión, por lo que aún usa menos recursos que el proceso de bifurcación para cada conexión. Spamassassin y Apache (mpm prefork) pueden usar este modelo de trabajador, por ejemplo. Podría terminar más fácil y más robusto dependiendo de su caso de uso. Específicamente, puede hacer que sus trabajadores mueran después de atender un número configurado de solicitudes y ser reaparecido por un proceso maestro, eliminando así gran parte de los efectos negativos de las memory leaks.

Ver PEP 3156, en particular la sección sobre interacción de hilos:

http://www.python.org/dev/peps/pep-3156/#thread-interaction

Esto documenta claramente los nuevos métodos asíncticos que podría usar, incluyendo run_in_executor (). Tenga en cuenta que el Ejecutor está definido en concurrent.futures, le sugiero que también eche un vistazo allí.