¿Difunde el objeto Annoy en Spark (para vecinos más cercanos)?

Como el mllib de Spark no tiene la funcionalidad de vecinos más cercanos, estoy tratando de usar Annoy para los vecinos más cercanos. Intento transmitir el objeto Annoy y pasárselo a los trabajadores; Sin embargo, no funciona como se esperaba.

A continuación se muestra el código de reproducibilidad (que se ejecutará en PySpark). El problema se resalta en la diferencia que se observa cuando se usa Annoy con vs sin Spark.

from annoy import AnnoyIndex import random random.seed(42) f = 40 t = AnnoyIndex(f) # Length of item vector that will be indexed allvectors = [] for i in xrange(20): v = [random.gauss(0, 1) for z in xrange(f)] t.add_item(i, v) allvectors.append((i, v)) t.build(10) # 10 trees # Use Annoy with Spark sparkvectors = sc.parallelize(allvectors) bct = sc.broadcast(t) x = sparkvectors.map(lambda x: bct.value.get_nns_by_vector(vector=x[1], n=5)) print "Five closest neighbors for first vector with Spark:", print x.first() # Use Annoy without Spark print "Five closest neighbors for first vector without Spark:", print(t.get_nns_by_vector(vector=allvectors[0][1], n=5)) 

Salida vista:

Cinco vecinos más cercanos para el primer vector con Spark: Ninguno

Cinco vecinos más cercanos para el primer vector sin chispa: [0, 13, 12, 6, 4]

Nunca he usado Annoy, pero estoy bastante seguro de que la descripción del paquete explica lo que está pasando aquí:

También crea grandes estructuras de datos basadas en archivos de solo lectura que se colocan en la memoria de modo que muchos procesos puedan compartir los mismos datos.

Dado que utiliza índices asignados en memoria cuando lo serializa y lo pasa a los trabajadores, todos los datos se pierden en el camino.

Intenta algo como esto en su lugar:

 from pyspark import SparkFiles t.save("index.ann") sc.addPyFile("index.ann") def find_neighbors(iter): t = AnnoyIndex(f) t.load(SparkFiles.get("index.ann")) return (t.get_nns_by_vector(vector=x[1], n=5) for x in iter) sparkvectors.mapPartitions(find_neighbors).first() ## [0, 13, 12, 6, 4] 

En caso de que alguien más lo esté siguiendo aquí, como yo lo estaba mapPartitions , deberá importar Annoy en la función mapPartitions , de lo contrario, seguirá recibiendo errores de decapado. Aquí está mi ejemplo completo basado en lo anterior:

 from annoy import AnnoyIndex from pyspark import SparkFiles from pyspark import SparkContext from pyspark import SparkConf import random random.seed(42) f = 1024 t = AnnoyIndex(f) allvectors = [] for i in range(100): v = [random.gauss(0, 1) for z in range(f)] t.add_item(i, v) allvectors.append((i, v)) t.build(10) t.save("index.ann") def find_neighbors(i): from annoy import AnnoyIndex ai = AnnoyIndex(f) ai.load(SparkFiles.get("index.ann")) return (ai.get_nns_by_vector(vector=x[1], n=5) for x in i) with SparkContext(conf=SparkConf().setAppName("myannoy")) as sc: sc.addFile("index.ann") sparkvectors = sc.parallelize(allvectors) sparkvectors.mapPartitions(find_neighbors).first()