Articles of apache spark sql

Obtener el lunes pasado en Spark

Estoy usando Spark 2.0 con la API de Python. Tengo un dataframe con una columna de tipo DateType (). Me gustaría agregar una columna al dataframe que contiene el lunes más reciente. Puedo hacerlo así: reg_schema = pyspark.sql.types.StructType([ pyspark.sql.types.StructField(‘AccountCreationDate’, pyspark.sql.types.DateType(), True), pyspark.sql.types.StructField(‘UserId’, pyspark.sql.types.LongType(), True) ]) reg = spark.read.schema(reg_schema).option(‘header’, True).csv(path_to_file) reg = reg.withColumn(‘monday’, pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,’E’) == ‘Mon’, […]

Filtrado de columnas en PySpark

Tengo un df dataframe cargado de la tabla Hive y tiene una columna de marca de tiempo, por ejemplo, ts , con el tipo de cadena de formato dd-MMM-yy hh.mm.ss.MS a (convertido a la biblioteca datetime de python, esto es %d-%b-%y %I.%M.%S.%f %p ). Ahora quiero filtrar las filas del dataframe que son de los […]

Composición de la función de fila PySpark

Como ejemplo simplificado, tengo un dataframe “df” con las columnas “col1, col2” y quiero calcular un máximo de filas después de aplicar una función a cada columna: def f(x): return (x+1) max_udf=udf(lambda x,y: max(x,y), IntegerType()) f_udf=udf(f, IntegerType()) df2=df.withColumn(“result”, max_udf(f_udf(df.col1),f_udf(df.col2))) Así que si df: col1 col2 1 2 3 0 Entonces df2: col1 col2 result 1 […]

Convertir una nueva columna derivada en un DataFrame de booleano a entero

Supongamos que tengo un DataFrame x con este esquema: xSchema = StructType([ \ StructField(“a”, DoubleType(), True), \ StructField(“b”, DoubleType(), True), \ StructField(“c”, DoubleType(), True)]) Entonces tengo el DataFrame: DataFrame[a :double, b:double, c:double] Me gustaría tener una columna derivada de enteros. Soy capaz de crear una columna booleana: x = x.withColumn(‘y’, (xa-xb)/xc > 1) Mi nuevo […]

¿Qué tipo debería ser, después de usar .toArray () para un vector Spark?

Quiero transferir mi vector a array, entonces uso get_array = udf(lambda x: x.toArray(),ArrayType(DoubleType())) result3 = result2.withColumn(‘list’,get_array(‘features’)) result3.show() donde las features la columna es dtype vector. Pero Spark me dice que net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct) Sé que la razón debe ser del tipo que uso en la UDF, así que […]

Columna de conversión que contiene varios formatos de fecha de cadena a DateTime en Spark

Tengo una columna de fecha en mi Spark DataDrame que contiene varios formatos de cadena. Me gustaría lanzar estos a DateTime. Los dos formatos en mi columna son: mm/dd/yyyy ; y yyyy-mm-dd Mi solución hasta ahora es utilizar un UDF para cambiar el primer formato de fecha para que coincida con el segundo de la […]

Convertir un RDD complejo en un RDD plano con PySpark

Tengo el siguiente CSV (muestra) id timestamp routeid creationdate parameters 1000 21-11-2016 22:55 14 21-11-2016 22:55 RSRP=-102, 1002 21-11-2016 22:55 14 21-11-2016 22:55 RA Req. SN=-146,TPC=4,RX Antennas=-8, 1003 21-11-2016 22:55 14 21-11-2016 22:55 RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191, Básicamente quiero separar los parámetros de una columna en varias columnas de la siguiente manera: id , timestamp, […]

¿Cómo llegar hoy? ¿Fecha de “1 día” en Sparksql?

Cómo obtener current_date – 1 día en sparksql, igual que cur_date()-1 en mysql.

Cómo ejecutar un archivo .sql en spark usando python

from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext conf = SparkConf().setAppName(“Test”).set(“spark.driver.memory”, “1g”) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) results = sqlContext.sql(“/home/ubuntu/workload/queryXX.sql”) Cuando ejecuto este comando usando: python test.py me da un error . y4j.protocol.Py4JJavaError: Se produjo un error al llamar a o20.sql. : java.lang.RuntimeException: [1.1] error: “ con ” esperado pero `/ […]

Spark ML Pipeline Causes java.lang.Exception: no se compiló … El código … crece más allá de 64 KB

Utilizando Spark 2.0, estoy tratando de ejecutar un VectorAssembler simple en un pipeline de pyspark ML, de esta manera: feature_assembler = VectorAssembler(inputCols=[‘category_count’, ‘name_count’], \ outputCol=”features”) pipeline = Pipeline(stages=[feature_assembler]) model = pipeline.fit(df_train) model_output = model.transform(df_train) Cuando trato de mirar la salida usando model_output.select(“features”).show(1) Me sale el error Py4JJavaError Traceback (most recent call last) in () 2 […]