¿Cómo convierto una columna de matriz (es decir, lista) a Vector

Versión corta de la pregunta!

Considere el siguiente fragmento de SparkSession (suponiendo que la spark ya está establecida en algunas SparkSession ):

 from pyspark.sql import Row source_data = [ Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]), Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), ] df = spark.createDataFrame(source_data) 

Observe que el campo de temperaturas es una lista de flotadores. Me gustaría convertir estas listas de flotadores al Vector tipo MLlib, y me gustaría que esta conversión se exprese mediante la API básica de DataFrame lugar de ir a través de RDD (lo cual es ineficiente porque envía todos los datos desde la JVM a Python). el procesamiento se realiza en Python, no obtenemos los beneficios del optimizador Catalyst de Spark, yada yada). ¿Cómo hago esto? Específicamente:

  1. ¿Hay alguna manera de hacer funcionar un elenco recto? Consulte a continuación para obtener detalles (y un bash fallido de solución). O, ¿hay alguna otra operación que tenga el efecto que buscaba?
  2. ¿Cuál es más eficiente de las dos soluciones alternativas que sugiero a continuación (UDF vs explosión / reensamblado de los elementos en la lista)? ¿O hay otras alternativas casi correctas pero no correctas que sean mejores que cualquiera de ellas?

Un elenco recto no funciona

Esto es lo que esperaría que fuera la solución “adecuada”. Quiero convertir el tipo de una columna de un tipo a otro, por lo que debería usar una conversión. Como un poco de contexto, permítame recordarle la forma normal de convertirlo en otro tipo:

 from pyspark.sql import types df_with_strings = df.select( df["city"], df["temperatures"].cast(types.ArrayType(types.StringType()))), ) 

Ahora, por ejemplo, df_with_strings.collect()[0]["temperatures"][1] es '-7.0' . Pero si lanzo a un vector de ml, entonces las cosas no van tan bien:

 from pyspark.ml.linalg import VectorUDT df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT())) 

Esto da un error:

 pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY, `values`: ARRAY>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; 'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)] +- LogicalRDD [city#0, temperatures#1] " 

¡Ay! Alguna idea de cómo solucionar este problema?

Posibles alternativas

Alternativa 1: Usar VectorAssembler

Hay un Transformer que parece casi ideal para este trabajo: el VectorAssembler . Toma una o más columnas y las concatena en un solo vector. Desafortunadamente, solo toma columnas Vector y Float , no columnas de Array , por lo que el siguiente no funciona:

 from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector") df_fail = assembler.transform(df) 

Da este error:

 pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.' 

El mejor trabajo que se me ocurre es hacer explotar la lista en varias columnas y luego usar el VectorAssembler para recostackrlas de nuevo:

 from pyspark.ml.feature import VectorAssembler TEMPERATURE_COUNT = 3 assembler_exploded = VectorAssembler( inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], outputCol="temperature_vector" ) df_exploded = df.select( df["city"], *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)] ) converted_df = assembler_exploded.transform(df_exploded) final_df = converted_df.select("city", "temperature_vector") 

Esto parece que sería ideal, excepto que TEMPERATURE_COUNT tendrá más de 100 y, a veces, más de 1000. (Otro problema es que el código sería más complicado si no conoce el tamaño de la matriz de antemano, aunque eso es no es el caso de mis datos.) ¿Spark realmente genera un conjunto de datos intermedios con tantas columnas, o simplemente considera esto como un paso intermedio por el que pasan los elementos individuales de forma transitoria (o incluso optimiza este paso de distancia por completo cuando ve eso)? el único uso de estas columnas es ser ensamblado en un vector)?

Alternativa 2: utilizar un UDF

Una alternativa bastante más simple es usar un UDF para hacer la conversión. Esto me permite express de manera bastante directa lo que quiero hacer en una línea de código, y no requiere hacer un conjunto de datos con un número loco de columnas. Pero todos los datos deben intercambiarse entre Python y la JVM, y Python debe manejar cada número individual (lo que es notoriamente lento para iterar sobre elementos de datos individuales). Aquí es cómo se ve:

 from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT()) df_with_vectors = df.select( df["city"], list_to_vector_udf(df["temperatures"]).alias("temperatures") ) 

Comentarios ignorantes

Las secciones restantes de esta pregunta incomprensible son algunas cosas adicionales que se me ocurrieron al intentar encontrar una respuesta. Es probable que la mayoría de las personas que las leen puedan saltearlas

No es una solución: usa Vector para empezar

En este ejemplo trivial, es posible crear los datos utilizando el tipo de vector para comenzar, pero, por supuesto, mis datos no son realmente una lista de Python con la que estoy paralelizando, sino que se están leyendo desde una fuente de datos. Pero para el registro, aquí está cómo se vería:

 from pyspark.ml.linalg import Vectors from pyspark.sql import Row source_data = [ Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])), Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])), ] df = spark.createDataFrame(source_data) 

Solución ineficiente: usar map()

Una posibilidad es usar el método RDD map() para transformar la lista en un Vector . Esto es similar a la idea UDF, excepto que es aún peor porque el costo de la serialización, etc., se incurre en todos los campos de cada fila, no solo en el que se está operando. Para el registro, esto es cómo se vería esa solución:

 df_with_vectors = df.rdd.map(lambda row: Row( city=row["city"], temperatures=Vectors.dense(row["temperatures"]) )).toDF() 

Falló el bash de una solución para el lanzamiento

En su desesperación, noté que Vector está representado internamente por una estructura con cuatro campos, pero el uso de un molde tradicional de ese tipo de estructura tampoco funciona. Aquí hay una ilustración (donde construí la estructura usando un udf pero el udf no es la parte importante):

 from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType()) df_almost_vector = df.select( df["city"], list_to_almost_vector_udf(df["temperatures"]).alias("temperatures") ) df_with_vectors = df_almost_vector.select( df_almost_vector["city"], df_almost_vector["temperatures"].cast(VectorUDT()) ) 

Esto da el error:

 pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY, `values`: ARRAY>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; 'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)] +- Project [city#0, (temperatures#1) AS temperatures#5] +- LogicalRDD [city#0, temperatures#1] " 

Personalmente iría con Python UDF y no me molestaría en nada más:

  • Vectors no son tipos de SQL nativos, por lo que habrá una sobrecarga de rendimiento de una forma u otra. En particular, este proceso requiere dos pasos donde los datos se convierten primero de tipo externo a fila , y luego de fila a representación interna usando RowEncoder genérico .
  • Cualquier Pipeline ML descendente será mucho más costosa que una simple conversión. Además, requiere un proceso que es opuesto al descrito anteriormente.

Pero si realmente quieres otras opciones aquí estás:

  • Scala UDF con envoltura de Python:

    Instale sbt siguiendo las instrucciones en el sitio del proyecto.

    Crear paquete Scala con la siguiente estructura:

     . ├── build.sbt └── udfs.scala 

    Edite build.sbt (ajuste para reflejar las versiones de Scala y Spark):

     scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.spark" %% "spark-mllib" % "2.1.0" ) 

    Editar udfs.scala :

     package com.example.spark.udfs import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) } 

    Paquete:

     sbt package 

    e incluir (o equivalente dependiendo de Scala vers:

     $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar 

    como argumento para --driver-class-path al iniciar el shell / al enviar la aplicación.

    En PySpark definimos una envoltura:

     from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column))) 

    Prueba:

     with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show() 
     +--------+------------------+----------------+ | city| temperatures| vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema() 
     root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true) 
  • Vuelque los datos a un formato JSON que refleje el esquema DenseVector y vuelva a leerlo:

     from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show() 
     +--------+------------------+----------------+ | city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ 
     with_parsed_vector.printSchema() 
     root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true) 

Tuve un mismo problema como tú y lo hice de esta manera. Esta forma incluye la transformación RDD, por lo que no es crítico para el rendimiento, pero funciona.

 from pyspark.sql import Row from pyspark.ml.linalg import Vectors source_data = [ Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]), Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), ] df = spark.createDataFrame(source_data) city_rdd = df.rdd.map(lambda row:row[0]) temp_rdd = df.rdd.map(lambda row:row[1]) new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures']) new_df 

el resultado es,

 DataFrame[city: string, temperatures: vector]