multiproceso de las funciones de llamada pool.map en cierto orden

¿Cómo puedo hacer que multiprocessing.pool.map distribuya los procesos en orden numérico?


Más información:
Tengo un progtwig que procesa unos pocos miles de archivos de datos, haciendo una gráfica de cada uno. Estoy usando multiprocessing.pool.map para distribuir cada archivo a un procesador y funciona muy bien. A veces esto lleva mucho tiempo, y sería bueno mirar las imágenes de salida mientras se ejecuta el progtwig. Esto sería mucho más fácil si el proceso del mapa distribuyera las instantáneas en orden; en cambio, para la ejecución en particular que acabo de ejecutar, las primeras 8 instantáneas analizadas fueron: 0, 78, 156, 234, 312, 390, 468, 546 . ¿Hay alguna manera de hacer que se distribuyan más estrechamente en orden numérico?


Ejemplo:
Aquí hay un código de muestra que contiene los mismos elementos clave y muestra el mismo resultado básico:

 import sys from multiprocessing import Pool import time num_proc = 4; num_calls = 20; sleeper = 0.1 def SomeFunc(arg): time.sleep(sleeper) print "%5d" % (arg), sys.stdout.flush() # otherwise doesn't print properly on single line proc_pool = Pool(num_proc) proc_pool.map( SomeFunc, range(num_calls) ) 

Rendimientos:

  0 4 2 6 1 5 3 7 8 10 12 14 13 11 9 15 16 18 17 19 

Responder:

De @Hayden: use el parámetro ‘chunksize’, def map(self, func, iterable, chunksize=None) .

Más información :
El chunksize determina cuántas iteraciones se asignan a cada procesador a la vez. Mi ejemplo anterior, por ejemplo, utiliza un tamaño de trozo de 2 — lo que significa que cada procesador se apaga y hace lo suyo por 2 iteraciones de la función, luego regresa por más (“check-in”). La compensación detrás de chunksize es que hay una sobrecarga para el ‘check-in’ cuando el procesador tiene que sincronizarse con los demás, lo que sugiere que desea un gran tamaño . Por otro lado, si tiene trozos grandes, entonces un procesador puede terminar su trozo, mientras que a otro le queda mucho tiempo por delante, por lo que debe usar un trozo pequeño . Supongo que la información útil adicional es la cantidad de scope, el tiempo que puede tomar cada llamada de función. Si realmente deberían tomar la misma cantidad de tiempo, es mucho más eficiente usar un tamaño de trozo grande. Por otro lado, si algunas llamadas de función pueden durar el doble que otras, usted desea un pequeño tamaño para que los procesadores no queden atrapados esperando.

Para mi problema, cada llamada a una función debería tomar casi la misma cantidad de tiempo (creo), así que si quiero que los procesos se procesen en orden, sacrificaré la eficiencia debido a los gastos generales de facturación.

La razón por la que esto ocurre es porque a cada proceso se le asigna una cantidad predefinida de trabajo al inicio de la llamada a la chunksize que depende del chunksize del chunksize . Podemos calcular el chunksize predeterminado de chunksize mirando la fuente de pool.map

 chunksize, extra = divmod(len(iterable), len(self._pool) * 4) if extra: chunksize += 1 

Así que para un rango de 20, y con 4 procesos, obtendremos un chunksize de 2.

Si modificamos su código para reflejar esto, deberíamos obtener resultados similares a los resultados que obtiene ahora:

proc_pool.map(SomeFunc, range(num_calls), chunksize=2)

Esto produce el resultado:

0 2 6 4 1 7 5 3 8 10 12 14 9 13 15 11 16 18 17 19

Ahora, configurar chunksize=1 asegurará que a cada proceso dentro del grupo solo se le chunksize=1 una tarea a la vez.

proc_pool.map(SomeFunc, range(num_calls), chunksize=1)

Esto debería garantizar un orden numérico razonablemente bueno en comparación con eso cuando no se especifica un tamaño de chunksize. Por ejemplo, un tamaño de bloque de 1 produce el resultado:

0 1 2 3 4 5 6 7 9 10 8 11 13 12 15 14 16 17 19 18

¿Qué pasa con el cambio de map para imap :

 import os from multiprocessing import Pool import time num_proc = 4 num_calls = 20 sleeper = 0.1 def SomeFunc(arg): time.sleep(sleeper) print "%s %5d" % (os.getpid(), arg) return arg proc_pool = Pool(num_proc) list(proc_pool.imap(SomeFunc, range(num_calls))) 

La razón quizás es que el chunksize predeterminado de imap de imap es 1, por lo que puede que no se ejecute hasta el map .