Articles of apache spark

PySpark 1.5 Cómo truncar la marca de tiempo al minuto más cercano en segundos

Estoy usando PySpark. Tengo una columna (‘dt’) en un dataframe (‘canon_evt’) que esta es una marca de tiempo. Estoy tratando de eliminar segundos de un valor DateTime. Originalmente se lee desde parquet como una cadena. Entonces trato de convertirlo a Timestamp a través de canon_evt = canon_evt.withColumn(‘dt’,to_date(canon_evt.dt)) canon_evt= canon_evt.withColumn(‘dt’,canon_evt.dt.astype(‘Timestamp’)) Entonces me gustaría quitar los segundos. […]

Preservar indizador de cadena de chispa de correspondencia de cadena de índice

El StringIndexer de Spark es bastante útil, pero es común tener que recuperar las correspondencias entre los valores de índice generados y las cadenas originales, y parece que debería haber una forma integrada de lograrlo. Ilustraré usando este simple ejemplo de la documentación de Spark : from pyspark.ml.feature import StringIndexer df = sqlContext.createDataFrame( [(0, “a”), […]

Lectura y escritura de tablas de hives con chispa después de la agregación.

Tenemos un almacén de hives, y queríamos usar spark para varias tareas (principalmente clasificación). A veces escribe los resultados de nuevo como una tabla de hive. Por ejemplo, escribimos la siguiente función de Python para encontrar la sum total de la columna original_table dos, agrupada por la columna original_table uno. La función funciona, pero nos […]

Databricks (Spark): ¿Las dependencias de .egg no se instalan automáticamente?

Tengo un paquete .egg creado localmente que depende de boto==2.38.0. Utilicé setuptools para crear la distribución de comstackción. Todo funciona en mi propio entorno local, ya que recupera boto correctamente de PiP . Sin embargo, en databricks no recupera automáticamente las dependencias cuando adjunto una biblioteca al clúster. Realmente luché por unos días intentando instalar […]

¿En qué partición df.repartition sin argumentos de columna?

En PySpark, el módulo de reparto tiene un argumento de columnas opcional que, por supuesto, repartirá su dataframe con esa clave. Mi pregunta es: ¿cómo reparte Spark cuando no hay llave? No pude profundizar más en el código fuente para encontrar dónde pasa esto con Spark. def repartition(self, numPartitions, *cols): “”” Returns a new :class:`DataFrame` […]

Pyspark: analizar una columna de cadenas json

Tengo un dataframe pyspark que consta de una columna, llamada json , donde cada fila es una cadena unicode de json. Me gustaría analizar cada fila y devolver un nuevo dataframe donde cada fila es el json analizado. # Sample Data Frame jstr1 = u'{“header”:{“id”:12345,”foo”:”bar”},”body”:{“id”:111000,”name”:”foobar”,”sub_json”:{“id”:54321,”sub_sub_json”:{“col1″:20,”col2″:”somethong”}}}}’ jstr2 = u'{“header”:{“id”:12346,”foo”:”baz”},”body”:{“id”:111002,”name”:”barfoo”,”sub_json”:{“id”:23456,”sub_sub_json”:{“col1″:30,”col2″:”something else”}}}}’ jstr3 = u'{“header”:{“id”:43256,”foo”:”foobaz”},”body”:{“id”:20192,”name”:”bazbar”,”sub_json”:{“id”:39283,”sub_sub_json”:{“col1″:50,”col2″:”another thing”}}}}’ df = […]

Spark RDD – Mapeo con argumentos extra

¿Es posible pasar argumentos adicionales a la función de mapeo en pySpark? Específicamente, tengo la siguiente receta de código: raw_data_rdd = sc.textFile(“data.json”, use_unicode=True) json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line)) mapped_rdd = json_data_rdd.flatMap(processDataLine) La función processDataLine toma argumentos adicionales además del objeto JSON, como: def processDataLine(dataline, arg1, arg2) ¿Cómo puedo pasar los argumentos adicionales arg1 y arg2 […]

Adjuntar nombre de archivo a RDD

Tengo una carpeta que contiene mis archivos de datos. Cada archivo tiene un tamaño de aproximadamente 1 GB. Lo que necesito es el nombre de archivo dentro del RDD. Lo siguiente no funciona como se esperaba: import glob rdds = [] for filename in glob.iglob(‘/data/*’): rdd = sc.textFile(filename).map(lambda row: (filename, row)) rdds.append(rdd) allData = sc.union(rdds) […]

Selección de valores de matriz vacía de un Spark DataFrame

Dado un DataFrame con las siguientes filas: rows = [ Row(col1=’abc’, col2=[8], col3=[18], col4=[16]), Row(col2=’def’, col2=[18], col3=[18], col4=[]), Row(col3=’ghi’, col2=[], col3=[], col4=[])] Me gustaría eliminar las filas con una matriz vacía para cada uno de col2 , col4 y col4 (es decir, la tercera fila). Por ejemplo, podría esperar que este código funcione: df.where(~df.col2.isEmpty(), ~df.col3.isEmpty(), […]

Filtrado de Sparksql (selección con la cláusula where) con múltiples condiciones

Hola tengo el siguiente problema: numeric.registerTempTable(“numeric”). Todos los valores que quiero filtrar son cadenas nulas literales y no valores N / A o nulos. Probé estas tres opciones: numeric_filtered = numeric.filter(numeric[‘LOW’] != ‘null’).filter(numeric[‘HIGH’] != ‘null’).filter(numeric[‘NORMAL’] != ‘null’) numeric_filtered = numeric.filter(numeric[‘LOW’] != ‘null’ AND numeric[‘HIGH’] != ‘null’ AND numeric[‘NORMAL’] != ‘null’) sqlContext.sql(“SELECT * from numeric WHERE […]