Pausar dos hilos de Python mientras que un tercero hace cosas (¿con lockings?)

Soy nuevo en la progtwigción concurrente.

Me gustaría ejecutar tres tareas repetidamente. Los dos primeros deben ejecutarse todo el tiempo, el tercero debe ejecutarse aproximadamente cada hora. Las primeras dos tareas pueden ejecutarse en paralelo, pero siempre quiero pausarlas mientras se ejecuta la tercera tarea.

Aquí está el esqueleto de lo que he intentado:

import threading import time flock = threading.Lock() glock = threading.Lock() def f(): while True: with flock: print 'f' time.sleep(1) def g(): while True: with glock: print 'g' time.sleep(1) def h(): while True: with flock: with glock: print 'h' time.sleep(5) threading.Thread(target=f).start() threading.Thread(target=g).start() threading.Thread(target=h).start() 

Yo esperaría que este código imprima una f y g cada segundo, y una h cada cinco segundos. Sin embargo, cuando lo ejecuto, se necesitan alrededor de 12 f y 12 g antes de empezar a ver algunas h. Parece que los dos primeros subprocesos liberan y recuperan constantemente sus lockings mientras que el tercer subproceso se queda fuera del bucle.

  1. ¿Porqué es eso? Cuando el tercer subproceso intenta adquirir un locking retenido actualmente y se libera, ¿no debería la adquisición tener éxito inmediatamente en lugar de que el primer / segundo subproceso lo adquiera nuevamente? Probablemente estoy malinterpretando algo.
  2. ¿Cuál sería una buena manera de lograr lo que quiero?

Nota: mover las time.sleep(1) fuera del bloque flock / glock funciona para este simple ejemplo, pero aparentemente no para mi aplicación real donde los hilos pasan la mayor parte del tiempo haciendo las operaciones reales. Cuando los dos primeros subprocesos duermen un segundo después de cada ejecución del cuerpo del bucle, con el locking liberado, la tercera tarea aún nunca se ejecuta.

¿Qué tal hacerlo con hilos.Eventos :

 import threading import time import logging logger=logging.getLogger(__name__) def f(resume,is_waiting,name): while True: if not resume.is_set(): is_waiting.set() logger.debug('{n} pausing...'.format(n=name)) resume.wait() is_waiting.clear() logger.info(name) time.sleep(1) def h(resume,waiters): while True: logger.debug('halt') resume.clear() for i,w in enumerate(waiters): logger.debug('{i}: wait for worker to pause'.format(i=i)) w.wait() logger.info('h begin') time.sleep(2) logger.info('h end') logger.debug('resume') resume.set() time.sleep(5) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') # set means resume; clear means halt resume = threading.Event() resume.set() waiters=[] for name in 'fg': is_waiting=threading.Event() waiters.append(is_waiting) threading.Thread(target=f,args=(resume,is_waiting,name)).start() threading.Thread(target=h,args=(resume,waiters)).start() 

rendimientos

 [07:28:55 Thread-1] f [07:28:55 Thread-2] g [07:28:55 Thread-3] halt [07:28:55 Thread-3] 0: wait for worker to pause [07:28:56 Thread-1] f pausing... [07:28:56 Thread-2] g pausing... [07:28:56 Thread-3] 1: wait for worker to pause [07:28:56 Thread-3] h begin [07:28:58 Thread-3] h end [07:28:58 Thread-3] resume [07:28:58 Thread-1] f [07:28:58 Thread-2] g [07:28:59 Thread-1] f [07:28:59 Thread-2] g [07:29:00 Thread-1] f [07:29:00 Thread-2] g [07:29:01 Thread-1] f [07:29:01 Thread-2] g [07:29:02 Thread-1] f [07:29:02 Thread-2] g [07:29:03 Thread-3] halt 

(En respuesta a una pregunta en los comentarios) Este código intenta medir el tiempo que tarda el subproceso h en adquirir cada locking de los otros subprocesos de trabajo.

Parece mostrar que incluso si h está esperando para adquirir un locking, el otro subproceso de trabajo puede, con una probabilidad bastante alta, liberar y volver a adquirir el locking. No hay prioridad dada a h solo porque ha estado esperando más tiempo.

David Beazley ha presentado en PyCon sobre problemas relacionados con el enhebrado y la GIL. Aquí hay un pdf de las diapositivas . Es una lectura fascinante y puede ayudar a explicar esto también.

 import threading import time import logging logger=logging.getLogger(__name__) def f(lock,n): while True: with lock: logger.info(n) time.sleep(1) def h(locks): while True: t=time.time() for n,lock in enumerate(locks): lock.acquire() t2=time.time() logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t)) t=t2 t2=time.time() logger.info('h {d}'.format(d=t2-t)) t=t2 for lock in locks: lock.release() time.sleep(5) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') locks=[] N=5 for n in range(N): lock=threading.Lock() locks.append(lock) t=threading.Thread(target=f,args=(lock,n)) t.start() threading.Thread(target=h,args=(locks,)).start() 

Usando la comunicación para la sincronización:

 #!/usr/bin/env python import threading import time from Queue import Empty, Queue def f(q, c): while True: try: q.get_nowait(); q.get() # get PAUSE signal except Empty: pass # no signal, do our thing else: q.get() # block until RESUME signal print c, time.sleep(1) def h(queues): while True: for q in queues: q.put_nowait(1); q.put(1) # block until PAUSE received print 'h' for q in queues: q.put(1) # put RESUME time.sleep(5) queues = [Queue(1) for _ in range(2)] threading.Thread(target=f, args=(queues[0], 'f')).start() threading.Thread(target=f, args=(queues[1], 'g')).start() threading.Thread(target=h, args=(queues,)).start() 

Puede que no sea óptimo desde un punto de rendimiento tuyo, pero me parece mucho más fácil de seguir.

Salida

 fg fgh fgfggffggfgffgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh fgfgfgfgfgfgfgh 

La forma más sencilla de hacerlo es con 3 procesos de Python. Si está haciendo esto en Linux, entonces el proceso por hora puede enviar una señal para hacer que las otras tareas se detengan, o incluso podría matarlos y luego reiniciarse una vez que se complete la tarea por hora. No hay necesidad de hilos.

Sin embargo, si está decidido a usar subprocesos, intente compartir NINGÚN tipo de datos entre los subprocesos, simplemente envíe mensajes de un lado a otro (también conocido como copia de datos en lugar de compartir datos). El enhebrado es difícil de hacer bien.

Pero, los procesos múltiples te obligan a no compartir nada y, por lo tanto, es mucho más fácil de hacer correctamente. Si usa una biblioteca como 0MQ http://www.zeromq.org para hacer pasar su mensaje, entonces es fácil pasar de un modelo de subprocesamiento a un modelo de proceso múltiple.

¿Qué tal un semáforo inicializado a 2? F y G esperan y señalan una unidad, H espera y señala 2 unidades.