Articles of rdd

ERROR AL EJECUTAR RECOGER () EN PYSPARK

Estoy tratando de separar el nombre del sitio web de la URL. Por ejemplo, si la URL es www.google.com, la salida debería ser “google”. Probé el siguiente código y todo funciona bien, excepto la última línea: “websites.collect ()”. Utilicé un dataframe para almacenar los nombres de los sitios web y luego lo convertí en un […]

Seleccionar valores superiores de chispa en RDD

El conjunto de datos original es: # (numbersofrating,title,avg_rating) newRDD =[(3,’monster’,4),(4,’minions 3D’,5),….] Quiero seleccionar las mejores N avg_ratings en newRDD. Utilizo el siguiente código, tiene un error. selectnewRDD = (newRDD.map(x, key =lambda x: x[2]).sortBy(……)) TypeError: map() takes no keyword arguments Los datos esperados deben ser: # (numbersofrating,title,avg_rating) selectnewRDD =[(4,’minions 3D’,5),(3,’monster’,4)….]

¿Cómo acceder a un elemento individual en una tupla en un RDD en pyspark?

Digamos que tengo un RDD como [(u’Some1′, (u’ABC’, 9989)), (u’Some2′, (u’XYZ’, 235)), (u’Some3′, (u’BBB’, 5379)), (u’Some4′, (u’ABC’, 5379))] Estoy usando el map para obtener una tupla a la vez, pero ¿cómo puedo acceder al elemento individual de una tupla para ver si una tupla contiene algún carácter? En realidad quiero filtrar aquellos que contienen algún […]

Devuelva un RDD de takeOrdered, en lugar de una lista

Estoy usando pyspark para hacer un poco de limpieza de datos. Una operación muy común es tomar un subconjunto pequeño de un archivo y exportarlo para su inspección: (self.spark_context.textFile(old_filepath+filename) .takeOrdered(100) .saveAsTextFile(new_filepath+filename)) Mi problema es que takeOrdered está devolviendo una lista en lugar de un RDD, por lo que saveAsTextFile no funciona. AttributeError: ‘list’ object has […]

Spark RDD – Mapeo con argumentos extra

¿Es posible pasar argumentos adicionales a la función de mapeo en pySpark? Específicamente, tengo la siguiente receta de código: raw_data_rdd = sc.textFile(“data.json”, use_unicode=True) json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line)) mapped_rdd = json_data_rdd.flatMap(processDataLine) La función processDataLine toma argumentos adicionales además del objeto JSON, como: def processDataLine(dataline, arg1, arg2) ¿Cómo puedo pasar los argumentos adicionales arg1 y arg2 […]

La aplicación PySpark falla con java.lang.OutOfMemoryError: espacio de almacenamiento dynamic de Java

Estoy ejecutando spark a través de pycharm y pyspark shell respectivamente. He astackdo con este error: : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:416) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:748) Mi código es: from pyspark import SparkContext, […]

Chispa: ¿Cómo “reducirByKey” cuando las claves son matrices numpy que no son hashable?

Tengo un RDD de (clave, valor) elementos. Las claves son matrices NumPy. Los arreglos de NumPy no son hashable, y esto causa un problema cuando bash hacer una operación reduceByKey . ¿Hay alguna forma de proporcionar el contexto Spark con mi función hash manual? ¿O hay alguna otra forma de solucionar este problema (aparte de […]

Análisis de registros multilínea en Scala

Aquí está mi RDD [String] M1 module1 PIP a ZA PIP b ZB PIP c Y n4 M2 module2 PIP a I n4 PIP b OD PIP c O n5 y así. Básicamente, necesito un RDD de clave (que contenga la segunda palabra en la línea 1) y los valores de las siguientes líneas PIP […]

¿Cómo aplanar listas anidadas en PySpark?

Tengo una estructura RDD como: rdd = [[[1],[2],[3]], [[4],[5]], [[6]], [[7],[8],[9],[10]]] Y quiero que se convierta en: rdd = [1,2,3,4,5,6,7,8,9,10] ¿Cómo escribo un mapa o función de reducción para que funcione?

Entendiendo treeReduce () en Spark

Puede ver la implementación aquí: https://github.com/apache/spark/blob/ffa05c84fe75663fc33f3d954d1cb1e084ab3280/python/pyspark/rdd.py#L804 ¿En qué se diferencia de la función de reduce ‘normal’? ¿Qué significa depth = 2 ? No quiero que la función del reductor pase linealmente en las particiones, sino que primero reduzca cada uno de los pares disponibles, y luego iteraré así hasta que tenga un solo par y […]