Leer, comprimir, escribir con multiprocesamiento.

Estoy comprimiendo archivos. Un solo proceso está bien para algunos de ellos, pero estoy comprimiendo miles de ellos y esto puede (y ha tomado) varios días, así que me gustaría acelerarlo con multiprocesamiento. He leído que debería evitar tener varios procesos leyendo archivos al mismo tiempo, y supongo que no debería tener varios procesos escribiendo a la vez. Este es mi método actual que se ejecuta individualmente:

import tarfile, bz2, os def compress(folder): "compresses a folder into a file" bz_file = bz2.BZ2File(folder+'.tbz', 'w') with tarfile.open(mode='w', fileobj = bz_file) as tar: for fn in os.listdir(folder): read each file in the folder and do some pre processing that will make the compressed file much smaller than without tar.addfile( processed file ) bz_file.close() return 

Esto es tomar una carpeta y comprimir todo su contenido en un solo archivo. Esto los hace más fáciles de manejar y más organizados. Si acabo de tirar esto en un grupo, tendré varios procesos de lectura y escritura a la vez, así que quiero evitar eso. Puedo volver a trabajar así que solo un proceso es leer los archivos pero todavía tengo varios que están escribiendo:

 import multiprocessing as mp import tarfile, bz2, os def compress(file_list): folder = file_list[0] bz_file = bz2.BZ2File(folder+'.tbz', 'w') with tarfile.open(mode='w', fileobj = bz_file) as tar: for i in file_list[1:]: preprocess file data tar.addfile(processed data) bz_file.close() return cpu_count = mp.cpu_count() p = mp.Pool(cpu_count) for subfolder in os.listdir(main_folder): read all files in subfolder into memory, place into file_list place file_list into fld_list until fld_list contains cpu_count file lists. then pass to p.map(compress, fld_list) 

Esto todavía tiene una serie de procesos que escriben archivos comprimidos a la vez. El simple hecho de decirle a tarfile qué tipo de compresión usar comienza a escribir en el disco duro. No puedo leer todos los archivos que necesito comprimir en la memoria, ya que no tengo la cantidad de RAM para hacerlo, así que también tiene el problema de que estoy reiniciando Pool.map muchas veces.

¿Cómo puedo leer y escribir archivos en un solo proceso, y al mismo tiempo tener toda la compresión en varios procesos, evitando al mismo tiempo reiniciar el multiprocesamiento? ¿Trabajar varias veces?

En lugar de usar multiprocessing.Pool , uno debe usar multiprocessing.Queue y crear una bandeja de entrada y una bandeja de salida.

Comience un solo proceso para leer los archivos y colocar los datos en la cola de la bandeja de entrada, y ponga un límite al tamaño de la cola para que no termine de llenar su RAM. El ejemplo aquí comprime archivos individuales, pero se puede ajustar para manejar carpetas enteras a la vez.

 def reader(inbox, input_path, num_procs): "process that reads in files to be compressed and puts to inbox" for fn in os.listdir(input_path): path = os.path.join(input_path, fn) # read in each file, put data into inbox fname = os.path.basename(fn) with open(fn, 'r') as src: lines = src.readlines() data = [fname, lines] inbox.put(data) # read in everything, add finished notice for all running processes for i in range(num_procs): inbox.put(None) # when a compressor sees a None, it will stop inbox.close() return 

Pero eso es solo la mitad de la pregunta, la otra parte es comprimir el archivo sin tener que escribirlo en el disco. Le damos un objeto StringIO a la función de compresión en lugar de un archivo abierto; se pasa a tarfile . Una vez comprimido, colocamos el objeto StringIO en la cola de la bandeja de salida.

Excepto que no podemos hacer eso, porque los objetos StringIO no pueden ser decapados, solo los objetos que se pueden recoger pueden entrar en una cola. Sin embargo, la función getvalue de StringIO puede proporcionar el contenido en un formato seleccionable, así que tome el contenido con getvalue, cierre el objeto StringIO y luego coloque el contenido en la bandeja de salida.

 from io import StringIO import tarfile def compressHandler(inbox, outbox): "process that pulls from inbox, compresses and puts to outbox" supplier = iter(inbox.get, None) # stops when gets a None while True: try: data = next(supplier) # grab data from inbox pressed = compress(data) # compress it ou_que.put(pressed) # put into outbox except StopIteration: outbox.put(None) # finished compressing, inform the writer return # and quit def compress(data): "compress file" bz_file = StringIO() fname, lines = dat # see reader def for package order with tarfile.open(mode='w:bz2', fileobj=bz_file) as tar: info = tarfile.TarInfo(fname) # store file name tar.addfile(info, StringIO(''.join(lines))) # compress data = bz_file.getvalue() bz_file.close() return data 

El proceso de escritura luego extrae el contenido de la cola de la bandeja de salida y los escribe en el disco. Esta función necesitará saber cuántos procesos de compresión se iniciaron para que solo se detenga cuando haya escuchado que cada proceso se ha detenido.

 def writer(outbox, output_path, num_procs): "single process that writes compressed files to disk" num_fin = 0 while True: # all compression processes have finished if num_finished >= num_procs: break tardata = outbox.get() # a compression process has finished if tardata == None: num_fin += 1 continue fn, data = tardata name = os.path.join(output_path, fn) + '.tbz' with open(name, 'wb') as dst: dst.write(data) return 

Finalmente, está la configuración para juntarlos a todos.

 import multiprocessing as mp import os def setup(): fld = 'file/path' # multiprocess setup num_procs = mp.cpu_count() # inbox and outbox queues inbox = mp.Queue(4*num_procs) # limit size outbox = mp.Queue() # one process to read reader = mp.Process(target = reader, args = (inbox, fld, num_procs)) reader.start() # n processes to compress compressors = [mp.Process(target = compressHandler, args = (inbox, outbox)) for i in range(num_procs)] for c in compressors: c.start() # one process to write writer = mp.Process(target = writer, args=(outbox, fld, num_procs)) writer.start() writer.join() # wait for it to finish print('done!')