Análisis de registros multilínea en Scala

Aquí está mi RDD [String]

M1 module1 PIP a ZA PIP b ZB PIP c Y n4 M2 module2 PIP a I n4 PIP b OD PIP c O n5 

y así. Básicamente, necesito un RDD de clave (que contenga la segunda palabra en la línea 1) y los valores de las siguientes líneas PIP que se pueden iterar.

He intentado lo siguiente

 val usgPairRDD = usgRDD.map(x => (x.split("\\n")(0), x)) 

Pero esto me da la siguiente salida.

 (,) (M1 module1,M1 module1) (PIP a ZA,PIP a ZA) (PIP b ZB,PIP b ZB) (PIP c Y n4,PIP c Y n4) (,) (M2 module2,M2 module2) (PIP a I n4,PIP a I n4) (PIP b OD,PIP b OD) (PIP c O n5,PIP c O n5) 

En cambio, me gustaría que la salida fuera

 module1, (PIP a ZA, PIP b ZB, PIP b ZB) module2, (PIP a I n4,PIP b OD, PIP c O n5) 

¿Qué estoy haciendo mal? Soy bastante nuevo en las API de Spark. Gracias

Hola @ cero323

 usgRDD.take(10).foreach(x => println(x + "%%%%%%%%%")) 

rendimientos …

 %%%%%%%%% M1 module1%%%%%%%%% PIP a ZA%%%%%%%%% PIP b ZB%%%%%%%%% PIP c Y n4%%%%%%%%% %%%%%%%%% M2 module2%%%%%%%%% PIP a I n4%%%%%%%%% PIP b OD%%%%%%%%% PIP c O n5%%%%%%%%% 

y así

Hola @ zero323 y @Daniel Darabos Mi entrada es un conjunto muy muy grande de muchos archivos (que abarcan TBs). Aquí está la muestra ..

 BIN n4 BIN n5 BIN D BIN E PIT AIA PIT BIB PIT CIC PIT DOD PIT EOE DEF M1 module1 PIP a ZA PIP b ZB PIP c Y n4 DEF M2 module2 PIP a I n4 PIP b OD PIP c O n5 

Necesito todos los BINS, PIT y DEF (incluidas las líneas PIP a continuación) en 3 RDDS diferentes. Aquí es cómo estoy haciendo esto actualmente (de la discusión, siento que usgRDD a continuación se calcula incorrectamente)

 val binRDD = levelfileRDD.filter(line => line.contains("BIN")) val pitRDD = levelfileRDD.filter(line => line.contains("PIT")) val usgRDD = levelfileRDD.filter(line => !line.contains("BIN") && !line.contains("PIT")).flatMap(s=>s.split("DEF").map(_.trim)) 

Necesito 3 tipos (en este momento) de RDD porque necesito realizar la validación más adelante. Por ejemplo, “n4” en “DEF M2 module2” solo puede existir si n4 es un elemento BIN. De los RDDs, espero derivar relaciones usando API GraphX ​​(obviamente no he llegado a este punto). Sería ideal si cada usgPairRDD (computado desde usgRDD o de otro modo) imprima lo siguiente

 module1, (a ZA, b ZB, c Y n4) %%%%%%% module2, (a I n4, b OD, c O n5) %%%%%%% 

Espero que tenga sentido. Disculpas a los dioses de la chispa, si no lo soy.

Por defecto, Spark crea un único elemento por línea. Significa que, en su caso, todos los registros se reparten entre múltiples elementos que, según lo declarado por Daniel Darabos en los comentarios, pueden ser procesados ​​por diferentes trabajadores.

Como parece que sus datos son relativamente regulares y están separados por una línea vacía, debería poder usar newAPIHadoopFile con un delimitador personalizado:

 import org.apache.spark.rdd.RDD import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.io.{LongWritable, Text} val path: String = ??? val conf = new org.apache.hadoop.mapreduce.Job().getConfiguration conf.set("textinputformat.record.delimiter", "\n\n") val usgRDD = sc.newAPIHadoopFile( path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) .map{ case (_, v) => v.toString } val usgPairRDD: RDD[(String, Seq[String])] = usgRDD.map(_.split("\n") match { case Array(x, xs @ _*) => (x, xs) })