Cómo acelerar la comunicación con subprocesos.

Estoy usando el subprocess Python 2 con threading para tomar la entrada estándar, procesarlo con los binarios A , B y C y escribir los datos modificados en la salida estándar.

Este script (llamémoslo: A_to_C.py ) es muy lento y me gustaría aprender cómo solucionarlo.

El flujo general es el siguiente:

 A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin)) B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin)) C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE) convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin)) produce_A_thread.start() convert_A_to_B_thread.start() convert_B_to_C_thread.start() produce_A_thread.join() convert_A_to_B_thread.join() convert_B_to_C_thread.join() A_process.wait() B_process.wait() C_process.wait() 

La idea es que la entrada estándar entra en A_to_C.py :

  1. El binario A procesa una parte de la entrada estándar y crea A salida A con la función produceA .
  2. El binario B procesa una parte de la salida estándar de A y crea una salida B través de la función produceB .
  3. El binario C procesa una parte de la salida estándar de B través de la función produceC y escribe la salida C en la salida estándar.

Hice perfiles con cProfile y casi todo el tiempo en este script parece gastarse en la adquisición de lockings de subprocesos.

Por ejemplo, en un trabajo de prueba 417s, 416s ( > 99% del tiempo de ejecución total) se gasta en la adquisición de lockings de subprocesos:

 $ python Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32) [GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import pstats >>> p = pstats.Stats('1.profile') >>> p.sort_stats('cumulative').print_stats(10) Thu Jun 12 22:19:07 2014 1.profile 1755 function calls (1752 primitive calls) in 417.203 CPU seconds Ordered by: cumulative time List reduced from 162 to 10 due to restriction  ncalls tottime percall cumtime percall filename:lineno(function) 1 0.020 0.020 417.203 417.203 A_to_C.py:90() 1 0.000 0.000 417.123 417.123 A_to_C.py:809(main) 6 0.000 0.000 416.424 69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait) 32 416.424 13.013 416.424 13.013 {method 'acquire' of 'thread.lock' objects} 3 0.000 0.000 416.422 138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join) 3 0.000 0.000 0.498 0.166 A_to_C.py:473(which) 37 0.000 0.000 0.498 0.013 A_to_C.py:475(is_exe) 3 0.496 0.165 0.496 0.165 {posix.access} 6 0.000 0.000 0.194 0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call) 3 0.000 0.000 0.191 0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait) 

¿Qué estoy haciendo mal con mi threading.Thread y / o subprocess.Popen acuerdo que está causando este problema?

Sus llamadas a subprocess.Popen () especifican implícitamente el valor predeterminado de bufsize, 0, que fuerza la E / S no almacenada en búfer. Intente agregar un tamaño de búfer razonable (4K, 16K, incluso 1M) y vea si hace alguna diferencia.

Creo que solo estás siendo engañado por la forma en que funciona cProfile. Por ejemplo, aquí hay un script simple que usa dos hilos:

 #!/usr/bin/python import threading import time def f(): time.sleep(10) def main(): t = threading.Thread(target=f) t.start() t.join() 

Si pruebo esto usando cProfile, esto es lo que obtengo:

 >>> import test >>> import cProfile >>> cProfile.run('test.main()') 60 function calls in 10.011 seconds Ordered by: standard name ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 10.011 10.011 :1() 1 0.000 0.000 10.011 10.011 test.py:10(main) 1 0.000 0.000 0.000 0.000 threading.py:1008(daemon) 2 0.000 0.000 0.000 0.000 threading.py:1152(currentThread) 2 0.000 0.000 0.000 0.000 threading.py:241(Condition) 2 0.000 0.000 0.000 0.000 threading.py:259(__init__) 2 0.000 0.000 0.000 0.000 threading.py:293(_release_save) 2 0.000 0.000 0.000 0.000 threading.py:296(_acquire_restre) 2 0.000 0.000 0.000 0.000 threading.py:299(_is_owned) 2 0.000 0.000 10.011 5.005 threading.py:308(wait) 1 0.000 0.000 0.000 0.000 threading.py:541(Event) 1 0.000 0.000 0.000 0.000 threading.py:560(__init__) 2 0.000 0.000 0.000 0.000 threading.py:569(isSet) 4 0.000 0.000 0.000 0.000 threading.py:58(__init__) 1 0.000 0.000 0.000 0.000 threading.py:602(wait) 1 0.000 0.000 0.000 0.000 threading.py:627(_newname) 5 0.000 0.000 0.000 0.000 threading.py:63(_note) 1 0.000 0.000 0.000 0.000 threading.py:656(__init__) 1 0.000 0.000 0.000 0.000 threading.py:709(_set_daemon) 1 0.000 0.000 0.000 0.000 threading.py:726(start) 1 0.000 0.000 10.010 10.010 threading.py:911(join) 10 10.010 1.001 10.010 1.001 {method 'acquire' of 'thread.lock' objects} 2 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects} 4 0.000 0.000 0.000 0.000 {method 'release' of 'thread.lock' objects} 4 0.000 0.000 0.000 0.000 {thread.allocate_lock} 2 0.000 0.000 0.000 0.000 {thread.get_ident} 1 0.000 0.000 0.000 0.000 {thread.start_new_thread} 

Como puede ver, dice que casi todo el tiempo se dedica a adquirir lockings. Por supuesto, sabemos que no es realmente una representación precisa de lo que estaba haciendo el script. Todo el tiempo se gastó en una llamada time.sleep dentro de f() . El tiempo alto de la llamada de acquire se debe a que join estaba esperando a que termine f , lo que significa que tuvo que sentarse y esperar para obtener un locking. Sin embargo, cProfile no muestra ningún tiempo en absoluto en f . Podemos ver claramente lo que realmente sucede porque el código de ejemplo es muy simple, pero en un progtwig más complicado, esta salida es muy engañosa.

Puede obtener resultados más confiables utilizando otra biblioteca de perfiles, como yappi :

 >>> import test >>> import yappi >>> yappi.set_clock_type("wall") >>> yappi.start() >>> test.main() >>> yappi.get_func_stats().print_all() Clock type: wall Ordered by: totaltime, desc name #n tsub ttot tavg :1  2/1 0.000025 10.00801 5.004003 test.py:10 main 1 0.000060 10.00798 10.00798 ..2.7/threading.py:308 _Condition.wait 2 0.000188 10.00746 5.003731 ..thon2.7/threading.py:911 Thread.join 1 0.000039 10.00706 10.00706 ..ython2.7/threading.py:752 Thread.run 1 0.000024 10.00682 10.00682 test.py:6 f 1 0.000013 10.00680 10.00680 ..hon2.7/threading.py:726 Thread.start 1 0.000045 0.000608 0.000608 ..thon2.7/threading.py:602 _Event.wait 1 0.000029 0.000484 0.000484 ..2.7/threading.py:656 Thread.__init__ 1 0.000064 0.000250 0.000250 ..on2.7/threading.py:866 Thread.__stop 1 0.000025 0.000121 0.000121 ..lib/python2.7/threading.py:541 Event 1 0.000011 0.000101 0.000101 ..python2.7/threading.py:241 Condition 2 0.000025 0.000094 0.000047 ..hreading.py:399 _Condition.notifyAll 1 0.000020 0.000090 0.000090 ..2.7/threading.py:560 _Event.__init__ 1 0.000018 0.000090 0.000090 ..thon2.7/encodings/utf_8.py:15 decode 2 0.000031 0.000071 0.000035 ..threading.py:259 _Condition.__init__ 2 0.000064 0.000069 0.000034 ..7/threading.py:372 _Condition.notify 1 0.000034 0.000068 0.000068 ..hreading.py:299 _Condition._is_owned 3 0.000017 0.000040 0.000013 ../threading.py:709 Thread._set_daemon 1 0.000018 0.000035 0.000035 ..ding.py:293 _Condition._release_save 2 0.000019 0.000033 0.000016 ..thon2.7/threading.py:63 Thread._note 7 0.000020 0.000020 0.000003 ..n2.7/threading.py:1152 currentThread 2 0.000015 0.000019 0.000009 ..g.py:296 _Condition._acquire_restre 2 0.000011 0.000017 0.000008 ../python2.7/threading.py:627 _newname 1 0.000014 0.000014 0.000014 ..n2.7/threading.py:58 Thread.__init__ 4 0.000013 0.000013 0.000003 ..threading.py:1008 _MainThread.daemon 1 0.000004 0.000004 0.000004 ..hon2.7/threading.py:569 _Event.isSet 2 0.000003 0.000003 0.000002 

Con yappi , es mucho más fácil ver que el tiempo se gasta en f .

Sospecho que encontrará que, en realidad, la mayor parte del tiempo de su script se dedica a hacer cualquier trabajo que se haga en produceA , produceB y produceC .

TL; DR Si su progtwig se ejecuta más lento de lo esperado, probablemente se deba a los detalles de lo que hacen las funciones intermedias en lugar de a IPC o subprocesos. Pruebe con funciones y procesos simulados (lo más simple posible) para aislar solo la sobrecarga de pasar datos a / desde los subprocesos. En un punto de referencia basado en su código (a continuación), el rendimiento al pasar datos a / desde subprocesos parece ser aproximadamente equivalente al uso de canalizaciones de shell directamente; Python no es particularmente lento en esta tarea.

¿Qué está pasando con el código original?

La forma general del código original es:

 def produceB(from_stream, to_stream): while True: buf = from_stream.read() processed_buf = do_expensive_calculation(buf) to_stream.write(processed_buf) 

En este caso, el cálculo entre lectura y escritura toma aproximadamente 2/3 del tiempo total de CPU de todos los procesos (principal y secundario) combinados: este es el tiempo de CPU, no el tiempo de reloj de pared por cierto.

Creo que esto evita que la E / S se ejecute a toda velocidad. Las lecturas y escrituras, y el cálculo, cada una debe tener su propio hilo, con colas para proporcionar almacenamiento en búfer entre la lectura y el cálculo y entre el cálculo y la escritura (ya que la cantidad de almacenamiento en búfer que proporcionan las tuberías es insuficiente, creo).

A continuación, muestro que si no hay procesamiento entre lectura y escritura (o de manera equivalente: si el procesamiento intermedio se realiza en un subproceso separado), el rendimiento de los subprocesos + subproceso es muy alto. También es posible tener hilos separados para lecturas y escrituras; esto agrega un poco de sobrecarga pero hace que las escrituras no bloqueen las lecturas y viceversa. Tres subprocesos (lectura, escritura y procesamiento) son aún mejores, entonces ninguno de los pasos bloquea los otros (por supuesto, dentro de los límites de los tamaños de cola).

Algunos puntos de referencia

Todos los puntos de referencia a continuación se encuentran en Python 2.7.6 en Ubuntu 14.04LTS 64 bits (Intel i7, Ivy Bridge, quad core). La prueba es transferir aproximadamente 1GB de datos en bloques de 4KB entre dos procesos de dd , y pasar los datos a través de python como intermediario. Los procesos de dd utilizan bloques de tamaño mediano (4KB); la E / S de texto típico sería más pequeña (a menos que esté bien almacenada en el búfer por el intérprete, etc.), la E / S binaria típica sería mucho más grande, por supuesto. Tengo un ejemplo basado exactamente en cómo lo hizo, y otro basado en un enfoque alternativo que probé hace algún tiempo (que resulta ser más lento). Por cierto, gracias por publicar esta pregunta, es útil.

Hilos y locking de E / S.

Primero, vamos a convertir el código original de la pregunta en un ejemplo autocontenido ligeramente más simple. Esto es solo dos procesos que se comunican con un hilo que bombea datos de uno a otro, haciendo lecturas y escrituras de locking.

 import subprocess, threading A_process = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE) B_process = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE) def convert_A_to_B(src, dst): read_size = 8*1024 while True: try: buf = src.read(read_size) if len(buf) == 0: # This is a bit hacky, but seems to reliably happen when the src is closed break dst.write(buf) except ValueError as e: # Reading or writing on a closed fd causes ValueError, not IOError print str(e) break convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, args=(A_process.stdout, B_process.stdin)) convert_A_to_B_thread.start() # Here, watch out for the exact sequence to clean things up convert_A_to_B_thread.join() A_process.wait() B_process.stdin.close() B_process.wait() 

Resultados:

 244140+0 records in 244140+0 records out 999997440 bytes (1.0 GB) copied, 0.638977 s, 1.6 GB/s 244140+0 records in 244140+0 records out 999997440 bytes (1.0 GB) copied, 0.635499 s, 1.6 GB/s real 0m0.678s user 0m0.657s sys 0m1.273s 

¡No está mal! Resulta que el tamaño de lectura ideal en este caso es aproximadamente 8k-16KB, los tamaños mucho más pequeños y mucho más grandes son algo más lentos. Probablemente esto esté relacionado con el tamaño de bloque de 4KB que le pedimos a dd que usemos.

E / S selectas y sin locking

Cuando antes estaba viendo este tipo de problema, me dirigí en la dirección de usar select() , E / S sin locking y un solo hilo. Un ejemplo de eso está en mi pregunta aquí: ¿Cómo leer y escribir desde subprocesos de forma asíncrona? . Eso fue para leer de dos procesos en paralelo, que he extendido a continuación para leer de un proceso y escribir en otro. Las escrituras sin locking están limitadas a PIPE_BUF o menos en tamaño, que es de 4 KB en mi sistema; Para simplificar, las lecturas también son 4KB aunque podrían ser de cualquier tamaño. Esto tiene algunos casos de esquinas raros (y cuelga inexplicable, dependiendo de los detalles) pero en la forma de abajo funciona de manera confiable.

 import subprocess, select, fcntl, os, sys p1 = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE) p2 = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE) def make_nonblocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) make_nonblocking(p1.stdout) make_nonblocking(p2.stdin) print "PIPE_BUF = %d" % (select.PIPE_BUF) read_size = select.PIPE_BUF max_buf_len = 1 # For reasons which I have not debugged completely, this hangs sometimes when set > 1 bufs = [] while True: inputready, outputready, exceptready = select.select([ p1.stdout.fileno() ],[ p2.stdin.fileno() ],[]) for fd in inputready: if fd == p1.stdout.fileno(): if len(bufs) < max_buf_len: data = p1.stdout.read(read_size) bufs.append(data) for fd in outputready: if fd == p2.stdin.fileno() and len(bufs) > 0: data = bufs.pop(0) p2.stdin.write(data) p1.poll() # If the first process is done and there is nothing more to write out if p1.returncode != None and len(bufs) == 0: # Again cleanup is tricky. We expect the second process to finish soon after its input is closed p2.stdin.close() p2.wait() p1.wait() break 

Resultados:

 PIPE_BUF = 4096 244140+0 records in 244140+0 records out 999997440 bytes (1.0 GB) copied, 3.13722 s, 319 MB/s 244133+0 records in 244133+0 records out 999968768 bytes (1.0 GB) copied, 3.13599 s, 319 MB/s real 0m3.167s user 0m2.719s sys 0m2.373s 

Sin embargo, esto es significativamente más lento que la versión anterior (incluso si el tamaño de lectura / escritura tiene un tamaño de 4 KB en una comparación de manzanas con manzanas). No estoy seguro de por qué.

PS última adición: Parece que está bien ignorar o superar PIPE_BUF. Esto hace que se lance una excepción IOError gran parte del tiempo desde p2.stdin.write() (errno = 11, temporalmente no disponible), probablemente cuando hay suficiente espacio en la tubería para escribir algo, pero menos del tamaño completo que somos solicitando El mismo código anterior con read_size = 64*1024 , y con esa excepción detectada e ignorada, se ejecuta a 1.4GB / s.

Tubería directamente

Al igual que una línea de base, ¿qué tan rápido es para ejecutar esto utilizando la versión de shell de tuberías (en subproceso)? Echemos un vistazo:

 import subprocess subprocess.call("dd if=/dev/zero bs=4k count=244140 | dd of=/dev/null bs=4k", shell=True) 

Resultados:

 244140+0 records in 244140+0 records out 244140+0 records in 244140+0 records out 999997440 bytes (1.0 GB) copied, 0.425261 s, 2.4 GB/s 999997440 bytes (1.0 GB) copied, 0.423687 s, 2.4 GB/s real 0m0.466s user 0m0.300s sys 0m0.590s 

Esto es notablemente más rápido que el ejemplo de python con hilos. Sin embargo, esto es solo una copia, mientras que la versión de python con subprocesos está haciendo dos (dentro y fuera de python). Modificando el comando a "dd if=/dev/zero bs=4k count=244140 | dd bs=4k | dd of=/dev/null bs=4k" lleve el rendimiento a 1.6GB, en línea con el ejemplo de python.

Cómo ejecutar una comparación en un sistema completo

Algunas reflexiones adicionales sobre cómo ejecutar una comparación en un sistema completo. Nuevamente, para simplificar, esto es solo dos procesos, y ambos scripts tienen exactamente la misma función convert_A_to_B() .

Script 1: Pase los datos en python, como arriba

 A_process = subprocess.Popen(["A", ... B_process = subprocess.Popen(["B", ... convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, ... 

Script 2: script de comparación, pasar datos en shell

 convert_A_to_B(sys.stdin, sys.stdout) 

ejecuta esto en el shell con: A | python script_2.py | B A | python script_2.py | B

Esto permite una comparación de manzanas con manzanas en un sistema completo, sin utilizar funciones / procesos simulados.

¿Cómo afecta el tamaño de la lectura del bloque a los resultados?

Para esta prueba, se usa el código del primer ejemplo (con hilos) anterior, y tanto dd como el script de python están configurados para usar las mismas lecturas / escrituras de bloque.

 | Block size | Throughput | |------------|------------| | 1KB | 249MB/s | | 2KB | 416MB/s | | 4KB | 552MB/s | | 8KB | 1.4GB/s | | 16KB | 1.8GB/s | | 32KB | 2.9GB/s | | 64KB | 3.0GB/s | | 128KB | 1.0GB/s | | 256KB | 600MB/s | 

En teoría, debería haber un mejor rendimiento con búferes más grandes (tal vez hasta efectos de caché), pero en la práctica, las tuberías de Linux se ralentizan con búferes muy grandes, incluso cuando se usan tuberías de shell puro.

Desde que habló sobre popen() y pthreads en los comentarios, supongo que está bajo un sistema POSIX (tal vez Linux). Entonces, ¿intentaste usar subprocess32 lugar de la biblioteca de subprocess estándar?

Su uso está fuertemente recomendado por la documentación y puede llevar a alguna mejora.

PD: Creo que mezclar tenedores ( subprocess ) e hilos es una mala idea .

PS2: ¿Por qué python produceA.py | A | python produceB.py | B | python produceC.py | C python produceA.py | A | python produceB.py | B | python produceC.py | C python produceA.py | A | python produceB.py | B | python produceC.py | C no se ajusta a tus necesidades? ¿O su equivalente usando subprocess ?

Este escenario es particularmente adecuado para una tubería, donde el paralelismo es administrado implícitamente por el sistema operativo. Ya que estás buscando una solución de un solo script, aquí estás:

 #! /usr/bin/python2 import sys import subprocess import pipes # Define these as needed def produceA(input, output): output.write(input.read()) def produceB(input, output): output.write(input.read()) def produceC(input, output): output.write(input.read()) # Magic starts here COMMAND = "{me} prepare_A | A - | {me} A_to_B | B - | {me} B_to_C | C -" def bootstrap(input, output): """Prepares and runs the pipeline.""" me = "./{}".format(pipes.quote(__file__)) subprocess.call( COMMAND.format(me=me), stdin=input, stdout=output, shell=True, bufsize=-1 ) if __name__ == '__main__': ACTIONS = { "prepare_A": produceA, "A_to_B": produceB, "B_to_C": produceC } action = ACTIONS[sys.argv[1]] if len(sys.argv) > 1 else bootstrap action(sys.stdin, sys.stdout) 

Esta secuencia de comandos configurará una canalización o ejecución de una de las funciones de produce , dependiendo del comando especificado.

Hazlo ejecutable y ejecútalo sin argumentos:

 ./A_to_C.py < A.txt > C.txt 

Nota: parece que estás usando Python 2.6, por lo que esta solución es para Python 2.x, aunque debería funcionar bien en Python 3.x, excepto que la función de quote se ha movido a shlex desde Python 3.3