python dask DataFrame, ¿es compatible con la fila (trivialmente paralelizable)?

Recientemente encontré el módulo dask que pretende ser un módulo de parallel processing de Python fácil de usar. El gran punto de venta para mí es que funciona con pandas.

Después de leer un poco en su página de manual, no puedo encontrar una manera de hacer esta tarea trivialmente paralelizable:

ts.apply(func) # for pandas series df.apply(func, axis = 1) # for pandas DF row apply 

Por el momento, para lograr esto en dask, AFAIK,

 ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame 

que es una syntax fea y en realidad es más lenta que absoluta

 df.apply(func, axis = 1) # for pandas DF row apply 

¿Cualquier sugerencia?

Edición: Gracias @MRocklin por la función de mapa. Parece ser más lento de lo que se aplican los pandas simples. ¿Está esto relacionado con el tema de la liberación de pandas GIL o lo estoy haciendo mal?

 import dask.dataframe as dd s = pd.Series([10000]*120) ds = dd.from_pandas(s, npartitions = 3) def slow_func(k): A = np.random.normal(size = k) # k = 10000 s = 0 for a in A: if a > 0: s += 1 else: s -= 1 return s s.apply(slow_func) # 0.43 sec ds.map(slow_func).compute() # 2.04 sec 

map_partitions

Puede aplicar su función a todas las particiones de su dataframe con la función map_partitions .

 df.map_partitions(func, columns=...) 

Tenga en cuenta que a la función se le dará solo una parte del conjunto de datos a la vez, no se pandas apply todo el conjunto de datos como con pandas apply (lo que presumiblemente no querría si quisiera hacer paralelismo).

map / apply

Puede asignar una función por filas en una serie con map

 df.mycolumn.map(func) 

Puede asignar una función por filas en un dataframe con apply

 df.apply(func, axis=1) 

Hilos vs Procesos

A partir de la versión 0.6.0, dask.dataframes paraleliza con subprocesos. Las funciones Python personalizadas no recibirán mucho beneficio del paralelismo basado en subprocesos. Podrías probar procesos en su lugar.

 df = dd.read_csv(...) df.map_partitions(func, columns=...).compute(scheduler='processes') 

Pero evita apply

Sin embargo, realmente debes evitar apply con funciones Python personalizadas, tanto en Pandas como en Dask. Esto es a menudo una fuente de mal desempeño. Podría ser que si encuentra una manera de realizar su operación de forma vectorializada, podría ser que su código de Pandas sea 100 veces más rápido y no necesite dask.dataframe en absoluto.

Considerar numba

Para su problema particular puede considerar numba . Esto mejora significativamente su rendimiento.

 In [1]: import numpy as np In [2]: import pandas as pd In [3]: s = pd.Series([10000]*120) In [4]: %paste def slow_func(k): A = np.random.normal(size = k) # k = 10000 s = 0 for a in A: if a > 0: s += 1 else: s -= 1 return s ## -- End pasted text -- In [5]: %time _ = s.apply(slow_func) CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms Wall time: 347 ms In [6]: import numba In [7]: fast_func = numba.jit(slow_func) In [8]: %time _ = s.apply(fast_func) # First time incurs comstacktion overhead CPU times: user 179 ms, sys: 0 ns, total: 179 ms Wall time: 175 ms In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms Wall time: 68.7 ms 

Descargo de responsabilidad, trabajo para la compañía que produce tanto numba como dask y emplea a muchos de los desarrolladores de pandas .

A partir de v dask.dataframe .apply delega la responsabilidad de map_partitions :

 @insert_meta_param_description(pad=12) def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds): """ Parallel version of pandas.Series.apply ... """ if meta is no_default: msg = ("`meta` is not specified, inferred from partial data. " "Please provide `meta` if the result is unexpected.\n" " Before: .apply(func)\n" " After: .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n" " or: .apply(func, meta=('x', 'f8')) for series result") warnings.warn(msg) meta = _emulate(M.apply, self._meta_nonempty, func, convert_dtype=convert_dtype, args=args, **kwds) return map_partitions(M.apply, self, func, convert_dtype, args, meta=meta, **kwds)