pyspark – Agrupando y calculando datos

Tengo el siguiente archivo csv.

Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt 0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand 1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand 2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand 3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand 

Tengo que crear un RDD donde USER MODELO Y GT SON CLAVE PRINCIPAL, no sé si tengo que hacerlo usándolos como una tupla.

Luego, cuando tengo el campo de clave principal, tengo que calcular AVG, MAX y MIN a partir de ‘x’, ‘y’ y ‘z’.

Aquí hay una salida:

 User,Model,gt,media(x,y,z),desviacion(x,y,z),max(x,y,z),min(x,y,z) a, nexus4,stand,-3.0,0.7,8.2,2.8,0.14,0.0,-1.0,0.8,8.2,-5.0,0.6,8.2 

Cualquier idea sobre cómo agruparlos y, por ejemplo, obtener los valores de medios de “x”

Con mi código actual obtengo lo siguiente.

 # Data loading lectura = sc.textFile("Phones_accelerometer.csv") datos = lectura.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(x.split(",")[3], x.split(",")[4], x.split(",")[5]))) sumCount = datos.combineByKey(lambda value: (value, 1), lambda x, value: (x[0] + value, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1])) 

Un ejemplo de mis tuplas:

  [(('a', 'nexus4', 'stand'), ('-5.958191', '0.6880646', '8.135345'))] 

Si tiene datos csv en un archivo como se indica en la pregunta, entonces puede usar sqlContext para leerlo como un dataframe y convertir los tipos apropiados como

 df = sqlContext.read.format("com.databricks.spark.csv").option("header", True).load("path to csv file") import pyspark.sql.functions as F import pyspark.sql.types as T df = df.select(F.col('User'), F.col('Model'), F.col('gt'), F.col('x').cast('float'), F.col('y').cast('float'), F.col('z').cast('float')) 

He seleccionado las claves primarias y las columnas necesarias solamente que deberían darle

 +----+------+-----+----------+---------+--------+ |User|Model |gt |x |y |z | +----+------+-----+----------+---------+--------+ |a |nexus4|stand|-5.958191 |0.6880646|8.135345| |a |nexus4|stand|-5.95224 |0.6702118|8.136536| |a |nexus4|stand|-5.9950867|0.6535492|8.204376| |a |nexus4|stand|-5.9427185|0.6761627|8.128204| +----+------+-----+----------+---------+--------+ 

Todos sus requisitos: mediana, desviación, máximo y mínimo dependen de la lista de x , y y z cuando se agrupan por claves primarias: User, Model and gt .

Por lo tanto, necesitaría la función incorporada groupBy y collect_list y una función udf para calcular todos sus requisitos. El paso final es separarlos en diferentes columnas que se dan a continuación.

 from math import sqrt def calculation(array): num_items = len(array) print num_items, sum(array) mean = sum(array) / num_items differences = [x - mean for x in array] sq_differences = [d ** 2 for d in differences] ssd = sum(sq_differences) variance = ssd / (num_items - 1) sd = sqrt(variance) return [mean, sd, max(array), min(array)] calcUdf = F.udf(calculation, T.ArrayType(T.FloatType())) df.groupBy('User', 'Model', 'gt')\ .agg(calcUdf(F.collect_list(F.col('x'))).alias('x'), calcUdf(F.collect_list(F.col('y'))).alias('y'), calcUdf(F.collect_list(F.col('z'))).alias('z'))\ .select(F.col('User'), F.col('Model'), F.col('gt'), F.col('x')[0].alias('median_x'), F.col('y')[0].alias('median_y'), F.col('z')[0].alias('median_z'), F.col('x')[1].alias('deviation_x'), F.col('y')[1].alias('deviation_y'), F.col('z')[1].alias('deviation_z'), F.col('x')[2].alias('max_x'), F.col('y')[2].alias('max_y'), F.col('z')[2].alias('max_z'), F.col('x')[3].alias('min_x'), F.col('y')[3].alias('min_y'), F.col('z')[3].alias('min_z'))\ .show(truncate=False) 

Así que finalmente deberías tener

 +----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+ |User|Model |gt |median_x |median_y |median_z|deviation_x|deviation_y|deviation_z|max_x |max_y |max_z |min_x |min_y |min_z | +----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+ |a |nexus4|stand|-5.962059|0.6719971|8.151115|0.022922019|0.01436464 |0.0356973 |-5.9427185|0.6880646|8.204376|-5.9950867|0.6535492|8.128204| +----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+ 

Espero que la respuesta sea útil.

Tendrás que usar groupByKey para obtener la mediana. Si bien generalmente no es preferible por razones de rendimiento , no se puede paralelizar fácilmente el valor mediano de una lista de números. La lógica para calcular la mediana requiere la lista completa de números. groupByKey es el método de agregación que se utiliza cuando necesita procesar todos los valores de una clave al mismo tiempo

Además, como se mencionó en los comentarios, esta tarea sería más fácil usando Spark DataFrames.