Filtro basado en otro RDD en Spark.

Me gustaría mantener solo a los empleados que tienen una ID de departamento referenciada en la segunda tabla.

Employee table LastName DepartmentID Rafferty 31 Jones 33 Heisenberg 33 Robinson 34 Smith 34 Department table DepartmentID 31 33 

He intentado el siguiente código que no funciona:

 employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] department = [31,33] employee = sc.parallelize(employee) department = sc.parallelize(department) employee.filter(lambda e: e[1] in department).collect() Py4JError: An error occurred while calling o344.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist 

¿Algunas ideas? Estoy usando Spark 1.1.0 con Python. Sin embargo, aceptaría una respuesta de Scala o Python.

En este caso, lo que le gustaría lograr es filtrar en cada partición con los datos contenidos en la tabla de departamento: Esta sería la solución básica:

 val dept = deptRdd.collect.toSet val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)} 

Si los datos de su departamento son grandes, una variable de transmisión mejorará el rendimiento al entregar los datos una vez a todos los nodos en lugar de tener que serializarlos con cada tarea

 val deptBC = sc.broadcast(deptRdd.collect.toSet) val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)} 

Aunque el uso de la unión funcionaría, es una solución muy costosa, ya que requerirá una distribución aleatoria de los datos (byKey) para lograr la unión. Dado que el requisito es un filtro simple, el envío de datos a cada partición (como se muestra arriba) proporcionará un rendimiento mucho mejor.

Finalmente implementé una solución usando una unión. Tuve que agregar un valor de 0 al departamento para evitar una excepción de Spark:

 employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] department = [31,33] # invert id and name to get id as the key employee = sc.parallelize(employee).map(lambda e: (e[1],e[0])) # add a 0 value to avoid an exception department = sc.parallelize(department).map(lambda d: (d,0)) employee.join(department).map(lambda e: (e[1][0], e[0])).collect() output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)] 

Filtrado de múltiples valores en múltiples columnas:

En el caso de que esté extrayendo datos de una base de datos (Hive o SQL tipo db para este ejemplo) y necesite filtrar en varias columnas, podría ser más fácil cargar la tabla con el primer filtro, luego iterar sus filtros a través de RDD (múltiples iteraciones pequeñas es la forma recomendada de progtwigción de Spark):

 { import org.apache.spark.sql.hive.HiveContext val hc = new HiveContext(sc) val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)") val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20") val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500") } 

Por supuesto, debe conocer sus datos un poco para filtrar los valores correctos, pero eso es parte del proceso de análisis.

para el mismo ejemplo anterior, me gustaría conservar solo los empleados que contenían o en un ID de departamento al que se hace referencia en la segunda tabla. pero no tiene que ser una operación de unión, lo vería en “contenido” o “en”, quiero decir que 33 está en “334” y 335

 employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]] department = [31,33] employee = sc.parallelize(employee) department = sc.parallelize(department)