Articles of apache spark sql

Spark dataframe transformar varias filas en columna

Soy un principiante para encender, y quiero transformarme debajo del dataframe de origen (cargar desde un archivo JSON): +–+—–+—–+ |A |count|major| +–+—–+—–+ | a| 1| m1| | a| 1| m2| | a| 2| m3| | a| 3| m4| | b| 4| m1| | b| 1| m2| | b| 2| m3| | c| 3| m1| | […]

Ejemplo de PySpark y difusión unir

Estoy usando Spark 1.3 # Read from text file, parse it and then do some basic filtering to get data1 data1.registerTempTable(‘data1’) # Read from text file, parse it and then do some basic filtering to get data1 data2.registerTempTable(‘data2’) # Perform join data_joined = data1.join(data2, data1.id == data2.id); Mis datos son bastante sesgados y data2 (pocos […]

Pyspark DataFrame UDF en columna de texto

Estoy tratando de hacer un poco de limpieza de texto NLP de algunas columnas Unicode en un DataSrame de PySpark. Lo he probado en Spark 1.3, 1.5 y 1.6 y parece que no puedo hacer que las cosas funcionen para mi vida. También he intentado usar Python 2.7 y Python 3.4. He creado un udf […]

Apache Spark – Asigna el resultado de UDF a múltiples columnas de marcos de datos

Estoy usando pyspark, cargando un archivo csv grande en un dataframe con spark-csv, y como paso de preprocesamiento necesito aplicar una variedad de operaciones a los datos disponibles en una de las columnas (que contiene una cadena json) . Eso devolverá los valores de X, cada uno de los cuales debe almacenarse en su propia […]

Spark – Creando DataFrame nested

Estoy empezando con PySpark y tengo problemas para crear DataFrames con objetos nesteds. Este es mi ejemplo. Tengo usuarios. $ cat user.json {“id”:1,”name”:”UserA”} {“id”:2,”name”:”UserB”} Los usuarios tienen pedidos. $ cat order.json {“id”:1,”price”:202.30,”userid”:1} {“id”:2,”price”:343.99,”userid”:1} {“id”:3,”price”:399.99,”userid”:2} Y me gusta unirme a él para obtener una estructura en la que los pedidos se anidan en los usuarios. $ […]

Cómo realizar una instrucción Switch con Apache Spark Dataframes (Python)

Estoy tratando de realizar una operación en mis datos donde un cierto valor se asignará a una lista de valores predeterminados si coincide con uno de los criterios, o de lo contrario a un valor de caída. Este sería el equivalente de SQL: CASE WHEN user_agent LIKE \’%CanvasAPI%\’ THEN \’api\’ WHEN user_agent LIKE \’%candroid%\’ THEN […]

Uso del objeto Python personalizado en Pyspark UDF

Cuando se ejecuta la siguiente pieza de código PySpark: nlp = NLPFunctions() def parse_ingredients(ingredient_lines): parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0] return list(chain.from_iterable(parsed_ingredients)) udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType())) Recibo el siguiente error: _pickle.PicklingError: Could not serialize object: TypeError: can’t pickle _thread.lock objects Me imagino que esto se debe a que PySpark no puede serializar esta clase personalizada. Pero, ¿cómo puedo […]

cómo agregar la identificación de la fila en los marcos de datos de pySpark

Tengo un archivo csv; que convierto a DataFrame (df) en pyspark; después de alguna transformación; Quiero agregar una columna en df; que debería ser un ID de fila simple (comenzando desde 0 o 1 hasta N). Convertí df en rdd y uso “zipwithindex”. Convertí resultante rdd de nuevo a df. Este enfoque funciona, pero generó […]

Pyspark: Reemplazo de valor en una columna buscando un diccionario

Soy un novato en PySpark. Tengo un df Spark DataFrame que tiene una columna ‘device_type’. Quiero reemplazar todos los valores que se encuentran en “Tableta” o “Teléfono” por “Teléfono”, y reemplazar “PC” por “Escritorio”. En Python puedo hacer lo siguiente, deviceDict = {‘Tablet’:’Mobile’,’Phone’:’Mobile’,’PC’:’Desktop’} df[‘device_type’] = df[‘device_type’].replace(deviceDict,inplace=False) ¿Cómo puedo lograr esto usando PySpark? ¡Gracias!

Cómo agregar cualquier biblioteca nueva como spark-csv en la versión precomstackda de Apache Spark

He construido el Spark-csv y puedo usar el mismo desde el shell de pyspark usando el siguiente comando bin/spark-shell –packages com.databricks:spark-csv_2.10:1.0.3 error al obtener >>> df_cat.save(“k.csv”,”com.databricks.spark.csv”) Traceback (most recent call last): File “”, line 1, in File “/Users/abhishekchoudhary/bigdata/cdh5.2.0/spark-1.3.1/python/pyspark/sql/dataframe.py”, line 209, in save self._jdf.save(source, jmode, joptions) File “/Users/abhishekchoudhary/bigdata/cdh5.2.0/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py”, line 538, in __call__ File “/Users/abhishekchoudhary/bigdata/cdh5.2.0/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py”, line 300, […]