Articles of spark dataframe

¿Cómo puedo escribir un archivo de parquet utilizando Spark (pyspark)?

Soy bastante nuevo en Spark y he estado intentando convertir un Dataframe en un archivo de parquet en Spark, pero aún no he tenido éxito. La documentación dice que puedo usar la función write.parquet para crear el archivo. Sin embargo, cuando ejecuto el script, me muestra: AttributeError: el objeto ‘RDD’ no tiene ningún atributo ‘write’ […]

Cómo dividir un dataframe pyspark en dos filas

Estoy trabajando en Databricks. Tengo un dataframe que contiene 500 filas, me gustaría crear dos marcos de datos que contengan 100 filas y el otro que contenga las 400 filas restantes. +——————–+———-+ | userid| eventdate| +——————–+———-+ |00518b128fc9459d9…|2017-10-09| |00976c0b7f2c4c2ca…|2017-12-16| |00a60fb81aa74f35a…|2017-12-04| |00f9f7234e2c4bf78…|2017-05-09| |0146fe6ad7a243c3b…|2017-11-21| |016567f169c145ddb…|2017-10-16| |01ccd278777946cb8…|2017-07-05| He intentado lo de abajo pero recibo un error. df1 = df[:99] […]

PySpark – Pase la lista como parámetro a UDF

Necesito pasar una lista a un UDF, la lista determinará la puntuación / categoría de la distancia. Por ahora, estoy progtwigndo todas las distancias para que sea la cuarta puntuación. a= spark.createDataFrame([(“A”, 20), (“B”, 30), (“D”, 80)],[“Letter”, “distances”]) from pyspark.sql.functions import udf def cate(label, feature_list): if feature_list == 0: return label[4] label_list = [“Great”, “Good”, […]

Configuración de la sesión de spark 2.1.0 (pyspark)

Estoy tratando de sobrescribir las configuraciones predeterminadas del contexto de la sesión de chispa / chispa, pero está recogiendo el recurso completo del nodo / clúster. spark = SparkSession.builder .master(“ip”) .enableHiveSupport() .getOrCreate() spark.conf.set(“spark.executor.memory”, ‘8g’) spark.conf.set(‘spark.executor.cores’, ‘3’) spark.conf.set(‘spark.cores.max’, ‘3’) spark.conf.set(“spark.driver.memory”,’8g’) sc = spark.sparkContext Funciona bien cuando pongo la configuración en chispa enviar spark-submit –master ip –executor-cores=3 […]

Cómo “reducir” varias tablas json almacenadas en una columna de un RDD a una sola tabla RDD de la manera más eficiente posible

¿El acceso simultáneo para agregar filas usando la unión en un dataframe usando el siguiente código funcionará correctamente? Actualmente mostrando error de tipo from pyspark.sql.types import * schema = StructType([ StructField(“owreg”, StringType(), True),StructField(“we”, StringType(), True) ,StructField(“aa”, StringType(), True) ,StructField(“cc”, StringType(), True) ,StructField(“ss”, StringType(), True) ,StructField(“ss”, StringType(), True) ,StructField(“sss”, StringType(), True) ]) f = sqlContext.createDataFrame(sc.emptyRDD(), schema) […]

¿Cómo podría ordenar por sum, dentro de un DataFrame en PySpark?

Análogamente a: order_items.groupBy(“order_item_order_id”).count().orderBy(desc(“count”)).show() Yo he tratado: order_items.groupBy(“order_item_order_id”).sum(“order_item_subtotal”).orderBy(desc(“sum”)).show() pero esto da un error: Py4JJavaError: Se produjo un error al llamar a o501.sort. : org.apache.spark.sql.AnalysisException: no se puede resolver la ‘sum’ dada las columnas de entrada order_item_order_id, SUM (order_item_subtotal # 429); También he intentado: order_items.groupBy(“order_item_order_id”).sum(“order_item_subtotal”).orderBy(desc(“SUM(order_item_subtotal)”)).show() pero me sale el mismo error: Py4JJavaError: Se produjo un error al […]

Pyspark – ValueError: no se pudo convertir la cadena a un literal flotante / inválido para float ()

Estoy tratando de usar datos de un dataframe de chispa como entrada para mi modelo de k-means. Sin embargo sigo recibiendo errores. (Ver sección después del código) Mi dataframe de chispa y se ve así (y tiene alrededor de 1M filas): ID col1 col2 Latitude Longitude 13 … … 22.2 13.5 62 … … 21.4 […]

Spark Data Frames – Verifica si la columna es de tipo entero

Estoy tratando de averiguar qué tipo de datos es mi columna en un dataframe de chispa y manipular la columna en función de esa reducción. Aquí está lo que tengo hasta ahora: import pyspark from pyspark.sql import SparkSession spark = SparkSession.builder.appName(‘MyApp’).getOrCreate() df = spark.read.csv(‘Path To csv File’,inferSchema=True,header=True) for x in df.columns: if type(x) == ‘integer’: […]

Uso de la función Reducir () de Python para unir múltiples marcos de datos de PySpark

¿Alguien sabe por qué el uso de functools.reduce() de functools.reduce() podría llevar a un peor rendimiento al unir múltiples DataFrames de PySpark que unir iterativamente los mismos DataFrames usando un bucle for ? Específicamente, esto produce una desaceleración masiva seguida de un error de memoria insuficiente: def join_dataframes(list_of_join_columns, left_df, right_df): return left_df.join(right_df, on=list_of_join_columns) joined_df = […]

Modo de datos agrupados en (py) Spark

Tengo un DataFrame de chispa con múltiples columnas. Me gustaría agrupar las filas según una columna y luego encontrar el modo de la segunda columna para cada grupo. Trabajando con un DataFrame de pandas, haría algo como esto: rand_values = np.random.randint(max_value, size=num_values).reshape((num_values/2, 2)) rand_values = pd.DataFrame(rand_values, columns=[‘x’, ‘y’]) rand_values[‘x’] = rand_values[‘x’] > max_value/2 rand_values[‘x’] = […]