Articles of apache spark

java.lang.OutOfMemoryError: no se puede adquirir 100 bytes de memoria, se obtuvo 0

Estoy invocando Pyspark con Spark 2.0 en modo local con el siguiente comando: pyspark –executor-memory 4g –driver-memory 4g El dataframe de entrada se está leyendo desde un archivo tsv y tiene 580 K x 28 columnas. Estoy realizando algunas operaciones en el dataframe y luego trato de exportarlo a un archivo tsv y obtengo este […]

Filtro RDD basado en row_number

sc.textFile (ruta) permite leer un archivo HDFS pero no acepta parámetros (como omitir varias filas, has_headers, …). en el libro electrónico O’Reilly de “Learning Spark”, se sugiere usar la siguiente función para leer un CSV (Ejemplo 5-12. Ejemplo de carga de Python CSV) import csv import StringIO def loadRecord(line): “””Parse a CSV line””” input = […]

¿Cómo pivotar en múltiples columnas en Spark SQL?

Necesito pivotar más de una columna en un dataframe de pyspark. Marco de datos de muestra, >>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)] >>> mydf = spark.createDataFrame(d,[‘id’,’day’,’price’,’units’]) >>> mydf.show() +—+—+—–+—–+ | id|day|price|units| +—+—+—–+—–+ |100| 1| 23| 10| |100| 2| 45| 11| |100| 3| 67| 12| |100| 4| 78| 13| |101| 1| 23| 10| |101| 2| 45| 13| |101| […]

No se puede encontrar la función col en pyspark

En pyspark 1.6.2, puedo importar la función col por from pyspark.sql.functions import col pero cuando trato de buscarlo en el código fuente de Github, no encuentro ninguna función col en el archivo functions.py , ¿cómo puede Python importar una función que no existe?

¿Cómo estimar el tamaño real del dataframe en pyspark?

¿Cómo determinar un tamaño de dataframe? En este momento, estimo el tamaño real de un dataframe de la siguiente manera: headers_size = key for key in df.first().asDict() rows_size = df.map(lambda row: len(value for key, value in row.asDict()).sum() total_size = headers_size + rows_size Es demasiado lento y estoy buscando una mejor manera.

Spark puede acceder a la tabla Hive desde pyspark pero no desde spark-submit

Por lo tanto, cuando se ejecuta desde pyspark yo escribiría (sin especificar ningún contexto): df_openings_latest = sqlContext.sql(‘select * from experian_int_openings_latest_orc’) .. y funciona bien. Sin embargo, cuando ejecuto mi script desde spark-submit , como spark-submit script.py pongo lo siguiente en from pyspark.sql import SQLContext from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName(‘inc_dd_openings’) sc = SparkContext(conf=conf) […]

¿Cómo convertir ArrayType a DenseVector en PySpark DataFrame?

Recibo el siguiente error al intentar construir un Pipeline ML: pyspark.sql.utils.IllegalArgumentException: ‘requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually ArrayType(DoubleType,true).’ La columna Mis features contiene una matriz de valores de punto flotante. Parece que necesito convertirlos en algún tipo de vector (no es escaso, ¿es un DenseVector?). ¿Hay alguna forma de […]

¿Cómo usar las funciones de ventana en PySpark?

Estoy tratando de usar algunas funciones de Windows ( ntile y percentRank ) para un dataframe pero no sé cómo usarlas. ¿Puede alguien ayudarme con esto, por favor? En la documentación de la API de Python no hay ejemplos al respecto. Específicamente, estoy tratando de obtener cuantiles de un campo numérico en mi dataframe. Estoy […]

Spark dataframe transformar varias filas en columna

Soy un principiante para encender, y quiero transformarme debajo del dataframe de origen (cargar desde un archivo JSON): +–+—–+—–+ |A |count|major| +–+—–+—–+ | a| 1| m1| | a| 1| m2| | a| 2| m3| | a| 3| m4| | b| 4| m1| | b| 1| m2| | b| 2| m3| | c| 3| m1| | […]

Ejemplo de PySpark y difusión unir

Estoy usando Spark 1.3 # Read from text file, parse it and then do some basic filtering to get data1 data1.registerTempTable(‘data1’) # Read from text file, parse it and then do some basic filtering to get data1 data2.registerTempTable(‘data2’) # Perform join data_joined = data1.join(data2, data1.id == data2.id); Mis datos son bastante sesgados y data2 (pocos […]