Proceso frente a subproceso en relación con el uso de Queue () / deque () y la variable de clase para la comunicación y la “píldora venenosa”

Me gustaría crear un subproceso o un proceso que se ejecute para siempre en un bucle While True.

Necesito enviar y recibir datos al trabajador en forma de colas, ya sea un multiprocessing.Queue () o collections.deque (). Prefiero usar collections.deque () ya que es significativamente más rápido.

También necesito poder matar al trabajador con el tiempo (ya que se ejecuta en un bucle de True. Aquí hay un código de prueba que he reunido para intentar comprender las diferencias entre subprocesos, procesos, colas y deque …

import time from multiprocessing import Process, Queue from threading import Thread from collections import deque class ThreadingTest(Thread): def __init__(self, q): super(ThreadingTest, self).__init__() self.q = q self.toRun = False def run(self): print("Started Thread") self.toRun = True while self.toRun: if type(self.q) == type(deque()): if self.q: i = self.q.popleft() print("Thread deque: " + str(i)) elif type(self.q) == type(Queue()): if not self.q.empty(): i = self.q.get_nowait() print("Thread Queue: " + str(i)) def stop(self): print("Trying to stop Thread") self.toRun = False while self.isAlive(): time.sleep(0.1) print("Stopped Thread") class ProcessTest(Process): def __init__(self, q): super(ProcessTest, self).__init__() self.q = q self.toRun = False self.ctr = 0 def run(self): print("Started Process") self.toRun = True while self.toRun: if type(self.q) == type(deque()): if self.q: i = self.q.popleft() print("Process deque: " + str(i)) elif type(self.q) == type(Queue()): if not self.q.empty(): i = self.q.get_nowait() print("Process Queue: " + str(i)) def stop(self): print("Trying to stop Process") self.toRun = False while self.is_alive(): time.sleep(0.1) print("Stopped Process") if __name__ == '__main__': q = Queue() t1 = ProcessTest(q) t1.start() for i in range(10): if type(q) == type(deque()): q.append(i) elif type(q) == type(Queue()): q.put_nowait(i) time.sleep(1) t1.stop() t1.join() if type(q) == type(deque()): print(q) elif type(q) == type(Queue()): while q.qsize() > 0: print(str(q.get_nowait())) 

Como puede ver, t1 puede ser ThreadingTest o ProcessTest. Además, la cola que se le pasa puede ser multiprocessing.Queue o collections.deque.

ThreadingTest funciona con una cola o deque (). También mata a run () correctamente cuando se llama al método stop ().

 Started Thread Thread deque: 0 Thread deque: 1 Thread deque: 2 Thread deque: 3 Thread deque: 4 Thread deque: 5 Thread deque: 6 Thread deque: 7 Thread deque: 8 Thread deque: 9 Trying to stop Thread Stopped Thread deque([]) 

ProcessTest solo puede leer de la cola si es de tipo multiprocessing.Queue. No funciona con las colecciones. Además, no puedo terminar el proceso utilizando stop ().

 Process Queue: 0 Process Queue: 1 Process Queue: 2 Process Queue: 3 Process Queue: 4 Process Queue: 5 Process Queue: 6 Process Queue: 7 Process Queue: 8 Process Queue: 9 Trying to stop Process 

Estoy tratando de averiguar por qué? Además, ¿cuál sería la mejor manera de usar Deque con un proceso? Y, ¿cómo podría tratar de eliminar el proceso utilizando algún tipo de método stop ()?

No se puede usar un collections.deque para pasar datos entre dos instancias de multiprocessing.Process , porque collections.deque no reconoce el proceso. multiprocessing.Queue escribe su contenido en un multiprocessing.Pipe internamente, lo que significa que los datos en él se pueden encolar en un proceso una vez y se pueden recuperar en otro. collections.deque no tiene ese tipo de tuberías, por lo que no funcionará. Cuando escribe en el deque en un proceso, la instancia de deque en el otro proceso no se verá afectada en absoluto; Son instancias completamente separadas.

Un problema similar está sucediendo con su método stop() . Está cambiando el valor de toRun en el proceso principal, pero esto no afectará al niño en absoluto. Son instancias completamente separadas. La mejor manera de terminar con el niño sería enviar un centinela a la Queue . Cuando obtenga el centinela en el niño, salga del bucle infinito:

 def run(self): print("Started Process") self.toRun = True while self.toRun: if type(self.q) == type(deque()): if self.q: i = self.q.popleft() print("Process deque: " + str(i)) elif type(self.q) == type(Queue()): if not self.q.empty(): i = self.q.get_nowait() if i is None: break # Got sentinel, so break print("Process Queue: " + str(i)) def stop(self): print("Trying to stop Process") self.q.put(None) # Send sentinel while self.is_alive(): time.sleep(0.1) print("Stopped Process") 

Editar:

Si realmente necesita semántica de deque entre dos procesos, puede usar un multiprocessing.Manager() para crear un deque compartido en un proceso de Manager , y cada una de sus instancias de Process obtendrá un Proxy :

 import time from multiprocessing import Process from multiprocessing.managers import SyncManager from collections import deque SyncManager.register('deque', deque) def Manager(): m = SyncManager() m.start() return m class ProcessTest(Process): def __init__(self, q): super(ProcessTest, self).__init__() self.q = q self.ctr = 0 def run(self): print("Started Process") self.toRun = True while self.toRun: if self.q._getvalue(): i = self.q.popleft() if i is None: break print("Process deque: " + str(i)) def stop(self): print("Trying to stop Process") self.q.append(None) while self.is_alive(): time.sleep(0.1) print("Stopped Process") if __name__ == '__main__': m = Manager() q = m.deque() t1 = ProcessTest(q) t1.start() for i in range(10): q.append(i) time.sleep(1) t1.stop() t1.join() print(q) 

Tenga en cuenta que esto probablemente no va a ser más rápido que un multiprocessing.Queue . Sin embargo, hay un costo de IPC por cada vez que accede al deque . También es una estructura de datos mucho menos natural para pasar mensajes como eres.