Parece que no puedo hacer que –py-files en Spark funcionen

Tengo un problema con el uso de Python en Spark. Mi aplicación tiene algunas dependencias, como numpy, pandas, astropy, etc. No puedo usar virtualenv para crear un entorno con todas las dependencias, ya que los nodos del clúster no tienen ningún punto de assembly o sistema de archivos común, además de HDFS. Por lo tanto, estoy atascado con el uso de la spark-submit --py-files . Empaqueté el contenido de los paquetes de sitio en un archivo ZIP y envié el trabajo como con la --py-files=dependencies.zip (como se sugiere en ¿La forma más fácil de instalar dependencias de Python en los nodos ejecutores de Spark? ). Sin embargo, los nodos en el clúster aún no parecen ver los módulos en el interior y lanzan ImportError como este al importar números.

 File "/path/anonymized/module.py", line 6, in  import numpy File "/tmp/pip-build-4fjFLQ/numpy/numpy/__init__.py", line 180, in  File "/tmp/pip-build-4fjFLQ/numpy/numpy/add_newdocs.py", line 13, in  File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/__init__.py", line 8, in  # File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/type_check.py", line 11, in  File "/tmp/pip-build-4fjFLQ/numpy/numpy/core/__init__.py", line 14, in  ImportError: cannot import name multiarray 

Cuando cambio a virtualenv y uso el shell pyspark local, todo funciona bien, así que todas las dependencias están ahí. ¿Alguien sabe, qué podría causar este problema y cómo solucionarlo?

¡Gracias!

En primer lugar, asumiré que sus dependencias se enumeran en requirements.txt . Para empaquetar y comprimir las dependencias, ejecute lo siguiente en la línea de comando:

 pip install -t dependencies -r requirements.txt cd dependencies zip -r ../dependencies.zip . 

Arriba, el comando de cd dependencies es crucial para garantizar que los módulos se encuentren en el nivel superior del archivo zip. Gracias a la publicación de Dan Corin por heads up.

A continuación, envíe el trabajo a través de:

 spark-submit --py-files dependencies.zip spark_job.py 

La directiva --py-files envía el archivo zip a los trabajadores de Spark pero no lo agrega a PYTHONPATH (fuente de confusión para mí). Para agregar las dependencias a PYTHONPATH para arreglar ImportError , agregue la siguiente línea al trabajo de Spark, spark_job.py :

 sc.addPyFile("dependencies.zip") 

Una advertencia de este post de Cloudera :

La suposición de que cualquier persona que realice computación distribuida con hardware básico debe asumir que el hardware subyacente es potencialmente heterogéneo. Un huevo de Python construido en una máquina cliente será específico para la architecture de la CPU del cliente debido a la comstackción de C requerida. La distribución de un huevo para un paquete complejo y comstackdo como NumPy, SciPy o pandas es una solución frágil que es probable que falle en la mayoría de los clusters, al menos eventualmente.

Aunque la solución anterior no crea un huevo, se aplica la misma pauta.

  • Primero necesita pasar sus archivos a través de –py-files o –files

    • Cuando pase sus archivos zip / archivos con los indicadores anteriores, básicamente sus recursos se transferirán al directorio temporal creado en HDFS solo durante la vida útil de esa aplicación.
  • Ahora en su código, agregue esos archivos / zip usando el siguiente comando

    sc.addPyFile("your zip/file")

    • Lo que se hace anteriormente es que carga los archivos en el entorno de ejecución, como JVM.
  • Ahora importe su archivo zip en su código con un alias como el siguiente para comenzar a referenciarlo

    import zip/file as your-alias

    Nota: no es necesario utilizar la extensión de archivo al importar, como .py al final

Espero que esto sea útil.

Puede localizar todos los .pys que necesita y agregarlos relativamente. Vea aquí para esta explicación:

 import os, sys, inspect # realpath() will make your script run, even if you symlink it :) cmd_folder = os.path.realpath(os.path.abspath(os.path.split(inspect.getfile( inspect.currentframe() ))[0])) if cmd_folder not in sys.path: sys.path.insert(0, cmd_folder) # use this if you want to include modules from a subfolder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"subfolder"))) if cmd_subfolder not in sys.path: sys.path.insert(0, cmd_subfolder) # Info: # cmd_folder = os.path.dirname(os.path.abspath(__file__)) # DO NOT USE __file__ !!! # __file__ fails if script is called in different ways on Windows # __file__ fails if someone does os.chdir() before # sys.argv[0] also fails because it doesn't not always contains the path 

Spark también fallará silenciosamente al cargar un archivo zip creado con el módulo zipfile Python. Los archivos zip deben crearse usando una utilidad zip.