Articles of parque de atracciones

Cómo cambiar las propiedades de SparkContext en la sesión de PySpark interactivo

¿Cómo puedo cambiar spark.driver.maxResultSize en el shell interactivo pyspark? He usado el siguiente código from pyspark import SparkConf, SparkContext conf = (SparkConf() .set(“spark.driver.maxResultSize”, “10g”)) sc.stop() sc=SparkContext(conf) pero me da el error AttributeError: ‘SparkConf’ object has no attribute ‘_get_object_id’

pyspark EOFError después de llamar al mapa

Soy nuevo en spark & ​​pyspark. Estoy leyendo un pequeño archivo csv (~ 40k) en un dataframe. from pyspark.sql import functions as F df = sqlContext.read.format(‘com.databricks.spark.csv’).options(header=’true’, inferschema=’true’).load(‘/tmp/sm.csv’) df = df.withColumn(‘verified’, F.when(df[‘verified’] == ‘Y’, 1).otherwise(0)) df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF() Me sale un error extraño que no ocurre todas las veces, pero ocurre con bastante […]

¿Cómo eliminar un RDD en PySpark con el fin de liberar recursos?

Si tengo un RDD que ya no necesito, ¿cómo lo borro de la memoria? ¿Sería suficiente lo siguiente para hacer esto? del thisRDD ¡Gracias!

Marco de datos Pyspark convertir varias columnas a flotar

Estoy tratando de convertir varias columnas de un dataframe de una cadena a flotar como esta df_temp = sc.parallelize([(“1”, “2”, “3.4555”), (“5.6”, “6.7”, “7.8”)]).toDF((“x”, “y”, “z”)) df_temp.select(*(float(col(c)).alias(c) for c in df_temp.columns)).show() pero estoy recibiendo el error select() argument after * must be a sequence, not generator No puedo entender por qué se está lanzando este […]

Spark: variables de difusión: parece que está intentando hacer referencia a SparkContext desde una variable de difusión, una acción o una transformación

Class ProdsTransformer: def __init__(self): self.products_lookup_hmap = {} self.broadcast_products_lookup_map = None def create_broadcast_variables(self): self.broadcast_products_lookup_map = sc.broadcast(self.products_lookup_hmap) def create_lookup_maps(self): // The code here builds the hashmap that maps Prod_ID to another space. pt = ProdsTransformer () pt.create_broadcast_variables() pairs = distinct_users_projected.map(lambda x: (x.user_id, pt.broadcast_products_lookup_map.value[x.Prod_ID])) Obtuve el siguiente error: “Excepción: parece que está intentando hacer referencia a SparkContext desde […]

Spark RuntimeError: objeto de método de clase no inicializado

Escribí un código Spark muy simple en Python: import collections Person = collections.namedtuple(‘Person’, [‘name’, ‘age’, ‘gender’]) a = sc.parallelize([[‘Barack Obama’, 54, ‘M’], [‘Joe Biden’, 74, ‘M’]]) a = a.map(lambda row: Person(*row)) print a.collect() def func(row): tmp = row._replace(name=’Jack Rabbit’) return tmp print a.map(func).collect() Me sale la siguiente salida y error: [Person(name=’Barack Obama’, age=29, gender=’M’), Person(name=’Joe […]

Escribir datos a Redis desde PySpark

En Scala, escribiríamos un RDD a Redis así: datardd.foreachPartition(iter => { val r = new RedisClient(“hosturl”, 6379) iter.foreach(i => { val (str, it) = i val map = it.toMap r.hmset(str, map) }) }) Intenté hacer esto en PySpark de esta manera: datardd.foreachPartition(storeToRedis) , donde la función storeToRedis se define como: def storeToRedis(x): r = redis.StrictRedis(host […]

Pyspark: analizar una columna de cadenas json

Tengo un dataframe pyspark que consta de una columna, llamada json , donde cada fila es una cadena unicode de json. Me gustaría analizar cada fila y devolver un nuevo dataframe donde cada fila es el json analizado. # Sample Data Frame jstr1 = u'{“header”:{“id”:12345,”foo”:”bar”},”body”:{“id”:111000,”name”:”foobar”,”sub_json”:{“id”:54321,”sub_sub_json”:{“col1″:20,”col2″:”somethong”}}}}’ jstr2 = u'{“header”:{“id”:12346,”foo”:”baz”},”body”:{“id”:111002,”name”:”barfoo”,”sub_json”:{“id”:23456,”sub_sub_json”:{“col1″:30,”col2″:”something else”}}}}’ jstr3 = u'{“header”:{“id”:43256,”foo”:”foobaz”},”body”:{“id”:20192,”name”:”bazbar”,”sub_json”:{“id”:39283,”sub_sub_json”:{“col1″:50,”col2″:”another thing”}}}}’ df = […]

Adjuntar nombre de archivo a RDD

Tengo una carpeta que contiene mis archivos de datos. Cada archivo tiene un tamaño de aproximadamente 1 GB. Lo que necesito es el nombre de archivo dentro del RDD. Lo siguiente no funciona como se esperaba: import glob rdds = [] for filename in glob.iglob(‘/data/*’): rdd = sc.textFile(filename).map(lambda row: (filename, row)) rdds.append(rdd) allData = sc.union(rdds) […]

Construyendo una fila a partir de un dict en pySpark

Estoy tratando de construir dinámicamente una fila en pySpark 1.6.1, luego construirla en un dataframe. La idea general es extender los resultados de la describe para incluir, por ejemplo, sesgo y curtosis. Esto es lo que pensé que debería funcionar: from pyspark.sql import Row row_dict = {‘C0’: -1.1990072635132698, ‘C3’: 0.12605772684660232, ‘C4’: 0.5760856026559944, ‘C5’: 0.1951877800894315, ‘C6’: […]