Multiprocesamiento de escritura a pandas dataframe

Entonces, lo que estoy tratando de hacer con el siguiente código es leer una lista de listas y ponerlas en la función llamada checker y luego hacer que log_result ocupe del resultado del checker función. Estoy tratando de hacer esto utilizando subprocesos múltiples porque el nombre de la variable rows_to_parse en realidad tiene millones de filas, por lo que el uso de múltiples núcleos debería acelerar este proceso en una cantidad considerable.

El código en el momento presente no funciona y se bloquea Python.

Preocupaciones y problemas que tengo:

  1. Desea que el df existente que se mantuvo en la variable df mantenga el índice durante todo el proceso porque, de lo contrario, log_result se confundirá en cuanto a qué fila debe actualizarse.
  2. Estoy bastante seguro de que apply_async no es la función de multiprocesamiento adecuada para realizar esta tarea porque creo que la orden en que la computadora lee y escribe el df puede posiblemente corromperlo.
  3. Creo que es necesario configurar una cola para escribir y leer df pero no estoy seguro de cómo hacerlo.

Gracias por cualquier ayuda.

 import pandas as pd import multiprocessing from functools import partial def checker(a,b,c,d,e): match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)] index_of_match = match.index.tolist() if len(index_of_match) == 1: #one match in df return index_of_match elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__: return [index_of_match[0]] else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df return [a,b,c,d,e] def log_result(result, dataf): if len(result) == 1: # dataf.loc[result[0]]['e'] += 1 else: #append new row to exisiting df new_row = pd.DataFrame([result],columns=cols) dataf = dataf.append(new_row,ignore_index=True) def apply_async_with_callback(parsing_material, dfr): pool = multiprocessing.Pool() for var_a, var_b, var_c, var_d, var_e in parsing_material: pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr)) pool.close() pool.join() if __name__ == '__main__': #setting up main dataframe cols = ['a','b','c','d','e'] existing_data = [["YES","A","16052011","13031999",3], ["NO","Q","11022003","15081999",3], ["YES","A","22082010","03012001",9]] #main dataframe df = pd.DataFrame(existing_data,columns=cols) #new data rows_to_parse = [['NO', 'A', '09061997', '06122003', 5], ['YES', 'W', '17061992', '26032012', 6], ['YES', 'G', '01122006', '07082014', 2], ['YES', 'N', '06081992', '21052008', 9], ['YES', 'Y', '18051995', '24011996', 6], ['NO', 'Q', '11022003', '15081999', 3], ['NO', 'O', '20112004', '28062008', 0], ['YES', 'R', '10071994', '03091996', 8], ['NO', 'C', '09091998', '22051992', 1], ['YES', 'Q', '01051995', '02012000', 3], ['YES', 'Q', '26022015', '26092007', 5], ['NO', 'F', '15072002', '17062001', 8], ['YES', 'I', '24092006', '03112003', 2], ['YES', 'A', '22082010', '03012001', 9], ['YES', 'I', '15072016', '30092005', 7], ['YES', 'Y', '08111999', '02022006', 3], ['NO', 'V', '04012016', '10061996', 1], ['NO', 'I', '21012003', '11022001', 6], ['NO', 'P', '06041992', '30111993', 6], ['NO', 'W', '30081992', '02012016', 6]] apply_async_with_callback(rows_to_parse, df) 

Actualizar DataFrames como este en MultiProcessing no va a funcionar:

 dataf = dataf.append(new_row,ignore_index=True) 

Por un lado, esto es muy ineficiente (O (n) para cada anexo, por lo que O (n ^ 2) en total. La forma preferida es concentrar algunos objetos en una sola pasada.

Por otro lado, y más importante, dataf no se bloquea para cada actualización, por lo que no hay garantía de que dos operaciones no entren en conflicto (supongo que esto se está estrellando en Python).

Finalmente, el apéndice no actúa en su lugar, por lo que la variable dataf se descarta una vez que finaliza la callback. y no se realizan cambios en el dataf principal.


Podríamos usar MultiProcessing list o un dict . haga una lista si no le importa el orden o dicta si lo hace (por ejemplo, enumerar), ya que debe tener en cuenta que los valores no se devuelven en un orden bien definido desde async.
(O podríamos crear un objeto que implemente Bloquearnos, consulte Eli Bendersky ).
Así se realizan los siguientes cambios:

 df = pd.DataFrame(existing_data,columns=cols) # becomes df = pd.DataFrame(existing_data,columns=cols) d = MultiProcessing.list([df]) dataf = dataf.append(new_row,ignore_index=True) # becomes d.append(new_row) 

Ahora, una vez que ha finalizado el async, tiene un MultiProcessing.list de DataFrames. Puede concat estos (e ignore_index ) para obtener el resultado deseado:

 pd.concat(d, ignore_index=True) 

Debería hacer el truco.


Nota: crear el nuevo dataframe en cada etapa también es menos eficiente que permitir que los pandas analicen la lista de listas directamente a un dataframe de una sola vez. Esperemos que este sea un ejemplo de juguete, realmente quieres que tus trozos sean bastante grandes para obtener ganancias con MultiProcessing (he escuchado 50kb como regla de oro …), una fila a la vez nunca será una gana aqui


Aparte: debes evitar el uso de elementos globales (como df) en tu código, es mucho más fácil pasarlos por alto en tus funciones (en este caso, como un argumento para el corrector).