Creación de la estructura de datos de chispa a partir de registro multilínea

Estoy tratando de leer en el archivo de eventos retrosheet en chispa. El archivo de eventos está estructurado como tal.

id,TEX201403310 version,2 info,visteam,PHI info,hometeam,TEX info,site,ARL02 info,date,2014/03/31 info,number,0 info,starttime,1:07PM info,daynight,day info,usedh,true info,umphome,joycj901 info,attendance,49031 start,reveb001,"Ben Revere",0,1,8 start,rollj001,"Jimmy Rollins",0,2,6 start,utlec001,"Chase Utley",0,3,4 start,howar001,"Ryan Howard",0,4,3 start,byrdm001,"Marlon Byrd",0,5,9 id,TEX201404010 version,2 info,visteam,PHI info,hometeam,TEX 

Como puedes ver para cada juego, los eventos retroceden.

He leído el archivo en un RDD, y luego a través de un segundo bucle for agregó una clave para cada iteración, que parece funcionar. Pero esperaba obtener algún comentario sobre si había una forma de limpieza para hacerlo utilizando métodos de chispa.

 logFile = '2014TEX.EVA' event_data = (sc .textFile(logfile) .collect()) idKey = 0 newevent_list = [] for line in event_dataFile: if line.startswith('id'): idKey += 1 newevent_list.append((idKey,line)) else: newevent_list.append((idKey,line)) event_data = sc.parallelize(newevent_list) 

PySpark desde la versión 1.1 es compatible con los formatos de entrada de Hadoop . Puede usar la opción textinputformat.record.delimiter para usar un delimitador de formato personalizado como se muestra a continuación

 from operator import itemgetter retrosheet = sc.newAPIHadoopFile( '/path/to/retrosheet/file', 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat', 'org.apache.hadoop.io.LongWritable', 'org.apache.hadoop.io.Text', conf={'textinputformat.record.delimiter': '\nid,'} ) (retrosheet .filter(itemgetter(1)) .values() .filter(lambda x: x) .map(lambda v: ( v if v.startswith('id') else 'id,{0}'.format(v)).splitlines()))