Articles of apache spark sql

Agregar columna a PySpark DataFrame dependiendo de si el valor de la columna está en otra columna

Tengo un DataSrame de PySpark con estructura dada por [(‘u1’, 1, [1 ,2, 3]), (‘u1’, 4, [1, 2, 3])].toDF(‘user’, ‘item’, ‘fav_items’) Necesito agregar una columna adicional con 1 o 0 dependiendo de si el “elemento” está en “fav_items” o no. Asi que queria [(‘u1’, 1, [1 ,2, 3], 1), (‘u1’, 4, [1, 2, 3], 0)] […]

Pyspark cambiando el tipo de columna de fecha a cadena

Tengo el siguiente dataframe: corr_temp_df [(‘vacationdate’, ‘date’), (‘valueE’, ‘string’), (‘valueD’, ‘string’), (‘valueC’, ‘string’), (‘valueB’, ‘string’), (‘valueA’, ‘string’)] Ahora me gustaría cambiar el tipo de datos de la columna vacationdate a String, de modo que también el dataframe tome este nuevo tipo y sobrescriba los datos del tipo de datos para todas las entradas. Por ejemplo, […]

PySpark invirtiendo StringIndexer en una matriz anidada

Estoy usando PySpark para hacer un filtrado colaborativo usando ALS. Mi usuario original y los identificadores de elementos son cadenas, así que utilicé StringIndexer para convertirlos a índices numéricos (el modelo ALS de PySpark nos obliga a hacerlo). Una vez que haya instalado el modelo, puedo obtener las 3 recomendaciones principales para cada usuario como: […]

PySpark 1.5 Cómo truncar la marca de tiempo al minuto más cercano en segundos

Estoy usando PySpark. Tengo una columna (‘dt’) en un dataframe (‘canon_evt’) que esta es una marca de tiempo. Estoy tratando de eliminar segundos de un valor DateTime. Originalmente se lee desde parquet como una cadena. Entonces trato de convertirlo a Timestamp a través de canon_evt = canon_evt.withColumn(‘dt’,to_date(canon_evt.dt)) canon_evt= canon_evt.withColumn(‘dt’,canon_evt.dt.astype(‘Timestamp’)) Entonces me gustaría quitar los segundos. […]

Preservar indizador de cadena de chispa de correspondencia de cadena de índice

El StringIndexer de Spark es bastante útil, pero es común tener que recuperar las correspondencias entre los valores de índice generados y las cadenas originales, y parece que debería haber una forma integrada de lograrlo. Ilustraré usando este simple ejemplo de la documentación de Spark : from pyspark.ml.feature import StringIndexer df = sqlContext.createDataFrame( [(0, “a”), […]

Selección de valores de matriz vacía de un Spark DataFrame

Dado un DataFrame con las siguientes filas: rows = [ Row(col1=’abc’, col2=[8], col3=[18], col4=[16]), Row(col2=’def’, col2=[18], col3=[18], col4=[]), Row(col3=’ghi’, col2=[], col3=[], col4=[])] Me gustaría eliminar las filas con una matriz vacía para cada uno de col2 , col4 y col4 (es decir, la tercera fila). Por ejemplo, podría esperar que este código funcione: df.where(~df.col2.isEmpty(), ~df.col3.isEmpty(), […]

Filtrado de Sparksql (selección con la cláusula where) con múltiples condiciones

Hola tengo el siguiente problema: numeric.registerTempTable(“numeric”). Todos los valores que quiero filtrar son cadenas nulas literales y no valores N / A o nulos. Probé estas tres opciones: numeric_filtered = numeric.filter(numeric[‘LOW’] != ‘null’).filter(numeric[‘HIGH’] != ‘null’).filter(numeric[‘NORMAL’] != ‘null’) numeric_filtered = numeric.filter(numeric[‘LOW’] != ‘null’ AND numeric[‘HIGH’] != ‘null’ AND numeric[‘NORMAL’] != ‘null’) sqlContext.sql(“SELECT * from numeric WHERE […]

Tipo de datos para manejar grandes números en pyspark

Estoy usando chispa con python. Después de cargar un archivo csv, necesitaba analizar una columna en un archivo csv que tiene números que tienen 22 dígitos de longitud. Para analizar esa columna utilicé LongType () . Utilicé la función map () para definir la columna. Los siguientes son mis comandos en pyspark. >>> test=sc.textFile(“test.csv”) >>> […]

pyspark, compara dos filas en el dataframe

Estoy intentando comparar una fila en un dataframe con la siguiente para ver la diferencia en la marca de tiempo. Actualmente los datos se ven como: itemid | eventid | timestamp —————————- 134 | 30 | 2016-07-02 12:01:40 134 | 32 | 2016-07-02 12:21:23 125 | 30 | 2016-07-02 13:22:56 125 | 32 | 2016-07-02 […]

Agregue una columna vacía a Spark DataFrame

Como se mencionó en muchas otras ubicaciones en la web, agregar una nueva columna a un DataFrame existente no es sencillo. Desafortunadamente, es importante tener esta funcionalidad (aunque sea ineficiente en un entorno distribuido), especialmente cuando se trata de concatenar dos DataFrame usando unionAll . ¿Cuál es la solución más elegante para agregar una columna […]