Articles of apache spark sql

PySpark: withColumn () con dos condiciones y tres resultados

Estoy trabajando con Spark y PySpark. Estoy tratando de lograr el resultado equivalente al siguiente pseudocódigo: df = df.withColumn(‘new_column’, IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.) Estoy intentando hacer esto en PySpark, pero no estoy seguro de la syntax. Cualquier punteros? Miré en expr() pero […]

PySpark – Agregar una nueva columna con un rango por usuario

Tengo este DataSrame de PySpark df = pd.DataFrame(np.array([ [“aa@gmail.com”,2,3], [“aa@gmail.com”,5,5], [“bb@gmail.com”,8,2], [“cc@gmail.com”,9,3] ]), columns=[‘user’,’movie’,’rating’]) sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1) user movie rating aa@gmail.com 2 3 aa@gmail.com 5 5 bb@gmail.com 8 2 cc@gmail.com 9 3 Necesito agregar una nueva columna con un rango por usuario Quiero tener esta salida user movie rating Rank aa@gmail.com 2 3 1 […]

Transformación de estilo pandas de datos agrupados en PySpark DataFrame

Si tenemos un dataframe de Pandas que consta de una columna de categorías y una columna de valores, podemos eliminar la media de cada categoría haciendo lo siguiente: df[“DemeanedValues”] = df.groupby(“Category”)[“Values”].transform(lambda g: g – numpy.mean(g)) Según tengo entendido, los marcos de datos de Spark no ofrecen directamente esta operación de agrupación / transformación (estoy usando […]

¿Cómo unirse en múltiples columnas en Pyspark?

Estoy usando Spark 1.3 y me gustaría unirme en varias columnas usando la interfaz de Python (SparkSQL) Los siguientes trabajos: Primero los registro como tablas temporales. numeric.registerTempTable(“numeric”) Ref.registerTempTable(“Ref”) test = numeric.join(Ref, numeric.ID == Ref.ID, joinType=’inner’) Ahora me gustaría unirme a ellos en base a múltiples columnas. Me sale SyntaxError : syntax no válida con esto: […]

¿Métodos para escribir archivos de parquet usando Python?

Tengo problemas para encontrar una biblioteca que permita escribir archivos de Parquet con Python. Puntos de bonificación si puedo usar Snappy o un mecanismo de compresión similar junto con él. Hasta ahora, el único método que he encontrado es usar Spark con el pyspark.sql.DataFrame parquet pyspark.sql.DataFrame . Tengo algunos scripts que necesitan escribir archivos de […]

Agregar columna a PySpark DataFrame dependiendo de si el valor de la columna está en otra columna

Tengo un DataSrame de PySpark con estructura dada por [(‘u1’, 1, [1 ,2, 3]), (‘u1’, 4, [1, 2, 3])].toDF(‘user’, ‘item’, ‘fav_items’) Necesito agregar una columna adicional con 1 o 0 dependiendo de si el “elemento” está en “fav_items” o no. Asi que queria [(‘u1’, 1, [1 ,2, 3], 1), (‘u1’, 4, [1, 2, 3], 0)] […]

Pyspark cambiando el tipo de columna de fecha a cadena

Tengo el siguiente dataframe: corr_temp_df [(‘vacationdate’, ‘date’), (‘valueE’, ‘string’), (‘valueD’, ‘string’), (‘valueC’, ‘string’), (‘valueB’, ‘string’), (‘valueA’, ‘string’)] Ahora me gustaría cambiar el tipo de datos de la columna vacationdate a String, de modo que también el dataframe tome este nuevo tipo y sobrescriba los datos del tipo de datos para todas las entradas. Por ejemplo, […]

PySpark invirtiendo StringIndexer en una matriz anidada

Estoy usando PySpark para hacer un filtrado colaborativo usando ALS. Mi usuario original y los identificadores de elementos son cadenas, así que utilicé StringIndexer para convertirlos a índices numéricos (el modelo ALS de PySpark nos obliga a hacerlo). Una vez que haya instalado el modelo, puedo obtener las 3 recomendaciones principales para cada usuario como: […]

PySpark 1.5 Cómo truncar la marca de tiempo al minuto más cercano en segundos

Estoy usando PySpark. Tengo una columna (‘dt’) en un dataframe (‘canon_evt’) que esta es una marca de tiempo. Estoy tratando de eliminar segundos de un valor DateTime. Originalmente se lee desde parquet como una cadena. Entonces trato de convertirlo a Timestamp a través de canon_evt = canon_evt.withColumn(‘dt’,to_date(canon_evt.dt)) canon_evt= canon_evt.withColumn(‘dt’,canon_evt.dt.astype(‘Timestamp’)) Entonces me gustaría quitar los segundos. […]

Preservar indizador de cadena de chispa de correspondencia de cadena de índice

El StringIndexer de Spark es bastante útil, pero es común tener que recuperar las correspondencias entre los valores de índice generados y las cadenas originales, y parece que debería haber una forma integrada de lograrlo. Ilustraré usando este simple ejemplo de la documentación de Spark : from pyspark.ml.feature import StringIndexer df = sqlContext.createDataFrame( [(0, “a”), […]