Chispa: ¿Cómo “reducirByKey” cuando las claves son matrices numpy que no son hashable?

Tengo un RDD de (clave, valor) elementos. Las claves son matrices NumPy. Los arreglos de NumPy no son hashable, y esto causa un problema cuando bash hacer una operación reduceByKey .

¿Hay alguna forma de proporcionar el contexto Spark con mi función hash manual? ¿O hay alguna otra forma de solucionar este problema (aparte de en realidad “hashing” los arreglos “offline” y pasar a Spark solo la clave de hash)?

Aquí hay un ejemplo:

 import numpy as np from pyspark import SparkContext sc = SparkContext() data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]]) rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y) rd.collect() 

El error es:

Se produjo un error al llamar a z: org.apache.spark.api.python.PythonRDD.collectAndServe.

TypeError: tipo unsashable: ‘numpy.ndarray’

La solución más sencilla es convertirlo en un objeto que se pueda hashable. Por ejemplo:

 from operator import add reduced = sc.parallelize(data).map( lambda x: (tuple(x), x.sum()) ).reduceByKey(add) 

y convertirlo más tarde si es necesario.

¿Hay alguna forma de suministrar el contexto de Spark con mi función hash manual?

No es sencillo. Todo un mecanismo depende del hecho de que el objeto implementa un método __hash__ y las extensiones C no pueden ser parcheadas. Podría intentar usar el despacho para anular pyspark.rdd.portable_hash pero dudo que valga la pena incluso si considera el costo de las conversiones.