La mejor manera de obtener el valor máximo en una columna de dataframe Spark

Estoy tratando de descubrir la mejor manera de obtener el mayor valor en una columna de dataframe Spark.

Considere el siguiente ejemplo:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) df.show() 

Lo que crea:

 +---+---+ | A| B| +---+---+ |1.0|4.0| |2.0|5.0| |3.0|6.0| +---+---+ 

Mi objective es encontrar el mayor valor en la columna A (por inspección, esto es 3.0). Usando PySpark, aquí hay cuatro enfoques que puedo pensar:

 # Method 1: Use describe() float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) # Method 2: Use SQL df.registerTempTable("df_table") spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] # Method 3: Use groupby() df.groupby().max('A').collect()[0].asDict()['max(A)'] # Method 4: Convert to RDD df.select("A").rdd.max()[0] 

Cada uno de los anteriores da la respuesta correcta, pero en ausencia de una herramienta de creación de perfiles de Spark, no puedo decir cuál es la mejor.

¿Alguna idea de intuición o empirismo sobre cuál de los métodos anteriores es más eficiente en términos de tiempo de ejecución de Spark o uso de recursos, o si existe un método más directo que los anteriores?

 >df1.show() +-----+--------------------+--------+----------+-----------+ |floor| timestamp| uid| x| y| +-----+--------------------+--------+----------+-----------+ | 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418| | 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393| | 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585| | 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073| >row1 = df1.agg({"x": "max"}).collect()[0] >print row1 Row(max(x)=110.33613) >print row1["max(x)"] 110.33613 

La respuesta es casi la misma que method3. pero parece que el “asDict ()” en method3 puede ser eliminado

El valor máximo para una columna particular de un dataframe se puede lograr usando –

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

Nota: Spark está diseñado para trabajar en Big Data: computación distribuida. El tamaño del ejemplo DataFrame es muy pequeño, por lo que el orden de los ejemplos de la vida real se puede alterar con respecto al pequeño ejemplo.

Más lento: Método_1, porque .describe (“A”) calcula min, max, mean, stddev y count (5 cálculos en toda la columna)

Medio: Método_4, porque .rdd (transformación de DF a RDD) ralentiza el proceso.

Más rápido: Método_3 ~ Método_2 ~ método_5, porque la lógica es muy similar, por lo que el catalizador optimizador de Spark sigue una lógica muy similar con un número mínimo de operaciones (obtenga el máximo de una columna en particular, recopile un dataframe de valor único); (.asDict () agrega un poco de tiempo extra en comparación con 3,2 a 5)

 import pandas as pd import time time_dict = {} dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) #-- For bigger/realistic dataframe just uncomment the following 3 lines #lst = list(np.random.normal(0.0, 100.0, 100000)) #pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst}) #dfff = self.sqlContext.createDataFrame(pdf) tic1 = int(round(time.time() * 1000)) # Method 1: Use describe() max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) tac1 = int(round(time.time() * 1000)) time_dict['m1']= tac1 - tic1 print (max_val) tic2 = int(round(time.time() * 1000)) # Method 2: Use SQL dfff.registerTempTable("df_table") max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] tac2 = int(round(time.time() * 1000)) time_dict['m2']= tac2 - tic2 print (max_val) tic3 = int(round(time.time() * 1000)) # Method 3: Use groupby() max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)'] tac3 = int(round(time.time() * 1000)) time_dict['m3']= tac3 - tic3 print (max_val) tic4 = int(round(time.time() * 1000)) # Method 4: Convert to RDD max_val = dfff.select("A").rdd.max()[0] tac4 = int(round(time.time() * 1000)) time_dict['m4']= tac4 - tic4 print (max_val) tic5 = int(round(time.time() * 1000)) # Method 4: Convert to RDD max_val = dfff.agg({"A": "max"}).collect()[0][0] tac5 = int(round(time.time() * 1000)) time_dict['m5']= tac5 - tic5 print (max_val) print time_dict 

Resultado en un nodo de borde de un clúster en milisegundos (ms):

DF pequeño (ms): {‘m1’: 7096, ‘m2’: 205, ‘m3’: 165, ‘m4’: 211, ‘m5’: 180}

DF más grande (ms): {‘m1’: 10260, ‘m2’: 452, ‘m3’: 465, ‘m4’: 916, ‘m5’: 373}

Otra forma de hacerlo:

 df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX 

En mis datos, tengo estos puntos de referencia:

 df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms Wall time: 3.7 s df.select("A").rdd.max()[0] CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms Wall time: 10.3 s df.agg({"A": "max"}).collect()[0][0] CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms Wall time: 3.75 s 

Todos dan la misma respuesta.

En caso de que algunos se pregunten cómo hacerlo usando Scala (usando Spark 2.0. +), Aquí tienes:

 scala> df.createOrReplaceTempView("TEMP_DF") scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF"). collect()(0).getInt(0) scala> print(myMax) 117 

Creo que la mejor solución será usar head()

Teniendo en cuenta tu ejemplo:

 +---+---+ | A| B| +---+---+ |1.0|4.0| |2.0|5.0| |3.0|6.0| +---+---+ 

Usando el método agg y max de python podemos obtener el siguiente valor:

from pyspark.sql.functions import max df.agg(max(df.A)).head()[0]

Esto devolverá: 3.0

Asegúrese de tener la importación correcta:
from pyspark.sql.functions import max La función max que usamos aquí es la función de biblioteca pySPark sql, no la función max predeterminada de python.

Esta es una forma perezosa de hacer esto, simplemente haciendo estadísticas de cómputo:

 df.write.mode("overwrite").saveAsTable("sampleStats") Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns) spark.sql(Query) df.describe('ColName') 

o

 spark.sql("Select * from sampleStats").describe('ColName') 

o puede abrir una concha de hive y

 describe formatted table sampleStats; 

Verá las estadísticas en las propiedades: mínimo, máximo, distinto, nulos, etc.

 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ val testDataFrame = Seq( (1.0, 4.0), (2.0, 5.0), (3.0, 6.0) ).toDF("A", "B") val (maxA, maxB) = testDataFrame.select(max("A"), max("B")) .as[(Double, Double)] .first() println(maxA, maxB) 

Y el resultado es (3.0,6.0), que es el mismo que el testDataFrame.agg(max($"A"), max($"B")).collect()(0) testDataFrame.agg(max($"A"), max($"B")).collect()(0) testDataFrame.agg(max($"A"), max($"B")).collect()(0) Sin embargo, testDataFrame.agg(max($"A"), max($"B")).collect()(0) devuelve una lista, [3.0,6.0]

En pyspark puedes hacer esto:

 max(df.select('ColumnName').rdd.flatMap(lambda x: x).collect())