Pasando un Pipe / Connection como contexto arg a multiproceso Pool.apply_async ()

Quiero usar tuberías para hablar con las instancias de proceso en mi grupo, pero recibo un error:

Sea __p una instancia de Pool ():

(master_pipe, worker_pipe) = Pipe() self.__p.apply_async(_worker_task, (handler_info, context_info, worker_pipe)) 

Cuando ejecuto esto, obtengo el siguiente error [para cada instancia, obviamente]:

  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get task = get() File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get TypeError: Required argument 'handle' (pos 1) not found self.run() File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run return recv() return recv() self._target(*self._args, **self._kwargs) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 102, in worker TypeError: Required argument 'handle' (pos 1) not found TypeError: Required argument 'handle' (pos 1) not found task = get() File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() TypeError: Required argument 'handle' (pos 1) not found 

El error se refiere específicamente a la instancia de conexión que estoy intentando pasar. Si lo hago “Ninguno”, los trabajadores se bifurcan sin error.

No entiendo esto ya que, como el documento enfatiza a través del ejemplo, puedo pasar fácilmente el mismo argumento a un Proceso (), y hacer que funcione perfectamente:

     from multiprocessing import Pipe, Process def call_me(p): print("Here: %s" % (p)) (master, worker) = Pipe() p = Process(target=call_me, args=(worker,)) p.start() Here:  p.join() 

    Parece que este error ( http://bugs.python.org/issue4892 ) observado en esta discusión: Python 2.6 envía un objeto de conexión a través de Queue / Pipe / etc

    El grupo bifurca procesos secundarios inicialmente con canalizaciones para comunicar tareas / resultados a / desde los procesos secundarios. Se trata de comunicar su objeto de tubería a través de la tubería existente que explota, no en el forking. (el error se produce cuando el proceso hijo intenta con un get () en la abstracción de la cola).

    Parece que el problema surge debido a la forma en que el objeto Pipe es decapado / no seleccionado para la comunicación.

    En el segundo caso que anotó, la tubería se pasa a una instancia de proceso y luego se bifurca, por lo tanto, la diferencia en el comportamiento.

    No puedo imaginar que la comunicación activa con procesos de grupo fuera de la distribución de tareas puras fuera un caso de uso previsto para el grupo de multiprocesamiento. Por lo que respecta al estado / protocolo, eso implicaría que querrías más control sobre el proceso. Eso requeriría más contexto que lo que el objeto general de Pool podría saber.

    Esto es posible de resolver usando los argumentos inicializador e initargs cuando creas el pool y sus procesos. Es cierto que también tiene que haber una variable global involucrada. Sin embargo, si coloca el código de trabajador en un módulo separado, no se verá tan mal. Y es solo global a ese proceso. 🙂

    Un caso típico es que desea que sus procesos de trabajo agreguen cosas a una cola de multiprocesamiento. Como eso tiene que ver con que algo tenga que residir en un determinado lugar de la memoria, el decapado no funcionará. Incluso si hubiera funcionado, simplemente habría copiado datos sobre el hecho de que algún proceso tiene una cola. Que es lo contrario de lo que queremos aquí. Queremos compartir la misma cola.

    Así que aquí hay un ejemplo de código meta:

    El módulo que contiene el código del trabajador, lo llamamos “módulo de trabajador”:

     def worker_init(_the_queue): global the_queue the_queue = _the_queue def do_work(_a_string): # Add something to the queue the_queue.put("the string " + _a_string) 

    Y la creación de la piscina, seguida de tenerla haciendo algo.

     # Import our functions from worker_module import worker_init, do_work # Good idea: Call it MPQueue to not confuse it with the other Queue from multiprocessing import Queue as MPQueue from multiprocessing import Pool the_queue = MPQueue() # Initialize workers, it is only during initialization we can pass the_queue the_pool = Pool(processes= 3, initializer=worker_init, initargs=[the_queue,]) # Do the work the_pool.apply(do_work, ["my string",]) # The string is now on the queue my_string = the_queue.get(True)) 

    Este es un error que se ha corregido en Python 3.

    La solución más sencilla es pasar la cola a través del inicializador del Pool como se sugiere en la otra respuesta.