Creando un diccionario grande en pyspark

Estoy tratando de resolver el siguiente problema utilizando pyspark. Tengo un archivo en hdfs en el formato que es un volcado de tabla de búsqueda.

key1, value1 key2, value2 ... 

Quiero cargar esto en el diccionario de python en pyspark y usarlo para algún otro propósito. Así que traté de hacer:

 table = {} def populateDict(line): (k,v) = line.split(",", 1) table[k] = v kvfile = sc.textFile("pathtofile") kvfile.foreach(populateDict) 

He encontrado que la variable de tabla no se modifica. Entonces, ¿hay una manera de crear un hashtable inmemory grande en spark?

foreach es un cálculo distribuido, por lo que no puede esperar que modifique una estructura de datos solo visible en el controlador. Lo que quieres es.

 kv.map(line => { line.split(" ") match { case Array(k,v) => (k,v) case _ => ("","") }.collectAsMap() 

Esto está en Scala, pero se le collectAsMap() la idea, la función importante es collectAsMap() que devuelve un mapa al controlador.

Si sus datos son muy grandes, puede usar un PairRDD como mapa. Primer mapa a pares

  kv.map(line => { line.split(" ") match { case Array(k,v) => (k,v) case _ => ("","") } 

luego puede acceder con rdd.lookup("key") que devuelve una secuencia de valores asociados con la clave, aunque esto definitivamente no será tan eficiente como otras tiendas KV distribuidas, ya que la chispa no está realmente diseñada para eso.

Para mayor eficiencia, vea: sortByKey () y lookup ()

búsqueda (clave):

Devuelva la lista de valores en el RDD para clave clave. Esta operación se realiza de manera eficiente si el RDD tiene un particionador conocido buscando solo la partición a la que se asigna la clave.

El RDD se volverá a particionar por sortByKey () ( ver: OrderedRDD ) y se buscará de manera eficiente durante las llamadas de lookup() . En código, algo así como

 kvfile = sc.textFile("pathtofile") sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey() sorted_kv.lookup('key1').take(10) 

Hará el truco como un RDD y de manera eficiente.