¿Cómo multiprocesar, multiprocilar un archivo grande al dividirlo en pequeños fragmentos en función de los valores de una columna en particular?

He escrito un progtwig en Python para un proceso biológico https://codereview.stackexchange.com/questions/186396/solve-the-phase-state-between-two-haplotype-blocks-using-markov-transition-proba .

Si observa ese progtwig, puede ver que el progtwig lleva mucho tiempo computando datos de dos líneas consecutivas (o teclas, valores) a la vez. No estoy poniendo todo el código aquí, pero por simplicidad estoy creando un archivo simulado y un progtwig simulado (que se muestra a continuación) que se comporta de manera similar en el nivel más simple. En este progtwig simulado que estoy calculando, diga la columna len(vals) y vuelva a escribirlo en un archivo de salida.

Dado que el cálculo está vinculado a la CPU / GPU mientras se realiza for (k1, v1) and (k2, v2) .... en el progtwig original (enlace anterior), quiero multiprocesar / enhebrar los análisis de datos leyendo : 1) todo el contenido los datos en la memoria de la manera más rápida posible 2) dividen los datos en trozos por el campo chr único 3) hacen el cálculo 4) escriben de nuevo en un archivo. Entonces, ¿cómo lo haría?

En el archivo simulado dado, el cómputo es demasiado simple para estar vinculado a GPU / CPU, pero solo quiero saber cómo puedo hacerlo si es necesario.

Nota: muchas personas me preguntaron qué es lo que estoy tratando de lograr; estoy tratando de multiprocesar / subprocesar el problema dado. Si pongo aquí mi gran progtwig original, nadie lo va a ver. Por lo tanto, vamos a entrenar este pequeño archivo y el pequeño progtwig de python.

    A continuación se muestra mi código y datos:

     my_data = '''chr\tpos\tidx\tvals 2\t23\t4\tabcd 2\t25\t7\tatg 2\t29\t8\tct 2\t35\t1\txylfz 3\t37\t2\tmnost 3\t39\t3\tpqr 3\t41\t6\trtuv 3\t45\t5\tlfghef 3\t39\t3\tpqr 3\t41\t6\trtu 3\t45\t5\tlfggg 4\t25\t3\tpqrp 4\t32\t6\trtu 4\t38\t5\tlfgh 4\t51\t3\tpqr 4\t57\t6\trtus ''' def manipulate_lines(vals): vals_len = len(vals[3]) return write_to_file(vals[0:3], vals_len) def write_to_file(a, b): print(a,b) to_file = open('write_multiprocessData.txt', 'a') to_file.write('\t'.join(['\t'.join(a), str(b), '\n'])) to_file.close() def main(): to_file = open('write_multiprocessData.txt', 'w') to_file.write('\t'.join(['chr', 'pos', 'idx', 'vals', '\n'])) to_file.close() data = my_data.rstrip('\n').split('\n') for lines in data: if lines.startswith('chr'): continue else: lines = lines.split('\t') manipulate_lines(lines) if __name__ == '__main__': main() 

    Un problema para manejar cuando se usan múltiples procesos para manejar datos, es preservar el orden. Python ha ideado una forma bastante agradable de manejar esto, utilizando un multiprocessing.Pool , que puede usarse para map los procesos sobre los datos de entrada. Esto se encargará de devolver los resultados en orden.

    Sin embargo, es posible que el procesamiento aún esté fuera de servicio, por lo tanto, para usarlo correctamente, solo el procesamiento, y no se debe ejecutar ningún acceso de E / S en los subprocesos. Por lo tanto, para usar esto en su caso, se debe realizar una pequeña reescritura de su código, que tenga todas las operaciones de IO en el proceso principal:

     from multiprocessing import Pool from time import sleep from random import randint my_data = '''chr\tpos\tidx\tvals 2\t23\t4\tabcd 2\t25\t7\tatg 2\t29\t8\tct 2\t35\t1\txylfz 3\t37\t2\tmnost 3\t39\t3\tpqr 3\t41\t6\trtuv 3\t45\t5\tlfghef 3\t39\t3\tpqr 3\t41\t6\trtu 3\t45\t5\tlfggg 4\t25\t3\tpqrp 4\t32\t6\trtu 4\t38\t5\tlfgh 4\t51\t3\tpqr 4\t57\t6\trtus ''' def manipulate_lines(vals): sleep(randint(0, 2)) vals_len = len(vals[3]) return vals[0:3], vals_len def write_to_file(a, b): print(a,b) to_file = open('write_multiprocessData.txt', 'a') to_file.write('\t'.join(['\t'.join(a), str(b), '\n'])) to_file.close() def line_generator(data): for line in data: if line.startswith('chr'): continue else: yield line.split('\t') def main(): p = Pool(5) to_file = open('write_multiprocessData.txt', 'w') to_file.write('\t'.join(['chr', 'pos', 'idx', 'vals', '\n'])) to_file.close() data = my_data.rstrip('\n').split('\n') lines = line_generator(data) results = p.map(manipulate_lines, lines) for result in results: write_to_file(*result) if __name__ == '__main__': main() 

    Este progtwig no divide la lista después de sus diferentes valores chr , sino que procesa entrada por entrada, directamente de la lista en un máximo de 5 subprocesos (argumento a Pool ).

    Para mostrar que los datos aún están en el orden esperado, agregué un retraso aleatorio de suspensión a la función manipulate_lines . Esto muestra el concepto pero puede que no proporcione una vista correcta de la aceleración, ya que un proceso de suspensión permite que otro se ejecute en paralelo, mientras que un proceso de cómputo pesado usará la CPU durante todo su tiempo de ejecución.

    Como se puede ver, la escritura en el archivo debe realizarse una vez que se devuelve la llamada del map , lo que garantiza que todos los subprocesos hayan finalizado y devuelto sus resultados. Hay bastante sobrecarga para esta comunicación detrás de la escena, por lo que para que esto sea beneficioso, la parte de cómputo debe ser sustancialmente más larga que la fase de escritura, y no debe generar demasiados datos para escribir en el archivo.

    Además, también he roto el bucle for en un generador. Esto es para que la entrada al multiprocessing.Pool esté disponible a pedido. Otra forma sería preprocesar la lista de data y luego pasar esa lista directamente a la Pool . Sin embargo, creo que la solución del generador es más agradable y tiene un consumo máximo de memoria máximo.

    También, un comentario sobre multiproceso vs multiprocesamiento; Siempre que realice operaciones pesadas de cómputo, debe usar el multiprocesamiento, que, al menos en teoría, permite que los procesos se ejecuten en diferentes máquinas. Además, en cPython, la implementación de Python más utilizada, los subprocesos alcanzan otro problema, que es el locking global del intérprete (GIL). Esto significa que solo un subproceso puede ejecutarse a la vez, ya que el intérprete bloquea el acceso a los demás subprocesos. (Hay algunas excepciones, por ejemplo, cuando se usan módulos escritos en C, como numpy. En estos casos, la GIL puede liberarse mientras se realizan cálculos numpy, pero en general este no es el caso). Por lo tanto, los hilos son principalmente para situaciones donde su progtwig está atascado esperando lento, fuera de orden, IO. (Zócalos, entrada de terminal, etc.)

    Solo he usado hilos un par de veces, y no he probado este código a continuación, pero a simple vista, el bucle for es realmente el único lugar que podría beneficiarse de los hilos.

    Aunque dejaré que otras personas decidan.

     import threading my_data = '''chr\tpos\tidx\tvals 2\t23\t4\tabcd 2\t25\t7\tatg 2\t29\t8\tct 2\t35\t1\txylfz 3\t37\t2\tmnost 3\t39\t3\tpqr 3\t41\t6\trtuv 3\t45\t5\tlfghef 3\t39\t3\tpqr 3\t41\t6\trtu 3\t45\t5\tlfggg 4\t25\t3\tpqrp 4\t32\t6\trtu 4\t38\t5\tlfgh 4\t51\t3\tpqr 4\t57\t6\trtus ''' def manipulate_lines(vals): vals_len = len(vals[3]) return write_to_file(vals[0:3], vals_len) def write_to_file(a, b): print(a,b) to_file = open('write_multiprocessData.txt', 'a') to_file.write('\t'.join(['\t'.join(a), str(b), '\n'])) to_file.close() def main(): to_file = open('write_multiprocessData.txt', 'w') to_file.write('\t'.join(['chr', 'pos', 'idx', 'vals', '\n'])) to_file.close() data = my_data.rstrip('\n').split('\n') for lines in data: if not lines.startswith('chr'): lines = lines.split('\t') threading.Thread(target = manipulate_lines, args = (lines)).start() if __name__ == '__main__': main()