Articles of pyspark

Agregar columna a PySpark DataFrame dependiendo de si el valor de la columna está en otra columna

Tengo un DataSrame de PySpark con estructura dada por [(‘u1’, 1, [1 ,2, 3]), (‘u1’, 4, [1, 2, 3])].toDF(‘user’, ‘item’, ‘fav_items’) Necesito agregar una columna adicional con 1 o 0 dependiendo de si el “elemento” está en “fav_items” o no. Asi que queria [(‘u1’, 1, [1 ,2, 3], 1), (‘u1’, 4, [1, 2, 3], 0)] […]

Pyspark cambiando el tipo de columna de fecha a cadena

Tengo el siguiente dataframe: corr_temp_df [(‘vacationdate’, ‘date’), (‘valueE’, ‘string’), (‘valueD’, ‘string’), (‘valueC’, ‘string’), (‘valueB’, ‘string’), (‘valueA’, ‘string’)] Ahora me gustaría cambiar el tipo de datos de la columna vacationdate a String, de modo que también el dataframe tome este nuevo tipo y sobrescriba los datos del tipo de datos para todas las entradas. Por ejemplo, […]

¿Cómo asignar y usar encabezados de columna en Spark?

Estoy leyendo un conjunto de datos como abajo. f = sc.textFile(“s3://test/abc.csv”) Mi archivo contiene más de 50 campos y quiero asignar encabezados de columna para que cada uno de los campos haga referencia más adelante en mi script. ¿Cómo hago eso en PySpark? ¿Es DataFrame la forma de ir aquí? PS – Newbie to Spark.

Cómo leer el archivo avro en PySpark

Estoy escribiendo un trabajo de chispa usando python. Sin embargo, necesito leer un montón de archivos avro. Esta es la solución más cercana que he encontrado en la carpeta de ejemplo de Spark. Sin embargo, debe enviar este script de python utilizando spark-submit. En la línea de comandos de spark-submit, puede especificar la clase de […]

Spark MLlib – advertencia de tren implícita

Sigo viendo estas advertencias cuando uso trainImplicit : WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. Y luego el tamaño de la tarea comienza a boost. Intenté llamar a repartition en el RDD de entrada pero las advertencias son las mismas. Todas […]

¿Cuál es la forma correcta de guardar \ cargar modelos en Spark \ PySpark?

Estoy trabajando con Spark 1.3.0 usando PySpark y MLlib y necesito guardar y cargar mis modelos. Uso código como este (tomado de la documentación oficial) from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating data = sc.textFile(“data/mllib/als/test.data”) ratings = data.map(lambda l: l.split(‘,’)).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) rank = 10 numIterations = 20 model = ALS.train(ratings, rank, numIterations) testdata […]

¿Cómo pasar DataFrame como entrada a Spark UDF?

Tengo un dataframe y quiero aplicar una función a cada fila. Esta función depende de otros marcos de datos. Ejemplo simplificado. Tengo tres marcos de datos como a continuación: df = sc.parallelize([ [‘a’, ‘b’, 1], [‘c’, ‘d’, 3] ]).toDF((‘feat1’, ‘feat2’, ‘value’)) df_other_1 = sc.parallelize([ [‘a’, 0, 1, 0.0], [‘a’, 1, 3, 0.1], [‘a’, 3, 10, […]

¿Cómo puedo leer desde S3 en pyspark ejecutando en modo local?

Estoy usando PyCharm 2018.1 usando Python 3.4 con Spark 2.3 instalado a través de pip en un virtualenv. No hay instalación de hadoop en el host local, por lo que no hay instalación de Spark (por lo tanto no hay SPARK_HOME, HADOOP_HOME, etc.) Cuando bash esto: from pyspark import SparkConf from pyspark import SparkContext conf […]

PySpark invirtiendo StringIndexer en una matriz anidada

Estoy usando PySpark para hacer un filtrado colaborativo usando ALS. Mi usuario original y los identificadores de elementos son cadenas, así que utilicé StringIndexer para convertirlos a índices numéricos (el modelo ALS de PySpark nos obliga a hacerlo). Una vez que haya instalado el modelo, puedo obtener las 3 recomendaciones principales para cada usuario como: […]

PySpark 1.5 Cómo truncar la marca de tiempo al minuto más cercano en segundos

Estoy usando PySpark. Tengo una columna (‘dt’) en un dataframe (‘canon_evt’) que esta es una marca de tiempo. Estoy tratando de eliminar segundos de un valor DateTime. Originalmente se lee desde parquet como una cadena. Entonces trato de convertirlo a Timestamp a través de canon_evt = canon_evt.withColumn(‘dt’,to_date(canon_evt.dt)) canon_evt= canon_evt.withColumn(‘dt’,canon_evt.dt.astype(‘Timestamp’)) Entonces me gustaría quitar los segundos. […]