Cómo escribir resultados en archivos JSON en gcs en Dataflow / Beam

Estoy usando el Python Beam SDK 0.6.0. Y me gustaría escribir mi salida a JSON en Google Cloud Storage. ¿Cuál es la mejor manera de hacer esto?

Creo que puedo usar WriteToText desde el sumidero de Text IO, pero luego tengo que formatear cada fila por separado, ¿verdad? ¿Cómo guardo mi resultado en archivos JSON válidos que contienen listas de objetos?

Ok, para referencia, resolví esto al escribir mi propia _TextSink sumideros en el _TextSink utilizado por WriteToText en el SDK de beam.

No estoy seguro si esta es la mejor manera de hacerlo, pero hasta ahora funciona bien. Espero que pueda ayudar a alguien más.

 import os import json import apache_beam as beam from apache_beam import coders from apache_beam.io.iobase import Write from apache_beam.transforms import PTransform class _JsonSink(beam.io.FileSink): """A Dataflow sink for writing JSON files.""" def __init__(self, file_path_prefix, file_name_suffix='', num_shards=0, shard_name_template=None, coder=coders.ToStringCoder(), compression_type=beam.io.CompressionTypes.AUTO): super(_JsonSink, self).__init__( file_path_prefix, file_name_suffix=file_name_suffix, num_shards=num_shards, shard_name_template=shard_name_template, coder=coder, mime_type='text/plain', compression_type=compression_type) self.last_rows = dict() def open(self, temp_path): """ Open file and initialize it w opening a list.""" file_handle = super(_JsonSink, self).open(temp_path) file_handle.write('[\n') return file_handle def write_record(self, file_handle, value): """Writes a single encoded record converted to JSON and terminates the line wa comma.""" if self.last_rows.get(file_handle, None) is not None: file_handle.write(self.coder.encode( json.dumps(self.last_rows[file_handle]))) file_handle.write(',\n') self.last_rows[file_handle] = value def close(self, file_handle): """Finalize the JSON list and close the file handle returned from ``open()``. Called after all records are written. """ if file_handle is not None: # Write last row without a comma file_handle.write(self.coder.encode( json.dumps(self.last_rows[file_handle]))) # Close list and then the file file_handle.write('\n]\n') file_handle.close() class WriteToJson(PTransform): """PTransform for writing to JSON files.""" def __init__(self, file_path_prefix, file_name_suffix='', num_shards=0, shard_name_template=None, coder=coders.ToStringCoder(), compression_type=beam.io.CompressionTypes.AUTO): self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards, shard_name_template, coder, compression_type) def expand(self, pcoll): return pcoll | Write(self._sink) 

Usar el receptor es similar a cómo usas el receptor de texto:

 pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json') 

Hacer que cada archivo contenga una lista única con un conjunto de elementos es difícil, porque tendría que agrupar un conjunto de elementos y luego escribirlos juntos en un archivo. Déjame aconsejarte que uses un formato diferente.

Puede considerar el formato de líneas JSON , donde cada línea en un archivo representa un solo elemento JSON.

Transformar tus datos a JSON Lines debería ser bastante fácil. La siguiente transformación debería hacer el truco:

 class WriteToJsonLines(beam.PTransform): def __init__(self, file_name): self._file_name = file_name def expand(self, pcoll): return (pcoll | 'format json' >> beam.Map(json.dumps) | 'write to text' >> beam.WriteToText(self._file_name)) 

Finalmente, si luego desea leer sus archivos de líneas JSON, puede escribir su propio JsonLinesSource o usar el que está en beam_utils .

Aunque este es un año tarde, me gustaría agregar otra forma de escribir un resultado a los archivos json en GCS. Para las tuberías de viga apache 2.x, esta transformación funciona:

.withSuffix (“. json”)

Por ejemplo:

 result.apply("WriteToGCS", TextIO.write().to(bucket) .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP) .withSuffix(".json") .withNumShards(chunks));