PySpark agrega una columna a un DataFrame desde una columna TimeStampType

Tengo un DataFrame que se parece a eso. Quiero operar el día del campo date_time .

 root |-- host: string (nullable = true) |-- user_id: string (nullable = true) |-- date_time: timestamp (nullable = true) 

Intenté agregar una columna para extraer el día. Hasta ahora mis bashs han fallado.

 df = df.withColumn("day", df.date_time.getField("day")) org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type TimestampType; 

Esto también ha fallado

 df = df.withColumn("day", df.select("date_time").map(lambda row: row.date_time.day)) AttributeError: 'PipelinedRDD' object has no attribute 'alias' 

¿Alguna idea de cómo se puede hacer esto?

Puedes usar un map simple:

 df.rdd.map(lambda row: Row(row.__fields__ + ["day"])(row + (row.date_time.day, )) ) 

Otra opción es registrar una función y ejecutar la consulta SQL:

 sqlContext.registerFunction("day", lambda x: x.day) sqlContext.registerDataFrameAsTable(df, "df") sqlContext.sql("SELECT *, day(date_time) as day FROM df") 

Finalmente puedes definir udf así:

 from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType day = udf(lambda date_time: date_time.day, IntegerType()) df.withColumn("day", day(df.date_time)) 

EDITAR :

En realidad, si utiliza la función de day SQL sin formato ya está definida (al menos en Spark 1.4), por lo que puede omitir el registro udf. También proporciona una serie de diferentes funciones de procesamiento de fecha, incluyendo:

  • Captadores como year , month , dayofmonth month

  • Herramientas de aritmética de fecha como date_add , datediff

  • analizadores como from_unixtime y formateadores como date_format

También es posible usar expresiones de fecha simples como:

 current_timestamp() - expr("INTERVAL 1 HOUR") 

Significa que puede generar consultas relativamente complejas sin pasar datos a Python. Por ejemplo:

 df = sc.parallelize([ (1, "2016-01-06 00:04:21"), (2, "2016-05-01 12:20:00"), (3, "2016-08-06 00:04:21") ]).toDF(["id", "ts_"]) now = lit("2016-06-01 00:00:00").cast("timestamp") five_months_ago = now - expr("INTERVAL 5 MONTHS") (df # Cast string to timestamp # For Spark 1.5 use cast("double").cast("timestamp") .withColumn("ts", unix_timestamp("ts_").cast("timestamp")) # Find all events in the last five months .where(col("ts").between(five_months_ago, now)) # Find first Sunday after the event .withColumn("next_sunday", next_day(col("ts"), "Sun")) # Compute difference in days .withColumn("diff", datediff(col("ts"), col("next_sunday"))))