Indicador de progreso durante las operaciones de pandas (python)

Regularmente realizo operaciones de pandas en marcos de datos de más de 15 millones de filas y me encantaría tener acceso a un indicador de progreso para operaciones particulares.

¿Existe un indicador de progreso basado en texto para las operaciones de combinar-aplicar-combinar pandas?

Por ejemplo, en algo como:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup) 

donde feature_rollup es una función un tanto complicada que toma muchas columnas del DF y crea nuevas columnas de usuarios a través de varios métodos. Estas operaciones pueden demorar un tiempo para los marcos de datos grandes, por lo que me gustaría saber si es posible tener una salida basada en texto en un cuaderno iPython que me actualice sobre el progreso.

Hasta ahora, he probado los indicadores de progreso de bucle canónico para Python, pero no interactúan con los pandas de ninguna manera significativa.

Espero que haya algo que haya pasado por alto en la biblioteca / documentación de pandas que nos permita conocer el progreso de una combinación de aplicación dividida. Una implementación simple tal vez analice el número total de subconjuntos de marcos de datos en los que está funcionando la función de apply e informe el progreso como la fracción completa de esos subconjuntos.

¿Es esto quizás algo que necesita ser agregado a la biblioteca?

Debido a la demanda popular, tqdm ha agregado soporte para pandas . A diferencia de las otras respuestas, esto no ralentizará notablemente a los pandas : aquí hay un ejemplo para DataFrameGroupBy.progress_apply :

 import pandas as pd import numpy as np from tqdm import tqdm # from tqdm.auto import tqdm # for notebooks df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000))) # Create and register a new `tqdm` instance with `pandas` # (can use tqdm_gui, optional kwargs, etc.) tqdm.pandas() # Now you can use `progress_apply` instead of `apply` df.groupby(0).progress_apply(lambda x: x**2) 

En caso de que esté interesado en cómo funciona esto (y cómo modificarlo para sus propias devoluciones de llamada), consulte los ejemplos en github , la documentación completa en pypi , o importe el módulo y ejecute la help(tqdm) .

EDITAR


Para responder directamente a la pregunta original, reemplace:

 df_users.groupby(['userID', 'requestDate']).apply(feature_rollup) 

con:

 from tqdm import tqdm tqdm.pandas() df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup) 

Nota: tqdm <= v4.8 : para versiones de tqdm por debajo de 4.8, en lugar de tqdm.pandas() tenía que hacer:

 from tqdm import tqdm, tqdm_pandas tqdm_pandas(tqdm()) 

Para modificar la respuesta de Jeff (y tener esto como una función reutilizable).

 def logged_apply(g, func, *args, **kwargs): step_percentage = 100. / len(g) import sys sys.stdout.write('apply progress: 0%') sys.stdout.flush() def logging_decorator(func): def wrapper(*args, **kwargs): progress = wrapper.count * step_percentage sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%') sys.stdout.flush() wrapper.count += 1 return func(*args, **kwargs) wrapper.count = 0 return wrapper logged_func = logging_decorator(func) res = g.apply(logged_func, *args, **kwargs) sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n') sys.stdout.flush() return res 

Nota: el porcentaje de progreso aplicado se actualiza en línea . Si su función funciona, entonces esto no funcionará.

 In [11]: g = df_users.groupby(['userID', 'requestDate']) In [12]: f = feature_rollup In [13]: logged_apply(g, f) apply progress: 100% Out[13]: ... 

Como de costumbre, puede agregar esto a sus objetos groupby como un método:

 from pandas.core.groupby import DataFrameGroupBy DataFrameGroupBy.logged_apply = logged_apply In [21]: g.logged_apply(f) apply progress: 100% Out[21]: ... 

Como se mencionó en los comentarios, esta no es una característica que los pandas centrales estén interesados ​​en implementar. Pero Python te permite crearlos para muchos objetos / métodos pandas (hacerlo sería un poco de trabajo … aunque deberías poder generalizar este enfoque).

En caso de que necesite ayuda sobre cómo usar esto en un cuaderno Jupyter / ipython, como hice, aquí hay una guía útil y una fuente del artículo relevante :

 from tqdm._tqdm_notebook import tqdm_notebook import pandas as pd tqdm_notebook.pandas() df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000))) df.groupby(0).progress_apply(lambda x: x**2) 

Note el guión bajo en la statement de importación para _tqdm_notebook . Como se menciona en el artículo de referencia, el desarrollo se encuentra en fase beta tardía.

Puedes hacerlo fácilmente con un decorador.

 from functools import wraps def logging_decorator(func): @wraps def wrapper(*args, **kwargs): wrapper.count += 1 print "The function I modify has been called {0} times(s).".format( wrapper.count) func(*args, **kwargs) wrapper.count = 0 return wrapper modified_function = logging_decorator(feature_rollup) 

luego solo usa la función de modificación (y cambia cuando quieras que se imprima)

Para cualquier persona que quiera aplicar tqdm en su código de aplicación de pandas paralelo personalizado.

(Probé algunas de las bibliotecas para la paralelización a lo largo de los años, pero nunca encontré una solución de paralelización al 100%, principalmente para la función de aplicación, y siempre tuve que volver para mi código “manual”).

df_multi_core – este es el que llamas. Acepta:

  1. Tu df objeto
  2. El nombre de la función al que te gustaría llamar
  3. El subconjunto de columnas en las que se puede realizar la función (ayuda a reducir el tiempo / memoria)
  4. La cantidad de trabajos que se ejecutarán en paralelo (-1 u omitir para todos los núcleos)
  5. Cualquier otro kwarg que acepte la función de df (como “eje”)

_df_split : esta es una función auxiliar interna que se debe colocar globalmente en el módulo en ejecución (Pool.map es “dependiente de la ubicación”), de lo contrario, lo ubicaría internamente.

Aquí está el código de mi esencia ( agregaré más pruebas de función de pandas allí):

 import pandas as pd import numpy as np import multiprocessing from functools import partial def _df_split(tup_arg, **kwargs): split_ind, df_split, df_f_name = tup_arg return (split_ind, getattr(df_split, df_f_name)(**kwargs)) def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs): if njobs == -1: njobs = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=njobs) try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs) pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)] results = pool.map(partial(_df_split, **kwargs), pool_data) pool.close() pool.join() results = sorted(results, key=lambda x:x[0]) results = pd.concat([split[1] for split in results]) return results 

A continuación se muestra un código de prueba para una aplicación en paralelo con tqtm “progress_apply”.

 from time import time from tqdm import tqdm tqdm.pandas() if __name__ == '__main__': sep = '-' * 50 # tqdm progress_apply test def apply_f(row): return row['c1'] + 0.1 N = 1000000 np.random.seed(0) df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)}) print('testing pandas apply on {}\n{}'.format(df.shape, sep)) t1 = time() res = df.progress_apply(apply_f, axis=1) t2 = time() print('result random sample\n{}'.format(res.sample(n=3, random_state=0))) print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep)) t3 = time() # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1) res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1) t4 = time() print('result random sample\n{}'.format(res.sample(n=3, random_state=0))) print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep)) 

En la salida puede ver 1 barra de progreso para ejecutar sin paralelización, y barras de progreso por núcleo cuando se ejecuta con paralelización. Hay un ligero problema y, a veces, el rest de los núcleos aparecen a la vez, pero incluso así creo que es útil ya que se obtienen las estadísticas de progreso por núcleo (it / sec y registros totales, por ejemplo)

introduzca la descripción de la imagen aquí

Gracias @abcdaa por esta gran biblioteca!

He cambiado la respuesta de Jeff , para incluir un total, de modo que pueda hacer un seguimiento del progreso y una variable para simplemente imprimir cada X iteraciones (esto en realidad mejora el rendimiento en gran medida, si el “print_at” es razonablemente alto)

 def count_wrapper(func,total, print_at): def wrapper(*args): wrapper.count += 1 if wrapper.count % wrapper.print_at == 0: clear_output() sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) ) sys.stdout.flush() return func(*args) wrapper.count = 0 wrapper.total = total wrapper.print_at = print_at return wrapper 

la función clear_output () es de

 from IPython.core.display import clear_output 

Si no está en IPython, la respuesta de Andy Hayden lo hace sin él.