Multiprocesamiento de Python: ¿Cómo puedo CONFIANZA redirigir la salida estándar de un proceso secundario?

NÓTESE BIEN. He visto la salida del registro de multiprocesamiento . Proceso : desafortunadamente, no responde a esta pregunta.

Estoy creando un proceso hijo (en windows) a través de multiprocesamiento. Quiero que toda la salida stdout y stderr del proceso hijo se redirija a un archivo de registro, en lugar de aparecer en la consola. La única sugerencia que he visto es que el proceso hijo establezca sys.stdout en un archivo. Sin embargo, esto no redirige de manera efectiva toda la salida de la salida estándar, debido al comportamiento de la redirección de la salida estándar en Windows.

Para ilustrar el problema, cree una DLL de Windows con el siguiente código

#include  extern "C" { __declspec(dllexport) void writeToStdOut() { std::cout << "Writing to STDOUT from test DLL" << std::endl; } } 

Luego, cree y ejecute una secuencia de comandos de python como la siguiente, que importa esta DLL y llama a la función:

 from ctypes import * import sys print print "Writing to STDOUT from python, before redirect" print sys.stdout = open("stdout_redirect_log.txt", "w") print "Writing to STDOUT from python, after redirect" testdll = CDLL("Release/stdout_test.dll") testdll.writeToStdOut() 

Para ver el mismo comportamiento que yo, es probable que la DLL se cree en un tiempo de ejecución de C diferente al que utiliza Python. En mi caso, Python está creado con Visual Studio 2010, pero mi DLL está creado con VS 2005.

El comportamiento que veo es que la consola muestra:

 > stdout_test.py Writing to STDOUT from python, before redirect Writing to STDOUT from test DLL 

Mientras que el archivo stdout_redirect_log.txt termina conteniendo:

 Writing to STDOUT from python, after redirect 

En otras palabras, la configuración de sys.stdout no pudo redirigir la salida de salida estándar generada por la DLL. Esto no es sorprendente dada la naturaleza de las API subyacentes para la redirección de la salida estándar en Windows. He encontrado este problema en el nivel nativo / C ++ anteriormente y nunca encontré una forma de redirigir de manera confiable la salida estándar desde un proceso. Tiene que ser hecho externamente.

Esta es realmente la razón por la que estoy iniciando un proceso secundario: es para poder conectarme externamente a sus tuberías y así garantizar que intercepto toda su salida. Definitivamente puedo hacer esto iniciando el proceso manualmente con pywin32, pero me gustaría mucho poder usar las instalaciones de multiprocesamiento, en particular la capacidad de comunicarse con el proceso hijo a través de un objeto Pipe de multiprocesamiento, para avanzar. actualizaciones La pregunta es si hay alguna forma de usar el multiprocesamiento para sus instalaciones de IPC y de redirigir de manera confiable todas las salidas de stdout y stderr del niño a un archivo.

ACTUALIZACIÓN: Al observar el código fuente de multiprocessing.Processs, tiene un miembro estático, _Popen, que parece que se puede usar para anular la clase utilizada para crear el proceso. Si se establece en Ninguno (predeterminado), utiliza multiprocessing.forking._Popen, pero parece que al decir

 multiprocessing.Process._Popen = MyPopenClass 

Podría anular la creación del proceso. Sin embargo, aunque podría derivar de multiprocessing.forking._Popen, parece que tendría que copiar un montón de cosas internas en mi implementación, lo que suena inestable y no muy a prueba de futuro. Si esa es la única opción, creo que probablemente me gustaría hacer todo esto manualmente con pywin32.

La solución que sugiere es una buena: cree sus procesos manualmente de modo que tenga acceso explícito a sus controladores de archivos stdout / stderr. Luego puede crear un socket para comunicarse con el subproceso y usar multiprocessing.connection a través de ese socket (multiprocessing.Pipe crea el mismo tipo de objeto de conexión, por lo que esto debería proporcionarle la misma funcionalidad de IPC).

Aquí hay un ejemplo de dos archivos.

master.py:

 import multiprocessing.connection import subprocess import socket import sys, os ## Listen for connection from remote process (and find free port number) port = 10000 while True: try: l = multiprocessing.connection.Listener(('localhost', int(port)), authkey="secret") break except socket.error as ex: if ex.errno != 98: raise port += 1 ## if errno==98, then port is not available. proc = subprocess.Popen((sys.executable, "subproc.py", str(port)), stdout=subprocess.PIPE, stderr=subprocess.PIPE) ## open connection for remote process conn = l.accept() conn.send([1, "asd", None]) print(proc.stdout.readline()) 

subproc.py:

 import multiprocessing.connection import subprocess import sys, os, time port = int(sys.argv[1]) conn = multiprocessing.connection.Client(('localhost', port), authkey="secret") while True: try: obj = conn.recv() print("received: %s\n" % str(obj)) sys.stdout.flush() except EOFError: ## connection closed break 

También es posible que desee ver la primera respuesta a esta pregunta para obtener lecturas sin locking del subproceso.

No creo que tenga una mejor opción que redirigir un subproceso a un archivo como mencionó en su comentario.

La forma en que funcionan las consolas stdin / out / err en Windows es que cada proceso cuando nace tiene sus controladores std definidos. Puedes cambiarlos con SetStdHandle . Cuando modificas sys.stdout de python, solo modificas donde python imprime cosas, no donde otras DLL están imprimiendo cosas. Parte del CRT en su DLL es usar GetStdHandle para averiguar dónde imprimir. Si lo desea, puede hacer cualquier canalización que desee en la API de Windows en su DLL o en su script de python con pywin32. Aunque creo que será más sencillo con el subproceso .

Asumo que estoy fuera de lugar y me estoy perdiendo algo, pero para lo que vale la pena aquí es lo que me vino a la mente cuando leí tu pregunta.

Si puede interceptar todos los stdout y stderr (tengo esa impresión de su pregunta), ¿por qué no agregar o envolver esa funcionalidad de captura en cada uno de sus procesos? Luego, envíe lo que se captura a través de una cola a un consumidor que puede hacer lo que quiera con todas las salidas.

En mi situación cambié sys.stdout.write para escribir en un PySide QTextEdit. No pude leer desde sys.stdout y no sabía cómo cambiar sys.stdout para que sea legible. He creado dos tubos. Uno para stdout y el otro para stderr. En el proceso separado redirigir sys.stdout y sys.stderr a la conexión secundaria de la tubería de multiprocesamiento. En el proceso principal, creé dos subprocesos para leer la canalización principal stdout y stderr y redirigir los datos de tubería a sys.stdout y sys.stderr .

 import sys import contextlib import threading import multiprocessing as mp import multiprocessing.queues from queue import Empty import time class PipeProcess(mp.Process): """Process to pipe the output of the sub process and redirect it to this sys.stdout and sys.stderr. Note: The use_queue = True argument will pass data between processes using Queues instead of Pipes. Queues will give you the full output and read all of the data from the Queue. A pipe is more efficient, but may not redirect all of the output back to the main process. """ def __init__(self, group=None, target=None, name=None, args=tuple(), kwargs={}, *_, daemon=None, use_pipe=None, use_queue=None): self.read_out_th = None self.read_err_th = None self.pipe_target = target self.pipe_alive = mp.Event() if use_pipe or (use_pipe is None and not use_queue): # Default self.parent_stdout, self.child_stdout = mp.Pipe(False) self.parent_stderr, self.child_stderr = mp.Pipe(False) else: self.parent_stdout = self.child_stdout = mp.Queue() self.parent_stderr = self.child_stderr = mp.Queue() args = (self.child_stdout, self.child_stderr, target) + tuple(args) target = self.run_pipe_out_target super(PipeProcess, self).__init__(group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon) def start(self): """Start the multiprocess and reading thread.""" self.pipe_alive.set() super(PipeProcess, self).start() self.read_out_th = threading.Thread(target=self.read_pipe_out, args=(self.pipe_alive, self.parent_stdout, sys.stdout)) self.read_err_th = threading.Thread(target=self.read_pipe_out, args=(self.pipe_alive, self.parent_stderr, sys.stderr)) self.read_out_th.daemon = True self.read_err_th.daemon = True self.read_out_th.start() self.read_err_th.start() @classmethod def run_pipe_out_target(cls, pipe_stdout, pipe_stderr, pipe_target, *args, **kwargs): """The real multiprocessing target to redirect stdout and stderr to a pipe or queue.""" sys.stdout.write = cls.redirect_write(pipe_stdout) # , sys.__stdout__) # Is redirected in main process sys.stderr.write = cls.redirect_write(pipe_stderr) # , sys.__stderr__) # Is redirected in main process pipe_target(*args, **kwargs) @staticmethod def redirect_write(child, out=None): """Create a function to write out a pipe and write out an additional out.""" if isinstance(child, mp.queues.Queue): send = child.put else: send = child.send_bytes # No need to pickle with child_conn.send(data) def write(data, *args): try: if isinstance(data, str): data = data.encode('utf-8') send(data) if out is not None: out.write(data) except: pass return write @classmethod def read_pipe_out(cls, pipe_alive, pipe_out, out): if isinstance(pipe_out, mp.queues.Queue): # Queue has better functionality to get all of the data def recv(): return pipe_out.get(timeout=0.5) def is_alive(): return pipe_alive.is_set() or pipe_out.qsize() > 0 else: # Pipe is more efficient recv = pipe_out.recv_bytes # No need to unpickle with data = pipe_out.recv() is_alive = pipe_alive.is_set # Loop through reading and redirecting data while is_alive(): try: data = recv() if isinstance(data, bytes): data = data.decode('utf-8') out.write(data) except EOFError: break except Empty: pass except: pass def join(self, *args): # Wait for process to finish (unless a timeout was given) super(PipeProcess, self).join(*args) # Trigger to stop the threads self.pipe_alive.clear() # Pipe must close to prevent blocking and waiting on recv forever if not isinstance(self.parent_stdout, mp.queues.Queue): with contextlib.suppress(): self.parent_stdout.close() with contextlib.suppress(): self.parent_stderr.close() # Close the pipes and threads with contextlib.suppress(): self.read_out_th.join() with contextlib.suppress(): self.read_err_th.join() def run_long_print(): for i in range(1000): print(i) print(i, file=sys.stderr) print('finished') if __name__ == '__main__': # Example test write (My case was a QTextEdit) out = open('stdout.log', 'w') err = open('stderr.log', 'w') # Overwrite the write function and not the actual stdout object to prove this works sys.stdout.write = out.write sys.stderr.write = err.write # Create a process that uses pipes to read multiprocess output back into sys.stdout.write proc = PipeProcess(target=run_long_print, use_queue=True) # If use_pipe=True Pipe may not write out all values # proc.daemon = True # If daemon and use_queue Not all output may be redirected to stdout proc.start() # time.sleep(5) # Not needed unless use_pipe or daemon and all of stdout/stderr is desired # Close the process proc.join() # For some odd reason this blocks forever when use_queue=False # Close the output files for this test out.close() err.close()