Pyspark: Reemplazo de valor en una columna buscando un diccionario

Soy un novato en PySpark.

Tengo un df Spark DataFrame que tiene una columna ‘device_type’.

Quiero reemplazar todos los valores que se encuentran en “Tableta” o “Teléfono” por “Teléfono”, y reemplazar “PC” por “Escritorio”.

En Python puedo hacer lo siguiente,

 deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'} df['device_type'] = df['device_type'].replace(deviceDict,inplace=False) 

¿Cómo puedo lograr esto usando PySpark? ¡Gracias!

Puedes usar na.replace :

 df = spark.createDataFrame([ ('Tablet', ), ('Phone', ), ('PC', ), ('Other', ), (None, ) ], ["device_type"]) df.na.replace(deviceDict, 1).show() 
 +-----------+ |device_type| +-----------+ | Mobile| | Mobile| | Desktop| | Other| | null| +-----------+ 

o literal del mapa:

 from itertools import chain from pyspark.sql.functions import create_map, lit mapping = create_map([lit(x) for x in chain(*deviceDict.items())]) df.select(mapping[df['device_type']].alias('device_type')) 
 +-----------+ |device_type| +-----------+ | Mobile| | Mobile| | Desktop| | null| | null| +-----------+ 

Tenga en cuenta que la última solución convertirá los valores no presentes en la asignación a NULL . Si este no es un comportamiento deseado, puede agregar coalesce :

 from pyspark.sql.functions import coalesce df.select( coalesce(mapping[df['device_type']], df['device_type']).alias('device_type') ) 
 +-----------+ |device_type| +-----------+ | Mobile| | Mobile| | Desktop| | Other| | null| +-----------+ 

Puedes hacer esto usando df.withColumn también:

 from itertools import chain from pyspark.sql.functions import create_map, lit deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'} mapping_expr = create_map([lit(x) for x in chain(*deviceDict.items())]) df = df.withColumn('device_type', mapping_expr[df['dvice_type']]) df.show() 

Aquí hay una pequeña función auxiliar, inspirada en la función R recode , que resume las respuestas anteriores. Como beneficio adicional, agrega la opción para un valor predeterminado.

 from itertools import chain from pyspark.sql.functions import col, create_map, lit, when, isnull from pyspark.sql.column import Column df = spark.createDataFrame([ ('Tablet', ), ('Phone', ), ('PC', ), ('Other', ), (None, ) ], ["device_type"]) deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'} df.show() +-----------+ |device_type| +-----------+ | Tablet| | Phone| | PC| | Other| | null| +-----------+ 

Aquí está la definición de recode .

 def recode(col_name, map_dict, default=None): if not isinstance(col, Column): col_name = col(col_name) mapping_expr = create_map([lit(x) for x in chain(*map_dict.items())]) if default is None: return mapping_expr.getItem(col_name) else: return when(~isnull(mapping_expr.getItem(col_name)), mapping_expr.getItem(col_name)).otherwise(default) 

La creación de una columna sin un valor predeterminado da null / None en todos los valores no coincidentes.

 df.withColumn("device_type", recode('device_type', deviceDict)).show() +-----------+ |device_type| +-----------+ | Mobile| | Mobile| | Desktop| | null| | null| +-----------+ 

Por otro lado, la especificación de un valor por default reemplaza todos los valores no coincidentes con este por defecto.

 df.withColumn("device_type", recode('device_type', deviceDict, default='Other')).show() +-----------+ |device_type| +-----------+ | Mobile| | Mobile| | Desktop| | Other| | Other| +-----------+