Spark dataframe transformar varias filas en columna

Soy un principiante para encender, y quiero transformarme debajo del dataframe de origen (cargar desde un archivo JSON):

+--+-----+-----+ |A |count|major| +--+-----+-----+ | a| 1| m1| | a| 1| m2| | a| 2| m3| | a| 3| m4| | b| 4| m1| | b| 1| m2| | b| 2| m3| | c| 3| m1| | c| 4| m3| | c| 5| m4| | d| 6| m1| | d| 1| m2| | d| 2| m3| | d| 3| m4| | d| 4| m5| | e| 4| m1| | e| 5| m2| | e| 1| m3| | e| 1| m4| | e| 1| m5| +--+-----+-----+ 

En el siguiente dataframe de resultados :

 +--+--+--+--+--+--+ |A |m1|m2|m3|m4|m5| +--+--+--+--+--+--+ | a| 1| 1| 2| 3| 0| | b| 4| 2| 1| 0| 0| | c| 3| 0| 4| 5| 0| | d| 6| 1| 2| 3| 4| | e| 4| 5| 1| 1| 1| +--+--+--+--+--+--+ 

Aquí está la regla de transformación :

  1. El dataframe de resultados consta de A + (n major columns) donde los nombres de las columnas major se especifican por:

     sorted(src_df.map(lambda x: x[2]).distinct().collect()) 
  2. El dataframe del resultado contiene m filas donde los valores para la columna A son proporcionados por:

     sorted(src_df.map(lambda x: x[0]).distinct().collect()) 
  3. El valor para cada columna principal en el dataframe de resultados es el valor del dataframe de origen en la A correspondiente y mayor (por ejemplo, el recuento en la Fila 1 en el dataframe de origen se asigna al box donde A es ay columna m1 )

  4. Las combinaciones de A y major en el dataframe de origen no tienen duplicación (considérela una clave principal en las dos columnas en SQL)

Vamos a empezar con datos de ejemplo:

 df = sqlContext.createDataFrame([ ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"), ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"), ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"), ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"), ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"), ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"), ("e", 1, "m4"), ("e", 1, "m5")], ("a", "cnt", "major")) 

Tenga en cuenta que he cambiado de count a cnt . Count es una palabra clave reservada en la mayoría de los dialectos de SQL y no es una buena opción para un nombre de columna.

Hay al menos dos formas de remodelar estos datos:

  • agregando sobre DataFrame

     from pyspark.sql.functions import col, when, max majors = sorted(df.select("major") .distinct() .map(lambda row: row[0]) .collect()) cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m) for m in majors] maxs = [max(col(m)).alias(m) for m in majors] reshaped1 = (df .select(col("a"), *cols) .groupBy("a") .agg(*maxs) .na.fill(0)) reshaped1.show() ## +---+---+---+---+---+---+ ## | a| m1| m2| m3| m4| m5| ## +---+---+---+---+---+---+ ## | a| 1| 1| 2| 3| 0| ## | b| 4| 1| 2| 0| 0| ## | c| 3| 0| 4| 5| 0| ## | d| 6| 1| 2| 3| 4| ## | e| 4| 5| 1| 1| 1| ## +---+---+---+---+---+---+ 
  • groupBy sobre RDD

     from pyspark.sql import Row grouped = (df .map(lambda row: (row.a, (row.major, row.cnt))) .groupByKey()) def make_row(kv): k, vs = kv tmp = dict(list(vs) + [("a", k)]) return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors}) reshaped2 = sqlContext.createDataFrame(grouped.map(make_row)) reshaped2.show() ## +---+---+---+---+---+---+ ## | a| m1| m2| m3| m4| m5| ## +---+---+---+---+---+---+ ## | a| 1| 1| 2| 3| 0| ## | e| 4| 5| 1| 1| 1| ## | c| 3| 0| 4| 5| 0| ## | b| 4| 1| 2| 0| 0| ## | d| 6| 1| 2| 3| 4| ## +---+---+---+---+---+---+ 

Usando el dataframe de zero323,

 df = sqlContext.createDataFrame([ ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"), ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"), ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"), ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"), ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"), ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"), ("e", 1, "m4"), ("e", 1, "m5")], ("a", "cnt", "major")) 

también podrías usar

 reshaped_df = df.groupby('a').pivot('major').max('cnt').fillna(0)