Articles of pyspark sql

PySpark – Agregar una nueva columna con un rango por usuario

Tengo este DataSrame de PySpark df = pd.DataFrame(np.array([ [“aa@gmail.com”,2,3], [“aa@gmail.com”,5,5], [“bb@gmail.com”,8,2], [“cc@gmail.com”,9,3] ]), columns=[‘user’,’movie’,’rating’]) sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1) user movie rating aa@gmail.com 2 3 aa@gmail.com 5 5 bb@gmail.com 8 2 cc@gmail.com 9 3 Necesito agregar una nueva columna con un rango por usuario Quiero tener esta salida user movie rating Rank aa@gmail.com 2 3 1 […]

¿En qué partición df.repartition sin argumentos de columna?

En PySpark, el módulo de reparto tiene un argumento de columnas opcional que, por supuesto, repartirá su dataframe con esa clave. Mi pregunta es: ¿cómo reparte Spark cuando no hay llave? No pude profundizar más en el código fuente para encontrar dónde pasa esto con Spark. def repartition(self, numPartitions, *cols): “”” Returns a new :class:`DataFrame` […]

Selección de valores de matriz vacía de un Spark DataFrame

Dado un DataFrame con las siguientes filas: rows = [ Row(col1=’abc’, col2=[8], col3=[18], col4=[16]), Row(col2=’def’, col2=[18], col3=[18], col4=[]), Row(col3=’ghi’, col2=[], col3=[], col4=[])] Me gustaría eliminar las filas con una matriz vacía para cada uno de col2 , col4 y col4 (es decir, la tercera fila). Por ejemplo, podría esperar que este código funcione: df.where(~df.col2.isEmpty(), ~df.col3.isEmpty(), […]

pyspark, compara dos filas en el dataframe

Estoy intentando comparar una fila en un dataframe con la siguiente para ver la diferencia en la marca de tiempo. Actualmente los datos se ven como: itemid | eventid | timestamp —————————- 134 | 30 | 2016-07-02 12:01:40 134 | 32 | 2016-07-02 12:21:23 125 | 30 | 2016-07-02 13:22:56 125 | 32 | 2016-07-02 […]

Cómo crear una tabla como seleccionar en pyspark.sql

¿Es posible crear una tabla en spark usando una statement de selección? Hago lo siguiente import findspark findspark.init() import pyspark from pyspark.sql import SQLContext sc = pyspark.SparkContext() sqlCtx = SQLContext(sc) spark_df = sqlCtx.read.format(‘com.databricks.spark.csv’).options(header=’true’, inferschema=’true’).load(“./data/documents_topics.csv”) spark_df.registerTempTable(“my_table”) sqlCtx.sql(“CREATE TABLE my_table_2 AS SELECT * from my_table”) pero me sale el error / Users / user / anaconda / […]

Obtener el lunes pasado en Spark

Estoy usando Spark 2.0 con la API de Python. Tengo un dataframe con una columna de tipo DateType (). Me gustaría agregar una columna al dataframe que contiene el lunes más reciente. Puedo hacerlo así: reg_schema = pyspark.sql.types.StructType([ pyspark.sql.types.StructField(‘AccountCreationDate’, pyspark.sql.types.DateType(), True), pyspark.sql.types.StructField(‘UserId’, pyspark.sql.types.LongType(), True) ]) reg = spark.read.schema(reg_schema).option(‘header’, True).csv(path_to_file) reg = reg.withColumn(‘monday’, pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,’E’) == ‘Mon’, […]

Convertir una nueva columna derivada en un DataFrame de booleano a entero

Supongamos que tengo un DataFrame x con este esquema: xSchema = StructType([ \ StructField(“a”, DoubleType(), True), \ StructField(“b”, DoubleType(), True), \ StructField(“c”, DoubleType(), True)]) Entonces tengo el DataFrame: DataFrame[a :double, b:double, c:double] Me gustaría tener una columna derivada de enteros. Soy capaz de crear una columna booleana: x = x.withColumn(‘y’, (xa-xb)/xc > 1) Mi nuevo […]

Spark ML Pipeline Causes java.lang.Exception: no se compiló … El código … crece más allá de 64 KB

Utilizando Spark 2.0, estoy tratando de ejecutar un VectorAssembler simple en un pipeline de pyspark ML, de esta manera: feature_assembler = VectorAssembler(inputCols=[‘category_count’, ‘name_count’], \ outputCol=”features”) pipeline = Pipeline(stages=[feature_assembler]) model = pipeline.fit(df_train) model_output = model.transform(df_train) Cuando trato de mirar la salida usando model_output.select(“features”).show(1) Me sale el error Py4JJavaError Traceback (most recent call last) in () 2 […]

Uso de la función Reducir () de Python para unir múltiples marcos de datos de PySpark

¿Alguien sabe por qué el uso de functools.reduce() de functools.reduce() podría llevar a un peor rendimiento al unir múltiples DataFrames de PySpark que unir iterativamente los mismos DataFrames usando un bucle for ? Específicamente, esto produce una desaceleración masiva seguida de un error de memoria insuficiente: def join_dataframes(list_of_join_columns, left_df, right_df): return left_df.join(right_df, on=list_of_join_columns) joined_df = […]

Cómo convertir la cadena a ArrayType del diccionario (JSON) en PySpark

Intentando convertir StringType en ArrayType de JSON para un CSV generado a partir de un dataframe. Usando pyspark en Spark2 El archivo CSV que estoy tratando; es como sigue – date,attribute2,count,attribute3 2017-09-03,’attribute1_value1′,2,'[{“key”:”value”,”key2″:2},{“key”:”value”,”key2″:2},{“key”:”value”,”key2″:2}]’ 2017-09-04,’attribute1_value2′,2,'[{“key”:”value”,”key2″:20},{“key”:”value”,”key2″:25},{“key”:”value”,”key2″:27}]’ Como se muestra arriba, contiene un atributo “attribute3” en cadena literal, que técnicamente es una lista de diccionario (JSON) con una longitud […]