Articles of pyspark sql

¿Puedo leer varios archivos en un Spark Dataframe desde S3, pasando por los que no existen?

Me gustaría leer varios archivos de parquet en un dataframe desde S3. Actualmente, estoy usando el siguiente método para hacer esto: files = [‘s3a://dev/2017/01/03/data.parquet’, ‘s3a://dev/2017/01/02/data.parquet’] df = session.read.parquet(*files) Esto funciona si todos los archivos existen en S3, pero me gustaría solicitar que se cargue una lista de archivos en un dataframe sin interrupciones cuando no […]

¿Cómo agregar vectores dispersos después de agruparlos usando Spark SQL?

Estoy haciendo un sistema de recomendación de Noticias y necesito construir una tabla para los usuarios y las noticias que lean. mis datos en bruto como este: 001436800277225 [“9161492″,”9161787″,”9378531”] 009092130698762 [“9394697”] 010003000431538 [“9394697″,”9426473″,”9428530”] 010156461231357 [“9350394″,”9414181”] 010216216021063 [“9173862″,”9247870”] 010720006581483 [“9018786”] 011199797794333 [“9017977″,”9091134″,”9142852″,”9325464″,”9331913”] 011337201765123 [“9161294″,”9198693”] 011414545455156 [“9168185″,”9178348″,”9182782″,”9359776”] 011425002581540 [“9083446″,”9161294″,”9309432”] y uso chispa de SQL explotar y una […]

Caché ordenado Spark DataFrame crea un trabajo no deseado

Quiero convertir un RDD en un DataFrame y quiero almacenar en caché los resultados del RDD: from pyspark.sql import * from pyspark.sql.types import * import pyspark.sql.functions as fn schema = StructType([StructField(‘t’, DoubleType()), StructField(‘value’, DoubleType())]) df = spark.createDataFrame( sc.parallelize([Row(t=float(i/10), value=float(i*i)) for i in range(1000)], 4), #.cache(), schema=schema, verifySchema=False ).orderBy(“t”) #.cache() Si no utiliza una función de […]

PySpark: cuando funciona con múltiples salidas

Estoy tratando de usar una función de “encadenado cuando”. En otras palabras, me gustaría obtener más de dos salidas. Intenté usar la misma lógica de la función de concatenar IF en Excel: df.withColumn(“device_id”, when(col(“device”)==”desktop”,1)).otherwise(when(col(“device”)==”mobile”,2)).otherwise(null)) Pero eso no funciona porque no puedo poner una tupla en la función “de lo contrario”.

Cómo usar matplotlib para trazar los resultados de pyspark sql

Soy nuevo en pyspark. Quiero trazar el resultado utilizando matplotlib, pero no estoy seguro de qué función usar. Busqué una forma de convertir el resultado de sql a pandas y luego usar plot.

¿Cómo crear un DataFrame fuera de filas mientras se conserva el esquema existente?

Si llamo map o mapPartition y mi función recibe filas de PySpark, ¿cuál es la forma natural de crear un PySpark local o Pandas DataFrame? ¿Algo que combine las filas y retenga el esquema? Actualmente hago algo como: def combine(partition): rows = [x for x in partition] dfpart = pd.DataFrame(rows,columns=rows[0].keys()) pandafunc(dfpart) mydf.mapPartition(combine)

¿Cómo pivotar en múltiples columnas en Spark SQL?

Necesito pivotar más de una columna en un dataframe de pyspark. Marco de datos de muestra, >>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)] >>> mydf = spark.createDataFrame(d,[‘id’,’day’,’price’,’units’]) >>> mydf.show() +—+—+—–+—–+ | id|day|price|units| +—+—+—–+—–+ |100| 1| 23| 10| |100| 2| 45| 11| |100| 3| 67| 12| |100| 4| 78| 13| |101| 1| 23| 10| |101| 2| 45| 13| |101| […]

No se puede encontrar la función col en pyspark

En pyspark 1.6.2, puedo importar la función col por from pyspark.sql.functions import col pero cuando trato de buscarlo en el código fuente de Github, no encuentro ninguna función col en el archivo functions.py , ¿cómo puede Python importar una función que no existe?

Pyspark DataFrame UDF en columna de texto

Estoy tratando de hacer un poco de limpieza de texto NLP de algunas columnas Unicode en un DataSrame de PySpark. Lo he probado en Spark 1.3, 1.5 y 1.6 y parece que no puedo hacer que las cosas funcionen para mi vida. También he intentado usar Python 2.7 y Python 3.4. He creado un udf […]

PySpark: tome el promedio de una columna después de usar la función de filtro

Estoy utilizando el siguiente código para obtener la edad promedio de las personas cuyo salario es superior a algún umbral. dataframe.filter(df[‘salary’] > 100000).agg({“avg”: “age”}) la edad de la columna es numérica (flotante) pero aún así estoy recibiendo este error. py4j.protocol.Py4JJavaError: An error occurred while calling o86.agg. : scala.MatchError: age (of class java.lang.String) ¿Conoces alguna otra […]