Spark rendimiento para Scala vs Python

Prefiero Python sobre Scala. Pero, como Spark está escrito de forma nativa en Scala, esperaba que mi código se ejecutara más rápido en Scala que en la versión de Python por razones obvias.

Con esa suposición, pensé en aprender y escribir la versión de Scala de un código de preprocesamiento muy común para aproximadamente 1 GB de datos. Los datos se recogen de la competencia SpringLeaf en Kaggle . Solo para dar una visión general de los datos (contiene 1936 dimensiones y 145232 filas). Los datos se componen de varios tipos, por ejemplo, int, float, string, boolean. Estoy usando 6 núcleos de 8 para el procesamiento de Spark; por eso usé minPartitions=6 para que cada núcleo tenga algo que procesar.

Código Scala

 val input = sc.textFile("train.csv", minPartitions=6) val input2 = input.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } val delim1 = "\001" def separateCols(line: String): Array[String] = { val line2 = line.replaceAll("true", "1") val line3 = line2.replaceAll("false", "0") val vals: Array[String] = line3.split(",") for((x,i) <- vals.view.zipWithIndex) { vals(i) = "VAR_%04d".format(i) + delim1 + x } vals } val input3 = input2.flatMap(separateCols) def toKeyVal(line: String): (String, String) = { val vals = line.split(delim1) (vals(0), vals(1)) } val input4 = input3.map(toKeyVal) def valsConcat(val1: String, val2: String): String = { val1 + "," + val2 } val input5 = input4.reduceByKey(valsConcat) input5.saveAsTextFile("output") 

Código Python

 input = sc.textFile('train.csv', minPartitions=6) DELIM_1 = '\001' def drop_first_line(index, itr): if index == 0: return iter(list(itr)[1:]) else: return itr input2 = input.mapPartitionsWithIndex(drop_first_line) def separate_cols(line): line = line.replace('true', '1').replace('false', '0') vals = line.split(',') vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"')) for e, val in enumerate(vals)] return vals2 input3 = input2.flatMap(separate_cols) def to_key_val(kv): key, val = kv.split(DELIM_1) return (key, val) input4 = input3.map(to_key_val) def vals_concat(v1, v2): return v1 + ',' + v2 input5 = input4.reduceByKey(vals_concat) input5.saveAsTextFile('output') 

Scala Performance Stage 0 (38 mins), Stage 1 (18 sec) introduzca la descripción de la imagen aquí

Python Performance Stage 0 (11 minutos), Stage 1 (7 segundos) introduzca la descripción de la imagen aquí

Ambos producen diferentes gráficos de visualización DAG (debido a que ambas imágenes muestran diferentes funciones de etapa 0 para Scala ( map ) y Python ( reduceByKey ))

Pero, esencialmente, ambos códigos intentan transformar datos en RDD (dimensión, cadena de lista de valores) RDD y guardarlos en el disco. La salida se utilizará para calcular varias estadísticas para cada dimensión.

En cuanto al rendimiento, el código Scala para estos datos reales como este parece ejecutarse 4 veces más lento que la versión Python. Una buena noticia para mí es que me dio una buena motivación para permanecer con Python. La mala noticia es que no entendí bien por qué?


La respuesta original que discute el código se puede encontrar a continuación.


En primer lugar, debe distinguir entre diferentes tipos de API, cada uno con sus propias consideraciones de rendimiento.

API RDD

(Estructuras Python puras con orquestación basada en JVM)

Este es el componente que se verá más afectado por el rendimiento del código de Python y los detalles de la implementación de PySpark. Si bien es poco probable que el rendimiento de Python sea un problema, hay al menos algunos factores que debe tener en cuenta:

  • Sobrecarga de comunicación de JVM. Prácticamente todos los datos que vienen desde y hacia el ejecutor de Python deben pasar a través de un socket y un trabajador de JVM. Si bien esta es una comunicación local relativamente eficiente, todavía no es gratuita.
  • Ejecutores basados ​​en procesos (Python) versus ejecutores basados ​​en subprocesos (subprocesos únicos de JVM) (Scala). Cada ejecutor de Python se ejecuta en su propio proceso. Como efecto secundario, proporciona un aislamiento más fuerte que su contraparte de JVM y cierto control sobre el ciclo de vida del ejecutor, pero potencialmente un uso de memoria significativamente mayor:

    • huella de memoria del intérprete
    • huella de las bibliotecas cargadas
    • Difusión menos eficiente (cada proceso requiere su propia copia de una emisión)
  • Rendimiento del propio código Python. En general, Scala es más rápido que Python, pero variará de una tarea a otra. Además, tiene múltiples opciones que incluyen JIT como Numba, extensiones C ( Cython ) o bibliotecas especializadas como Theano . Finalmente, si no usa ML / MLlib (o simplemente NumPy stack) , considere usar PyPy como un intérprete alternativo. Ver SPARK-3094 .

  • La configuración de PySpark proporciona la opción spark.python.worker.reuse que se puede usar para elegir entre forzar el proceso de Python para cada tarea y reutilizar el proceso existente. La última opción parece ser útil para evitar la recolección de basura costosa (es más una impresión que un resultado de pruebas sistemáticas), mientras que la primera (por defecto) es óptima en caso de emisiones e importaciones costosas.
  • El recuento de referencias, utilizado como método de recolección de basura de primera línea en CPython, funciona bastante bien con las cargas de trabajo típicas de Spark (procesamiento similar a un flujo, sin ciclos de referencia) y reduce el riesgo de pausas largas en GC.

MLlib

(ejecución mixta de Python y JVM)

Las consideraciones básicas son prácticamente las mismas que antes con algunos problemas adicionales. Mientras que las estructuras básicas utilizadas con MLlib son objetos RDD de Python, todos los algoritmos se ejecutan directamente utilizando Scala.

Significa un costo adicional de convertir objetos de Python a objetos de Scala y al contrario, mayor uso de memoria y algunas limitaciones adicionales que cubriremos más adelante.

A partir de ahora (Spark 2.x), la API basada en RDD está en modo de mantenimiento y está progtwigda para ser eliminada en Spark 3.0 .

DataFrame API y Spark ML

(Ejecución JVM con código Python limitado al controlador)

Estas son probablemente la mejor opción para tareas estándar de procesamiento de datos. Dado que el código de Python se limita principalmente a operaciones lógicas de alto nivel en el controlador, no debe haber diferencia de rendimiento entre Python y Scala.

Una única excepción es el uso de UDFs de Python que son significativamente menos eficientes que sus equivalentes de Scala. Si bien hay algunas posibilidades de mejoras (ha habido un desarrollo sustancial en Spark 2.0.0), la mayor limitación es la ida y vuelta completa entre la representación interna (JVM) y el intérprete de Python. Si es posible, debería favorecer una composición de expresiones incorporadas ( ejemplo . El comportamiento de UDF de Python se ha mejorado en Spark 2.0.0, pero aún no es óptimo en comparación con la ejecución nativa. Esto puede mejorar en el futuro con la introducción de las UDF vectorizadas (SPARK-21190) .

También asegúrese de evitar pasar datos innecesarios entre DataFrames y RDDs . Esto requiere una serialización y deserialización costosas, sin mencionar la transferencia de datos hacia y desde el intérprete de Python.

Vale la pena señalar que las llamadas Py4J tienen una latencia bastante alta. Esto incluye llamadas simples como:

 from pyspark.sql.functions import col col("foo") 

Por lo general, no debería importar (la sobrecarga es constante y no depende de la cantidad de datos), pero en el caso de aplicaciones de software en tiempo real, puede considerar el almacenamiento en caché / reutilización de los contenedores de Java.

GraphX ​​y Spark DataSets

Por ahora (Spark 1.6 2.1) ninguno proporciona la API de PySpark, por lo que puede decir que PySpark es infinitamente peor que Scala.

GraphX

En la práctica, el desarrollo de GraphX ​​se detuvo casi por completo y el proyecto se encuentra actualmente en modo de mantenimiento con los tickets JIRA relacionados cerrados, ya que no se solucionarán . La biblioteca GraphFrames proporciona una biblioteca alternativa de procesamiento de gráficos con enlaces Python.

Conjunto de datos

Subjetivamente hablando, no hay mucho lugar para los Datasets de Datasets tipificados estáticamente en Python e incluso si existió la implementación actual de Scala es demasiado simplista y no ofrece los mismos beneficios de rendimiento que DataFrame .

Transmisión

Por lo que he visto hasta ahora, recomiendo encarecidamente usar Scala sobre Python. Puede cambiar en el futuro si PySpark obtiene soporte para flujos estructurados, pero en este momento la API de Scala parece ser mucho más robusta, completa y eficiente. Mi experiencia es bastante limitada.

La transmisión estructurada en Spark 2.x parece reducir la brecha entre los idiomas, pero por ahora todavía está en sus inicios. No obstante, la API basada en RDD ya se menciona como “transmisión heredada” en la Documentación de Databricks (fecha de acceso 2017-03-03), por lo que es razonable esperar mayores esfuerzos de unificación.

Consideraciones de incumplimiento

Paridad de características

No todas las características de Spark están expuestas a través de la API de PySpark. Asegúrese de verificar si las partes que necesita ya están implementadas y trate de comprender las posibles limitaciones.

Es particularmente importante cuando utiliza MLlib y contextos mixtos similares (consulte Cómo llamar a la función Java / Scala desde una tarea ). Para ser justos, algunas partes de la API de PySpark, como mllib.linalg , proporcionan un conjunto de métodos más completo que Scala.

Diseño API

La API de PySpark refleja de cerca su contraparte de Scala y, como tal, no es exactamente Pythonic. Significa que es bastante fácil de asignar entre idiomas, pero al mismo tiempo, el código Python puede ser mucho más difícil de entender.

Arquitectura compleja

El flujo de datos de PySpark es relativamente complejo en comparación con la ejecución pura de JVM. Es mucho más difícil razonar acerca de los progtwigs PySpark o la depuración. Además, al menos la comprensión básica de Scala y JVM en general es prácticamente una necesidad.

Chispa 2.x y más allá

El cambio continuo hacia la API del Dataset , con la API RDD congelada brinda oportunidades y desafíos para los usuarios de Python. Si bien las partes de alto nivel de la API son mucho más fáciles de exponer en Python, las funciones más avanzadas son prácticamente imposibles de usar directamente .

Además, las funciones nativas de Python siguen siendo ciudadanos de segunda clase en el mundo SQL. Esperemos que esto mejore en el futuro con la serialización de Apache Arrow ( los esfuerzos actuales apuntan a la collection datos collection pero la búsqueda de UDF es un objective a largo plazo ).

Para proyectos que dependen mucho de la base de código de Python, las alternativas puras de Python (como Dask o Ray ) podrían ser una alternativa interesante.

No tiene que ser uno contra el otro.

La API de Spark DataFrame (SQL, Dataset) proporciona una manera elegante de integrar el código Scala / Java en la aplicación PySpark. Puede usar DataFrames para exponer datos a un código JVM nativo y leer los resultados. He explicado algunas opciones en otro lugar y puede encontrar un ejemplo funcional de ida y vuelta de Python-Scala en Cómo usar una clase Scala dentro de Pyspark .

Se puede boost aún más introduciendo tipos definidos por el usuario (consulte ¿Cómo definir el esquema para el tipo personalizado en Spark SQL? ).


¿Qué hay de malo con el código proporcionado en la pregunta?

(Descargo de responsabilidad: punto de vista de Pythonista. Lo más probable es que me haya perdido algunos trucos de Scala)

En primer lugar, hay una parte en su código que no tiene ningún sentido en absoluto. Si ya tiene pares (key, value) creados usando zipWithIndex o enumerate ¿cuál es el punto en la creación de una cadena para dividirla justo después? flatMap no funciona de forma recursiva, por lo que simplemente puede producir tuplas y omitir el siguiente map .

Otra parte que encuentro problemática es reduceByKey . En términos generales, reduceByKey es útil si la aplicación de la función agregada puede reducir la cantidad de datos que se deben barajar. Ya que simplemente concatena cadenas no hay nada que ganar aquí. Al ignorar las cosas de bajo nivel, como la cantidad de referencias, la cantidad de datos que debe transferir es exactamente la misma que para groupByKey .

Normalmente no me detendría en eso, pero por lo que puedo decir es un cuello de botella en su código Scala. Unirse a cadenas en JVM es una operación bastante costosa (ver por ejemplo: ¿Es la concatenación de cadenas en scala tan costosa como en Java? ). Significa que algo como esto _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) que es equivalente a input4.reduceByKey(valsConcat) en su código no es una buena idea.

Si desea evitar groupByKey , puede intentar usar aggregateByKey con StringBuilder . Algo similar a esto debería hacer el truco:

 rdd.aggregateByKey(new StringBuilder)( (acc, e) => { if(!acc.isEmpty) acc.append(",").append(e) else acc.append(e) }, (acc1, acc2) => { if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2) else acc1.append(",").addString(acc2) } ) 

Pero dudo que valga la pena todo el alboroto.

Teniendo en cuenta lo anterior, he reescrito su código de la siguiente manera:

Scala :

 val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(1) else iter } val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{ case ("true", i) => (i, "1") case ("false", i) => (i, "0") case p => p.swap }) val result = pairs.groupByKey.map{ case (k, vals) => { val valsString = vals.mkString(",") s"$k,$valsString" } } result.saveAsTextFile("scalaout") 

Python :

 def drop_first_line(index, itr): if index == 0: return iter(list(itr)[1:]) else: return itr def separate_cols(line): line = line.replace('true', '1').replace('false', '0') vals = line.split(',') for (i, x) in enumerate(vals): yield (i, x) input = (sc .textFile('train.csv', minPartitions=6) .mapPartitionsWithIndex(drop_first_line)) pairs = input.flatMap(separate_cols) result = (pairs .groupByKey() .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1])))) result.saveAsTextFile("pythonout") 

Resultados

En modo local[6] (CPU Intel (R) Xeon (R) E3-1245 V2 a 3.40GHz) con 4GB de memoria por ejecutor toma (n = 3):

  • Scala – media: 250.00s, stdev: 12.49
  • Python – media: 246.66s, stdev: 1.15

Estoy bastante seguro de que la mayor parte de ese tiempo se dedica a barajar, serializar, deserializar y otras tareas secundarias. Solo por diversión, aquí hay un código ingenuo de un solo hilo en Python que realiza la misma tarea en esta máquina en menos de un minuto:

 def go(): with open("train.csv") as fr: lines = [ line.replace('true', '1').replace('false', '0').split(",") for line in fr] return zip(*lines[1:])