Articles of parque de atracciones

Apache Spark con Python: error

Nuevo en Spark. Descargué todo bien, pero cuando ejecuto pyspark obtengo los siguientes errores: Type “help”, “copyright”, “credits” or “license” for more information. Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to “WARN”. To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/02/05 20:46:58 WARN NativeCodeLoader: Unable to load native-hadoop library for […]

PySpark groupByKey devolviendo pyspark.resultiterable.ResultIterable

Estoy tratando de averiguar por qué mi groupByKey está devolviendo lo siguiente: [(0, ), (1, ), (2, ), (3, ), (4, ), (5, ), (6, ), (7, ), (8, ), (9, )] Tengo valores flatMapped que se ven así: [(0, u’D’), (0, u’D’), (0, u’D’), (0, u’D’), (0, u’D’), (0, u’D’), (0, u’D’), (0, u’D’), […]

Manejo de datos faltantes en Pyspark

Utilizando PySpark 1.6 / Python 2.7 Tengo datos en el siguiente formato, que se obtiene de Hive en un dataframe: date, stock, price 1388534400, GOOG, 50 1388534400, FB, 60 1388534400, MSFT, 55 1388620800, GOOG, 52 1388620800, FB, 61 1388620800, MSFT, 55 Me gustaría terminar con un json del siguiente formato: GOOG.json: { ‘symbol’: ‘GOOG’, ‘first_epoch’: […]

Comparando columnas en Pyspark

Estoy trabajando en un DataSrame de PySpark con n columnas. Tengo un conjunto de m columnas (m <n) y mi tarea es elegir la columna con los valores máximos en ella. Por ejemplo: Entrada: Marco de datos de PySpark que contiene col_1 = [1,2,3], col_2 = [2,1,4], col_3 = [3,2,5]. Ouput = col_4 = max […]

`combineByKey`, pyspark

Sólo me pregunto qué hace exactamente esto? Entiendo keyBy , pero lucho por lo que exactamente es que combineByKey . He leído las páginas (enlace) y todavía no entiendo. df.rdd.keyBy( lambda row: row[‘id’] ).combineByKey( lambda row: [row], lambda rows, row: rows + [row], lambda rows1, rows2: rows1 + rows2, ) )

Problema al crear una lista global desde el mapa usando PySpark

Tengo este código donde estoy leyendo un archivo en ipython usando pyspark . Lo que estoy tratando de hacer es agregarle una pieza que forme una lista basada en una columna particular leída del archivo, pero cuando bash ejecutarlo, la lista aparece vacía y no se le anexa nada. Mi código es: list1 = [] […]

Concatenar dos marcos de datos de PySpark

Estoy tratando de concatenar dos marcos de datos de PySpark con algunas columnas que solo están en cada una de ellas: from pyspark.sql.functions import randn, rand df_1 = sqlContext.range(0, 10) +–+ |id| +–+ | 0| | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| +–+ df_2 […]

Combine dos filas en chispa según una condición en pyspark

Tengo registro de entrada en el siguiente formato: Formato de datos de entrada Quiero que los datos se transmitan en el siguiente formato: Formato de datos de salida Quiero combinar mis 2 filas según el tipo de condición. Según mi conocimiento, debo tomar la clave compuesta de los 3 campos de datos y comparar los […]

¿Cómo procesar RDDs usando una clase de Python?

Estoy implementando un modelo en Spark como una clase de python, y cada vez que bash asignar un método de clase a un RDD falla. Mi código real es más complicado, pero esta versión simplificada se encuentra en el centro del problema: class model(object): def __init__(self): self.data = sc.textFile(‘path/to/data.csv’) # other misc setup def run_model(self): […]

Spark __getnewargs__ error

Estoy intentando limpiar un Spark DataFrame asignándolo a RDD y luego a DataFrame. Aquí hay un ejemplo de juguete: def replace_values(row,sub_rules): d = row.asDict() for col,old_val,new_val in sub_rules: if d[col] == old_val: d[col] = new_val return Row(**d) ex = sc.parallelize([{‘name’: ‘Alice’, ‘age’: 1},{‘name’: ‘Bob’, ‘age’: 2}]) ex = sqlContext.createDataFrame(ex) (ex.map(lambda row: replace_values(row,[(col,1,3) for col in […]