Estoy usando PySpark. Tengo una columna (‘dt’) en un dataframe (‘canon_evt’) que esta es una marca de tiempo. Estoy tratando de eliminar segundos de un valor DateTime. Originalmente se lee desde parquet como una cadena. Entonces trato de convertirlo a Timestamp a través de canon_evt = canon_evt.withColumn(‘dt’,to_date(canon_evt.dt)) canon_evt= canon_evt.withColumn(‘dt’,canon_evt.dt.astype(‘Timestamp’)) Entonces me gustaría quitar los segundos. […]
El StringIndexer de Spark es bastante útil, pero es común tener que recuperar las correspondencias entre los valores de índice generados y las cadenas originales, y parece que debería haber una forma integrada de lograrlo. Ilustraré usando este simple ejemplo de la documentación de Spark : from pyspark.ml.feature import StringIndexer df = sqlContext.createDataFrame( [(0, “a”), […]
Tenemos un almacén de hives, y queríamos usar spark para varias tareas (principalmente clasificación). A veces escribe los resultados de nuevo como una tabla de hive. Por ejemplo, escribimos la siguiente función de Python para encontrar la sum total de la columna original_table dos, agrupada por la columna original_table uno. La función funciona, pero nos […]
Tengo un paquete .egg creado localmente que depende de boto==2.38.0. Utilicé setuptools para crear la distribución de comstackción. Todo funciona en mi propio entorno local, ya que recupera boto correctamente de PiP . Sin embargo, en databricks no recupera automáticamente las dependencias cuando adjunto una biblioteca al clúster. Realmente luché por unos días intentando instalar […]
En PySpark, el módulo de reparto tiene un argumento de columnas opcional que, por supuesto, repartirá su dataframe con esa clave. Mi pregunta es: ¿cómo reparte Spark cuando no hay llave? No pude profundizar más en el código fuente para encontrar dónde pasa esto con Spark. def repartition(self, numPartitions, *cols): “”” Returns a new :class:`DataFrame` […]
Tengo un dataframe pyspark que consta de una columna, llamada json , donde cada fila es una cadena unicode de json. Me gustaría analizar cada fila y devolver un nuevo dataframe donde cada fila es el json analizado. # Sample Data Frame jstr1 = u'{“header”:{“id”:12345,”foo”:”bar”},”body”:{“id”:111000,”name”:”foobar”,”sub_json”:{“id”:54321,”sub_sub_json”:{“col1″:20,”col2″:”somethong”}}}}’ jstr2 = u'{“header”:{“id”:12346,”foo”:”baz”},”body”:{“id”:111002,”name”:”barfoo”,”sub_json”:{“id”:23456,”sub_sub_json”:{“col1″:30,”col2″:”something else”}}}}’ jstr3 = u'{“header”:{“id”:43256,”foo”:”foobaz”},”body”:{“id”:20192,”name”:”bazbar”,”sub_json”:{“id”:39283,”sub_sub_json”:{“col1″:50,”col2″:”another thing”}}}}’ df = […]
¿Es posible pasar argumentos adicionales a la función de mapeo en pySpark? Específicamente, tengo la siguiente receta de código: raw_data_rdd = sc.textFile(“data.json”, use_unicode=True) json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line)) mapped_rdd = json_data_rdd.flatMap(processDataLine) La función processDataLine toma argumentos adicionales además del objeto JSON, como: def processDataLine(dataline, arg1, arg2) ¿Cómo puedo pasar los argumentos adicionales arg1 y arg2 […]
Tengo una carpeta que contiene mis archivos de datos. Cada archivo tiene un tamaño de aproximadamente 1 GB. Lo que necesito es el nombre de archivo dentro del RDD. Lo siguiente no funciona como se esperaba: import glob rdds = [] for filename in glob.iglob(‘/data/*’): rdd = sc.textFile(filename).map(lambda row: (filename, row)) rdds.append(rdd) allData = sc.union(rdds) […]
Dado un DataFrame con las siguientes filas: rows = [ Row(col1=’abc’, col2=[8], col3=[18], col4=[16]), Row(col2=’def’, col2=[18], col3=[18], col4=[]), Row(col3=’ghi’, col2=[], col3=[], col4=[])] Me gustaría eliminar las filas con una matriz vacía para cada uno de col2 , col4 y col4 (es decir, la tercera fila). Por ejemplo, podría esperar que este código funcione: df.where(~df.col2.isEmpty(), ~df.col3.isEmpty(), […]
Hola tengo el siguiente problema: numeric.registerTempTable(“numeric”). Todos los valores que quiero filtrar son cadenas nulas literales y no valores N / A o nulos. Probé estas tres opciones: numeric_filtered = numeric.filter(numeric[‘LOW’] != ‘null’).filter(numeric[‘HIGH’] != ‘null’).filter(numeric[‘NORMAL’] != ‘null’) numeric_filtered = numeric.filter(numeric[‘LOW’] != ‘null’ AND numeric[‘HIGH’] != ‘null’ AND numeric[‘NORMAL’] != ‘null’) sqlContext.sql(“SELECT * from numeric WHERE […]