Generación de números aleatorios en PySpark

Comencemos con una función simple que siempre devuelve un entero aleatorio:

import numpy as np def f(x): return np.random.randint(1000) 

y un RDD lleno de ceros y mapeado usando f :

 rdd = sc.parallelize([0] * 10).map(f) 

Como el RDD anterior no es persistente, espero obtener una salida diferente cada vez que recopile:

 > rdd.collect() [255, 512, 512, 512, 255, 512, 255, 512, 512, 255] 

Si ignoramos el hecho de que la distribución de valores no parece realmente aleatoria, es más o menos lo que sucede. El problema comienza cuando tomamos solo un primer elemento:

 assert len(set(rdd.first() for _ in xrange(100))) == 1 

o

 assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1 

Parece que vuelve el mismo número cada vez. He podido reproducir este comportamiento en dos máquinas diferentes con Spark 1.2, 1.3 y 1.4. Aquí estoy usando np.random.randint pero se comporta de la misma manera con random.randint .

Este problema, al igual que los resultados no aleatorios con collect , parece ser específico de Python y no pude reproducirlo utilizando Scala:

 def f(x: Int) = scala.util.Random.nextInt(1000) val rdd = sc.parallelize(List.fill(10)(0)).map(f) (1 to 100).map(x => rdd.first).toSet.size rdd.collect() 

¿Me perdí algo obvio aquí?

Editar :

Resulta que la fuente del problema es la implementación de Python RNG. Para citar la documentación oficial :

Las funciones proporcionadas por este módulo son en realidad métodos enlazados de una instancia oculta de la clase random.Random. Puede crear instancias de Random para crear generadores que no compartan el estado.

Supongo que NumPy funciona de la misma manera y reescribe f usando la instancia de RandomState siguiente manera

 import os import binascii def f(x, seed=None): seed = ( seed if seed is not None else int(binascii.hexlify(os.urandom(4)), 16)) rs = np.random.RandomState(seed) return rs.randint(1000) 

lo hace más lento pero resuelve el problema.

Mientras que lo anterior explica los resultados no aleatorios de la recostackción, todavía no entiendo cómo afecta first / take(1) entre múltiples acciones.

Así que el problema real aquí es relativamente simple. Cada subproceso en Python hereda su estado de su padre:

 len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect())) # 1 

Dado que el estado principal no tiene motivos para cambiar en este escenario particular y los trabajadores tienen una vida útil limitada, el estado de cada niño será exactamente el mismo en cada ejecución.

Esto parece ser un error (o característica) de randint . Veo el mismo comportamiento, pero tan pronto como cambio la f , los valores sí cambian. Por lo tanto, no estoy seguro de la aleatoriedad real de este método … No puedo encontrar ninguna documentación, pero parece estar usando algún algoritmo matemático determinista en lugar de usar más características variables de la máquina en ejecución. Incluso si voy de un lado a otro, los números parecen ser los mismos al volver al valor original …