¿Es posible pasar argumentos adicionales a la función de mapeo en pySpark? Específicamente, tengo la siguiente receta de código: raw_data_rdd = sc.textFile(“data.json”, use_unicode=True) json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line)) mapped_rdd = json_data_rdd.flatMap(processDataLine) La función processDataLine toma argumentos adicionales además del objeto JSON, como: def processDataLine(dataline, arg1, arg2) ¿Cómo puedo pasar los argumentos adicionales arg1 y arg2 […]
Estoy ejecutando spark a través de pycharm y pyspark shell respectivamente. He astackdo con este error: : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:416) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:748) Mi código es: from pyspark import SparkContext, […]
Tengo un RDD de (clave, valor) elementos. Las claves son matrices NumPy. Los arreglos de NumPy no son hashable, y esto causa un problema cuando bash hacer una operación reduceByKey . ¿Hay alguna forma de proporcionar el contexto Spark con mi función hash manual? ¿O hay alguna otra forma de solucionar este problema (aparte de […]
Aquí está mi RDD [String] M1 module1 PIP a ZA PIP b ZB PIP c Y n4 M2 module2 PIP a I n4 PIP b OD PIP c O n5 y así. Básicamente, necesito un RDD de clave (que contenga la segunda palabra en la línea 1) y los valores de las siguientes líneas PIP […]
Tengo una estructura RDD como: rdd = [[[1],[2],[3]], [[4],[5]], [[6]], [[7],[8],[9],[10]]] Y quiero que se convierta en: rdd = [1,2,3,4,5,6,7,8,9,10] ¿Cómo escribo un mapa o función de reducción para que funcione?
Puede ver la implementación aquí: https://github.com/apache/spark/blob/ffa05c84fe75663fc33f3d954d1cb1e084ab3280/python/pyspark/rdd.py#L804 ¿En qué se diferencia de la función de reduce ‘normal’? ¿Qué significa depth = 2 ? No quiero que la función del reductor pase linealmente en las particiones, sino que primero reduzca cada uno de los pares disponibles, y luego iteraré así hasta que tenga un solo par y […]
Digamos que tengo los siguientes dos RDD, con los siguientes valores de par de claves. rdd1 = [ (key1, [value1, value2]), (key2, [value3, value4]) ] y rdd2 = [ (key1, [value5, value6]), (key2, [value7]) ] Ahora, quiero unirlos por valores clave, por ejemplo, quiero devolver lo siguiente ret = [ (key1, [value1, value2, value5, value6]), […]
Mi objective es agrupar objetos en función de la superposición de tiempo. Cada objeto en mi rdd contiene un start_time y end_time . Probablemente estoy haciendo esto de manera ineficiente, pero lo que planeo hacer es asignar una identificación de superposición a cada objeto en función de si se ha superpuesto en algún momento con […]
Tengo dos RDD dicen rdd1 = id | created | destroyed | price 1 | 1 | 2 | 10 2 | 1 | 5 | 11 3 | 2 | 3 | 11 4 | 3 | 4 | 12 5 | 3 | 5 | 11 rdd2 = [1,2,3,4,5] # lets call these […]
Tengo un RDD y quiero convertirlo en un dataframe pandas . Sé que para convertir y RDD a un dataframe normal podemos hacer df = rdd1.toDF() Pero quiero convertir el RDD a un dataframe pandas y no a un dataframe normal. ¿Cómo puedo hacerlo?