Captura la excepción de un hilo en el hilo de la persona que llama en Python

Soy muy nuevo en Python y en progtwigción multihilo en general. Básicamente, tengo un script que copiará archivos a otra ubicación. Me gustaría que esto se coloque en otro hilo para que pueda generar .... para indicar que el script todavía se está ejecutando.

El problema que tengo es que si los archivos no se pueden copiar, se producirá una excepción. Esto está bien si se ejecuta en el hilo principal; Sin embargo, tener el siguiente código no funciona:

 try: threadClass = TheThread(param1, param2, etc.) threadClass.start() ##### **Exception takes place here** except: print "Caught an exception" 

En la propia clase de subproceso, intenté volver a lanzar la excepción, pero no funciona. He visto a personas de aquí hacer preguntas similares, pero todos parecen estar haciendo algo más específico de lo que estoy tratando de hacer (y no entiendo muy bien las soluciones ofrecidas). He visto a personas mencionar el uso de sys.exc_info() , sin embargo, no sé dónde ni cómo usarlo.

¡Toda ayuda es muy apreciada!

EDITAR: El código para la clase de hilo está abajo:

     class TheThread(threading.Thread): def __init__(self, sourceFolder, destFolder): threading.Thread.__init__(self) self.sourceFolder = sourceFolder self.destFolder = destFolder def run(self): try: shul.copytree(self.sourceFolder, self.destFolder) except: raise 

    El problema es que thread_obj.start() devuelve inmediatamente. El subproceso secundario que generó se ejecuta en su propio contexto, con su propia stack. Cualquier excepción que ocurra allí está en el contexto del subproceso secundario, y está en su propia stack. Una forma en la que puedo pensar en este momento para comunicar esta información al subproceso principal es mediante el uso de algún tipo de paso de mensajes, por lo que podría investigarlo.

    Pruébalo para el tamaño:

     import sys import threading import Queue class ExcThread(threading.Thread): def __init__(self, bucket): threading.Thread.__init__(self) self.bucket = bucket def run(self): try: raise Exception('An error occured here.') except Exception: self.bucket.put(sys.exc_info()) def main(): bucket = Queue.Queue() thread_obj = ExcThread(bucket) thread_obj.start() while True: try: exc = bucket.get(block=False) except Queue.Empty: pass else: exc_type, exc_obj, exc_trace = exc # deal with the exception print exc_type, exc_obj print exc_trace thread_obj.join(0.1) if thread_obj.isAlive(): continue else: break if __name__ == '__main__': main() 

    El módulo concurrent.futures facilita el trabajo en subprocesos (o procesos) separados y maneja las excepciones resultantes:

     import concurrent.futures import shutil def copytree_with_dots(src_path, dst_path): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: # Execute the copy on a separate thread, # creating a future object to track progress. future = executor.submit(shutil.copytree, src_path, dst_path) while future.running(): # Print pretty dots here. pass # Return the value returned by shutil.copytree(), None. # Raise any exceptions raised during the copy process. return future.result() 

    concurrent.futures se incluye con Python 3.2 y está disponible como el módulo de futures puerto futures para versiones anteriores.

    Aunque no es posible capturar directamente una excepción lanzada en un hilo diferente, aquí hay un código para obtener algo muy cercano a esta funcionalidad de manera transparente. Su subproceso secundario debe subclasificar la clase ExThread lugar de threading.Thread y el subproceso primario debe llamar al método child_thread.join_with_exception() lugar de child_thread.join() cuando espera que el hilo termine su trabajo

    Detalles técnicos de esta implementación: cuando el subproceso secundario lanza una excepción, se pasa al principal a través de una Queue y se lanza nuevamente al subproceso principal. Observe que no hay espera ocupada en este enfoque.

     #!/usr/bin/env python import sys import threading import Queue class ExThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.__status_queue = Queue.Queue() def run_with_exception(self): """This method should be overriden.""" raise NotImplementedError def run(self): """This method should NOT be overriden.""" try: self.run_with_exception() except BaseException: self.__status_queue.put(sys.exc_info()) self.__status_queue.put(None) def wait_for_exc_info(self): return self.__status_queue.get() def join_with_exception(self): ex_info = self.wait_for_exc_info() if ex_info is None: return else: raise ex_info[1] class MyException(Exception): pass class MyThread(ExThread): def __init__(self): ExThread.__init__(self) def run_with_exception(self): thread_name = threading.current_thread().name raise MyException("An error in thread '{}'.".format(thread_name)) def main(): t = MyThread() t.start() try: t.join_with_exception() except MyException as ex: thread_name = threading.current_thread().name print "Caught a MyException in thread '{}': {}".format(thread_name, ex) if __name__ == '__main__': main() 

    Si se produce una excepción en un subproceso, la mejor manera es volver a elevarlo en el subproceso de la persona que llama durante la join . Puede obtener información sobre la excepción que se está manejando actualmente utilizando la función sys.exc_info() . Esta información puede almacenarse simplemente como una propiedad del objeto de subproceso hasta que se llame a join , en cuyo momento se puede volver a elevar.

    Tenga en cuenta que un Queue.Queue (como se sugiere en otras respuestas) no es necesario en este caso simple donde el hilo se lanza como máximo 1 excepción y se completa justo después de lanzar una excepción . Evitamos las condiciones de la carrera simplemente esperando que el hilo se complete.

    Por ejemplo, extienda ExcThread (abajo), anulando excRun (en lugar de run ).

    Python 2.x:

     import threading class ExcThread(threading.Thread): def excRun(self): pass def run(self): self.exc = None try: # Possibly throws an exception self.excRun() except: import sys self.exc = sys.exc_info() # Save details of the exception thrown but don't rethrow, # just complete the function def join(self): threading.Thread.join(self) if self.exc: msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1]) new_exc = Exception(msg) raise new_exc.__class__, new_exc, self.exc[2] 

    Python 3.x:

    La forma de 3 argumentos para el raise ha desaparecido en Python 3, así que cambia la última línea a:

     raise new_exc.with_traceback(self.exc[2]) 

    Hay muchas respuestas realmente extrañas y complicadas a esta pregunta. Estoy simplificando esto en exceso, porque me parece suficiente para la mayoría de las cosas.

     from threading import Thread class PropagatingThread(Thread): def run(self): self.exc = None try: if hasattr(self, '_Thread__target'): # Thread uses name mangling prior to Python 3. self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) else: self.ret = self._target(*self._args, **self._kwargs) except BaseException as e: self.exc = e def join(self): super(PropagatingThread, self).join() if self.exc: raise self.exc return self.ret 

    Si está seguro de que solo se ejecutará en una u otra versión de Python, podría reducir el método run() a solo la versión mutilada (si solo se ejecutará en versiones de Python antes del 3). ), o simplemente la versión limpia (si solo se ejecutará en versiones de Python que comienzan con 3).

    Ejemplo de uso:

     def f(*args, **kwargs) print(args) print(kwargs) raise Exception('I suck') t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'}) t.start() t.join() 

    Y verás la excepción planteada en el otro hilo cuando te unes.

    Este fue un pequeño problema desagradable, y me gustaría incluir mi solución. Algunas otras soluciones que encontré (async.io, por ejemplo) parecían prometedoras pero también presentaban un poco de una caja negra. El enfoque de bucle de cola / evento te enlaza con una implementación determinada. Sin embargo, el código fuente de futuros concurrentes es de solo 1000 líneas y es fácil de comprender . Me permitió resolver fácilmente mi problema: crear subprocesos de trabajo ad-hoc sin mucha configuración, y poder detectar excepciones en el subproceso principal.

    Mi solución utiliza la API de futuros simultáneos y la API de subprocesos. Te permite crear un trabajador que te da tanto el hilo como el futuro. De esa manera, puedes unirte al hilo para esperar el resultado:

     worker = Worker(test) thread = worker.start() thread.join() print(worker.future.result()) 

    … o puede dejar que el trabajador simplemente envíe una callback cuando termine:

     worker = Worker(test) thread = worker.start(lambda x: print('callback', x)) 

    … o puedes hacer un bucle hasta que el evento se complete:

     worker = Worker(test) thread = worker.start() while True: print("waiting") if worker.future.done(): exc = worker.future.exception() print('exception?', exc) result = worker.future.result() print('result', result) break time.sleep(0.25) 

    Aquí está el código:

     from concurrent.futures import Future import threading import time class Worker(object): def __init__(self, fn, args=()): self.future = Future() self._fn = fn self._args = args def start(self, cb=None): self._cb = cb self.future.set_running_or_notify_cancel() thread = threading.Thread(target=self.run, args=()) thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process thread.start() return thread def run(self): try: self.future.set_result(self._fn(*self._args)) except BaseException as e: self.future.set_exception(e) if(self._cb): self._cb(self.future.result()) 

    … y la función de prueba:

     def test(*args): print('args are', args) time.sleep(2) raise Exception('foo') 

    Como noobie a Threading, me tomó mucho tiempo entender cómo implementar el código de Mateusz Kobos (arriba). Aquí hay una versión clarificada para ayudar a entender cómo usarla.

     #!/usr/bin/env python import sys import threading import Queue class ExThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.__status_queue = Queue.Queue() def run_with_exception(self): """This method should be overriden.""" raise NotImplementedError def run(self): """This method should NOT be overriden.""" try: self.run_with_exception() except Exception: self.__status_queue.put(sys.exc_info()) self.__status_queue.put(None) def wait_for_exc_info(self): return self.__status_queue.get() def join_with_exception(self): ex_info = self.wait_for_exc_info() if ex_info is None: return else: raise ex_info[1] class MyException(Exception): pass class MyThread(ExThread): def __init__(self): ExThread.__init__(self) # This overrides the "run_with_exception" from class "ExThread" # Note, this is where the actual thread to be run lives. The thread # to be run could also call a method or be passed in as an object def run_with_exception(self): # Code will function until the int print "sleeping 5 seconds" import time for i in 1, 2, 3, 4, 5: print i time.sleep(1) # Thread should break here int("str") # I'm honestly not sure why these appear here? So, I removed them. # Perhaps Mateusz can clarify? # thread_name = threading.current_thread().name # raise MyException("An error in thread '{}'.".format(thread_name)) if __name__ == '__main__': # The code lives in MyThread in this example. So creating the MyThread # object set the code to be run (but does not start it yet) t = MyThread() # This actually starts the thread t.start() print print ("Notice 't.start()' is considered to have completed, although" " the countdown continues in its new thread. So you code " "can tinue into new processing.") # Now that the thread is running, the join allows for monitoring of it try: t.join_with_exception() # should be able to be replace "Exception" with specific error (untested) except Exception, e: print print "Exceptioon was caught and control passed back to the main thread" print "Do some handling here...or raise a custom exception " thread_name = threading.current_thread().name e = ("Caught a MyException in thread: '" + str(thread_name) + "' [" + str(e) + "]") raise Exception(e) # Or custom class of exception, such as MyException 

    concurrent.futures.as_completed

    https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

    La siguiente solución:

    • vuelve al hilo principal inmediatamente cuando se llama a una excepción
    • no requiere clases adicionales definidas por el usuario porque no necesita:
      • una Queue explícita
      • para agregar una excepción en su trabajo

    Fuente:

     #!/usr/bin/env python3 import concurrent.futures import time def func_that_raises(do_raise): for i in range(3): print(i) time.sleep(0.1) if do_raise: raise Exception() for i in range(3): print(i) time.sleep(0.1) with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures = [] futures.append(executor.submit(func_that_raises, False)) futures.append(executor.submit(func_that_raises, True)) for future in concurrent.futures.as_completed(futures): print(repr(future.exception())) 

    Salida posible:

     0 0 1 1 2 2 0 Exception() 1 2 None 

    Desafortunadamente, no es posible matar futuros para cancelar los demás, ya que uno falla:

    • concurrent.features ; Python: concurrent.futures ¿Cómo hacerlo cancelable?
    • threading : ¿Hay alguna forma de matar un hilo?
    • C pthreads: Kill Thread en Pthread Library

    Si haces algo como:

     for future in concurrent.futures.as_completed(futures): if future.exception() is not None: raise future.exception() 

    luego el with atrapa, y espera que el segundo hilo termine antes de continuar. Lo siguiente se comporta de manera similar:

     for future in concurrent.futures.as_completed(futures): future.result() 

    desde future.result() vuelve a elevar la excepción si se produce una.

    Si desea salir de todo el proceso de Python, puede salirse con os._exit(0) , pero esto probablemente significa que necesita un refactor.

    Probado en Python 3.6.7, Ubuntu 18.04.

    De manera similar a RickardSjogren’s sin Queue, sys, etc. pero también sin algunos escuchas a las señales: ejecute directamente un controlador de excepciones que corresponde a un bloque de excepción.

     #!/usr/bin/env python3 import threading class ExceptionThread(threading.Thread): def __init__(self, callback=None, *args, **kwargs): """ Redirect exceptions of thread to an exception handler. :param callback: function to handle occured exception :type callback: function(thread, exception) :param args: arguments for threading.Thread() :type args: tuple :param kwargs: keyword arguments for threading.Thread() :type kwargs: dict """ self._callback = callback super().__init__(*args, **kwargs) def run(self): try: if self._target: self._target(*self._args, **self._kwargs) except BaseException as e: if self._callback is None: raise e else: self._callback(self, e) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs, self._callback 

    Solo self._callback y el bloque de excepción en run () son adicionales a los subprocesos normales.

    Sé que llego un poco tarde a la fiesta aquí, pero tenía un problema muy similar, pero incluía el uso de tkinter como GUI, y mainloop hacía imposible usar cualquiera de las soluciones que dependen de .join (). Por lo tanto, adapté la solución dada en la EDICIÓN de la pregunta original, pero la hice más general para que sea más fácil de entender para los demás.

    Aquí está la nueva clase de hilo en acción:

     import threading import traceback import logging class ExceptionThread(threading.Thread): def __init__(self, *args, **kwargs): threading.Thread.__init__(self, *args, **kwargs) def run(self): try: if self._target: self._target(*self._args, **self._kwargs) except Exception: logging.error(traceback.format_exc()) def test_function_1(input): raise IndexError(input) if __name__ == "__main__": input = 'useful' t1 = ExceptionThread(target=test_function_1, args=[input]) t1.start() 

    Por supuesto, siempre puede hacer que maneje la excepción de alguna otra manera desde el registro, como imprimirla o enviarla a la consola.

    Esto le permite utilizar la clase ExceptionThread exactamente como lo haría con la clase Thread, sin modificaciones especiales.

    Un método que me gusta se basa en el patrón de observador . Defino una clase de señal que mi hilo usa para emitir excepciones a los oyentes. También se puede utilizar para devolver valores de subprocesos. Ejemplo:

     import threading class Signal: def __init__(self): self._subscribers = list() def emit(self, *args, **kwargs): for func in self._subscribers: func(*args, **kwargs) def connect(self, func): self._subscribers.append(func) def disconnect(self, func): try: self._subscribers.remove(func) except ValueError: raise ValueError('Function {0} not removed from {1}'.format(func, self)) class WorkerThread(threading.Thread): def __init__(self, *args, **kwargs): super(WorkerThread, self).__init__(*args, **kwargs) self.Exception = Signal() self.Result = Signal() def run(self): if self._Thread__target is not None: try: self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) except Exception as e: self.Exception.emit(e) else: self.Result.emit(self._return_value) if __name__ == '__main__': import time def handle_exception(exc): print exc.message def handle_result(res): print res def a(): time.sleep(1) raise IOError('a failed') def b(): time.sleep(2) return 'b returns' t = WorkerThread(target=a) t2 = WorkerThread(target=b) t.Exception.connect(handle_exception) t2.Result.connect(handle_result) t.start() t2.start() print 'Threads started' t.join() t2.join() print 'Done' 

    No tengo suficiente experiencia trabajando con subprocesos para afirmar que este es un método completamente seguro. Pero me ha funcionado y me gusta la flexibilidad.

    No es una buena práctica usar excepciones al desnudo, ya que por lo general atrapa más de lo que negocia.

    Sugeriría modificar la except para capturar ÚNICAMENTE la excepción que desea manejar. No creo que elevarlo tenga el efecto deseado, porque cuando se TheThread instancia de TheThread en el try externo, si se genera una excepción, la asignación nunca se realizará.

    En su lugar, es posible que desee solo alertarlo y seguir adelante, como por ejemplo:

     def run(self): try: shul.copytree(self.sourceFolder, self.destFolder) except OSError, err: print err 

    Luego, cuando se captura esa excepción, puedes manejarla allí. Luego, cuando el try externo TheThread una excepción de TheThread , sabrás que no será el que ya manejaste y te ayudará a aislar el flujo de tu proceso.

    Una forma sencilla de capturar la excepción del hilo y comunicarse con el método de la persona que llama podría ser pasando un diccionario o una lista al método de worker .

    Ejemplo (pasando el diccionario al método de trabajo):

     import threading def my_method(throw_me): raise Exception(throw_me) def worker(shared_obj, *args, **kwargs): try: shared_obj['target'](*args, **kwargs) except Exception as err: shared_obj['err'] = err shared_obj = {'err':'', 'target': my_method} throw_me = "Test" th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={}) th.start() th.join() if shared_obj['err']: print(">>%s" % shared_obj['err']) 

    Envolver hilo con almacenamiento de excepción.

     import threading import sys class ExcThread(threading.Thread): def __init__(self, target, args = None): self.args = args if args else [] self.target = target self.exc = None threading.Thread.__init__(self) def run(self): try: self.target(*self.args) raise Exception('An error occured here.') except Exception: self.exc=sys.exc_info() def main(): def hello(name): print(!"Hello, {name}!") thread_obj = ExcThread(target=hello, args=("Jack")) thread_obj.start() thread_obj.join() exc = thread_obj.exc if exc: exc_type, exc_obj, exc_trace = exc print(exc_type, ':',exc_obj, ":", exc_trace) main()