Articles of apache spark sql

Cómo ejecutar un archivo .sql en spark usando python

from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext conf = SparkConf().setAppName(“Test”).set(“spark.driver.memory”, “1g”) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) results = sqlContext.sql(“/home/ubuntu/workload/queryXX.sql”) Cuando ejecuto este comando usando: python test.py me da un error . y4j.protocol.Py4JJavaError: Se produjo un error al llamar a o20.sql. : java.lang.RuntimeException: [1.1] error: “ con ” esperado pero `/ […]

Spark ML Pipeline Causes java.lang.Exception: no se compiló … El código … crece más allá de 64 KB

Utilizando Spark 2.0, estoy tratando de ejecutar un VectorAssembler simple en un pipeline de pyspark ML, de esta manera: feature_assembler = VectorAssembler(inputCols=[‘category_count’, ‘name_count’], \ outputCol=”features”) pipeline = Pipeline(stages=[feature_assembler]) model = pipeline.fit(df_train) model_output = model.transform(df_train) Cuando trato de mirar la salida usando model_output.select(“features”).show(1) Me sale el error Py4JJavaError Traceback (most recent call last) in () 2 […]

Apache Spark lanza NullPointerException cuando encuentra una característica faltante

Tengo un problema extraño con PySpark al indexar la columna de cadenas en las características. Aquí está mi archivo tmp.csv: x0,x1,x2,x3 asd2s,1e1e,1.1,0 asd2s,1e1e,0.1,0 ,1e3e,1.2,0 bd34t,1e1e,5.1,1 asd2s,1e3e,0.2,0 bd34t,1e2e,4.3,1 donde tengo un valor faltante para ‘x0’. Al principio, estoy leyendo las características del archivo csv en DataFrame usando pyspark_csv: https://github.com/seahboonsiew/pyspark-csv y luego indexando x0 con StringIndexer: import […]

Agregar una nueva columna en el Marco de datos derivado de otras columnas (Spark)

Estoy usando Spark 1.3.0 y Python. Tengo un dataframe y deseo agregar una columna adicional que se deriva de otras columnas. Me gusta esto, >>old_df.columns [col_1, col_2, …, col_m] >>new_df.columns [col_1, col_2, …, col_m, col_n] dónde col_n = col_3 – col_4 ¿Cómo hago esto en PySpark?

¿Cómo desenvolver la columna Struct anidada en varias columnas?

Estoy intentando expandir una columna DataFrame con un tipo de struct anidada (ver más abajo) a varias columnas. El esquema de Struct con el que estoy trabajando tiene el aspecto de {“foo”: 3, “bar”: {“baz”: 2}} . Idealmente, me gustaría expandir lo anterior en dos columnas ( “foo” y “bar.baz” ). Sin embargo, cuando intenté […]

¿Cómo puedo cambiar el nombre de las columnas por grupo / partición en un Spark Dataframe?

Tengo algunos datos de sensores que se almacenan en una tabla por nombre de canal, en lugar de ese nombre de sensor (esto es para evitar tener tablas muy anchas debido al hecho de que muchos sensores solo se usan en unos pocos dispositivos: el trabajo para columnas dispersas, I Lo sé, pero yo soy […]

Cómo convertir la lista de diccionarios en Spark DataFrame

Quiero convertir mi lista de diccionarios en DataFrame. Esta es la lista: mylist = [ {“type_activity_id”:1,”type_activity_name”:”xxx”}, {“type_activity_id”:2,”type_activity_name”:”yyy”}, {“type_activity_id”:3,”type_activity_name”:”zzz”} ] Este es mi código: from pyspark.sql.types import StringType df = spark.createDataFrame(mylist, StringType()) df.show(2,False) +—————————————–+ | value| +—————————————–+ |{type_activity_id=1,type_activity_id=xxx}| |{type_activity_id=2,type_activity_id=yyy}| |{type_activity_id=3,type_activity_id=zzz}| +—————————————–+ Supongo que debo proporcionar algunos mapas y tipos para cada columna, pero no sé cómo […]

¿Cómo usar las funciones de la ventana en PySpark usando DataFrames?

Tratando de averiguar cómo usar las funciones de la ventana en PySpark. Aquí hay un ejemplo de lo que me gustaría poder hacer, simplemente contar el número de veces que un usuario tiene un “evento” (en este caso “dt” es una marca de tiempo simulada). from pyspark.sql.window import Window from pyspark.sql.functions import count df = […]

Seleccione el elemento de la matriz del método de división de Spark Dataframes en la misma llamada?

Estoy dividiendo una solicitud HTTP para ver los elementos, y me preguntaba si había una manera de especificar el elemento que me gustaría ver en la misma llamada sin tener que hacer otra operación. Por ejemplo: from pyspark.sql import functions as fn df.select(fn.split(df.http_request, ‘/’).alias(‘http’)) me da un nuevo Dataframe con filas de matrices como esta: […]

Cómo seleccionar y ordenar varias columnas en un dataframe Pyspark después de una unión

Quiero seleccionar varias columnas de la estructura de datos existente (que se crea después de las combinaciones) y me gustaría ordenar los archivos como mi estructura de tabla de destino. Cómo puede hacerse esto ? El acercamiento que he usado está abajo. Aquí puedo seleccionar las columnas necesarias pero no puedo hacer en secuencia. Required […]