Articles of spark dataframe

Cómo “reducir” varias tablas json almacenadas en una columna de un RDD a una sola tabla RDD de la manera más eficiente posible

¿El acceso simultáneo para agregar filas usando la unión en un dataframe usando el siguiente código funcionará correctamente? Actualmente mostrando error de tipo from pyspark.sql.types import * schema = StructType([ StructField(“owreg”, StringType(), True),StructField(“we”, StringType(), True) ,StructField(“aa”, StringType(), True) ,StructField(“cc”, StringType(), True) ,StructField(“ss”, StringType(), True) ,StructField(“ss”, StringType(), True) ,StructField(“sss”, StringType(), True) ]) f = sqlContext.createDataFrame(sc.emptyRDD(), schema) […]

¿Cómo podría ordenar por sum, dentro de un DataFrame en PySpark?

Análogamente a: order_items.groupBy(“order_item_order_id”).count().orderBy(desc(“count”)).show() Yo he tratado: order_items.groupBy(“order_item_order_id”).sum(“order_item_subtotal”).orderBy(desc(“sum”)).show() pero esto da un error: Py4JJavaError: Se produjo un error al llamar a o501.sort. : org.apache.spark.sql.AnalysisException: no se puede resolver la ‘sum’ dada las columnas de entrada order_item_order_id, SUM (order_item_subtotal # 429); También he intentado: order_items.groupBy(“order_item_order_id”).sum(“order_item_subtotal”).orderBy(desc(“SUM(order_item_subtotal)”)).show() pero me sale el mismo error: Py4JJavaError: Se produjo un error al […]

Pyspark – ValueError: no se pudo convertir la cadena a un literal flotante / inválido para float ()

Estoy tratando de usar datos de un dataframe de chispa como entrada para mi modelo de k-means. Sin embargo sigo recibiendo errores. (Ver sección después del código) Mi dataframe de chispa y se ve así (y tiene alrededor de 1M filas): ID col1 col2 Latitude Longitude 13 … … 22.2 13.5 62 … … 21.4 […]

Spark Data Frames – Verifica si la columna es de tipo entero

Estoy tratando de averiguar qué tipo de datos es mi columna en un dataframe de chispa y manipular la columna en función de esa reducción. Aquí está lo que tengo hasta ahora: import pyspark from pyspark.sql import SparkSession spark = SparkSession.builder.appName(‘MyApp’).getOrCreate() df = spark.read.csv(‘Path To csv File’,inferSchema=True,header=True) for x in df.columns: if type(x) == ‘integer’: […]

Uso de la función Reducir () de Python para unir múltiples marcos de datos de PySpark

¿Alguien sabe por qué el uso de functools.reduce() de functools.reduce() podría llevar a un peor rendimiento al unir múltiples DataFrames de PySpark que unir iterativamente los mismos DataFrames usando un bucle for ? Específicamente, esto produce una desaceleración masiva seguida de un error de memoria insuficiente: def join_dataframes(list_of_join_columns, left_df, right_df): return left_df.join(right_df, on=list_of_join_columns) joined_df = […]

Modo de datos agrupados en (py) Spark

Tengo un DataFrame de chispa con múltiples columnas. Me gustaría agrupar las filas según una columna y luego encontrar el modo de la segunda columna para cada grupo. Trabajando con un DataFrame de pandas, haría algo como esto: rand_values = np.random.randint(max_value, size=num_values).reshape((num_values/2, 2)) rand_values = pd.DataFrame(rand_values, columns=[‘x’, ‘y’]) rand_values[‘x’] = rand_values[‘x’] > max_value/2 rand_values[‘x’] = […]

Pyspark leyó varios archivos csv en un dataframe (¿O RDD?)

Tengo un clúster Spark 2.0.2 que estoy atacando a través de Pyspark a través de Jupyter Notebook. Tengo varios archivos txt delimitados por tuberías (cargados en HDFS, pero también disponibles en un directorio local) que necesito cargar usando spark-csv en tres marcos de datos separados, dependiendo del nombre del archivo. Veo tres enfoques que puedo […]

Elemento de acceso de un vector en un dataframe de chispa (vector de probabilidad de regresión logística)

Entrené un modelo de LogisticRegression en PySpark (paquete ML) y el resultado de la predicción es un DataSrame de PySpark ( cv_predictions ) (ver [1]). La columna de probability (ver [2]) es un tipo de vector (ver [3]). [1] type(cv_predictions_prod) pyspark.sql.dataframe.DataFrame [2] cv_predictions_prod.select(‘probability’).show(10, False) +—————————————-+ |probability | +—————————————-+ |[0.31559134817066054,0.6844086518293395]| |[0.8937864350711228,0.10621356492887715]| |[0.8615878905395029,0.1384121094604972] | |[0.9594427633777901,0.04055723662220989]| |[0.5391547673698157,0.46084523263018434]| |[0.2820729747752462,0.7179270252247538] […]

La mejor manera de obtener el valor máximo en una columna de dataframe Spark

Estoy tratando de descubrir la mejor manera de obtener el mayor valor en una columna de dataframe Spark. Considere el siguiente ejemplo: df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], [“A”, “B”]) df.show() Lo que crea: +—+—+ | A| B| +—+—+ |1.0|4.0| |2.0|5.0| |3.0|6.0| +—+—+ Mi objective es encontrar el mayor valor en la columna […]

Python / pyspark data frame reorganiza las columnas

Tengo un dataframe en python / pyspark con columnas id time city zip y así sucesivamente …… Ahora agregué un nuevo name columna a este dataframe. Ahora tengo que organizar las columnas de tal manera que la columna del name aparezca después de id Lo he hecho como abajo change_cols = [‘id’, ‘name’] cols = […]