Agregue PySpark RDD como nueva columna a pyspark.sql.dataframe

Tengo un pyspark.sql.dataframe donde cada fila es un artículo de noticias. Luego tengo un RDD que representa las palabras contenidas en cada artículo. Quiero agregar el RDD de las palabras como una columna denominada ‘palabras’ a mi dataframe de nuevos artículos. Lo intenté

df.withColumn('words', words_rdd ) 

pero me sale el error

 AssertionError: col should be Column 

El DataFrame se ve algo como esto

 Articles the cat and dog ran we went to the park today it will rain 

Pero tengo artículos de noticias 3k.

Apliqué una función para limpiar el texto, como eliminar palabras de parada y tengo un RDD que tiene este aspecto:

 [[cat, dog, ran],[we, went, park],[today, will, rain]] 

Estoy intentando que mi Dataframe se vea así:

 Articles Words the cat and dog ran [cat, dog, ran] we went to the park [we, went, park] today it will rain [today, will, rain] 

Descargo de responsabilidad :

Spark DataFrame en general no tiene un orden estrictamente definido. Úselo bajo su propio riesgo.

Agregar índice al DataFrame existente:

 from pyspark.sql.types import * df_index = spark.createDataFrame( df.rdd.zipWithIndex(), StructType([StructField("data", df.schema), StructField("id", LongType())]) ) 

Agregue un índice a RDD y DataFrame a DataFrame :

 words_df = spark.createDataFrame( words_rdd.zipWithIndex(), StructType([ StructField("words", ArrayType(StringType())), StructField("id", LongType()) ]) ) 

Unir ambos y seleccionar los campos requeridos:

 df_index.join(words_df, "id").select("data.*", "words") 

Precaución

Existen diferentes soluciones, que pueden funcionar en casos específicos, pero no garantizan el rendimiento ni la corrección. Éstos incluyen:

  • Usar monotonically_increasing_id como una clave de join , en general no es correcto.
  • El uso de la función de ventana row_number() como una clave de unión – implicación de rendimiento inaceptable y en general no es correcta si no hay un orden específico definido.
  • El uso de zip en RDDs puede funcionar solo si ambas estructuras tienen la misma distribución de datos (en este caso debería funcionar).

Nota :

En este caso específico no debería necesitar RDD . pyspark.ml.feature proporciona una variedad de Transformers , que deberían funcionar bien para usted.

 from pyspark.ml.feature import * from pyspark.ml import Pipeline df = spark.createDataFrame( ["the cat and dog ran", "we went to the park", "today it will rain"], "string" ).toDF("Articles") Pipeline(stages=[ RegexTokenizer(inputCol="Articles", outputCol="Tokens"), StopWordsRemover(inputCol="Tokens", outputCol="Words") ]).fit(df).transform(df).show() # +-------------------+--------------------+---------------+ # | Articles| Tokens| Words| # +-------------------+--------------------+---------------+ # |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]| # |we went to the park|[we, went, to, th...| [went, park]| # | today it will rain|[today, it, will,...| [today, rain]| # +-------------------+--------------------+---------------+ 

La lista de palabras de parada puede proporcionarse utilizando el parámetro stopWords de StopWordsRemover , por ejemplo:

 StopWordsRemover( inputCol="Tokens", outputCol="Words", stopWords=["the", "and", "we", "to", "it"] ) 

¿Por qué desea unir el rdd de nuevo al dataframe, prefiero crear una nueva columna de “Artículos” directamente. Hay varias maneras de hacerlo, aquí están mis 5 centavos:

 from pyspark.sql import Row from pyspark.sql.context import SQLContext sqlCtx = SQLContext(sc) # sc is the sparkcontext x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')] df = sqlCtx.createDataFrame(x) df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words']) df2.show() 

Obtienes la siguiente salida:

 Articles words the cat and dog ran [the, cat, and, dog, ran] we went to the park [we, went, to, the, park] today it will rain [today, it, will, rain] 

Déjame saber si estabas buscando lograr algo más.

Un enfoque simple pero efectivo sería usar udf . Usted puede:

 from pyspark.sql.functions import udf from pyspark.sql.types import StringType df = spark.createDataFrame(["the cat and dog ran", "we went to the park", "today it will rain", None], "string" ).toDF("Articles") split_words = udf(lambda x : x.split(' ') if x is not None else x, StringType()) df = df.withColumn('Words', split_words(df['Articles'])) df.show(10,False) >> +-------------------+-------------------------+ |Articles |Words | +-------------------+-------------------------+ |the cat and dog ran|[the, cat, and, dog, ran]| |we went to the park|[we, went, to, the, park]| |today it will rain |[today, it, will, rain] | |null |null | +-------------------+-------------------------+ 

Agregué la checkbox de Ninguno porque es muy común que aparezcan en sus datos líneas erróneas. Puedes soltarlos fácilmente después de dividirlos o antes, con dropna.

Pero en mi opinión, si desea hacer esto como una tarea de preparación para el análisis de texto, probablemente le convenga construir un Pipeline como sugiere @ user9613318 en su respuesta.

 rdd1 = spark.sparkContext.parallelize([1, 2, 3, 5]) # make some transformation on rdd1: rdd2 = rdd.map(lambda n: True if n % 2 else False) # Append each row in rdd2 to those in rdd1. rdd1.zip(rdd2).collect()