Spark 1.4 aumenta la memoria maxResultSize

Estoy usando Spark 1.4 para mi investigación y tengo problemas con la configuración de la memoria. Mi máquina tiene 16 GB de memoria, así que no hay problema, ya que el tamaño de mi archivo es de solo 300 MB. Sin embargo, cuando bash convertir Spark RDD a un dataframe panda usando la función toPandas() , recibo el siguiente error:

 serialized results of 9 tasks (1096.9 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) 

Intenté arreglar esto cambiando el archivo de configuración de chispa y aún obteniendo el mismo error. He oído que este es un problema con la chispa 1.4 y me pregunto si sabe cómo resolverlo. Cualquier ayuda es muy apreciada.

Puede establecer el parámetro spark.driver.maxResultSize en el objeto SparkConf :

 from pyspark import SparkConf, SparkContext # In Jupyter you have to stop the current context first sc.stop() # Create new config conf = (SparkConf() .set("spark.driver.maxResultSize", "2g")) # Create new context sc = SparkContext(conf=conf) 

Probablemente debería crear un nuevo SQLContext también:

 from pyspark.sql import SQLContext sqlContext = SQLContext(sc) 

Desde la línea de comandos, como con pyspark, --conf spark.driver.maxResultSize=3g también se puede usar para boost el tamaño máximo del resultado.

Ajustar spark.driver.maxResultSize es una buena práctica considerando el entorno de ejecución. Sin embargo, no es la solución a su problema, ya que la cantidad de datos puede cambiar de tiempo en tiempo. Como mencionó @ Zia-Kayani, es mejor recostackr datos de manera inteligente. Entonces, si tiene un df DataFrame, puede llamar a df.rdd y hacer todas las cosas mágicas en el clúster, no en el controlador. Sin embargo, si necesita recostackr los datos, sugeriría:

  • No encienda spark.sql.parquet.binaryAsString . Los objetos de cuerda ocupan más espacio.
  • Utilice spark.rdd.compress para comprimir los RDD cuando los recopile
  • Trate de recogerlo mediante paginación. (código en Scala, de otra respuesta Scala: cómo obtener un rango de filas en un dataframe )

    long count = df.count() int limit = 50; while(count > 0){ df1 = df.limit(limit); df1.show(); //will print 50, next 50, etc rows df = df.except(df1); count = count - limit; }

Parece que está recolectando el RDD, así que definitivamente recostackrá todos los datos al nodo del controlador, por eso se enfrenta a este problema. Debe evitar recostackr datos si no es necesario para un rdd, o si es necesario, especifique spark.driver.maxResultSize . Hay dos formas de definir esta variable.

1 – crea Spark Config configurando esta variable como
conf.set("spark.driver.maxResultSize", "3g")
2 – o establezca esta variable en el archivo spark-defaults.conf presente en la carpeta conf de spark. like spark.driver.maxResultSize 3g y reinicia la chispa.

También hay un error de Spark https://issues.apache.org/jira/browse/SPARK-12837 que da el mismo error

  serialized results of X tasks (Y MB) is bigger than spark.driver.maxResultSize 

incluso aunque no esté extrayendo datos al controlador explícitamente.

SPARK-12837 aborda un error de Spark que los acumuladores / variables de transmisión anteriores a Spark 2 se llevaron al controlador innecesario, lo que causó este problema.

Al iniciar el trabajo o terminal, puede utilizar

 --conf spark.driver.maxResultSize="0" 

para eliminar el cuello de botella

Puede establecer spark.driver.maxResultSize en 2GB cuando inicie el shell pyspark:

 pyspark --conf "spark.driver.maxResultSize=2g" 

Esto es para permitir 2Gb para spark.driver.maxResultSize