¿Cómo paralelizar la iteración en un rango, usando StdLib y Python 3?

He estado buscando una respuesta a esto ahora por días sin éxito. Probablemente no entiendo las piezas que flotan por ahí y la documentación de Python en el módulo de multiprocessing es bastante grande y no está clara para mí.

Digamos que tienes lo siguiente para bucle:

 import timeit numbers = [] start = timeit.default_timer() for num in range(100000000): numbers.append(num) end = timeit.default_timer() print('TIME: {} seconds'.format(end - start)) print('SUM:', sum(numbers)) 

Salida:

 TIME: 23.965870224497916 seconds SUM: 4999999950000000 

Para este ejemplo, digamos que tienes un procesador de 4 núcleos. ¿Hay alguna forma de crear 4 procesos en total, donde cada proceso se ejecuta en un núcleo de CPU separado y finaliza aproximadamente 4 veces más rápido, por lo que 24s / 4 procesos = ~ 6 segundos?

De alguna manera, divida el bucle for en 4 partes iguales y luego agregue las 4 partes en la lista de números para igualar la misma sum. Había un hilo de stackoverflow: Parallel Simple For Loop pero no lo entiendo. Gracias a todos.

Sí, eso es factible. Su cálculo no depende de resultados intermedios, por lo que puede dividir fácilmente la tarea en partes y distribuirla en múltiples procesos. Es lo que se llama un

Un problema vergonzosamente paralelo .

La única parte difícil aquí podría ser, en primer lugar, dividir el rango en partes bastante iguales. Directamente mis dos funciones personales para lidiar con esto:

 # mp_utils.py from itertools import accumulate def calc_batch_sizes(n_tasks: int, n_workers: int) -> list: """Divide `n_tasks` optimally between n_workers to get batch_sizes. Guarantees batch sizes won't differ for more than 1. Example: # >>>calc_batch_sizes(23, 4) # Out: [6, 6, 6, 5] In case you're going to use numpy anyway, use np.array_split: [len(a) for a in np.array_split(np.arange(23), 4)] # Out: [6, 6, 6, 5] """ x = int(n_tasks / n_workers) y = n_tasks % n_workers batch_sizes = [x + (y > 0)] * y + [x] * (n_workers - y) return batch_sizes def build_batch_ranges(batch_sizes: list) -> list: """Build batch_ranges from list of batch_sizes. Example: # batch_sizes [6, 6, 6, 5] # >>>build_batch_ranges(batch_sizes) # Out: [range(0, 6), range(6, 12), range(12, 18), range(18, 23)] """ upper_bounds = [*accumulate(batch_sizes)] lower_bounds = [0] + upper_bounds[:-1] batch_ranges = [range(l, u) for l, u in zip(lower_bounds, upper_bounds)] return batch_ranges 

Entonces tu guión principal se vería así:

 import time from multiprocessing import Pool from mp_utils import calc_batch_sizes, build_batch_ranges def target_foo(batch_range): return sum(batch_range) # ~ 6x faster than target_foo1 def target_foo1(batch_range): numbers = [] for num in batch_range: numbers.append(num) return sum(numbers) if __name__ == '__main__': N = 100000000 N_CORES = 4 batch_sizes = calc_batch_sizes(N, n_workers=N_CORES) batch_ranges = build_batch_ranges(batch_sizes) start = time.perf_counter() with Pool(N_CORES) as pool: result = pool.map(target_foo, batch_ranges) r_sum = sum(result) print(r_sum) print(f'elapsed: {time.perf_counter() - start:.2f} s') 

Tenga en cuenta que también cambié su for-loop por una sum simple sobre el objeto de rango, ya que ofrece un rendimiento mucho mejor. Si no puedes hacer esto en tu aplicación real, una comprensión de la lista aún sería ~ 60% más rápida que llenar tu lista manualmente como en tu ejemplo.

Ejemplo de salida:

 4999999950000000 elapsed: 0.51 s Process finished with exit code 0 
 import timeit from multiprocessing import Pool def appendNumber(x): return x start = timeit.default_timer() with Pool(4) as p: numbers = p.map(appendNumber, range(100000000)) end = timeit.default_timer() print('TIME: {} seconds'.format(end - start)) print('SUM:', sum(numbers)) 

Entonces Pool.map es como la función de map incorporada. Toma una función y una iterable y produce una lista del resultado de llamar a esa función en cada elemento de la iterable. Aquí, ya que en realidad no queremos cambiar los elementos del rango iterable, simplemente devolvemos el argumento.

Lo crucial es que Pool.map divide el iterable proporcionado ( range(1000000000) aquí) en trozos y los envía al número de procesos que tiene (definido aquí como 4 en el Pool(4) ) y luego vuelve a unir los resultados en uno. lista.

La salida que obtengo al ejecutar esto es

 TIME: 8.748245699999984 seconds SUM: 4999999950000000 

Hice una comparación, el tiempo necesario para dividir las tareas a veces puede llevar más tiempo:

Archivo multiprocessing_summation.py :

 def summation(lst): sum = 0 for x in range(lst[0], lst[1]): sum += x return sum 

Archivo multiprocessing_summation_master.py :

 %%file ./examples/multiprocessing_summation_master.py import multiprocessing as mp import timeit import os import sys import multiprocessing_summation as mps if __name__ == "__main__": if len(sys.argv) == 1: print(f'{sys.argv[0]} ') sys.exit(1) else: args = [int(x) for x in sys.argv[1:]] nBegin = 1 nCore = os.cpu_count() for nEnd in args: ### Approach 1 #### #################### start = timeit.default_timer() answer1 = mps.summation((nBegin, nEnd+1)) end = timeit.default_timer() print(f'Answer1 = {answer1}') print(f'Time taken = {end - start}') ### Approach 2 #### #################### start = timeit.default_timer() lst = [] for x in range(nBegin, nEnd, int((nEnd-nBegin+1)/nCore)): lst.append(x) lst.append(nEnd+1) lst2 = [] for x in range(1, len(lst)): lst2.append((lst[x-1], lst[x])) with mp.Pool(processes=nCore) as pool: answer2 = pool.map(mps.summation, lst2) end = timeit.default_timer() print(f'Answer2 = {sum(answer2)}') print(f'Time taken = {end - start}') 

Ejecuta el segundo script:

python multiprocessing_summation_master.py 1000 100000 10000000 1000000000

Las salidas son:

 Answer1 = 500500 Time taken = 4.558405389566795e-05 Answer2 = 500500 Time taken = 0.15728066685459452 Answer1 = 5000050000 Time taken = 0.005781152051264199 Answer2 = 5000050000 Time taken = 0.14532123447452705 Answer1 = 50000005000000 Time taken = 0.4903863230334036 Answer2 = 50000005000000 Time taken = 0.49744346392131533 Answer1 = 500000000500000000 Time taken = 50.825169837068 Answer2 = 500000000500000000 Time taken = 26.603663061636567