Pyspark – TypeError: el objeto ‘float’ no es un subíndice al calcular la media usando reduceByKey

mi archivo “asdasd.csv” tiene la siguiente estructura.

Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt 0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand 1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand 2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand 3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand 

Ok, obtengo la siguiente tupla {clave, valor} para operar con ella.

 # xyz [(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))] # part A (key) part B (value) 

Mi código para calcular la media es el siguiente, tengo que calcular la media de cada columna, X, YZ para cada clave.

 rdd_ori = sc.textFile("asdasd.csv") \ .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5])))) meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \ .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\ .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3])) 

Mi problema es que probé ese código y funciona bien en otra PC con el mismo MV que estoy usando para desarrollarlo (PySpark Py3)

Aquí hay un ejemplo, que este código es correcto:

introduzca la descripción de la imagen aquí

Pero no sé por qué estoy recibiendo este error, la parte importante está en Strong .

————————————————– ————————- Py4JJavaError Traceback (última llamada más reciente) en () 9 # sum_1 = count_.reduceByKey (lambda x, y: ( x [0] [0] + y [0] [0], x 0 + y 0 , x [0] [2] + y [0] [2])) 10 —> 11 impresos (meanRDD.take (1))

/opt/spark/current/python/pyspark/rdd.py en take (self, num) 1341
1342 p = rango (partsScanned, min (partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob (self, takeUpToNumLeft, p) 1344 1345 elementos + = res

/opt/spark/current/python/pyspark/context.py en runJob (self, rdd, partitionFunc, particiones, allowLocal) 990 # SparkContext # runJob. 991 mappedRDD = rdd.mapPartitions (partitionFunc) -> 992 port = self._jvm.PythonRDD.runJob (self._jsc.sc (), mappedRDD._jrdd, particiones) lista 993 de retorno (_load_from_socket (puerto, mappedRDD._jrdd_deserializer)) 994

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call (self, * args) 1131 answer = self.gateway_client.send_command (command) 1132 return_value = get_return_value ( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 para temp_arg in temp_args:

/opt/spark/current/python/pyspark/sql/utils.py en deco (* a, ** kw) 61 def deco (* a, ** kw): 62 try: —> 63 return f (* a, ** kw) 64 excepto py4j.protocol.Py4JJavaError como e: 65 s = e.java_exception.toString ()

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py en get_return_value (respuesta, gateway_client, target_id, nombre) 317 genera Py4JJavaError (318 “Se produjo un error al llamar {0 } {1} {2}. \ N “. -> 319 formato (target_id,”. “, Nombre), valor) 320 más: 321 aumenta Py4JError (

Py4JJavaError: se produjo un error al llamar a z: org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: trabajo cancelado debido a una falla en la etapa: la tarea 0 en la etapa 127.0 falló 1 vez, la falla más reciente: la tarea perdida 0.0 en la etapa 127.0 (TID 102, localhost, controlador del ejecutor): org.apache.spark .api.python.PythonException: Traceback (última llamada más reciente): Archivo “/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py”, línea 177, en el proceso principal () Archivo “/ opt / spark / current / python / lib / pyspark.zip / pyspark / worker.py “, línea 172, en el proceso serializer.dump_stream (func (split_index, iterator), outfile) Archivo” / opt / spark / current / python / pyspark / rdd.py “, línea 2423, en pipeline_func return func (split, prev_func (split, iterator)) Archivo” /opt/spark/current/python/pyspark/rdd.py “, línea 2423, en pipeline_func return func ( split, prev_func (split, iterator)) Archivo “/opt/spark/current/python/pyspark/rdd.py”, línea 346, in func return f (iterator) Archivo “/ opt / spark / current / python / pyspark / rdd.py “, línea 1842, en combineLocally merger.mergeValues ​​(iterador) Archivo” / opt / spark / current / python / lib / p yspark.zip/pyspark/shuffle.py “, línea 238, en mergeValues d [k] = comb (d [k], v) si k en d else creator (v) Archivo” “, línea 3, en TypeError: ‘ objeto flotante no es un subíndice

Aquí es cómo funciona reduceByKey . Estoy tomando su ejemplo para ilustración, es decir, con los siguientes datos que pasa a reduceByKey

 # xyz [(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))] # part A (key) part B (value) counter 

Déjame ir paso a paso

Después de realizar la siguiente función mapValues

 rdd_ori.mapValues(lambda x: (x,1)) 

los datos de rdd se verán como

 ((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1)) ((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1)) ((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1)) ((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1)) 

Así que cuando se invoca reduceByKey como

 .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])) 

Y todas las filas con la misma clave se agrupan y los valores se pasan a la función lambda de reducyByKey .

Como en su caso, todas las claves son iguales , los valores se pasan a las variables b en las siguientes iteraciones.

En la primera iteración, a es ((-5.9427185, 0.6761626999999999, 8.128204), 1) y b es ((-5.958191, 0.6880646, 8.135345), 1) por lo que la parte de cálculo (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) es correcto y pasa .

En la segunda iteración, a es la salida de (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) que es (-11.910430999999999, 1.3582764, 16.271881, 2)

Entonces, si nos fijamos en el formato de los datos, no hay tal a[0][0] en a . Solo puede obtener a[0] , a[1] .. y así sucesivamente. Así que ese es el problema. Y eso es lo que el mensaje de error está sugiriendo también .

TypeError: el objeto ‘float’ no es un subíndice

La solución a esto es formatear los datos para que pueda acceder a como a[0][0] que se puede hacer si formatea su reduceByKey del siguiente formato.

 .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1])) 

Pero eso molestaría tu última función mapValues

 .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3])) 

como sus valores, es decir , a función in lambda , son de ((-23.848236199999995, 2.6879882999999998, 32.604461), 4) así que a[0] significa (-23.848236199999995, 2.6879882999999998, 32.604461) y a[1] significa 4 y no existe más por lo que te encontrarás

IndexError: índice de tupla fuera de rango

Así que tus últimos mapValues deberían ser

 .mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1])) 

Así que en general, el siguiente código debería funcionar para usted

 rdd_ori = sc.textFile("asdasd.csv") \ .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5])))) meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) \ .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))\ .mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1])) 

Espero haberlo explicado bastante bien.