Articles of parque de atracciones

Pyspark: analizar una columna de cadenas json

Tengo un dataframe pyspark que consta de una columna, llamada json , donde cada fila es una cadena unicode de json. Me gustaría analizar cada fila y devolver un nuevo dataframe donde cada fila es el json analizado. # Sample Data Frame jstr1 = u'{“header”:{“id”:12345,”foo”:”bar”},”body”:{“id”:111000,”name”:”foobar”,”sub_json”:{“id”:54321,”sub_sub_json”:{“col1″:20,”col2″:”somethong”}}}}’ jstr2 = u'{“header”:{“id”:12346,”foo”:”baz”},”body”:{“id”:111002,”name”:”barfoo”,”sub_json”:{“id”:23456,”sub_sub_json”:{“col1″:30,”col2″:”something else”}}}}’ jstr3 = u'{“header”:{“id”:43256,”foo”:”foobaz”},”body”:{“id”:20192,”name”:”bazbar”,”sub_json”:{“id”:39283,”sub_sub_json”:{“col1″:50,”col2″:”another thing”}}}}’ df = […]

Adjuntar nombre de archivo a RDD

Tengo una carpeta que contiene mis archivos de datos. Cada archivo tiene un tamaño de aproximadamente 1 GB. Lo que necesito es el nombre de archivo dentro del RDD. Lo siguiente no funciona como se esperaba: import glob rdds = [] for filename in glob.iglob(‘/data/*’): rdd = sc.textFile(filename).map(lambda row: (filename, row)) rdds.append(rdd) allData = sc.union(rdds) […]

Construyendo una fila a partir de un dict en pySpark

Estoy tratando de construir dinámicamente una fila en pySpark 1.6.1, luego construirla en un dataframe. La idea general es extender los resultados de la describe para incluir, por ejemplo, sesgo y curtosis. Esto es lo que pensé que debería funcionar: from pyspark.sql import Row row_dict = {‘C0’: -1.1990072635132698, ‘C3’: 0.12605772684660232, ‘C4’: 0.5760856026559944, ‘C5’: 0.1951877800894315, ‘C6’: […]

KeyError: SPARK_HOME durante la inicialización de SparkConf

Soy un novato de chispas y quiero ejecutar un script de Python desde la línea de comandos. He probado pyspark interactivamente y funciona. Me sale este error cuando bash crear el sc: File “test.py”, line 10, in conf=(SparkConf().setMaster(‘local’).setAppName(‘a’).setSparkHome(‘/home/dirk/spark-1.4.1-bin-hadoop2.6/bin’)) File “/home/dirk/spark-1.4.1-bin-hadoop2.6/python/pyspark/conf.py”, line 104, in __init__ SparkContext._ensure_initialized() File “/home/dirk/spark-1.4.1-bin-hadoop2.6/python/pyspark/context.py”, line 229, in _ensure_initialized SparkContext._gateway = gateway or […]

Aplique StringIndexer a varias columnas en un dataframe de PySpark

Tengo un dataframe de PySpark +——-+————–+—-+—-+ |address| date|name|food| +——-+————–+—-+—-+ |1111111|20151122045510| Yin|gre | |1111111|20151122045501| Yin|gre | |1111111|20151122045500| Yln|gra | |1111112|20151122065832| Yun|ddd | |1111113|20160101003221| Yan|fdf | |1111111|20160703045231| Yin|gre | |1111114|20150419134543| Yin|fdf | |1111115|20151123174302| Yen|ddd | |2111115| 20123192| Yen|gre | +——-+————–+—-+—-+ que quiero transformar para usar con pyspark.ml. Puedo usar un StringIndexer para convertir la columna de nombre […]

mapear valores en un dataframe desde un diccionario usando pyspark

Quiero saber cómo asignar valores en una columna específica en un dataframe. Tengo un dataframe que se parece a: df = sc.parallelize([(‘india’,’japan’),(‘usa’,’uruguay’)]).toDF([‘col1′,’col2’]) +—–+——-+ | col1| col2| +—–+——-+ |india| japan| | usa|uruguay| +—–+——-+ Tengo un diccionario desde donde quiero mapear los valores. dicts = sc.parallelize([(‘india’,’ind’), (‘usa’,’us’),(‘japan’,’jpn’),(‘uruguay’,’urg’)]) La salida que quiero es: +—–+——-+——–+——–+ | col1| col2|col1_map|col2_map| +—–+——-+——–+——–+ […]

¿Cómo convertir un DataFrame de nuevo a RDD normal en pyspark?

Necesito usar el (rdd.)partitionBy(npartitions, custom_partitioner) Método que no está disponible en el DataFrame. Todos los métodos de DataFrame se refieren solo a los resultados de DataFrame. Entonces, ¿cómo crear un RDD a partir de los datos del DataFrame? Nota: este es un cambio (en 1.3.0) desde 1.2.0. Actualización de la respuesta de @dpangmao: el método […]

OutOfMemoryError cuando se usa PySpark para leer archivos en modo local

Tengo alrededor de una docena de archivos encriptados con gpg que contienen datos que me gustaría analizar utilizando PySpark. Mi estrategia es aplicar una función de descifrado como un mapa plano a cada archivo y luego continuar con el procesamiento en el nivel de registro: def read_fun_generator(filename): with gpg_open(filename[0].split(‘:’)[-1], ‘r’) as f: for line in […]

PySpark: Uso de objetos en RDD

Actualmente estoy aprendiendo Python y quiero aplicarlo en / con Spark. Tengo este script muy simple (e inútil): import sys from pyspark import SparkContext class MyClass: def __init__(self, value): self.v = str(value) def addValue(self, value): self.v += str(value) def getValue(self): return self.v if __name__ == “__main__”: if len(sys.argv) != 1: print(“Usage CC”) exit(-1) data = […]

Fusionar lista de listas en pySpark RDD

Tengo listas de tuplas que quiero combinar en una lista. He podido procesar los datos utilizando lambdas y enumerar la comprensión hasta donde estoy cerca de poder usar reduceByKey pero no estoy seguro de cómo combinar las listas. Así que el formato … [[(0, 14), (0, 24)], [(1, 19), (1, 50)], …] Y me gustaría […]