¿Combinando itertools y multiprocesamiento?

Tengo una matriz de 256x256x256 Numpy, en la que cada elemento es una matriz. Necesito hacer algunos cálculos en cada una de estas matrices, y quiero usar el módulo de multiprocessing para acelerar las cosas.

Los resultados de estos cálculos deben almacenarse en una matriz de 256x256x256 como la original, de modo que el resultado de la matriz en el elemento [i,j,k] en la matriz original se debe colocar en el elemento [i,j,k] de la nueva matriz.

Para hacer esto, quiero hacer una lista que se pueda escribir de forma pseudo-ish como [array[i,j,k], (i, j, k)] y pasarla a una función para que sea “multiprocesada” . Suponiendo que las matrices son una lista de todas las matrices extraídas de la matriz original y myfunc es la función que realiza los cálculos, el código se parecería a esto:

 import multiprocessing import numpy as np from itertools import izip def myfunc(finput): # Do some calculations... ... # ... and return the result and the index: return (result, finput[1]) # Make indices: inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3) # Make function input from the matrices and the indices: finput = izip(matrices, inds) pool = multiprocessing.Pool() async_results = np.asarray(pool.map_async(myfunc, finput).get(999999)) 

Sin embargo, parece que map_async realidad está creando esta enorme finput finalidades: mi CPU no está haciendo mucho, pero la memoria y el intercambio se consumen completamente en cuestión de segundos, lo que obviamente no es lo que quiero.

¿Hay alguna forma de pasar esta enorme lista a una función de multiprocesamiento sin la necesidad de crearla explícitamente primero? ¿O conoces otra forma de resolver este problema?

¡Gracias un montón! 🙂

Todos multiprocessing.Pool.map* métodos multiprocessing.Pool.map* consumen iteradores completamente (código de demostración) tan pronto como se llama a la función. Para alimentar los fragmentos de la función de mapa del iterador un fragmento a la vez, use grouper_nofill :

 def grouper_nofill(n, iterable): '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']] ''' it=iter(iterable) def take(): while 1: yield list(itertools.islice(it,n)) return iter(take().next,[]) chunksize=256 async_results=[] for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)): async_results.extend(pool.map_async(myfunc, finput).get()) async_results=np.array(async_results) 

PD. El parámetro chunksize hace algo diferente: divide el iterable en trozos, luego le da a cada trozo un proceso de trabajo que llama map(func,chunk) . Esto puede dar al proceso de trabajo más datos para masticar si la func(item) finaliza demasiado rápido, pero no ayuda en su situación ya que el iterador aún se consume completamente inmediatamente después de que se map_async llamada map_async .

Me encontré con este problema también. en lugar de esto:

 res = p.map(func, combinations(arr, select_n)) 

hacer

 res = p.imap(func, combinations(arr, select_n)) 

¡imap no lo consume!

Pool.map_async() necesita saber la longitud del iterable para enviar el trabajo a varios trabajadores. Ya que __len__ no tiene __len__ , __len__ convierte el iterable en una lista, causando el enorme uso de memoria que está experimentando.

Puedes intentar evitar esto creando tu propio iterador de estilo __len__ con __len__ .