El tiempo de iteración de la chispa aumenta exponencialmente cuando se usa la combinación

Soy bastante nuevo en Spark y estoy tratando de implementar algún algoritmo iterativo para agrupar (expectativa-maximización) con centroide representado por el modelo de Markov. Así que necesito hacer iteraciones y uniones.

Un problema que experimento es que cada iteración crece de manera exponencial.
Después de algunos experimentos, descubrí que al hacer iteraciones es necesario que persista un RDD que se reutilizará en la siguiente iteración, de lo contrario, cada chispa de iteración creará un plan de ejecución que recalculará el RDD desde el inicio, lo que boostá el tiempo de cálculo.

init = sc.parallelize(xrange(10000000), 3) init.cache() for i in range(6): print i start = datetime.datetime.now() init2 = init.map(lambda n: (n, n*3)) init = init2.map(lambda n: n[0]) # init.cache() print init.count() print str(datetime.datetime.now() - start) 

Resultados en:

 0 10000000 0:00:04.283652 1 10000000 0:00:05.998830 2 10000000 0:00:08.771984 3 10000000 0:00:11.399581 4 10000000 0:00:14.206069 5 10000000 0:00:16.856993 

Así que agregar cache () ayuda y el tiempo de iteración se vuelve constante.

 init = sc.parallelize(xrange(10000000), 3) init.cache() for i in range(6): print i start = datetime.datetime.now() init2 = init.map(lambda n: (n, n*3)) init = init2.map(lambda n: n[0]) init.cache() print init.count() print str(datetime.datetime.now() - start) 0 10000000 0:00:04.966835 1 10000000 0:00:04.609885 2 10000000 0:00:04.324358 3 10000000 0:00:04.248709 4 10000000 0:00:04.218724 5 10000000 0:00:04.223368 

Pero al hacer Join dentro de la iteración el problema vuelve. Aquí hay un código simple que demuestra el problema. Incluso hacer caché en cada transformación RDD no resuelve el problema:

 init = sc.parallelize(xrange(10000), 3) init.cache() for i in range(6): print i start = datetime.datetime.now() init2 = init.map(lambda n: (n, n*3)) init2.cache() init3 = init.map(lambda n: (n, n*2)) init3.cache() init4 = init2.join(init3) init4.count() init4.cache() init = init4.map(lambda n: n[0]) init.cache() print init.count() print str(datetime.datetime.now() - start) 

Y aquí está la salida. Como puedes ver, el tiempo de iteración crece exponencialmente 🙁

 0 10000 0:00:00.674115 1 10000 0:00:00.833377 2 10000 0:00:01.525314 3 10000 0:00:04.194715 4 10000 0:00:08.139040 5 10000 0:00:17.852815 

Realmente apreciaré cualquier ayuda 🙂

Resumen :

En términos generales, los algoritmos iterativos, especialmente aquellos con auto-unión o auto-unión, requieren un control sobre:

  • Longitud del linaje (consulte, por ejemplo, Stackoverflow debido a un largo Lineage RDD y unionAll que dan como resultado StackOverflow ).
  • Número de particiones.

El problema descrito aquí es el resultado de la falta del anterior. En cada iteración, el número de particiones aumenta con la auto-unión que conduce a un patrón exponencial. Para resolver el problema, debe controlar el número de particiones en cada iteración (ver a continuación) o usar herramientas globales como spark.default.parallelism (ver una respuesta proporcionada por Travis ). En general, el primer enfoque proporciona mucho más control en general y no afecta a otras partes del código.

Respuesta original :

Por lo que puedo decir, hay dos problemas intercalados aquí: un número cada vez mayor de particiones y la sobrecarga de datos durante las combinaciones. Ambos se pueden manejar fácilmente, así que vamos a ir paso a paso.

Primero vamos a crear un ayudante para recostackr las estadísticas:

 import datetime def get_stats(i, init, init2, init3, init4, start, end, desc, cache, part, hashp): return { "i": i, "init": init.getNumPartitions(), "init1": init2.getNumPartitions(), "init2": init3.getNumPartitions(), "init4": init4.getNumPartitions(), "time": str(end - start), "timen": (end - start).seconds + (end - start).microseconds * 10 **-6, "desc": desc, "cache": cache, "part": part, "hashp": hashp } 

otro ayudante para manejar el almacenamiento en caché / partición

 def procRDD(rdd, cache=True, part=False, hashp=False, npart=16): rdd = rdd if not part else rdd.repartition(npart) rdd = rdd if not hashp else rdd.partitionBy(npart) return rdd if not cache else rdd.cache() 

extraer la lógica de la tubería:

 def run(init, description, cache=True, part=False, hashp=False, npart=16, n=6): times = [] for i in range(n): start = datetime.datetime.now() init2 = procRDD( init.map(lambda n: (n, n*3)), cache, part, hashp, npart) init3 = procRDD( init.map(lambda n: (n, n*2)), cache, part, hashp, npart) # If part set to True limit number of the output partitions init4 = init2.join(init3, npart) if part else init2.join(init3) init = init4.map(lambda n: n[0]) if cache: init4.cache() init.cache() init.count() # Force computations to get time end = datetime.datetime.now() times.append(get_stats( i, init, init2, init3, init4, start, end, description, cache, part, hashp )) return times 

y crear datos iniciales:

 ncores = 8 init = sc.parallelize(xrange(10000), ncores * 2).cache() 

Únase a la operación por sí misma, si no se proporciona el argumento numPartitions , ajuste el número de particiones en la salida en función del número de particiones de los RDD de entrada. Significa un número creciente de particiones con cada iteración. Si el número de particiones es demasiado grande, las cosas se ponen feas. Puede lidiar con estos proporcionando el argumento numPartitions para unir o volver a particionar los RDD con cada iteración.

 timesCachePart = sqlContext.createDataFrame( run(init, "cache + partition", True, True, False, ncores * 2)) timesCachePart.select("i", "init1", "init2", "init4", "time", "desc").show() +-+-----+-----+-----+--------------+-----------------+ |i|init1|init2|init4| time| desc| +-+-----+-----+-----+--------------+-----------------+ |0| 16| 16| 16|0:00:01.145625|cache + partition| |1| 16| 16| 16|0:00:01.090468|cache + partition| |2| 16| 16| 16|0:00:01.059316|cache + partition| |3| 16| 16| 16|0:00:01.029544|cache + partition| |4| 16| 16| 16|0:00:01.033493|cache + partition| |5| 16| 16| 16|0:00:01.007598|cache + partition| +-+-----+-----+-----+--------------+-----------------+ 

Como puede ver, cuando repartimos el tiempo de ejecución es más o menos constante. El segundo problema es que los datos anteriores se particionan al azar. Para garantizar el rendimiento de la combinación, nos gustaría tener las mismas claves en una sola partición. Para lograrlo podemos usar el particionador hash:

 timesCacheHashPart = sqlContext.createDataFrame( run(init, "cache + hashpart", True, True, True, ncores * 2)) timesCacheHashPart.select("i", "init1", "init2", "init4", "time", "desc").show() +-+-----+-----+-----+--------------+----------------+ |i|init1|init2|init4| time| desc| +-+-----+-----+-----+--------------+----------------+ |0| 16| 16| 16|0:00:00.946379|cache + hashpart| |1| 16| 16| 16|0:00:00.966519|cache + hashpart| |2| 16| 16| 16|0:00:00.945501|cache + hashpart| |3| 16| 16| 16|0:00:00.986777|cache + hashpart| |4| 16| 16| 16|0:00:00.960989|cache + hashpart| |5| 16| 16| 16|0:00:01.026648|cache + hashpart| +-+-----+-----+-----+--------------+----------------+ 

El tiempo de ejecución es constante como antes y hay una pequeña mejora con respecto a la partición básica.

Ahora vamos a usar el caché solo como referencia:

 timesCacheOnly = sqlContext.createDataFrame( run(init, "cache-only", True, False, False, ncores * 2)) timesCacheOnly.select("i", "init1", "init2", "init4", "time", "desc").show() +-+-----+-----+-----+--------------+----------+ |i|init1|init2|init4| time| desc| +-+-----+-----+-----+--------------+----------+ |0| 16| 16| 32|0:00:00.992865|cache-only| |1| 32| 32| 64|0:00:01.766940|cache-only| |2| 64| 64| 128|0:00:03.675924|cache-only| |3| 128| 128| 256|0:00:06.477492|cache-only| |4| 256| 256| 512|0:00:11.929242|cache-only| |5| 512| 512| 1024|0:00:23.284508|cache-only| +-+-----+-----+-----+--------------+----------+ 

Como puede ver, la cantidad de particiones (init2, init3, init4) para la versión de solo caché se duplica con cada iteración y el tiempo de ejecución es proporcional al número de particiones.

Finalmente, podemos verificar si podemos mejorar el rendimiento con un gran número de particiones si usamos el particionador hash:

 timesCacheHashPart512 = sqlContext.createDataFrame( run(init, "cache + hashpart 512", True, True, True, 512)) timesCacheHashPart512.select( "i", "init1", "init2", "init4", "time", "desc").show() +-+-----+-----+-----+--------------+--------------------+ |i|init1|init2|init4| time| desc| +-+-----+-----+-----+--------------+--------------------+ |0| 512| 512| 512|0:00:14.492690|cache + hashpart 512| |1| 512| 512| 512|0:00:20.215408|cache + hashpart 512| |2| 512| 512| 512|0:00:20.408070|cache + hashpart 512| |3| 512| 512| 512|0:00:20.390267|cache + hashpart 512| |4| 512| 512| 512|0:00:20.362354|cache + hashpart 512| |5| 512| 512| 512|0:00:19.878525|cache + hashpart 512| +-+-----+-----+-----+--------------+--------------------+ 

La mejora no es tan impresionante, pero si tiene un grupo pequeño y muchos datos, vale la pena intentarlo.

Supongo que llevar el mensaje aquí es cuestión de partición. Hay contextos en los que se maneja para usted ( mllib , sql ) pero si usa operaciones de bajo nivel es su responsabilidad.

El problema es (como cero323 señaló en su respuesta completa) que la combinación de llamadas sin especificar el número de particiones puede (hace) dar como resultado un número creciente de particiones. El número de particiones puede crecer (aparentemente) sin límite. Hay (al menos) dos formas de evitar que el número de particiones crezca (sin límite) al llamar repetidamente a unirse.

Método 1:

Como señaló cero323, puede especificar el número de particiones manualmente cuando llama unirse. Por ejemplo

 rdd1.join(rdd2, numPartitions) 

Esto asegurará que el número de particiones no supere las numParticiones y, en particular, el número de particiones no boostá continuamente.

Método 2:

Cuando creas tu SparkConf puedes especificar el nivel predeterminado de paralelismo. Si se establece este valor, entonces, cuando llame a funciones como join sin especificar numPartitions, se utilizará el paralelismo predeterminado en su lugar, limitando efectivamente el número de particiones y evitando que crezcan. Puede configurar este parámetro como

 conf=SparkConf.set("spark.default.parallelism", numPartitions) sc = SparkContex(conf=conf) 

Rdds son inmutables. Intenta hacer rdd = rdd.cache()