multiprocessing.Pool () más lento que el uso de funciones ordinarias

(Esta pregunta es sobre cómo hacer que el multiprocesamiento.Pool () ejecute el código más rápido. Finalmente lo resolví, y la solución final se puede encontrar en la parte inferior de la publicación).

Pregunta original:

Estoy tratando de usar Python para comparar una palabra con muchas otras palabras en una lista y recuperar una lista de las más similares. Para hacer eso estoy usando la función difflib.get_close_matches. Estoy en una computadora portátil con Windows 7 relativamente nueva y potente, con Python 2.6.5.

Lo que quiero es acelerar el proceso de comparación porque mi lista de palabras de comparación es muy larga y tengo que repetir el proceso de comparación varias veces. Cuando me enteré del módulo de multiprocesamiento, parecía lógico que si la comparación pudiera dividirse en tareas de trabajo y ejecutarse simultáneamente (y, por lo tanto, utilizar la potencia de la máquina a cambio de una velocidad más rápida) mi tarea de comparación terminaría más rápido.

Sin embargo, incluso después de haber probado muchos métodos diferentes, y se usaron métodos que se han mostrado en los documentos y sugerido en las publicaciones del foro, el método de Pool parece ser increíblemente lento, mucho más lento que simplemente ejecutar la función get_close_matches original en toda la lista en una vez. Me gustaría recibir ayuda para entender por qué Pool () está siendo tan lento y si lo uso correctamente. Solo uso este escenario de comparación de cadenas como ejemplo porque ese es el ejemplo más reciente en el que pude pensar en dónde no podía entender o hacer que el multiprocesamiento funcionara en lugar de hacerlo en mi contra. A continuación se muestra un código de ejemplo del escenario difflib que muestra las diferencias de tiempo entre los métodos ordinarios y los combinados:

from multiprocessing import Pool import random, time, difflib # constants wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(1000000)] mainword = "hello" # comparison function def findclosematch(subwordlist): matches = difflib.get_close_matches(mainword,subwordlist,len(subwordlist),0.7) if matches  []: return matches # pool print "pool method" if __name__ == '__main__': pool = Pool(processes=3) t=time.time() result = pool.map_async(findclosematch, wordlist, chunksize=100) #do something with result for r in result.get(): pass print time.time()-t # normal print "normal method" t=time.time() # run function result = findclosematch(wordlist) # do something with results for r in result: pass print time.time()-t 

La palabra que se encuentra es “hola”, y la lista de palabras para encontrar coincidencias es una larga lista de 1 millón de 5 caracteres unidos al azar (solo con fines ilustrativos). Utilizo 3 núcleos de procesador y la función de mapa con un tamaño de trozo de 100 (¿qué elementos se deben procesar por trabajador, creo?) (También probé bloques de 1000 y 10 000, pero no hubo una diferencia real). Tenga en cuenta que, en ambos métodos, comienzo el temporizador justo antes de activar mi función y lo finalizo justo después de haber repasado los resultados. Como puede ver a continuación, los resultados de los tiempos están claramente a favor del método original que no es Pool:

 >>> pool method 37.1690001488 seconds normal method 10.5329999924 seconds >>> 

El método Pool es casi 4 veces más lento que el método original. ¿Hay algo que me esté perdiendo aquí, o tal vez un malentendido sobre cómo funciona el Pooling / multiprocesamiento? Sospecho que parte del problema aquí podría ser que la función de mapa no devuelve Ninguno y, por lo tanto, agrega miles de elementos innecesarios a la lista de resultados, aunque solo quiero que las coincidencias reales se devuelvan a los resultados y lo escribí como tal en la función. Por lo que entiendo es cómo funciona el mapa. He escuchado acerca de algunas otras funciones como el filtro que solo recostack resultados no falsos, pero no creo que multiproceso / Pool admita el método de filtro. ¿Hay otras funciones además del mapa / imap en el módulo de multiprocesamiento que podrían ayudarme a devolver solo lo que devuelve mi función? Aplicar función es más para dar múltiples argumentos como lo entiendo.

Sé que también está la función imap, que probé pero sin mejoras de tiempo. La razón es la misma razón por la que he tenido problemas para entender qué tiene de bueno el módulo de itertools, supuestamente “rápido como un rayo”, lo que he notado es cierto para llamar a la función, pero en mi experiencia y por lo que he leído, eso es porque llamar a la función en realidad no hace ningún cálculo, por lo que cuando es tiempo de recorrer los resultados para recostackrlos y analizarlos (sin los cuales no tendría sentido llamar a la unidad), se necesita tanto o, a veces, más tiempo que un Simplemente usando la versión normal de la función straightup. Pero supongo que es para otro post.

De todos modos, estoy emocionado de ver si alguien puede empujarme en la dirección correcta aquí, y realmente aprecio cualquier ayuda en esto. Estoy más interesado en comprender el multiprocesamiento en general que en hacer funcionar este ejemplo, aunque sería útil con algunos ejemplos de sugerencias de códigos de soluciones para ayudarme en mi comprensión.

La respuesta:

Parece que la desaceleración tuvo que ver con el lento tiempo de inicio de procesos adicionales. No pude conseguir que la función .Pool () fuera lo suficientemente rápida. Mi solución final para hacerlo más rápido fue dividir manualmente la lista de cargas de trabajo, usar varios .Process () en lugar de .Pool () y devolver las soluciones en una cola. Pero me pregunto si tal vez el cambio más crucial podría haber sido dividir la carga de trabajo en términos de la palabra principal a buscar en lugar de las palabras con las que comparar, quizás porque la función de búsqueda difflib ya es muy rápida. Aquí está el nuevo código que ejecuta 5 procesos al mismo tiempo, y resultó aproximadamente x10 más rápido que ejecutar un código simple (6 segundos frente a 55 segundos). Muy útil para búsquedas rápidas y difusas, además de lo rápido que es difflib.

 from multiprocessing import Process, Queue import difflib, random, time def f2(wordlist, mainwordlist, q): for mainword in mainwordlist: matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7) q.put(matches) if __name__ == '__main__': # constants (for 50 input words, find closest match in list of 100 000 comparison words) q = Queue() wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(100000)] mainword = "hello" mainwordlist = [mainword for each in xrange(50)] # normal approach t = time.time() for mainword in mainwordlist: matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7) q.put(matches) print time.time()-t # split work into 5 or 10 processes processes = 5 def splitlist(inlist, chunksize): return [inlist[x:x+chunksize] for x in xrange(0, len(inlist), chunksize)] print len(mainwordlist)/processes mainwordlistsplitted = splitlist(mainwordlist, len(mainwordlist)/processes) print "list ready" t = time.time() for submainwordlist in mainwordlistsplitted: print "sub" p = Process(target=f2, args=(wordlist,submainwordlist,q,)) p.Daemon = True p.start() for submainwordlist in mainwordlistsplitted: p.join() print time.time()-t while True: print q.get() 

Mi mejor conjetura es la sobrecarga de comunicación entre procesos (IPC). En la instancia de proceso único, el proceso único tiene la lista de palabras. Al delegar en varios otros procesos, el proceso principal debe transportar constantemente las secciones de la lista a otros procesos.

Por lo tanto, se deduce que un mejor enfoque podría ser escindir n procesos, cada uno de los cuales es responsable de cargar / generar 1 / n segmento de la lista y verificar si la palabra está en esa parte de la lista.

Sin embargo, no estoy seguro de cómo hacerlo con la biblioteca de multiprocesamiento de Python.

Estos problemas generalmente se reducen a lo siguiente:

¡La función que intenta paralelizar no requiere suficientes recursos de CPU (es decir, tiempo de CPU) para racionalizar la paralelización!

Claro, cuando se paraliza con el multiprocessing.Pool(8) , teóricamente ( pero no en la práctica) podría obtener una aceleración de 8x .

Sin embargo, tenga en cuenta que esto no es gratis: usted obtiene esta paralelización a costa de los siguientes gastos generales:

  1. Creando una task para cada chunk (de tamaño chunksize ) en su iter pasado a Pool.map(f, iter)
  2. Para cada task
    1. Serialice la task y el valor de retorno de la task's ( piense en pickle.dumps() )
    2. Deserialice la task y el valor de retorno de la task's ( piense en pickle.loads() )
    3. Pierda un tiempo significativo en espera de Locks en Queues memoria compartida, mientras que los procesos de trabajo y los procesos primarios get() y put() de / a estas Queues .
  3. Costo por única vez de llamadas a os.fork() para cada proceso de trabajo, lo que es costoso.

En esencia, cuando usas Pool() quieres:

  1. Altos requisitos de recursos de CPU
  2. Baja huella de datos pasada a cada llamada de función
  3. Itinerario razonablemente largo para justificar el costo único del (3) anterior.

Para una exploración más profunda, esta publicación y charla enlazada explican cómo los datos grandes que se pasan a Pool.map() ( y amigos) lo meten en problemas.

Raymond Hettinger también habla sobre el uso adecuado de la concurrencia de Python aquí.

Experimenté algo similar con la piscina en un problema diferente. No estoy seguro de la causa real en este punto …

La edición de respuestas de OP Karim Bahgat es la misma solución que funcionó para mí. Después de cambiar a un sistema Process & Queue, pude ver aceleraciones en línea con la cantidad de núcleos de una máquina.

Aquí hay un ejemplo.

 def do_something(data): return data * 2 def consumer(inQ, outQ): while True: try: # get a new message val = inQ.get() # this is the 'TERM' signal if val is None: break; # unpack the message pos = val[0] # its helpful to pass in/out the pos in the array data = val[1] # process the data ret = do_something(data) # send the response / results outQ.put( (pos, ret) ) except Exception, e: print "error!", e break def process_data(data_list, inQ, outQ): # send pos/data to workers for i,dat in enumerate(data_list): inQ.put( (i,dat) ) # process results for i in range(len(data_list)): ret = outQ.get() pos = ret[0] dat = ret[1] data_list[pos] = dat def main(): # initialize things n_workers = 4 inQ = mp.Queue() outQ = mp.Queue() # instantiate workers workers = [mp.Process(target=consumer, args=(inQ,outQ)) for i in range(n_workers)] # start the workers for w in workers: w.start() # gather some data data_list = [ d for d in range(1000)] # lets process the data a few times for i in range(4): process_data(data_list) # tell all workers, no more data (one msg for each) for i in range(n_workers): inQ.put(None) # join on the workers for w in workers: w.join() # print out final results (i*16) for i,dat in enumerate(data_list): print i, dat