Articles of pyspark sql

Cree un dataframe a partir de una lista en pyspark.sql

Estoy totalmente perdido en una situación cableada. Ahora tengo una lista li li = example_data.map(lambda x: get_labeled_prediction(w,x)).collect() print li, type(li) la salida es como [(0.0, 59.0), (0.0, 51.0), (0.0, 81.0), (0.0, 8.0), (0.0, 86.0), (0.0, 86.0), (0.0, 60.0), (0.0, 54.0), (0.0, 54.0), (0.0, 84.0)] Cuando bash crear un dataframe de esta lista m = sqlContext.createDataFrame(l, […]

Diferencia de fecha entre filas consecutivas – Pyspark Dataframe

Tengo una mesa con la siguiente estructura. USER_ID Tweet_ID Date 1 1001 Thu Aug 05 19:11:39 +0000 2010 1 6022 Mon Aug 09 17:51:19 +0000 2010 1 1041 Sun Aug 19 11:10:09 +0000 2010 2 9483 Mon Jan 11 10:51:23 +0000 2012 2 4532 Fri May 21 11:11:11 +0000 2012 3 4374 Sat Jul 10 […]

PySpark – Agregar una nueva columna con un rango por usuario

Tengo este DataSrame de PySpark df = pd.DataFrame(np.array([ [“aa@gmail.com”,2,3], [“aa@gmail.com”,5,5], [“bb@gmail.com”,8,2], [“cc@gmail.com”,9,3] ]), columns=[‘user’,’movie’,’rating’]) sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1) user movie rating aa@gmail.com 2 3 aa@gmail.com 5 5 bb@gmail.com 8 2 cc@gmail.com 9 3 Necesito agregar una nueva columna con un rango por usuario Quiero tener esta salida user movie rating Rank aa@gmail.com 2 3 1 […]

¿En qué partición df.repartition sin argumentos de columna?

En PySpark, el módulo de reparto tiene un argumento de columnas opcional que, por supuesto, repartirá su dataframe con esa clave. Mi pregunta es: ¿cómo reparte Spark cuando no hay llave? No pude profundizar más en el código fuente para encontrar dónde pasa esto con Spark. def repartition(self, numPartitions, *cols): “”” Returns a new :class:`DataFrame` […]

Selección de valores de matriz vacía de un Spark DataFrame

Dado un DataFrame con las siguientes filas: rows = [ Row(col1=’abc’, col2=[8], col3=[18], col4=[16]), Row(col2=’def’, col2=[18], col3=[18], col4=[]), Row(col3=’ghi’, col2=[], col3=[], col4=[])] Me gustaría eliminar las filas con una matriz vacía para cada uno de col2 , col4 y col4 (es decir, la tercera fila). Por ejemplo, podría esperar que este código funcione: df.where(~df.col2.isEmpty(), ~df.col3.isEmpty(), […]

pyspark, compara dos filas en el dataframe

Estoy intentando comparar una fila en un dataframe con la siguiente para ver la diferencia en la marca de tiempo. Actualmente los datos se ven como: itemid | eventid | timestamp —————————- 134 | 30 | 2016-07-02 12:01:40 134 | 32 | 2016-07-02 12:21:23 125 | 30 | 2016-07-02 13:22:56 125 | 32 | 2016-07-02 […]

Cómo crear una tabla como seleccionar en pyspark.sql

¿Es posible crear una tabla en spark usando una statement de selección? Hago lo siguiente import findspark findspark.init() import pyspark from pyspark.sql import SQLContext sc = pyspark.SparkContext() sqlCtx = SQLContext(sc) spark_df = sqlCtx.read.format(‘com.databricks.spark.csv’).options(header=’true’, inferschema=’true’).load(“./data/documents_topics.csv”) spark_df.registerTempTable(“my_table”) sqlCtx.sql(“CREATE TABLE my_table_2 AS SELECT * from my_table”) pero me sale el error / Users / user / anaconda / […]

Obtener el lunes pasado en Spark

Estoy usando Spark 2.0 con la API de Python. Tengo un dataframe con una columna de tipo DateType (). Me gustaría agregar una columna al dataframe que contiene el lunes más reciente. Puedo hacerlo así: reg_schema = pyspark.sql.types.StructType([ pyspark.sql.types.StructField(‘AccountCreationDate’, pyspark.sql.types.DateType(), True), pyspark.sql.types.StructField(‘UserId’, pyspark.sql.types.LongType(), True) ]) reg = spark.read.schema(reg_schema).option(‘header’, True).csv(path_to_file) reg = reg.withColumn(‘monday’, pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,’E’) == ‘Mon’, […]

Convertir una nueva columna derivada en un DataFrame de booleano a entero

Supongamos que tengo un DataFrame x con este esquema: xSchema = StructType([ \ StructField(“a”, DoubleType(), True), \ StructField(“b”, DoubleType(), True), \ StructField(“c”, DoubleType(), True)]) Entonces tengo el DataFrame: DataFrame[a :double, b:double, c:double] Me gustaría tener una columna derivada de enteros. Soy capaz de crear una columna booleana: x = x.withColumn(‘y’, (xa-xb)/xc > 1) Mi nuevo […]

Spark ML Pipeline Causes java.lang.Exception: no se compiló … El código … crece más allá de 64 KB

Utilizando Spark 2.0, estoy tratando de ejecutar un VectorAssembler simple en un pipeline de pyspark ML, de esta manera: feature_assembler = VectorAssembler(inputCols=[‘category_count’, ‘name_count’], \ outputCol=”features”) pipeline = Pipeline(stages=[feature_assembler]) model = pipeline.fit(df_train) model_output = model.transform(df_train) Cuando trato de mirar la salida usando model_output.select(“features”).show(1) Me sale el error Py4JJavaError Traceback (most recent call last) in () 2 […]