Articles of parque de atracciones

Agregue PySpark RDD como nueva columna a pyspark.sql.dataframe

Tengo un pyspark.sql.dataframe donde cada fila es un artículo de noticias. Luego tengo un RDD que representa las palabras contenidas en cada artículo. Quiero agregar el RDD de las palabras como una columna denominada ‘palabras’ a mi dataframe de nuevos artículos. Lo intenté df.withColumn(‘words’, words_rdd ) pero me sale el error AssertionError: col should be […]

sobrescribiendo una salida de chispa usando pyspark

Estoy intentando sobrescribir un dataframe de Spark usando la siguiente opción en PySpark pero no tengo éxito spark_df.write.format(‘com.databricks.spark.csv’).option(“header”, “true”,mode=’overwrite’).save(self.output_file_path) el comando mode = overwrite no es exitoso

¿Cómo desactivar INFO de los registros en PySpark sin cambios en log4j.properties?

Estoy trabajando en un clúster en el que no tengo permiso para cambiar el archivo log4j.properties para detener el registro de información mientras uso pyspark (como se explica en la primera respuesta aquí ). -shell (scala) import org.apache.log4j.Logger import org.apache.log4j.Level Pero para la chispa con python (es decir, pyspark), no funcionó ni lo siguiente Logger.getLogger(“org”).setLevel(Level.OFF) […]

Cómo cambiar las propiedades de SparkContext en la sesión de PySpark interactivo

¿Cómo puedo cambiar spark.driver.maxResultSize en el shell interactivo pyspark? He usado el siguiente código from pyspark import SparkConf, SparkContext conf = (SparkConf() .set(“spark.driver.maxResultSize”, “10g”)) sc.stop() sc=SparkContext(conf) pero me da el error AttributeError: ‘SparkConf’ object has no attribute ‘_get_object_id’

pyspark EOFError después de llamar al mapa

Soy nuevo en spark & ​​pyspark. Estoy leyendo un pequeño archivo csv (~ 40k) en un dataframe. from pyspark.sql import functions as F df = sqlContext.read.format(‘com.databricks.spark.csv’).options(header=’true’, inferschema=’true’).load(‘/tmp/sm.csv’) df = df.withColumn(‘verified’, F.when(df[‘verified’] == ‘Y’, 1).otherwise(0)) df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF() Me sale un error extraño que no ocurre todas las veces, pero ocurre con bastante […]

¿Cómo eliminar un RDD en PySpark con el fin de liberar recursos?

Si tengo un RDD que ya no necesito, ¿cómo lo borro de la memoria? ¿Sería suficiente lo siguiente para hacer esto? del thisRDD ¡Gracias!

Marco de datos Pyspark convertir varias columnas a flotar

Estoy tratando de convertir varias columnas de un dataframe de una cadena a flotar como esta df_temp = sc.parallelize([(“1”, “2”, “3.4555”), (“5.6”, “6.7”, “7.8”)]).toDF((“x”, “y”, “z”)) df_temp.select(*(float(col(c)).alias(c) for c in df_temp.columns)).show() pero estoy recibiendo el error select() argument after * must be a sequence, not generator No puedo entender por qué se está lanzando este […]

Spark: variables de difusión: parece que está intentando hacer referencia a SparkContext desde una variable de difusión, una acción o una transformación

Class ProdsTransformer: def __init__(self): self.products_lookup_hmap = {} self.broadcast_products_lookup_map = None def create_broadcast_variables(self): self.broadcast_products_lookup_map = sc.broadcast(self.products_lookup_hmap) def create_lookup_maps(self): // The code here builds the hashmap that maps Prod_ID to another space. pt = ProdsTransformer () pt.create_broadcast_variables() pairs = distinct_users_projected.map(lambda x: (x.user_id, pt.broadcast_products_lookup_map.value[x.Prod_ID])) Obtuve el siguiente error: “Excepción: parece que está intentando hacer referencia a SparkContext desde […]

Spark RuntimeError: objeto de método de clase no inicializado

Escribí un código Spark muy simple en Python: import collections Person = collections.namedtuple(‘Person’, [‘name’, ‘age’, ‘gender’]) a = sc.parallelize([[‘Barack Obama’, 54, ‘M’], [‘Joe Biden’, 74, ‘M’]]) a = a.map(lambda row: Person(*row)) print a.collect() def func(row): tmp = row._replace(name=’Jack Rabbit’) return tmp print a.map(func).collect() Me sale la siguiente salida y error: [Person(name=’Barack Obama’, age=29, gender=’M’), Person(name=’Joe […]

Escribir datos a Redis desde PySpark

En Scala, escribiríamos un RDD a Redis así: datardd.foreachPartition(iter => { val r = new RedisClient(“hosturl”, 6379) iter.foreach(i => { val (str, it) = i val map = it.toMap r.hmset(str, map) }) }) Intenté hacer esto en PySpark de esta manera: datardd.foreachPartition(storeToRedis) , donde la función storeToRedis se define como: def storeToRedis(x): r = redis.StrictRedis(host […]