Detenga la interrupción del teclado para evitar que el trabajador de multiprocesamiento de Python trabaje en la cola

De varias publicaciones encontradas en stackoverflow creé este código.

Guión

Quiero tener una cola de multiprocesamiento en la que varios trabajadores “escuchen”

En caso de una interrupción del teclado, el proceso principal ya no debe poner nuevos elementos en la cola y, con la ayuda de los objetos centinela, el trabajador debe detenerse con gracia.

Problema

Mi problema con la versión actual donde la uso

signal.signal(signal.SIGINT, signal.SIG_IGN) 

Ignorar Ctrl + C es que el proceso principal también lo ignora.

Algunas ideas ? ¿Necesito usar el grupo de trabajadores de multiprocesamiento? Algunos ejemplos indican que podría tener que hacerlo. ¿Puedo seguir usando la cola?

 from multiprocessing import Pool, Process,Queue import time import signal # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process class Worker(Process): def __init__(self, queue,ident): super(Worker, self).__init__() # Ignore Signals signal.signal(signal.SIGINT, signal.SIG_IGN) self.queue= queue self.idstr= str(ident) print "Ident" + self.idstr def run(self): print 'Worker started' # do some initialization here print 'Computing things!' for data in iter( self.queue.get, None ): print "#" + self.idstr + " : " + str(data) time.sleep(5) print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize()) print "Worker Done" #### Main #### request_queue = Queue(10) for i in range(4): Worker( request_queue,i ).start() try: for data in range(1000000): request_queue.put( data ) #print "Queue Size: " + str(request_queue.qsize()) # Sentinel objects to allow clean shutdown: 1 per worker. for i in range(4): request_queue.put( None ) except KeyboardInterrupt: print "Caught KeyboardInterrupt, terminating workers" while request_queue.empty()==False: request_queue.get() request_queue.put( None ) 

Según su solución (que es buena), agregué una capa adicional de protección en caso de que el código principal no responda y el usuario cancele dos veces:

 global STOP import os, signal def signal_handler(sig, frame): global STOP if STOP: signal.signal(signal.SIGINT, signal.SIG_IGN) os.kill(os.getpid(), signal.SIGTERM) STOP = True signal.signal(signal.SIGINT, signal_handler) 

Creo que encontré una solución. Aún así, no me gusta que obtenga el SIGINT 1 una vez de main y 4 veces del Worker, pero tal vez tenga que vivir con eso.

  1. Especifiqué un manejador de señal para la señal de interrupción.
  2. Después de recibir el primer Sig INT, ignoro más la señal SIG Int.
  3. Cambio la bandera de parada a VERDADERO
  4. Rompo el bucle de inserción de cola
  5. Llamo a la función de parada que borra la cola e inserta los centinelas de parada

     from multiprocessing import Pool, Process,Queue import time import signal # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process # Stop Flag for loop stop = False # Define SIGINT def signal_handler(sig, frame): print 'You pressed Ctrl+C!' global stop stop = True # Ignore more Ctrl+C signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal_handler) def stopSentinel(request_queue): print "CTRL Stop Queue and insert None" # Empty Existing Queue while request_queue.empty()==False: request_queue.get() # Put One None for each Worker for i in range(4): request_queue.put( None ) class Worker(Process): def __init__(self, queue,ident): super(Worker, self).__init__() self.queue= queue self.idstr= str(ident) print "Ident" + self.idstr def run(self): print 'Worker started' # do some initialization here print 'Computing things!' for data in iter( self.queue.get, None ): print "#" + self.idstr + " : " + str(data) time.sleep(5) print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize()) print "Worker Done" #### Main ##### request_queue = Queue(10) for i in range(4): Worker( request_queue,i ).start() #### Fill Queue with Data #### for data in range(1000000): request_queue.put( data ) #print "Queue Size: " + str(request_queue.qsize()) # Sentinel objects to allow clean shutdown: 1 per worker. # Check for Stop print "Check Breakout" if stop == True: print "Stop Break" break if stop == True: stopSentinel(request_queue) else: print "Normal Stop" for i in range(4): request_queue.put( None )