Articles of rdd

Devuelva un RDD de takeOrdered, en lugar de una lista

Estoy usando pyspark para hacer un poco de limpieza de datos. Una operación muy común es tomar un subconjunto pequeño de un archivo y exportarlo para su inspección: (self.spark_context.textFile(old_filepath+filename) .takeOrdered(100) .saveAsTextFile(new_filepath+filename)) Mi problema es que takeOrdered está devolviendo una lista en lugar de un RDD, por lo que saveAsTextFile no funciona. AttributeError: ‘list’ object has […]

Spark RDD – Mapeo con argumentos extra

¿Es posible pasar argumentos adicionales a la función de mapeo en pySpark? Específicamente, tengo la siguiente receta de código: raw_data_rdd = sc.textFile(“data.json”, use_unicode=True) json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line)) mapped_rdd = json_data_rdd.flatMap(processDataLine) La función processDataLine toma argumentos adicionales además del objeto JSON, como: def processDataLine(dataline, arg1, arg2) ¿Cómo puedo pasar los argumentos adicionales arg1 y arg2 […]

La aplicación PySpark falla con java.lang.OutOfMemoryError: espacio de almacenamiento dynamic de Java

Estoy ejecutando spark a través de pycharm y pyspark shell respectivamente. He astackdo con este error: : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:416) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:748) Mi código es: from pyspark import SparkContext, […]

Chispa: ¿Cómo “reducirByKey” cuando las claves son matrices numpy que no son hashable?

Tengo un RDD de (clave, valor) elementos. Las claves son matrices NumPy. Los arreglos de NumPy no son hashable, y esto causa un problema cuando bash hacer una operación reduceByKey . ¿Hay alguna forma de proporcionar el contexto Spark con mi función hash manual? ¿O hay alguna otra forma de solucionar este problema (aparte de […]

Análisis de registros multilínea en Scala

Aquí está mi RDD [String] M1 module1 PIP a ZA PIP b ZB PIP c Y n4 M2 module2 PIP a I n4 PIP b OD PIP c O n5 y así. Básicamente, necesito un RDD de clave (que contenga la segunda palabra en la línea 1) y los valores de las siguientes líneas PIP […]

¿Cómo aplanar listas anidadas en PySpark?

Tengo una estructura RDD como: rdd = [[[1],[2],[3]], [[4],[5]], [[6]], [[7],[8],[9],[10]]] Y quiero que se convierta en: rdd = [1,2,3,4,5,6,7,8,9,10] ¿Cómo escribo un mapa o función de reducción para que funcione?

Entendiendo treeReduce () en Spark

Puede ver la implementación aquí: https://github.com/apache/spark/blob/ffa05c84fe75663fc33f3d954d1cb1e084ab3280/python/pyspark/rdd.py#L804 ¿En qué se diferencia de la función de reduce ‘normal’? ¿Qué significa depth = 2 ? No quiero que la función del reductor pase linealmente en las particiones, sino que primero reduzca cada uno de los pares disponibles, y luego iteraré así hasta que tenga un solo par y […]

¿Qué función en la chispa se utiliza para combinar dos RDD por teclas?

Digamos que tengo los siguientes dos RDD, con los siguientes valores de par de claves. rdd1 = [ (key1, [value1, value2]), (key2, [value3, value4]) ] y rdd2 = [ (key1, [value5, value6]), (key2, [value7]) ] Ahora, quiero unirlos por valores clave, por ejemplo, quiero devolver lo siguiente ret = [ (key1, [value1, value2, value5, value6]), […]

PySpark – Superposición de tiempo para objeto en RDD

Mi objective es agrupar objetos en función de la superposición de tiempo. Cada objeto en mi rdd contiene un start_time y end_time . Probablemente estoy haciendo esto de manera ineficiente, pero lo que planeo hacer es asignar una identificación de superposición a cada objeto en función de si se ha superpuesto en algún momento con […]

Chispa – Operación RDD anidada

Tengo dos RDD dicen rdd1 = id | created | destroyed | price 1 | 1 | 2 | 10 2 | 1 | 5 | 11 3 | 2 | 3 | 11 4 | 3 | 4 | 12 5 | 3 | 5 | 11 rdd2 = [1,2,3,4,5] # lets call these […]