Error al utilizar reducebykey: el objeto int no se puede suscribir

Recibo un error “el objeto int no se puede suscribir” al ejecutar el siguiente script:

element.reduceByKey( lambda x , y : x[1]+y[1]) 

with element es un RDD de clave-valor y el valor es una tupla. Ejemplo de entrada:

 (A, (toto , 10)) (A, (titi , 30)) (5, (tata, 10)) (A, (toto, 10)) 

Entiendo que la función reduceByKey toma (K, V) tuplas y aplico una función en todos los valores para obtener el resultado final de la reducción. Como el ejemplo dado en ReduceByKey Apache .

¿Alguna ayuda por favor?

Aquí hay un ejemplo que ilustrará lo que está pasando.

Consideremos qué sucede cuando se llama reduce en una lista con alguna función f :

 reduce(f, [a,b,c]) = f(f(a,b),c) 

Si tomamos su ejemplo, f = lambda u, v: u[1] + v[1] , la expresión anterior se divide en:

 reduce(f, [a,b,c]) = f(f(a,b),c) = f(a[1]+b[1],c) 

Pero a[1] + b[1] es un número entero, por lo que no existe un método __getitem__ , por lo tanto, su error.

En general, el mejor enfoque (como se muestra a continuación) es utilizar map() para extraer primero los datos en el formato que desee, y luego aplicar reduceByKey() .


Un MCVE con tus datos.

 element = sc.parallelize( [ ('A', ('toto' , 10)), ('A', ('titi' , 30)), ('5', ('tata', 10)), ('A', ('toto', 10)) ] ) 

Casi puede obtener la salida deseada con una función de reducción más sofisticada:

 def add_tuple_values(a, b): try: u = a[1] except: u = a try: v = b[1] except: v = b return u + v print(element.reduceByKey(add_tuple_values).collect()) 

Excepto que esto resulta en:

 [('A', 50), ('5', ('tata', 10))] 

¿Por qué? Debido a que solo hay un valor para la clave '5' , entonces no hay nada que reducir.

Por estas razones, es mejor llamar primero al map . Para obtener la salida deseada, puedes hacer:

 >>> print(element.map(lambda x: (x[0], x[1][1])).reduceByKey(lambda u, v: u+v).collect()) [('A', 50), ('5', 10)] 

Actualización 1

Aquí hay un enfoque más:

Podría crear tuple en su función de reduce y luego llamar al map para extraer el valor que desea. (Esencialmente invierte el orden del map y reduce ).

 print( element.reduceByKey(lambda u, v: (0,u[1]+v[1])) .map(lambda x: (x[0], x[1][1])) .collect() ) [('A', 50), ('5', 10)] 

Notas

  • Si hubiera habido al menos 2 registros para cada clave, el uso de add_tuple_values() le habría dado el resultado correcto.

Otro enfoque sería utilizar Dataframe.

 rdd = sc.parallelize([('A', ('toto', 10)),('A', ('titi', 30)),('5', ('tata', 10)),('A', ('toto', 10))]) rdd.map(lambda (a,(b,c)): (a,b,c)).toDF(['a','b','c']).groupBy('a').agg(sum("c")).rdd.map(lambda (a,c): (a,c)).collect() >>>[(u'5', 10), (u'A', 50)]