Comunicación entre hilos en Python (sin utilizar variables globales)

Digamos si tenemos un hilo principal que inicia dos hilos para los módulos de prueba: “test_a” y “test_b”. Ambos subprocesos del módulo de prueba mantienen su estado ya sea que hayan terminado de realizar la prueba o si se encontraron con algún error, advertencia o si desean actualizar alguna otra información.

Cómo el hilo principal puede acceder a esta información y actuar en consecuencia. Por ejemplo, si “test_a” levantó un indicador de error; ¿Cómo “principal” sabrá y detendrá el rest de las pruebas antes de existir con un error?

Una forma de hacerlo es usar variables globales, pero eso se pone muy feo … Muy pronto.

La solución obvia es compartir algún tipo de variable mutable, pasándola a los objetos / funciones del hilo en el constructor / inicio.

La forma limpia de hacerlo es crear una clase con los atributos de instancia adecuados. Si está usando un threading.Thread Subclase de threading.Thread , en lugar de solo una función de subproceso, generalmente puede usar la subclase como el lugar para pegar esos atributos. Pero lo mostraré con una list solo porque es más corto:

 def test_a_func(thread_state): # ... thread_state[0] = my_error_state # ... def main_thread(): test_states = [None] test_a = threading.Thread(target=test_a_func, args=(test_states,)) test_a.start() 

También puede (y normalmente desea) empaquetar un Lock o una Condition en el objeto de estado mutable, para que pueda sincronizar correctamente entre main_thread y test_a .

(Otra opción es usar una queue.Queue Una queue.Queue , un os.pipe , etc. para pasar la información, pero aún así debes tener esa cola o canalización en el subproceso secundario, lo que haces exactamente de la misma manera que arriba).


Sin embargo, vale la pena considerar si realmente necesitas hacer esto. Si piensa que test_a y test_b son “trabajos”, en lugar de “funciones de subproceso”, puede ejecutar esos trabajos en un grupo y dejar que el grupo maneje los resultados o errores.

Por ejemplo:

 try: with concurrent.futures.ThreadPoolExecutor(workers=2) as executor: tests = [executor.submit(job) for job in (test_a, test_b)] for test in concurrent.futures.as_completed(tests): result = test.result() except Exception as e: # do stuff 

Ahora, si la función test_a genera una excepción, el subproceso principal obtendrá esa excepción y, porque eso significa salir del bloque with , y todos los demás trabajos se cancelan y se desechan, y los subprocesos de trabajo se cierran.

Si está usando 2.5-3.1, no tiene incorporados los valores futuros, pero puede instalar el backport desde PyPI , o puede reescribir cosas en torno a multiprocessing.dummy.Pool . (Es un poco más complicado de esa manera, porque tienes que crear una secuencia de trabajos y llamar a map_async para recuperar un iterador sobre los objetos AsyncResult … pero en realidad eso es bastante simple).