¿Cómo crear un DataFrame fuera de filas mientras se conserva el esquema existente?

Si llamo map o mapPartition y mi función recibe filas de PySpark, ¿cuál es la forma natural de crear un PySpark local o Pandas DataFrame? ¿Algo que combine las filas y retenga el esquema?

Actualmente hago algo como:

 def combine(partition): rows = [x for x in partition] dfpart = pd.DataFrame(rows,columns=rows[0].keys()) pandafunc(dfpart) mydf.mapPartition(combine) 

Chispa> = 2.3.0

Desde Spark 2.3.0 es posible usar Pandas Series o DataFrame por partición o grupo. Ver por ejemplo:

  • Aplicación de UDF en GroupedData en PySpark (con un ejemplo de Python en funcionamiento)
  • Detección eficiente de sufijo de cadena

Chispa <2.3.0

¿Cuál es la forma natural de crear un PySpark local?

No existe tal cosa. Las estructuras de datos distribuidos de chispa no se pueden anidar o usted prefiere otra perspectiva, no puede anidar acciones o transformaciones.

o Pandas DataFrame

Es relativamente fácil, pero debes recordar al menos algunas cosas:

  • Pandas y Spark DataFrames no son ni siquiera remotamente equivalentes. Estas son estructuras diferentes, con propiedades diferentes y, en general, no puede reemplazar una con otra.
  • Las particiones pueden estar vacías.
  • Parece que estás pasando diccionarios. Recuerde que el diccionario base de Python no está ordenado (a diferencia de las collections.OrderedDict por ejemplo). Así que pasar columnas puede que no funcione como se espera.
 import pandas as pd rdd = sc.parallelize([ {"x": 1, "y": -1}, {"x": -3, "y": 0}, {"x": -0, "y": 4} ]) def combine(iter): rows = list(iter) return [pd.DataFrame(rows)] if rows else [] rdd.mapPartitions(combine).first() ## xy ## 0 1 -1 

Podrías usar toPandas() ,

 pandasdf = mydf.toPandas() 

Para crear un dataframe de chispa SQL necesita un contexto de sección:

 hc = HiveContext(sparkContext) 

Con HiveContext puede crear un dataframe SQL a través de la función inferSchema:

 sparkSQLdataframe = hc.inferSchema(rows) 

En realidad, es posible convertir filas de Spark en Pandas dentro de ejecutores y finalmente crear Spark DataFrame a partir de esas salidas usando mapPartitions . Mira mi esencia en Github

 # Convert function to use in mapPartitions def rdd_to_pandas(rdd_): # convert rows to dict rows = (row_.asDict() for row_ in rdd_) # create pandas dataframe pdf = pd.DataFrame(rows) # Rows/Pandas DF can be empty depending on patiition logic. # Make sure to check it here, otherwise it will throw untrackable error if len(pdf) > 0: # # Do something with pandas DataFrame # pass return pdf.to_dict(orient='records') # Create Spark DataFrame from resulting RDD rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas))