python concurrent.futures.ProcessPoolExecutor: Rendimiento de .submit () vs .map ()

Estoy usando concurrent.futures.ProcessPoolExecutor para encontrar la ocurrencia de un número de un rango de números. La intención es investigar la cantidad de rendimiento de aceleración obtenida de la concurrencia. Para comparar el rendimiento, tengo un control: un código de serie para realizar dicha tarea (que se muestra a continuación). He escrito 2 códigos concurrentes, uno usando concurrent.futures.ProcessPoolExecutor.submit() y el otro usando concurrent.futures.ProcessPoolExecutor.map() para realizar la misma tarea. Se muestran a continuación. Los consejos para redactar el primero y el segundo se pueden ver aquí y aquí , respectivamente.

La tarea emitida para los tres códigos fue encontrar el número de ocurrencias del número 5 en el rango de números de 0 a 1E8. A .submit() y .map() les asignaron 6 trabajadores, y .map() tenía un tamaño de 10.000. La manera de discretizar la carga de trabajo fue idéntica en los códigos concurrentes. Sin embargo, la función utilizada para encontrar ocurrencias en ambos códigos era diferente. Esto se debió a que la forma en que se pasaron los argumentos a una función llamada por .submit () y .map () eran diferentes.

Los 3 códigos reportaron el mismo número de ocurrencias, es decir, 56,953,279 veces. Sin embargo, el tiempo necesario para completar la tarea fue muy diferente. .submit() realizó 2 veces más rápido que el control, mientras que .map() tomó el doble de tiempo que el control para completar su tarea.

Preguntas:

  1. Me gustaría saber si el rendimiento lento de .map() es un artefacto de mi encoding o es inherentemente lento “. Si es el primero, ¿cómo puedo mejorarlo? Me sorprende que haya funcionado más lento que el control, ya que No habrá mucho incentivo para usarlo.
  2. Me gusta saber si hay .submit() hacer que el código .submit() funcione aún más rápido. Una condición que tengo es que la función _concurrent_submit() debe devolver un iterable con los números / apariciones que contienen el número 5.

Resultados de referencia
resultados de referencia

concurrent.futures.ProcessPoolExecutor.submit ()

 #!/usr/bin/python3.5 # -*- coding: utf-8 -*- import concurrent.futures as cf from time import time from traceback import print_exc def _findmatch(nmin, nmax, number): '''Function to find the occurrence of number in range nmin to nmax and return the found occurrences in a list.''' print('\n def _findmatch', nmin, nmax, number) start = time() match=[] for n in range(nmin, nmax): if number in str(n): match.append(n) end = time() - start print("found {0} in {1:.4f}sec".format(len(match),end)) return match def _concurrent_submit(nmax, number, workers): '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to find the occurences of a given number in a number range in a parallelised manner.''' # 1. Local variables start = time() chunk = nmax // workers futures = [] found =[] #2. Parallelization with cf.ProcessPoolExecutor(max_workers=workers) as executor: # 2.1. Discretise workload and submit to worker pool for i in range(workers): cstart = chunk * i cstop = chunk * (i + 1) if i != workers - 1 else nmax futures.append(executor.submit(_findmatch, cstart, cstop, number)) # 2.2. Instruct workers to process results as they come, when all are # completed or ..... cf.as_completed(futures) # faster than cf.wait() # 2.3. Consolidate result as a list and return this list. for future in futures: for f in future.result(): try: found.append(f) except: print_exc() foundsize = len(found) end = time() - start print('within statement of def _concurrent_submit():') print("found {0} in {1:.4f}sec".format(foundsize, end)) return found if __name__ == '__main__': nmax = int(1E8) # Number range maximum. number = str(5) # Number to be found in number range. workers = 6 # Pool of workers start = time() a = _concurrent_submit(nmax, number, workers) end = time() - start print('\n main') print('workers = ', workers) print("found {0} in {1:.4f}sec".format(len(a),end)) 

concurrent.futures.ProcessPoolExecutor.map ()

 #!/usr/bin/python3.5 # -*- coding: utf-8 -*- import concurrent.futures as cf import itertools from time import time from traceback import print_exc def _findmatch(listnumber, number): '''Function to find the occurrence of number in another number and return a string value.''' #print('def _findmatch(listnumber, number):') #print('listnumber = {0} and ref = {1}'.format(listnumber, number)) if number in str(listnumber): x = listnumber #print('x = {0}'.format(x)) return x def _concurrent_map(nmax, number, workers): '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to find the occurrences of a given number in a number range in a parallelised manner.''' # 1. Local variables start = time() chunk = nmax // workers futures = [] found =[] #2. Parallelization with cf.ProcessPoolExecutor(max_workers=workers) as executor: # 2.1. Discretise workload and submit to worker pool for i in range(workers): cstart = chunk * i cstop = chunk * (i + 1) if i != workers - 1 else nmax numberlist = range(cstart, cstop) futures.append(executor.map(_findmatch, numberlist, itertools.repeat(number), chunksize=10000)) # 2.3. Consolidate result as a list and return this list. for future in futures: for f in future: if f: try: found.append(f) except: print_exc() foundsize = len(found) end = time() - start print('within statement of def _concurrent(nmax, number):') print("found {0} in {1:.4f}sec".format(foundsize, end)) return found if __name__ == '__main__': nmax = int(1E8) # Number range maximum. number = str(5) # Number to be found in number range. workers = 6 # Pool of workers start = time() a = _concurrent_map(nmax, number, workers) end = time() - start print('\n main') print('workers = ', workers) print("found {0} in {1:.4f}sec".format(len(a),end)) 

Código de serie:

 #!/usr/bin/python3.5 # -*- coding: utf-8 -*- from time import time def _serial(nmax, number): start = time() match=[] nlist = range(nmax) for n in nlist: if number in str(n):match.append(n) end=time()-start print("found {0} in {1:.4f}sec".format(len(match),end)) return match if __name__ == '__main__': nmax = int(1E8) # Number range maximum. number = str(5) # Number to be found in number range. start = time() a = _serial(nmax, number) end = time() - start print('\n main') print("found {0} in {1:.4f}sec".format(len(a),end)) 

Actualización 13 de febrero de 2017:

Además de la respuesta de @niemmi, proporciono una respuesta después de algunas investigaciones personales para mostrar:

  1. cómo acelerar las soluciones .map () y .submit () de @ niemmi, y
  2. cuando ProcessPoolExecutor.map () puede llevar a más velocidad que ProcessPoolExecutor.submit ().

Visión general:

Hay 2 partes a mi respuesta:

  • La Parte 1 muestra cómo ganar más velocidad con la solución ProcessPoolExecutor.map() de @ niemmi.
  • La Parte 2 muestra cuándo las subclases .submit() y .map() ProcessPoolExecutor producen tiempos de cálculo no equivalentes.

================================================== =====================

Parte 1: Más velocidad para ProcessPoolExecutor.map ()

Antecedentes: esta sección se basa en la solución .map() @ niemmi, que por sí misma es excelente. Al hacer una investigación sobre su esquema de discretización para comprender mejor cómo interactúa con el argumento de .map (), encontré esta solución interesante.

Considero que la definición de @ niemmi de chunk = nmax // workers es una definición de chunksize, es decir, un tamaño más pequeño del rango de número real (tarea dada) que debe abordar cada trabajador en el grupo de trabajadores. Ahora, esta definición se basa en el supuesto de que si una computadora tiene x número de trabajadores, dividir la tarea de manera equitativa entre cada trabajador resultará en un uso óptimo de cada trabajador y, por lo tanto, la tarea total se completará más rápido. Por lo tanto, la cantidad de partes para dividir una tarea determinada siempre debe ser igual a la cantidad de trabajadores de la piscina. Sin embargo, ¿es correcto este supuesto?

Proposición: aquí, propongo que la suposición anterior no siempre conduce al tiempo de cálculo más rápido cuando se utiliza con ProcessPoolExecutor.map() . Más bien, la discretización de una tarea a una cantidad mayor que la cantidad de trabajadores de la agrupación puede acelerar, es decir, completar más rápidamente una tarea determinada .

Experimento: He modificado el código de @ niemmi para permitir que la cantidad de tareas discretizadas exceda la cantidad de trabajadores de la piscina. Este código se proporciona a continuación y se usa para encontrar el número de veces que aparece el número 5 en el rango de números de 0 a 1E8. He ejecutado este código utilizando 1, 2, 4 y 6 trabajadores de grupo y para varias proporciones de número de tareas discretizadas frente al número de trabajadores de grupo. Para cada escenario, se realizaron 3 ejecuciones y se tabularon los tiempos de cálculo. ” Aceleración ” se define aquí como el tiempo de cálculo promedio utilizando el mismo número de trozos y trabajadores de la agrupación sobre el tiempo de cómputo promedio de cuando el número de tareas discretizadas es mayor que el número de trabajadores de la agrupación.

Recomendaciones:

nchunk sobre nworkers

  1. La figura de la izquierda muestra el tiempo de cálculo tomado por todos los escenarios mencionados en la sección del experimento. Muestra que el tiempo de cálculo tomado por número de trozos / número de trabajadores = 1 es siempre mayor que el tiempo de cálculo tomado por número de trozos> número de trabajadores . Es decir, el primer caso es siempre menos eficiente que el segundo.

  2. La figura de la derecha muestra que se ganó una aceleración de 1.2 veces o más cuando el número de trozos / número de trabajadores alcanza un valor de umbral de 14 o más . Es interesante observar que la tendencia de aceleración también se produjo cuando ProcessPoolExecutor.map() se ejecutó con 1 trabajador.

Conclusión: cuando personalice la cantidad de tareas discretas que ProcessPoolExecutor.map () `debe usar para resolver una tarea determinada, es prudente asegurarse de que este número sea mayor que el número de trabajadores del grupo, ya que esta práctica acorta el tiempo de cálculo.

concurrent.futures.ProcessPoolExecutor.map () code. (solo piezas revisadas)

 def _concurrent_map(nmax, number, workers, num_of_chunks): '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to find the occurrences of a given number in a number range in a parallelised manner.''' # 1. Local variables start = time() chunksize = nmax // num_of_chunks futures = [] found =[] #2. Parallelization with cf.ProcessPoolExecutor(max_workers=workers) as executor: # 2.1. Discretise workload and submit to worker pool cstart = (chunksize * i for i in range(num_of_chunks)) cstop = (chunksize * i if i != num_of_chunks else nmax for i in range(1, num_of_chunks + 1)) futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number)) # 2.2. Consolidate result as a list and return this list. for future in futures: #print('type(future)=',type(future)) for f in future: if f: try: found.append(f) except: print_exc() foundsize = len(found) end = time() - start print('\n within statement of def _concurrent(nmax, number):') print("found {0} in {1:.4f}sec".format(foundsize, end)) return found if __name__ == '__main__': nmax = int(1E8) # Number range maximum. number = str(5) # Number to be found in number range. workers = 4 # Pool of workers chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance num_of_chunks = chunks_vs_workers * workers start = time() a = _concurrent_map(nmax, number, workers, num_of_chunks) end = time() - start print('\n main') print('nmax={}, workers={}, num_of_chunks={}'.format( nmax, workers, num_of_chunks)) print('workers = ', workers) print("found {0} in {1:.4f}sec".format(len(a),end)) 

================================================== =====================

Parte 2: el tiempo total de cómputo por el uso de las subclases ProcessPoolExecutor .submit () y .map () pueden ser diferentes al devolver una lista de resultados ordenados / ordenados.

Antecedentes: he modificado los .submit() y .map() para permitir una comparación “manzana a manzana” de su tiempo de cómputo y la capacidad de visualizar el tiempo de cómputo del código principal, el tiempo de cómputo del El método _concurrent llamado por el código principal para realizar las operaciones concurrentes, y el tiempo de cómputo para cada tarea / trabajador discretizado llamado por el método _concurrent. Además, el método concurrente en estos códigos se estructuró para devolver una lista ordenada y no ordenada del resultado directamente del objeto futuro de .submit() y el iterador de .map() . El código fuente se proporciona a continuación ( Espero que te ayude ).

Experimentos Estos dos códigos recientemente mejorados se utilizaron para realizar el mismo experimento descrito en la Parte 1, a excepción de que solo se consideraron 6 trabajadores de la agrupación y se usaron la list integrada de python y los métodos sorted para devolver una lista ordenada y no ordenada de los resultados al Sección principal del código, respectivamente.

Recomendaciones: .submit vs .map plus list vs sorted

  1. Del resultado del método _concurrent, podemos ver los tiempos de cálculo del método _concurrent usado para crear todos los objetos Future de ProcessPoolExecutor.submit() , y para crear el iterador de ProcessPoolExecutor.map() , como una función del número de Tarea sobre el número de trabajadores de la piscina, son equivalentes. Este resultado simplemente significa que las subclases ProcessPoolExecutor .submit() y .map() son igualmente eficientes / rápidas.
  2. Comparando los tiempos de cálculo de main y su método _stream actual, podemos ver que main se ejecuta más tiempo que su método _stream actual. Esto es de esperar ya que su diferencia de tiempo refleja la cantidad de tiempos de cálculo de la list y los métodos sorted (y la de los otros métodos incluidos en estos métodos). Claramente, el método de la list tardó menos tiempo en calcular para devolver una lista de resultados que el método sorted . Los tiempos de cálculo promedio del método de la list para los códigos .submit () y .map () fueron similares, a ~ 0.47sec. El tiempo de cálculo promedio del método ordenado para los códigos .submit () y .map () fue de 1.23 seg y 1.01 seg, respectivamente. En otras palabras, el método de la list se realizó 2.62 veces y 2.15 veces más rápido que el método sorted para los códigos .submit () y .map (), respectivamente.
  3. No está claro por qué el método sorted generó una lista ordenada desde .map() más rápido que desde .submit() , ya que el número de tareas discretizadas aumentó más que el número de trabajadores del grupo, excepto cuando el número de tareas discretizadas igualó el número de trabajadores de la piscina. Dicho esto, estos hallazgos muestran que la decisión de utilizar las .submit() o .map() igualmente rápidas puede verse afectada por el método ordenado. Por ejemplo, si la intención es generar una lista ordenada en el menor tiempo posible, el uso de ProcessPoolExecutor.map () debería preferirse a ProcessPoolExecutor.submit() ya que .map() puede permitir el tiempo de cálculo total más corto.
  4. El esquema de discretización mencionado en la Parte 1 de mi respuesta se muestra aquí para acelerar el rendimiento de las .submit() y .map() . La cantidad de aceleración puede ser de hasta un 20% en el caso cuando el número de tareas discretizadas iguala al número de trabajadores de la piscina.

Código .map () mejorado

 #!/usr/bin/python3.5 # -*- coding: utf-8 -*- import concurrent.futures as cf from time import time from itertools import repeat, chain def _findmatch(nmin, nmax, number): '''Function to find the occurence of number in range nmin to nmax and return the found occurences in a list.''' start = time() match=[] for n in range(nmin, nmax): if number in str(n): match.append(n) end = time() - start #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec". # format(nmin, nmax, number, len(match),end)) return match def _concurrent(nmax, number, workers, num_of_chunks): '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to find the occurrences of a given number in a number range in a concurrent manner.''' # 1. Local variables start = time() chunksize = nmax // num_of_chunks #2. Parallelization with cf.ProcessPoolExecutor(max_workers=workers) as executor: # 2.1. Discretise workload and submit to worker pool cstart = (chunksize * i for i in range(num_of_chunks)) cstop = (chunksize * i if i != num_of_chunks else nmax for i in range(1, num_of_chunks + 1)) futures = executor.map(_findmatch, cstart, cstop, repeat(number)) end = time() - start print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):') print("found in {0:.4f}sec".format(end)) return list(chain.from_iterable(futures)) #Return an unordered result list #return sorted(chain.from_iterable(futures)) #Return an ordered result list if __name__ == '__main__': nmax = int(1E8) # Number range maximum. number = str(5) # Number to be found in number range. workers = 6 # Pool of workers chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance num_of_chunks = chunks_vs_workers * workers start = time() found = _concurrent(nmax, number, workers, num_of_chunks) end = time() - start print('\n main') print('nmax={}, workers={}, num_of_chunks={}'.format( nmax, workers, num_of_chunks)) #print('found = ', found) print("found {0} in {1:.4f}sec".format(len(found),end)) 

Código .submit () mejorado.
Este código es el mismo que el código .map, excepto que reemplaza el método _concurrent con lo siguiente:

 def _concurrent(nmax, number, workers, num_of_chunks): '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to find the occurrences of a given number in a number range in a concurrent manner.''' # 1. Local variables start = time() chunksize = nmax // num_of_chunks futures = [] #2. Parallelization with cf.ProcessPoolExecutor(max_workers=workers) as executor: # 2.1. Discretise workload and submit to worker pool for i in range(num_of_chunks): cstart = chunksize * i cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax futures.append(executor.submit(_findmatch, cstart, cstop, number)) end = time() - start print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):') print("found in {0:.4f}sec".format(end)) return list(chain.from_iterable(f.result() for f in cf.as_completed( futures))) #Return an unordered list #return list(chain.from_iterable(f.result() for f in cf.as_completed( # futures))) #Return an ordered list 

================================================== =====================

Estás comparando manzanas con naranjas aquí. Al usar el map se producen todos los números 1E8 y se transfieren a los procesos de trabajo. Esto lleva mucho tiempo en comparación con la ejecución real. Cuando se usa submit , solo creas 6 conjuntos de parámetros que se transfieren.

Si cambias el map para operar con el mismo principio, obtendrás números cercanos entre sí:

 def _findmatch(nmin, nmax, number): '''Function to find the occurrence of number in range nmin to nmax and return the found occurrences in a list.''' print('\n def _findmatch', nmin, nmax, number) start = time() match=[] for n in range(nmin, nmax): if number in str(n): match.append(n) end = time() - start print("found {0} in {1:.4f}sec".format(len(match),end)) return match def _concurrent_map(nmax, number, workers): '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to find the occurrences of a given number in a number range in a parallelised manner.''' # 1. Local variables start = time() chunk = nmax // workers futures = [] found =[] #2. Parallelization with cf.ProcessPoolExecutor(max_workers=workers) as executor: # 2.1. Discretise workload and submit to worker pool cstart = (chunk * i for i in range(workers)) cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1)) futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number)) # 2.3. Consolidate result as a list and return this list. for future in futures: for f in future: try: found.append(f) except: print_exc() foundsize = len(found) end = time() - start print('within statement of def _concurrent(nmax, number):') print("found {0} in {1:.4f}sec".format(foundsize, end)) return found 

Podría mejorar el rendimiento del envío utilizando as_completed correctamente. Para determinados iterativos de futuros, devolverá un iterador que generará futuros en el orden en que se completen.

También puede omitir la copia de los datos a otra matriz y usar itertools.chain.from_iterable para combinar los resultados de futuros a iterativos individuales:

 import concurrent.futures as cf import itertools from time import time from traceback import print_exc from itertools import chain def _findmatch(nmin, nmax, number): '''Function to find the occurrence of number in range nmin to nmax and return the found occurrences in a list.''' print('\n def _findmatch', nmin, nmax, number) start = time() match=[] for n in range(nmin, nmax): if number in str(n): match.append(n) end = time() - start print("found {0} in {1:.4f}sec".format(len(match),end)) return match def _concurrent_map(nmax, number, workers): '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to find the occurrences of a given number in a number range in a parallelised manner.''' # 1. Local variables chunk = nmax // workers futures = [] found =[] #2. Parallelization with cf.ProcessPoolExecutor(max_workers=workers) as executor: # 2.1. Discretise workload and submit to worker pool for i in range(workers): cstart = chunk * i cstop = chunk * (i + 1) if i != workers - 1 else nmax futures.append(executor.submit(_findmatch, cstart, cstop, number)) return chain.from_iterable(f.result() for f in cf.as_completed(futures)) if __name__ == '__main__': nmax = int(1E8) # Number range maximum. number = str(5) # Number to be found in number range. workers = 6 # Pool of workers start = time() a = _concurrent_map(nmax, number, workers) end = time() - start print('\n main') print('workers = ', workers) print("found {0} in {1:.4f}sec".format(sum(1 for x in a),end))