Función que multiprocesa otra función.

Estoy realizando análisis de series de tiempo de simulaciones. Básicamente, está haciendo las mismas tareas para cada paso de tiempo. Como hay un gran número de pasos de tiempo, y como el análisis de cada uno de ellos es independiente, quise crear una función que pueda multiprocesar otra función. Este último tendrá argumentos, y devolverá un resultado.

Usando un diccionario compartido y el lib concurrent.futures, logré escribir esto:

import concurrent.futures as Cfut def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args): # function : function that is running in parallel # param_list : list of items # group_size : size of the groups # Nworkers : number of group/items running in the same time # **param_fixed : passing parameters manager = mlp.Manager() dic = manager.dict() executor = Cfut.ProcessPoolExecutor(Nworkers) futures = [executor.submit(function, param, dic, *args) for param in grouper(param_list, group_size)] Cfut.wait(futures) return [dic[i] for i in sorted(dic.keys())] 

Típicamente, puedo usarlo así:

 def read_file(files, dictionnary): for file in files: i = int(file[4:9]) #print(str(i)) if 'bz2' in file: os.system('bunzip2 ' + file) file = file[:-4] dictionnary[i] = np.loadtxt(file) os.system('bzip2 ' + file) Map = np.array(multiprocess_loop_grouped(read_file, list_alti, Group_size, N_thread)) 

o así :

 def autocorr(x): result = np.correlate(x, x, mode='full') return result[result.size//2:] def find_lambda_finger(indexes, dic, Deviation): for i in indexes : #print(str(i)) # Beach = Deviation[i,:] - np.mean(Deviation[i,:]) dic[i] = Anls.find_first_max(autocorr(Deviation[i,:]), valmax = True) args = [Deviation] Temp = Rescal.multiprocess_loop_grouped(find_lambda_finger, range(Nalti), Group_size, N_thread, *args) 

Básicamente, está funcionando. Pero no está funcionando bien. A veces se estrella. A veces, en realidad, se inician varios procesos de Python iguales a Nworkers, y otras veces solo se ejecutan 2 o 3 de ellos a la vez mientras especificaba Nworkers = 15 .

Por ejemplo, un error clásico que obtengo se describe en el siguiente tema que planteé: Llamar a matplotlib DESPUÉS del multiprocesamiento a veces resulta en un error: el hilo principal no está en el bucle principal

¿Cuál es la forma más pythonica de lograr lo que quiero? ¿Cómo puedo mejorar el control de esta función? ¿Cómo puedo controlar más el número de procesos de Python en ejecución?

Uno de los conceptos básicos para el multiprocesamiento de Python es usar colas. Funciona bastante bien cuando tiene una lista de entrada que se puede iterar y que no necesita ser modificada por los subprocesos. También le da un buen control sobre todos los procesos, ya que genera el número que desea, puede ejecutarlos inactivos o detenerlos.

También es mucho más fácil de depurar. Compartir datos explícitamente es generalmente un enfoque que es mucho más difícil de configurar correctamente.

Las colas pueden contener cualquier cosa, ya que son iterables por definición. Así que puede rellenarlos con cadenas de ruta de archivo para leer archivos, números no iterables para hacer cálculos o incluso imágenes para dibujar.

En su caso, un diseño podría verse así:

 import multiprocessing as mp import numpy as np import itertools as it def worker1(in_queue, out_queue): #holds when nothing is available, stops when 'STOP' is seen for a in iter(in_queue.get, 'STOP'): #do something out_queue.put({a: result}) #return your result linked to the input def worker2(in_queue, out_queue): for a in iter(in_queue.get, 'STOP'): #do something differently out_queue.put({a: result}) //return your result linked to the input def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args): # your final result result = {} in_queue = mp.Queue() out_queue = mp.Queue() # fill your input for a in param_list: in_queue.put(a) # stop command at end of input for n in range(Nworkers): in_queue.put('STOP') # setup your worker process doing task as specified process = [mp.Process(target=function, args=(in_queue, out_queue), daemon=True) for x in range(Nworkers)] # run processes for p in process: p.start() # wait for processes to finish for p in process: p.join() # collect your results from the calculations for a in param_list: result.update(out_queue.get()) return result temp = multiprocess_loop_grouped(worker1, param_list, group_size, Nworkers, *args) map = multiprocess_loop_grouped(worker2, param_list, group_size, Nworkers, *args) 

Se puede hacer un poco más dynamic si teme que las colas se queden sin memoria. De lo que necesita rellenar y vaciar las colas mientras se ejecutan los procesos. Vea este ejemplo aquí .

Palabras finales: no es más Pythonic como pediste. Pero es más fácil de entender para un novato 😉