error de memoria distribuida dask

Obtuve el siguiente error en el progtwigdor al ejecutar Dask en un trabajo distribuido:

distributed.core - ERROR - Traceback (most recent call last): File "/usr/local/lib/python3.4/dist-packages/distributed/core.py", line 269, in write frames = protocol.dumps(msg) File "/usr/local/lib/python3.4/dist-packages/distributed/protocol.py", line 81, in dumps frames = dumps_msgpack(small) File "/usr/local/lib/python3.4/dist-packages/distributed/protocol.py", line 153, in dumps_msgpack payload = msgpack.dumps(msg, use_bin_type=True) File "/usr/local/lib/python3.4/dist-packages/msgpack/__init__.py", line 47, in packb return Packer(**kwargs).pack(o) File "msgpack/_packer.pyx", line 231, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:231) File "msgpack/_packer.pyx", line 239, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:239) MemoryError 

¿Se está quedando sin memoria en el progtwigdor o en uno de los trabajadores? ¿¿O ambos??

La causa más común de este error es tratar de recostackr demasiados datos, como ocurre en el siguiente ejemplo utilizando dask.dataframe:

 df = dd.read_csv('s3://bucket/lots-of-data-*.csv') df.compute() 

Esto carga todos los datos en la RAM a través del clúster (lo cual está bien), y luego intenta devolver todo el resultado a la máquina local a través del progtwigdor (que probablemente no puede manejar sus 100 GB de datos). un solo lugar.) Las comunicaciones del trabajador al cliente pasan a través del Progtwigdor, por lo que es la primera máquina única en recibir todos los datos y la primera máquina que probablemente falle.

Si este es el caso, entonces es probable que desee utilizar el método Executor.persist para activar el cálculo pero dejarlo en el clúster.

 df = dd.read_csv('s3://bucket/lots-of-data-*.csv') df = e.persist(df) 

En general, solo usamos df.compute() para resultados pequeños que queremos ver en nuestra sesión local.