Codificar y ensamblar múltiples características en PySpark

Tengo una clase de Python que estoy usando para cargar y procesar algunos datos en Spark. Entre las varias cosas que debo hacer, estoy generando una lista de variables ficticias derivadas de varias columnas en un dataframe de Spark. Mi problema es que no estoy seguro de cómo definir correctamente una función definida por el usuario para lograr lo que necesito.

Actualmente tengo un método que, cuando se asigna sobre el RDD del dataframe subyacente, resuelve la mitad del problema (recuerde que este es un método en una clase de data_processor más data_processor ):

 def build_feature_arr(self,table): # this dict has keys for all the columns for which I need dummy coding categories = {'gender':['1','2'], ..} # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file if table == 'users': iter_over = self.config.dyadic_features_to_include elif table == 'activty': iter_over = self.config.user_features_to_include def _build_feature_arr(row): result = [] row = row.asDict() for col in iter_over: column_value = str(row[col]).lower() cats = categories[col] result += [1 if column_value and cat==column_value else 0 for cat in cats] return result return _build_feature_arr 

Esencialmente, lo que hace esto es, para el dataframe especificado, toma los valores de las variables categóricas para las columnas especificadas y devuelve una lista de los valores de estas nuevas variables ficticias. Eso significa el siguiente código:

 data = data_processor(init_args) result = data.user_data.rdd.map(self.build_feature_arr('users')) 

devuelve algo como:

 In [39]: result.take(10) Out[39]: [[1, 0, 0, 0, 1, 0], [1, 0, 0, 1, 0, 0], [1, 0, 0, 0, 0, 0], [1, 0, 1, 0, 0, 0], [1, 0, 0, 1, 0, 0], [1, 0, 0, 1, 0, 0], [0, 1, 1, 0, 0, 0], [1, 0, 1, 1, 0, 0], [1, 0, 0, 1, 0, 0], [1, 0, 0, 0, 0, 1]] 

Esto es exactamente lo que quiero en términos de generar la lista de variables ficticias que quiero, pero aquí está mi pregunta: ¿Cómo puedo (a) crear un UDF con una funcionalidad similar que pueda usar en una consulta Spark SQL (o de alguna otra forma)? , Supongo), o (b) tomar el RDD resultante del mapa descrito anteriormente y agregarlo como una nueva columna al dataframe de datos de usuario?

De cualquier manera, lo que necesito hacer es generar un nuevo dataframe que contenga las columnas de user_data, junto con una nueva columna (llamémosle feature_array ) que contenga la salida de la función anterior (o algo funcionalmente equivalente).

Chispa> = 2.3

Dado que Spark 2.3 OneHotEncoder está en desuso a favor de OneHotEncoderEstimator . Si usa una versión reciente, modifique el código del encoder

 from pyspark.ml.feature import OneHotEncoderEstimator encoder = OneHotEncoderEstimator( inputCols=["gender_numeric"], outputCols=["gender_vector"] ) 

Chispa <2.3

Bueno, puedes escribir un UDF pero ¿por qué lo harías? Ya hay bastantes herramientas diseñadas para manejar esta categoría de tareas:

 from pyspark.sql import Row from pyspark.ml.linalg import DenseVector row = Row("gender", "foo", "bar") df = sc.parallelize([ row("0", 3.0, DenseVector([0, 2.1, 1.0])), row("1", 1.0, DenseVector([0, 1.1, 1.0])), row("1", -1.0, DenseVector([0, 3.4, 0.0])), row("0", -3.0, DenseVector([0, 4.1, 0.0])) ]).toDF() 

En primer lugar StringIndexer .

 from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df) indexed_df = indexer.transform(df) indexed_df.drop("bar").show() ## +------+----+--------------+ ## |gender| foo|gender_numeric| ## +------+----+--------------+ ## | 0| 3.0| 0.0| ## | 1| 1.0| 1.0| ## | 1|-1.0| 1.0| ## | 0|-3.0| 0.0| ## +------+----+--------------+ 

Siguiente OneHotEncoder :

 from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector") encoded_df = encoder.transform(indexed_df) encoded_df.drop("bar").show() ## +------+----+--------------+-------------+ ## |gender| foo|gender_numeric|gender_vector| ## +------+----+--------------+-------------+ ## | 0| 3.0| 0.0|(1,[0],[1.0])| ## | 1| 1.0| 1.0| (1,[],[])| ## | 1|-1.0| 1.0| (1,[],[])| ## | 0|-3.0| 0.0|(1,[0],[1.0])| ## +------+----+--------------+-------------+ 

VectorAssembler :

 from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler( inputCols=["gender_vector", "bar", "foo"], outputCol="features") encoded_df_with_indexed_bar = (vector_indexer .fit(encoded_df) .transform(encoded_df)) final_df = assembler.transform(encoded_df) 

Si la bar contenía variables categóricas, podría usar VectorIndexer para establecer los metadatos requeridos:

 from pyspark.ml.feature import VectorIndexer vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed") 

Pero no es el caso aquí.

Finalmente puedes envolver todo eso usando tuberías:

 from pyspark.ml import Pipeline pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler]) model = pipeline.fit(df) transformed = model.transform(df) 

Podría decirse que es un enfoque mucho más robusto y limpio que escribir todo desde cero. Hay algunas advertencias, especialmente cuando necesita una encoding consistente entre diferentes conjuntos de datos. Puede leer más en la documentación oficial de StringIndexer y VectorIndexer .

Otra forma de obtener una salida comparable es RFormula que :

RFormula produce una columna vectorial de características y una columna de cadena doble o cadena de etiqueta. Al igual que cuando se usan las fórmulas en R para la regresión lineal, las columnas de entrada de cadena se codificarán por una sola vez y las columnas numéricas se convertirán a dobles. Si la columna de la etiqueta es de tipo cadena, primero se transformará para duplicarse con StringIndexer . Si la columna de etiqueta no existe en el dataframe, la columna de etiqueta de salida se creará a partir de la variable de respuesta especificada en la fórmula.

 from pyspark.ml.feature import RFormula rf = RFormula(formula="~ gender + bar + foo - 1") final_df_rf = rf.fit(df).transform(df) 

Como puede ver, es mucho más conciso, pero más difícil de componer no permite mucha personalización. Sin embargo, el resultado para una tubería simple como esta será idéntico:

 final_df_rf.select("features").show(4, False) ## +----------------------+ ## |features | ## +----------------------+ ## |[1.0,0.0,2.1,1.0,3.0] | ## |[0.0,0.0,1.1,1.0,1.0] | ## |(5,[2,4],[3.4,-1.0]) | ## |[1.0,0.0,4.1,0.0,-3.0]| ## +----------------------+ final_df.select("features").show(4, False) ## +----------------------+ ## |features | ## +----------------------+ ## |[1.0,0.0,2.1,1.0,3.0] | ## |[0.0,0.0,1.1,1.0,1.0] | ## |(5,[2,4],[3.4,-1.0]) | ## |[1.0,0.0,4.1,0.0,-3.0]| ## +----------------------+ 

Con respecto a sus preguntas:

Hacer un UDF con una funcionalidad similar que pueda usar en una consulta Spark SQL (o de alguna otra forma, supongo)

Es solo un UDF como cualquier otro. Asegúrate de usar tipos compatibles y, además, todo debería funcionar bien.

tomar el RDD resultante del mapa descrito anteriormente y agregarlo como una nueva columna al dataframe user_data?

 from pyspark.ml.linalg import VectorUDT from pyspark.sql.types import StructType, StructField schema = StructType([StructField("features", VectorUDT(), True)]) row = Row("features") result.map(lambda x: row(DenseVector(x))).toDF(schema) 

Nota :

Para Spark 1.x, reemplace pyspark.ml.linalg con pyspark.mllib.linalg .