Articles of apache spark

Devuelva un RDD de takeOrdered, en lugar de una lista

Estoy usando pyspark para hacer un poco de limpieza de datos. Una operación muy común es tomar un subconjunto pequeño de un archivo y exportarlo para su inspección: (self.spark_context.textFile(old_filepath+filename) .takeOrdered(100) .saveAsTextFile(new_filepath+filename)) Mi problema es que takeOrdered está devolviendo una lista en lugar de un RDD, por lo que saveAsTextFile no funciona. AttributeError: ‘list’ object has […]

PySpark – Agregar una nueva columna con un rango por usuario

Tengo este DataSrame de PySpark df = pd.DataFrame(np.array([ [“aa@gmail.com”,2,3], [“aa@gmail.com”,5,5], [“bb@gmail.com”,8,2], [“cc@gmail.com”,9,3] ]), columns=[‘user’,’movie’,’rating’]) sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1) user movie rating aa@gmail.com 2 3 aa@gmail.com 5 5 bb@gmail.com 8 2 cc@gmail.com 9 3 Necesito agregar una nueva columna con un rango por usuario Quiero tener esta salida user movie rating Rank aa@gmail.com 2 3 1 […]

pyspark: guardar schemaRDD como archivo json

Estoy buscando una manera de exportar datos de Apache Spark a otras herramientas en formato JSON. Supongo que debe haber una manera realmente sencilla de hacerlo. Ejemplo: tengo el siguiente archivo JSON ‘jfile.json‘: {“key”:value_a1, “key2”:value_b1}, {“key”:value_a2, “key2”:value_b2}, {…} donde cada línea del archivo es un objeto JSON. Este tipo de archivos se pueden leer fácilmente […]

Implementar un UDF de java y llamarlo desde pyspark

Necesito crear un UDF para usar en pyspark python que usa un objeto java para sus cálculos internos. Si fuera un simple python, haría algo como: def f(x): return 7 fudf = pyspark.sql.functions.udf(f,pyspark.sql.types.IntegerType()) y llámalo usando: df = sqlContext.range(0,5) df2 = df.withColumn(“a”,fudf(df.id)).show() Sin embargo, la implementación de la función que necesito está en java y […]

Transformación de estilo pandas de datos agrupados en PySpark DataFrame

Si tenemos un dataframe de Pandas que consta de una columna de categorías y una columna de valores, podemos eliminar la media de cada categoría haciendo lo siguiente: df[“DemeanedValues”] = df.groupby(“Category”)[“Values”].transform(lambda g: g – numpy.mean(g)) Según tengo entendido, los marcos de datos de Spark no ofrecen directamente esta operación de agrupación / transformación (estoy usando […]

¿Cómo unirse en múltiples columnas en Pyspark?

Estoy usando Spark 1.3 y me gustaría unirme en varias columnas usando la interfaz de Python (SparkSQL) Los siguientes trabajos: Primero los registro como tablas temporales. numeric.registerTempTable(“numeric”) Ref.registerTempTable(“Ref”) test = numeric.join(Ref, numeric.ID == Ref.ID, joinType=’inner’) Ahora me gustaría unirme a ellos en base a múltiples columnas. Me sale SyntaxError : syntax no válida con esto: […]

Cómo cambiar las propiedades de SparkContext en la sesión de PySpark interactivo

¿Cómo puedo cambiar spark.driver.maxResultSize en el shell interactivo pyspark? He usado el siguiente código from pyspark import SparkConf, SparkContext conf = (SparkConf() .set(“spark.driver.maxResultSize”, “10g”)) sc.stop() sc=SparkContext(conf) pero me da el error AttributeError: ‘SparkConf’ object has no attribute ‘_get_object_id’

pyspark EOFError después de llamar al mapa

Soy nuevo en spark & ​​pyspark. Estoy leyendo un pequeño archivo csv (~ 40k) en un dataframe. from pyspark.sql import functions as F df = sqlContext.read.format(‘com.databricks.spark.csv’).options(header=’true’, inferschema=’true’).load(‘/tmp/sm.csv’) df = df.withColumn(‘verified’, F.when(df[‘verified’] == ‘Y’, 1).otherwise(0)) df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF() Me sale un error extraño que no ocurre todas las veces, pero ocurre con bastante […]

¿Cómo eliminar un RDD en PySpark con el fin de liberar recursos?

Si tengo un RDD que ya no necesito, ¿cómo lo borro de la memoria? ¿Sería suficiente lo siguiente para hacer esto? del thisRDD ¡Gracias!

Serialización PySpark EOFError

Estoy leyendo un CSV como Spark DataFrame y estoy realizando operaciones de aprendizaje automático en él. Sigo recibiendo un EOFError de serialización de Python, ¿alguna idea de por qué? Pensé que podría ser un problema de memoria, es decir, un archivo que excede la RAM disponible, pero reducir drásticamente el tamaño del DataFrame no impidió […]