Pasando una columna de dataframe y una lista externa a udf bajo withColumn

Tengo un dataframe Spark con la siguiente estructura. El bodyText_token tiene los tokens (procesado / conjunto de palabras). Y tengo una lista anidada de palabras clave definidas

root |-- id: string (nullable = true) |-- body: string (nullable = true) |-- bodyText_token: array (nullable = true) keyword_list=['union','workers','strike','pay','rally','free','immigration',], ['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']] 

Necesitaba verificar cuántos tokens caen en cada lista de palabras clave y agregar el resultado como una nueva columna del dataframe existente. Por ejemplo: si tokens =["become", "farmer","rally","workers","student"] el resultado será -> [1,2,0]

La siguiente función funcionó como se esperaba.

 def label_maker_topic(tokens,topic_words): twt_list = [] for i in range(0, len(topic_words)): count = 0 #print(topic_words[i]) for tkn in tokens: if tkn in topic_words[i]: count += 1 twt_list.append(count) return twt_list 

Usé udf en withColumn para acceder a la función y obtengo un error. Creo que se trata de pasar una lista externa a un udf. ¿Hay alguna manera de pasar la lista externa y la columna del cuadro de datos a un udf y agregar una nueva columna a mi cuadro de datos?

 topicWord = udf(label_maker_topic,StringType()) myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list)) 

La solución más limpia es pasar argumentos adicionales utilizando el cierre:

 def make_topic_word(topic_words): return udf(lambda c: label_maker_topic(c, topic_words)) df = sc.parallelize([(["union"], )]).toDF(["tokens"]) (df.withColumn("topics", make_topic_word(keyword_list)(col("tokens"))) .show()) 

Esto no requiere ningún cambio en keyword_list o la función que envuelve con UDF. También puede utilizar este método para pasar un objeto arbitrario. Esto se puede usar para pasar, por ejemplo, una lista de sets para búsquedas eficientes.

Si desea utilizar su UDF actual y pasar las topic_words directamente, topic_words deberá convertirlo a un literal de columna:

 from pyspark.sql.functions import array, lit ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list]) df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show() 

Dependiendo de sus datos y requisitos, puede haber soluciones alternativas, más eficientes, que no requieren UDF (explotar + agregar + colapsar) o búsquedas (hashing + operaciones vectoriales).

Lo siguiente funciona bien donde cualquier parámetro externo se puede pasar a la UDF (un código modificado para ayudar a cualquiera)

 topicWord=udf(lambda tkn: label_maker_topic(tkn,topic_words),StringType()) myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token))