Python multiprocessing.Queue modifica objetos

Tengo una aplicación que implementa algo como una Cadena de Responsabilidad en Python. Hay un proceso que pasa objetos a través de multiprocessing.Queue () a otros procesos que luego realizan acciones en los objetos. También es importante para el último momento modificado del objeto que se pasa para ser rastreado, por lo que la acción se puede tomar solo cuando el objeto fue modificado.

El problema que estoy experimentando es que el atributo _modificado en el objeto parece cambiar aleatoriamente después de extraerlo de la cola. Sin embargo, el atributo _mtime siempre es correcto. El siguiente ejemplo ejecutará y (intencionalmente) modificará aleatoriamente el objeto ficticio, luego lo colocará en la cola para cada uno de los procesos del controlador. Cada controlador imprimirá los valores _modified y _mtime que recibieron en el objeto. Espero que el valor _modificado sea el mismo tanto en el comando_funcio como en las funciones del controlador, sin embargo, ese no suele ser el caso. Si elimino la herencia Object_w_mtime del objeto DummyObject, no veo ninguna diferencia en los objetos enviados y recibidos.

Soy relativamente nuevo en Python. Que yo sepa, lo que debería suceder es que cada vez que se coloca un objeto en una cola, se decapa y luego se envía por un conducto al proceso de recepción que despeja el objeto. ¿Es eso correcto? ¿Hay alguna manera de que la herencia del objeto se estropee cuando el objeto es decapado / no picado?

Probé esto con Python 2.7.2 y 2.6.7 en Ubuntu 11.10, así como con Python 2.7.1 en Ubuntu 11.04. A veces hay que dejar que se ejecute durante aproximadamente un minuto para ver el comportamiento, ya que parece ser aleatorio.

Agarrando pajitas aquí, gracias de antemano.

import multiprocessing import time import traceback import os import random class Object_w_mtime(object): ''' Parent object that tracks the last time an attribute was modified ''' def __setattr__(self,a_name,a_value): if ((a_name not in ('_mtime','_modified')) and (a_value != getattr(self,a_name,None)) ): object.__setattr__(self, '_modified', True) object.__setattr__(self, '_mtime', time.time()) object.__setattr__(self, a_name, a_value) return True #END def def reset(self): self._modified = False #END class class DummyObject(Object_w_mtime): def __init__(self): self.value = 10 def handler(in_queue = None, handler_id = None): print 'PID:' + str(os.getpid()) + ':handler{0}:'.format(handler_id) while True: try: obj = in_queue.get(True,61) print 'handler{} - _modified'.format(handler_id), obj._modified, ' \t_mtime', obj._mtime except multiprocessing.queues.Empty: break except KeyboardInterrupt: break except Exception as e: print traceback.format_exc() break return True #END def def command_func(next_links = None): print 'PID:' + str(os.getpid()) + ':command_func:' obj = DummyObject() while True: try: # randomly assign a different value to test with a modified and unmodified object obj.value = random.randint(0,1) print '**************** obj.value = {0} ***************'.format(obj.value) print 'command_ - _modified', obj._modified, ' \t_mtime', obj._mtime for each in next_links: each.put(obj,False) except multiprocessing.queues.Empty: break except KeyboardInterrupt: break except Exception as e: print e print traceback.format_exc() break obj.reset() time.sleep(3) return True #END def if __name__ == '__main__': handler_queues = list() handler_processes = list() # Create a queue and process object for each command handler for handler_id in range(1,4): queue = multiprocessing.Queue() process = multiprocessing.Process(target=handler, args=(queue, handler_id)) handler_queues.append(queue) handler_processes.append(process) try: # spawn handler processes for process in handler_processes: process.start() # Start sending commands to handlers command_func(handler_queues) # exit on keyboard interrupt except KeyboardInterrupt: for process in handler_processes: process.join() except Exception: traceback.print_exc() 

En resumen, modificas obj después de ponerlo en la cola.

Mirando http://svn.python.org/view/python/trunk/Lib/multiprocessing/queues.py?revision=76434&view=markup line 285, put () simplemente coloca el objeto en una cola interna, y si no lo está ya en ejecución, inicia un subproceso en segundo plano para procesar objetos de esa cola. Por lo tanto, hay una carrera entre each.put(obj,False) y obj.reset() en tu código.

Probablemente debería usar solo colas con objetos inmutables (copias de).