¿Puedo usar funciones importadas de archivos .py en Dask / Distributed?

Tengo una pregunta sobre serialización e importaciones.

  • ¿Deberían las funciones tener sus propias importaciones? Como he visto hecho con PySpark
  • ¿Está simplemente mal el siguiente? ¿Necesita mod.py ser un paquete conda / pip? mod.py fue escrito en un sistema de archivos compartido.

 In [1]: from distributed import Executor In [2]: e = Executor('127.0.0.1:8786') In [3]: e Out[3]:  In [4]: import socket In [5]: e.run(socket.gethostname) Out[5]: {'172.20.12.7:53405': 'n1015', '172.20.12.8:53779': 'n1016'} In [6]: %%file mod.py ...: def hostname(): ...: return 'the hostname' ...: Overwriting mod.py In [7]: import mod In [8]: mod.hostname() Out[8]: 'the hostname' In [9]: e.run(mod.hostname) distributed.utils - ERROR - No module named 'mod' 

Respuesta rápida

Sube tu archivo mod.py a todos tus trabajadores. Puede hacer esto usando el mecanismo que utilizó para configurar dask.distributed, o puede usar el método upload_file

 e.upload_file('mod.py') 

Alternativamente, si su función se realiza en IPython, en lugar de ser parte de un módulo, se enviará sin problemas.

Respuesta larga

Todo esto tiene que ver con cómo se serializan las funciones en Python. Las funciones de los módulos se serializan por su nombre de módulo y nombre de función

 In [1]: from math import sin In [2]: import pickle In [3]: pickle.dumps(sin) Out[3]: b'\x80\x03cmath\nsin\nq\x00.' 

Por lo tanto, si la máquina cliente desea referirse a la función math.sin que envía a lo largo de esta serie de bytring (que notará que tiene 'math' y 'sin' en su enterrado entre otros bytes) a la máquina trabajadora. El trabajador mira esta serie de bytestring y dice “OK genial, la función que quiero está en tal y tal módulo, déjame ir y encuentro eso en mi sistema de archivos local. Si el módulo no está presente, generará un error. , muy parecido a lo que recibiste anteriormente.

Para las funciones creadas dinámicamente (funciones que usted realiza en IPython) utiliza un enfoque completamente diferente, agrupando todo el código. Este enfoque generalmente funciona bien.

En general, Dask supone que todos los trabajadores y el cliente tienen el mismo entorno de software. Por lo general, esto es manejado principalmente por quienquiera que configure su clúster, utilizando alguna otra herramienta como Docker. Métodos como upload_file están disponibles para llenar los huecos cuando tiene archivos o scripts que se actualizan con más frecuencia.

Para ejecutar una función importada en su clúster que no está disponible en el entorno de los trabajadores, también puede crear una función local desde la función importada. Esta función local será decapada por cloudpickle . En Python 2 puedes lograr esto con new.function (ver el nuevo módulo ). Para Python 3 esto podría lograrse con el módulo de tipos , pero no lo he probado.

Tu ejemplo anterior se vería así:

 In [3]: import mod In [4]: import new In [5]: def remote(func): ...: return new.function(func.func_code, func.func_globals, closure=func.func_closure) ...: In [6]: e.run(remote(mod.hostname)) Out[6]: {'tcp://10.0.2.15:44208': 'the hostname'} 

agregando el directorio del módulo a PYTHONPATH funcionó para mí