Flujo de datos / haz de apache: ¿cómo acceder al nombre de archivo actual al pasar un patrón?

He visto responder esta pregunta antes en el desbordamiento de stack ( https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow ), pero no desde que apache beam ha agregado la funcionalidad de divisor divisible para python. ¿Cómo accedo al nombre de archivo del archivo actual que se está procesando al pasar un patrón de archivo a un grupo de gcs?

Quiero pasar el nombre de archivo a mi función de transformación:

with beam.Pipeline(options=pipeline_options) as p: lines = p | ReadFromText('gs://url to file') data = ( lines | 'Jsonify' >> beam.Map(jsonify) | 'Unnest' >> beam.FlatMap(unnest) | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink( 'project_id:dataset_id.table_name', schema=schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) ) 

En última instancia, lo que quiero hacer es pasar el nombre de archivo a mi función de transformación cuando transformo cada fila del json (vea esto y luego use el nombre de archivo para hacer una búsqueda en una tabla BQ diferente para obtener un valor). Creo que una vez que logre saber cómo obtener el nombre de archivo, podré averiguar la parte de la entrada lateral para realizar la búsqueda en la tabla bq y obtener el valor único.

Intenté implementar una solución con el caso anteriormente citado. Allí, al igual que en otros enfoques como este , también obtienen una lista de nombres de archivos, pero cargan todo el archivo en un solo elemento que puede no adaptarse bien a archivos grandes. Por lo tanto, busqué en agregar el nombre de archivo a cada registro.

Como entrada usé dos archivos csv:

 $ gsutil cat gs://$BUCKET/countries1.csv id,country 1,sweden 2,spain gsutil cat gs://$BUCKET/countries2.csv id,country 3,italy 4,france 

Usando GCSFileSystem.match podemos acceder a metadata_list para recuperar FileMetadata que contiene la ruta y el tamaño del archivo en bytes. En mi ejemplo:

 [FileMetadata(gs://BUCKET_NAME/countries1.csv, 29), FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)] 

El código es:

 result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])] 

Leeremos cada uno de los archivos correspondientes en una colección de PC diferente. Como no sabemos el número de archivos a priori, debemos crear mediante progtwigción una lista de nombres para cada PCollection (p0, p1, ..., pN-1) p0 (p0, p1, ..., pN-1) y asegurarnos de que tenemos tags únicas para cada paso ('Read file 0', 'Read file 1', etc.) :

 variables = ['p{}'.format(i) for i in range(len(result))] read_labels = ['Read file {}'.format(i) for i in range(len(result))] add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))] 

Luego procedemos a leer cada archivo diferente en su colección de PC correspondiente con ReadFromText y luego llamamos a AddFilenamesFn ParDo para asociar cada registro con el nombre del archivo.

 for i in range(len(result)): globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path) 

donde AddFilenamesFn es:

 class AddFilenamesFn(beam.DoFn): """ParDo to output a dict with filename and row""" def process(self, element, file_path): file_name = file_path.split("/")[-1] yield {'filename':file_name, 'row':element} 

Mi primer enfoque fue usar una función de Mapa directamente, lo que resulta en un código más simple. Sin embargo, el result[i].path se resolvió al final del bucle y cada registro se asignó incorrectamente al último archivo de la lista:

 globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem)) 

Finalmente, aplanamos todos los PCollections en uno:

 merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() 

y verificamos los resultados registrando los elementos:

 INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'} INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'} INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'} INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'} INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'} INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'} 

DirectRunner esto con DirectRunner y DataflowRunner para Python SDK 2.8.0.

Espero que esto aborde el problema principal aquí y que pueda continuar integrando BigQuery en su caso de uso completo ahora. Puede que necesite usar la biblioteca de cliente de Python para eso, escribí un ejemplo de Java similar.

Código completo:

 import argparse, logging from operator import add import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io import ReadFromText from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem class GCSFileReader: """Helper class to read gcs files""" def __init__(self, gcs): self.gcs = gcs class AddFilenamesFn(beam.DoFn): """ParDo to output a dict with filename and row""" def process(self, element, file_path): file_name = file_path.split("/")[-1] # yield (file_name, element) # use this to return a tuple instead yield {'filename':file_name, 'row':element} # just logging output to visualize results def write_res(element): logging.info(element) return element def run(argv=None): parser = argparse.ArgumentParser() known_args, pipeline_args = parser.parse_known_args(argv) p = beam.Pipeline(options=PipelineOptions(pipeline_args)) gcs = GCSFileSystem(PipelineOptions(pipeline_args)) gcs_reader = GCSFileReader(gcs) # in my case I am looking for files that start with 'countries' BUCKET='BUCKET_NAME' result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])] result = reduce(add, result) # create each input PCollection name and unique step labels variables = ['p{}'.format(i) for i in range(len(result))] read_labels = ['Read file {}'.format(i) for i in range(len(result))] add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))] # load each input file into a separate PCollection and add filename to each row for i in range(len(result)): # globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem)) globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path) # flatten all PCollections into a single one merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res) p.run() if __name__ == '__main__': run()