¿Consejos para usar correctamente grandes variables de transmisión?

Estoy usando una variable de transmisión de aproximadamente 100 MB de tamaño decapado, que estoy aproximando con

>>> data = list(range(int(10*1e6))) >>> import cPickle as pickle >>> len(pickle.dumps(data)) 98888896 

Se ejecuta en un clúster con 3 ejecutores c3.2xlarge y un controlador m3.large, con el siguiente comando iniciando la sesión interactiva:

 IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g 

En un RDD, si persisto una referencia a esta variable de difusión, el uso de la memoria explota. Para 100 referencias a una variable de 100 MB, incluso si se haya copiado 100 veces, esperaría que el uso de datos no sea más de 10 GB en total (por no hablar de 30 GB en 3 nodos). Sin embargo, veo errores de memoria cuando ejecuto la siguiente prueba:

 data = list(range(int(10*1e6))) metadata = sc.broadcast(data) ids = sc.parallelize(zip(range(100), range(100))) joined_rdd = ids.mapValues(lambda _: metadata.value) joined_rdd.persist() print('count: {}'.format(joined_rdd.count())) 

El rastro de la stack:

 TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func return func(split, prev_func(split, iterator)) File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func return func(split, prev_func(split, iterator)) File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func return f(iterator) File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in  return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in  return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream yield self._read_with_length(stream) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) MemoryError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last)  in () 7 joined_rdd.persist() 8 print('persist called') ----> 9 print('count: {}'.format(joined_rdd.count())) /usr/lib/spark/python/pyspark/rdd.py in count(self) 1004 3 1005 """ -> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1007 1008 def stats(self): /usr/lib/spark/python/pyspark/rdd.py in sum(self) 995 6.0 996 """ --> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 998 999 def count(self): /usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op) 869 # zeroValue provided to each partition is unique from the one provided 870 # to the final reduce call --> 871 vals = self.mapPartitions(func).collect() 872 return reduce(op, vals, zeroValue) 873 /usr/lib/spark/python/pyspark/rdd.py in collect(self) 771 """ 772 with SCCallSiteSync(self.context) as css: --> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 774 return list(_load_from_socket(port, self._jrdd_deserializer)) 775 /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) 

He visto subprocesos anteriores sobre el uso de memoria de deserialización de salmuera como un problema. Sin embargo, esperaría que una variable de difusión solo se deserializara (y se cargara en la memoria en un ejecutor) una vez, y las referencias posteriores a .value para hacer referencia a esa dirección en memoria. Sin embargo, ese no parece ser el caso. ¿Me estoy perdiendo de algo?

Los ejemplos que he visto con variables de transmisión los tienen como diccionarios, que se usan una vez para transformar un conjunto de datos (es decir, reemplazar las siglas de los aeropuertos con los nombres de los aeropuertos). La motivación detrás de persistirlos aquí es crear objetos con el conocimiento de una variable de transmisión y cómo interactuar con ella, persistir en esos objetos y realizar múltiples cálculos usándolos (con la chispa cuidando de mantenerlos en la memoria).

¿Cuáles son algunos consejos para usar variables de transmisión grandes (100 MB +)? ¿Es persistente una variable de transmisión equivocada? ¿Es este un problema que posiblemente sea específico de PySpark?

¡Gracias! Tu ayuda es apreciada.

Tenga en cuenta, también he publicado esta pregunta en los foros de databricks

Editar – pregunta de seguimiento:

Se sugirió que el serializador Spark predeterminado tiene un tamaño de lote de 65337. Los objetos serializados en diferentes lotes no se identifican como iguales y se asignan diferentes direcciones de memoria, examinadas aquí mediante la función de id integrada. Sin embargo, incluso con una variable de transmisión más grande que en teoría tomaría 256 lotes para serializar, todavía veo solo 2 copias distintas. ¿No debería ver muchos más? ¿Mi comprensión de cómo funciona la serialización por lotes es incorrecta?

 >>> sc.serializer.bestSize 65536 >>> import cPickle as pickle >>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))} >>> len(pickle.dumps(broadcast_data)) 16777786 >>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))})) / sc.serializer.bestSize 256 >>> bd = sc.broadcast(broadcast_data) >>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value) >>> rdd.map(id).distinct().count() 1 >>> rdd.cache().count() 100 >>> rdd.map(id).distinct().count() 2 

Bueno, el diablo está en el detalle. Para comprender la razón por la que esto puede suceder, tendremos que echar un vistazo más de cerca a los serializadores PySpark. Primero vamos a crear SparkContext con la configuración predeterminada:

 from pyspark import SparkContext sc = SparkContext("local", "foo") 

y compruebe qué es un serializador predeterminado:

 sc.serializer ## AutoBatchedSerializer(PickleSerializer()) sc.serializer.bestSize ## 65536 

Nos dice tres cosas diferentes:

  • este es el serializador AutoBatchedSerializer
  • está utilizando PickleSerializer para realizar un trabajo real
  • bestSize del bestSize serializado es 65536 bytes.

Un vistazo rápido al código fuente le mostrará que esta serialización ajusta la cantidad de registros que se serializaron en el momento del tiempo de ejecución y trata de mantener el tamaño del lote inferior a 10 * mejor bestSize . El punto importante es que no todos los registros en la partición única se serializan al mismo tiempo.

Podemos comprobarlo experimentalmente de la siguiente manera:

 from operator import add bd = sc.broadcast({}) rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value) rdd.map(id).distinct().count() ## 1 rdd.cache().count() ## 10 rdd.map(id).distinct().count() ## 2 

Como puede ver, incluso en este sencillo ejemplo, después de la deserialización por serialización, obtenemos dos objetos distintos. Puedes observar un comportamiento similar trabajando directamente con pickle :

 v = {} vs = [v, v, v, v] v1, *_, v4 = pickle.loads(pickle.dumps(vs)) v1 is v4 ## True (v1_, v2_), (v3_, v4_) = ( pickle.loads(pickle.dumps(vs[:2])), pickle.loads(pickle.dumps(vs[2:])) ) v1_ is v4_ ## False v3_ is v4_ ## True 

Valores serializados en la misma referencia de lote, después de desentrañar, el mismo objeto. Los valores de diferentes lotes apuntan a diferentes objetos.

En la práctica Spark múltiple serializa y diferentes estrategias de serialización. Por ejemplo, puedes usar lotes de tamaño infinito:

 from pyspark.serializers import BatchedSerializer, PickleSerializer rdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value) ._reserialize(BatchedSerializer(PickleSerializer()))) rdd_.cache().count() rdd_.map(id).distinct().count() ## 1 

Puede cambiar el serializador pasando los parámetros del serializer y / o batchSize al constructor de SparkContext :

 sc = SparkContext( "local", "bar", serializer=PickleSerializer(), # Default serializer # Unlimited batch size -> BatchedSerializer instead of AutoBatchedSerializer batchSize=-1 ) sc.serializer ## BatchedSerializer(PickleSerializer(), -1) 

La elección de diferentes serializadores y estrategias de procesamiento por lotes da como resultado diferentes concesiones (velocidad, capacidad de serializar objetos arbitrarios, requisitos de memoria, etc.).

También debe recordar que las variables de difusión en Spark no se comparten entre los subprocesos ejecutores, por lo que en el mismo trabajador pueden existir múltiples copias deserializadas al mismo tiempo.

Además, verás un comportamiento similar a esto si ejecutas una transformación que requiere barajar.