Ejemplo de PySpark y difusión unir

Estoy usando Spark 1.3

# Read from text file, parse it and then do some basic filtering to get data1 data1.registerTempTable('data1') # Read from text file, parse it and then do some basic filtering to get data1 data2.registerTempTable('data2') # Perform join data_joined = data1.join(data2, data1.id == data2.id); 

Mis datos son bastante sesgados y data2 (pocos KB) << data1 (10s de GB) y el rendimiento es bastante malo. Estaba leyendo acerca de unirse a la transmisión, pero no estoy seguro de cómo puedo hacer lo mismo usando la API de Python.

Spark 1.3 no admite uniones de difusión utilizando DataFrame. En Spark> = 1.5.0 puede usar la función de broadcast para aplicar uniones de difusión:

 from pyspark.sql.functions import broadcast data1.join(broadcast(data2), data1.id == data2.id) 

Para versiones anteriores, la única opción es convertir a RDD y aplicar la misma lógica que en otros idiomas. Aproximadamente algo como esto:

 from pyspark.sql import Row from pyspark.sql.types import StructType # Create a dictionary where keys are join keys # and values are lists of rows data2_bd = sc.broadcast( data2.map(lambda r: (r.id, r)).groupByKey().collectAsMap()) # Define a new row with fields from both DFs output_row = Row(*data1.columns + data2.columns) # And an output schema output_schema = StructType(data1.schema.fields + data2.schema.fields) # Given row x, extract a list of corresponding rows from broadcast # and output a list of merged rows def gen_rows(x): return [output_row(*x + y) for y in data2_bd.value.get(x.id, [])] # flatMap and create a new data frame joined = data1.rdd.flatMap(lambda row: gen_rows(row)).toDF(output_schema) 

Este código está trabajando en la versión de spark-2.0.2-bin-hadoop2.7

 from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate() df2 = spark.read.csv("D:\\trans_mar.txt",sep="^"); df1=spark.read.csv("D:\\trans_feb.txt",sep="^"); print(df1.join(broadcast(df2),df2._c77==df1._c77).take(10))