Tengo un DataSrame de PySpark con estructura dada por [(‘u1’, 1, [1 ,2, 3]), (‘u1’, 4, [1, 2, 3])].toDF(‘user’, ‘item’, ‘fav_items’) Necesito agregar una columna adicional con 1 o 0 dependiendo de si el “elemento” está en “fav_items” o no. Asi que queria [(‘u1’, 1, [1 ,2, 3], 1), (‘u1’, 4, [1, 2, 3], 0)] […]
Tengo el siguiente dataframe: corr_temp_df [(‘vacationdate’, ‘date’), (‘valueE’, ‘string’), (‘valueD’, ‘string’), (‘valueC’, ‘string’), (‘valueB’, ‘string’), (‘valueA’, ‘string’)] Ahora me gustaría cambiar el tipo de datos de la columna vacationdate a String, de modo que también el dataframe tome este nuevo tipo y sobrescriba los datos del tipo de datos para todas las entradas. Por ejemplo, […]
Estoy leyendo un conjunto de datos como abajo. f = sc.textFile(“s3://test/abc.csv”) Mi archivo contiene más de 50 campos y quiero asignar encabezados de columna para que cada uno de los campos haga referencia más adelante en mi script. ¿Cómo hago eso en PySpark? ¿Es DataFrame la forma de ir aquí? PS – Newbie to Spark.
Estoy escribiendo un trabajo de chispa usando python. Sin embargo, necesito leer un montón de archivos avro. Esta es la solución más cercana que he encontrado en la carpeta de ejemplo de Spark. Sin embargo, debe enviar este script de python utilizando spark-submit. En la línea de comandos de spark-submit, puede especificar la clase de […]
Sigo viendo estas advertencias cuando uso trainImplicit : WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. Y luego el tamaño de la tarea comienza a boost. Intenté llamar a repartition en el RDD de entrada pero las advertencias son las mismas. Todas […]
Estoy trabajando con Spark 1.3.0 usando PySpark y MLlib y necesito guardar y cargar mis modelos. Uso código como este (tomado de la documentación oficial) from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating data = sc.textFile(“data/mllib/als/test.data”) ratings = data.map(lambda l: l.split(‘,’)).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) rank = 10 numIterations = 20 model = ALS.train(ratings, rank, numIterations) testdata […]
Tengo un dataframe y quiero aplicar una función a cada fila. Esta función depende de otros marcos de datos. Ejemplo simplificado. Tengo tres marcos de datos como a continuación: df = sc.parallelize([ [‘a’, ‘b’, 1], [‘c’, ‘d’, 3] ]).toDF((‘feat1’, ‘feat2’, ‘value’)) df_other_1 = sc.parallelize([ [‘a’, 0, 1, 0.0], [‘a’, 1, 3, 0.1], [‘a’, 3, 10, […]
Estoy usando PyCharm 2018.1 usando Python 3.4 con Spark 2.3 instalado a través de pip en un virtualenv. No hay instalación de hadoop en el host local, por lo que no hay instalación de Spark (por lo tanto no hay SPARK_HOME, HADOOP_HOME, etc.) Cuando bash esto: from pyspark import SparkConf from pyspark import SparkContext conf […]
Estoy usando PySpark para hacer un filtrado colaborativo usando ALS. Mi usuario original y los identificadores de elementos son cadenas, así que utilicé StringIndexer para convertirlos a índices numéricos (el modelo ALS de PySpark nos obliga a hacerlo). Una vez que haya instalado el modelo, puedo obtener las 3 recomendaciones principales para cada usuario como: […]
Estoy usando PySpark. Tengo una columna (‘dt’) en un dataframe (‘canon_evt’) que esta es una marca de tiempo. Estoy tratando de eliminar segundos de un valor DateTime. Originalmente se lee desde parquet como una cadena. Entonces trato de convertirlo a Timestamp a través de canon_evt = canon_evt.withColumn(‘dt’,to_date(canon_evt.dt)) canon_evt= canon_evt.withColumn(‘dt’,canon_evt.dt.astype(‘Timestamp’)) Entonces me gustaría quitar los segundos. […]