Tarea distribuida paralela de apio con multiprocesamiento.

Tengo una tarea de apio intensivo de CPU. Me gustaría usar toda la potencia de procesamiento (núcleos) en muchas instancias de EC2 para hacer este trabajo más rápido (una tarea distribuida en paralelo de apio con multiprocesamiento, creo ) .

Los términos, subprocesos , multiprocesamiento , computación distribuida , parallel processing distribuido son todos los términos que estoy tratando de entender mejor.

Ejemplo de tarea:

@app.task for item in list_of_millions_of_ids: id = item # do some long complicated equation here very CPU heavy!!!!!!! database.objects(newid=id).save() 

Utilizando el código anterior (con un ejemplo si es posible), ¿cómo se haría para distribuir esta tarea utilizando Celery permitiendo que esta única tarea se divida utilizando toda la potencia de la CPU de cómputo en toda la máquina disponible en la nube?

Tus metas son:

  1. Distribuya su trabajo a muchas máquinas (computación distribuida / parallel processing distribuido)
  2. Distribuir el trabajo en una máquina determinada en todas las CPU (multiprocesamiento / subprocesamiento)

El apio puede hacer ambas cosas por ti con bastante facilidad. Lo primero que hay que entender es que cada trabajador de apio está configurado de manera predeterminada para ejecutar tantas tareas como haya núcleos de CPU disponibles en un sistema:

La concurrencia es la cantidad de procesos de trabajo de Prefork utilizados para procesar sus tareas simultáneamente, cuando todos estos están ocupados haciendo trabajo, las nuevas tareas tendrán que esperar a que una de las tareas termine antes de que pueda procesarse.

El número de concurrencia predeterminado es el número de CPU en esa máquina (incluidos los núcleos) , puede especificar un número personalizado usando la opción -c. No hay un valor recomendado, ya que el número óptimo depende de una serie de factores, pero si sus tareas están mayormente vinculadas a la E / S, puede intentar boostlo, la experimentación ha demostrado que agregar más del doble del número de CPU rara vez eficaz, y es probable que se degrade el rendimiento en su lugar.

Esto significa que cada tarea individual no necesita preocuparse por el uso de multiprocesamiento / subprocesos para hacer uso de múltiples CPU / núcleos. En su lugar, apio ejecutará suficientes tareas simultáneamente para usar cada CPU disponible.

Con eso fuera del camino, el siguiente paso es crear una tarea que maneje el procesamiento de algún subconjunto de su list_of_millions_of_ids . Aquí tiene un par de opciones: una es que cada tarea maneje una única ID, de modo que ejecute N tareas, donde N == len(list_of_millions_of_ids) . Esto garantizará que el trabajo se distribuya uniformemente entre todas sus tareas, ya que nunca habrá un caso en el que un trabajador termine antes y solo esté esperando; si necesita trabajo, puede sacar una identificación de la cola. Puedes hacer esto (como lo menciona John Doe) usando un group apio.

tareas.py:

 @app.task def process_id(item): id = item #long complicated equation here database.objects(newid=id).save() 

Y para ejecutar las tareas:

 from celery import group from tasks import process_id jobs = group(process_id.s(item) for item in list_of_millions_of_ids) result = jobs.apply_async() 

Otra opción es dividir la lista en partes más pequeñas y distribuirlas a sus trabajadores. Este enfoque corre el riesgo de perder algunos ciclos, ya que puede terminar con algunos trabajadores esperando mientras otros todavía están trabajando. Sin embargo, la documentación del apio señala que esta preocupación a menudo es infundada:

Algunos pueden preocuparse de que dividir sus tareas resulte en una degradación del paralelismo, pero esto rara vez es cierto para un clúster ocupado y en la práctica, ya que está evitando la sobrecarga de mensajería, ya que puede boost considerablemente el rendimiento.

Por lo tanto, es posible que el hecho de dividir la lista y distribuir los fragmentos a cada tarea funcione mejor, debido a la reducción de la sobrecarga de mensajes. Probablemente también puede aligerar la carga en la base de datos de esta manera, calculando cada ID, almacenándola en una lista y luego agregando la lista completa a la base de datos una vez que haya terminado, en lugar de hacerlo de una en una. . El enfoque fragmentario se vería así.

tareas.py:

 @app.task def process_ids(items): for item in items: id = item #long complicated equation here database.objects(newid=id).save() # Still adding one id at a time, but you don't have to. 

Y para iniciar las tareas:

 from tasks import process_ids jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here. jobs.apply_async() 

Puedes experimentar un poco con qué tamaño de trozo te da el mejor resultado. Desea encontrar un punto dulce en el que esté reduciendo la sobrecarga de mensajes, al mismo tiempo que mantiene el tamaño lo suficientemente pequeño como para que los trabajadores no terminen su parte mucho más rápido que otro trabajador, y luego simplemente esperen sin nada que hacer.

En el mundo de la distribución solo hay una cosa que debes recordar sobre todo:

La optimización prematura es la fuente de todos los males. Por D. Knuth

Sé que suena evidente, pero antes de distribuir la doble verificación, está utilizando el mejor algoritmo (si existe …). Dicho esto, optimizar la distribución es un acto de equilibrio entre 3 cosas:

  1. Escribir / Leer datos desde un medio persistente,
  2. Mover datos del medio A al medio B,
  3. Procesando datos,

Las computadoras están hechas para que cuanto más se acerque a su unidad de procesamiento (3), más rápido y eficiente sea (1) y (2). El orden en un clúster clásico será: disco duro de red, disco duro local, RAM, dentro del territorio de la unidad de procesamiento … Hoy en día, los procesadores se están volviendo lo suficientemente sofisticados como para ser considerados como un conjunto de unidades de procesamiento de hardware independientes comúnmente denominadas núcleos, estos núcleos se procesan Datos (3) a través de hilos (2). Imagine que su núcleo es tan rápido que cuando envía datos con un subproceso está usando el 50% de la potencia de la computadora, si el núcleo tiene 2 subprocesos, utilizará el 100%. Dos subprocesos por núcleo se denominan hipervínculos, y su sistema operativo verá 2 CPU por núcleo de hipervínculos.

La gestión de subprocesos en un procesador se denomina comúnmente subprocesos múltiples. La gestión de las CPU desde el sistema operativo se denomina comúnmente multiprocesamiento. La gestión de tareas simultáneas en un clúster se denomina comúnmente progtwigción paralela. La gestión de tareas dependientes en un clúster se denomina comúnmente progtwigción distribuida.

Entonces, ¿dónde está su cuello de botella?

  • En (1): intente persistir y transmitir desde el nivel superior (el que está más cerca de su unidad de procesamiento, por ejemplo, si el disco duro de la red es lento, primero guarde en el disco duro local)
  • En (2): este es el más común, intente evitar los paquetes de comunicación que no son necesarios para la distribución o comprima los paquetes “sobre la marcha” (por ejemplo, si la HD es lenta, guarde solo un mensaje “computarizado por lotes” y mantenga la resultados intermedios en la memoria RAM).
  • En (3): ¡Ya terminaste! Usted está utilizando toda la potencia de procesamiento a su disposición.

¿Qué pasa con el apio?

Celery es un marco de mensajería para la progtwigción distribuida, que utilizará un módulo de intermediario para la comunicación (2) y un módulo de backend para la persistencia (1), esto significa que podrá cambiar la configuración para evitar la mayoría de los cuellos de botella (si es posible) en su red y solo en su red. Primero haga un perfil de su código para lograr el mejor rendimiento en una sola computadora. Luego use apio en su grupo con la configuración predeterminada y establezca CELERY_RESULT_PERSISTENT=True :

 from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//', backend='redis://localhost') @app.task def process_id(all_the_data_parameters_needed_to_process_in_this_computer): #code that does stuff return result 

Durante la ejecución, abra sus herramientas de monitoreo favoritas, uso el valor predeterminado para rabbitMQ y flower para apio y top para cpus, sus resultados se guardarán en su servidor. Un ejemplo de cuello de botella en la red es que las tareas en la cola aumentan tanto que demoran la ejecución, puede cambiar los módulos o la configuración del apio, si no, el cuello de botella se encuentra en otra parte.

¿Por qué no usar la tarea de apio group para esto?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

Básicamente, debes dividir los ids en trozos (o rangos) y darles un montón de tareas en group .

Para algo más sofisticado, como la agregación de resultados de tareas particulares de apio, he utilizado con éxito la tarea de chord para propósitos similares:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

Aumente la settings.CELERYD_CONCURRENCY a un número que sea razonable y que pueda pagar, luego esos trabajadores de apio continuarán ejecutando sus tareas en grupo o acorde hasta que terminen.

Nota: debido a un error en kombu hubo problemas con la reutilización de los trabajadores para una gran cantidad de tareas en el pasado, no sé si está arreglado ahora. Tal vez sea, pero si no, reduzca CELERYD_MAX_TASKS_PER_CHILD.

Ejemplo basado en código simplificado y modificado que ejecuto:

 @app.task def do_matches(): match_data = ... result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s()) 

summarize obtiene los resultados de todas single_batch_processor tareas single_batch_processor . Cada tarea se ejecuta en cualquier trabajador de apio, kombu coordina eso.

Ahora lo entiendo: single_batch_processor y el summarize TAMBIÉN tienen que ser tareas de apio, no funciones normales; de lo contrario, por supuesto, no estará en paralelo (ni siquiera estoy seguro de que el constructor de acordes lo acepte si no es una tarea de apio).

Agregar más trabajadores de apio ciertamente acelerará la ejecución de la tarea. Es posible que tenga otro cuello de botella sin embargo: la base de datos. Asegúrese de que puede manejar las inserciones / actualizaciones simultáneas.

Con respecto a su pregunta: Usted está agregando trabajadores de apio al asignar otro proceso en sus instancias de EC2 como celeryd . Dependiendo de cuántos trabajadores necesite, es posible que desee agregar incluso más instancias.