Articles of pyspark sql

Spark ML Pipeline Causes java.lang.Exception: no se compiló … El código … crece más allá de 64 KB

Utilizando Spark 2.0, estoy tratando de ejecutar un VectorAssembler simple en un pipeline de pyspark ML, de esta manera: feature_assembler = VectorAssembler(inputCols=[‘category_count’, ‘name_count’], \ outputCol=”features”) pipeline = Pipeline(stages=[feature_assembler]) model = pipeline.fit(df_train) model_output = model.transform(df_train) Cuando trato de mirar la salida usando model_output.select(“features”).show(1) Me sale el error Py4JJavaError Traceback (most recent call last) in () 2 […]

Uso de la función Reducir () de Python para unir múltiples marcos de datos de PySpark

¿Alguien sabe por qué el uso de functools.reduce() de functools.reduce() podría llevar a un peor rendimiento al unir múltiples DataFrames de PySpark que unir iterativamente los mismos DataFrames usando un bucle for ? Específicamente, esto produce una desaceleración masiva seguida de un error de memoria insuficiente: def join_dataframes(list_of_join_columns, left_df, right_df): return left_df.join(right_df, on=list_of_join_columns) joined_df = […]

Cómo convertir la cadena a ArrayType del diccionario (JSON) en PySpark

Intentando convertir StringType en ArrayType de JSON para un CSV generado a partir de un dataframe. Usando pyspark en Spark2 El archivo CSV que estoy tratando; es como sigue – date,attribute2,count,attribute3 2017-09-03,’attribute1_value1′,2,'[{“key”:”value”,”key2″:2},{“key”:”value”,”key2″:2},{“key”:”value”,”key2″:2}]’ 2017-09-04,’attribute1_value2′,2,'[{“key”:”value”,”key2″:20},{“key”:”value”,”key2″:25},{“key”:”value”,”key2″:27}]’ Como se muestra arriba, contiene un atributo “attribute3” en cadena literal, que técnicamente es una lista de diccionario (JSON) con una longitud […]

¿Puedo leer varios archivos en un Spark Dataframe desde S3, pasando por los que no existen?

Me gustaría leer varios archivos de parquet en un dataframe desde S3. Actualmente, estoy usando el siguiente método para hacer esto: files = [‘s3a://dev/2017/01/03/data.parquet’, ‘s3a://dev/2017/01/02/data.parquet’] df = session.read.parquet(*files) Esto funciona si todos los archivos existen en S3, pero me gustaría solicitar que se cargue una lista de archivos en un dataframe sin interrupciones cuando no […]

¿Cómo agregar vectores dispersos después de agruparlos usando Spark SQL?

Estoy haciendo un sistema de recomendación de Noticias y necesito construir una tabla para los usuarios y las noticias que lean. mis datos en bruto como este: 001436800277225 [“9161492″,”9161787″,”9378531”] 009092130698762 [“9394697”] 010003000431538 [“9394697″,”9426473″,”9428530”] 010156461231357 [“9350394″,”9414181”] 010216216021063 [“9173862″,”9247870”] 010720006581483 [“9018786”] 011199797794333 [“9017977″,”9091134″,”9142852″,”9325464″,”9331913”] 011337201765123 [“9161294″,”9198693”] 011414545455156 [“9168185″,”9178348″,”9182782″,”9359776”] 011425002581540 [“9083446″,”9161294″,”9309432”] y uso chispa de SQL explotar y una […]

Caché ordenado Spark DataFrame crea un trabajo no deseado

Quiero convertir un RDD en un DataFrame y quiero almacenar en caché los resultados del RDD: from pyspark.sql import * from pyspark.sql.types import * import pyspark.sql.functions as fn schema = StructType([StructField(‘t’, DoubleType()), StructField(‘value’, DoubleType())]) df = spark.createDataFrame( sc.parallelize([Row(t=float(i/10), value=float(i*i)) for i in range(1000)], 4), #.cache(), schema=schema, verifySchema=False ).orderBy(“t”) #.cache() Si no utiliza una función de […]

PySpark: cuando funciona con múltiples salidas

Estoy tratando de usar una función de “encadenado cuando”. En otras palabras, me gustaría obtener más de dos salidas. Intenté usar la misma lógica de la función de concatenar IF en Excel: df.withColumn(“device_id”, when(col(“device”)==”desktop”,1)).otherwise(when(col(“device”)==”mobile”,2)).otherwise(null)) Pero eso no funciona porque no puedo poner una tupla en la función “de lo contrario”.

Cómo usar matplotlib para trazar los resultados de pyspark sql

Soy nuevo en pyspark. Quiero trazar el resultado utilizando matplotlib, pero no estoy seguro de qué función usar. Busqué una forma de convertir el resultado de sql a pandas y luego usar plot.

¿Cómo crear un DataFrame fuera de filas mientras se conserva el esquema existente?

Si llamo map o mapPartition y mi función recibe filas de PySpark, ¿cuál es la forma natural de crear un PySpark local o Pandas DataFrame? ¿Algo que combine las filas y retenga el esquema? Actualmente hago algo como: def combine(partition): rows = [x for x in partition] dfpart = pd.DataFrame(rows,columns=rows[0].keys()) pandafunc(dfpart) mydf.mapPartition(combine)

¿Cómo pivotar en múltiples columnas en Spark SQL?

Necesito pivotar más de una columna en un dataframe de pyspark. Marco de datos de muestra, >>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)] >>> mydf = spark.createDataFrame(d,[‘id’,’day’,’price’,’units’]) >>> mydf.show() +—+—+—–+—–+ | id|day|price|units| +—+—+—–+—–+ |100| 1| 23| 10| |100| 2| 45| 11| |100| 3| 67| 12| |100| 4| 78| 13| |101| 1| 23| 10| |101| 2| 45| 13| |101| […]