El multiprocesamiento funciona en Ubuntu, no en Windows

Estoy tratando de usar este ejemplo como plantilla para un sistema de colas en mi aplicación cherrypy.

Pude convertirlo de Python 2 a Python 3 (cambiar from Queue import Empty a from queue import Empty ) y ejecutarlo en Ubuntu. Pero cuando lo ejecuto en Windows me sale el siguiente error:

 F:\workspace\test>python test.py Traceback (most recent call last): File "test.py", line 112, in  broker.start() File "C:\Anaconda3\lib\multiprocessing\process.py", line 105, in start self._popen = self._Popen(self) File "C:\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "C:\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen return Popen(process_obj) File "C:\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__ reduction.dump(process_obj, to_child) File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump ForkingPickler(file, protocol).dump(obj) TypeError: cannot serialize '_io.TextIOWrapper' object F:\workspace\test>Traceback (most recent call last): File "", line 1, in  File "C:\Anaconda3\lib\multiprocessing\spawn.py", line 100, in spawn_main new_handle = steal_handle(parent_pid, pipe_handle) File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 81, in steal_handle _winapi.PROCESS_DUP_HANDLE, False, source_pid) OSError: [WinError 87] The parameter is incorrect 

Aquí está el código completo:

 # from http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html import sys import logging from logging import handlers from cherrypy.process import wspbus class MyBus(wspbus.Bus): def __init__(self, name=""): wspbus.Bus.__init__(self) self.open_logger(name) self.subscribe("log", self._log) def exit(self): wspbus.Bus.exit(self) self.close_logger() def open_logger(self, name=""): logger = logging.getLogger(name) logger.setLevel(logging.INFO) h = logging.StreamHandler(sys.stdout) h.setLevel(logging.INFO) h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s")) logger.addHandler(h) self.logger = logger def close_logger(self): for handler in self.logger.handlers: handler.flush() handler.close() def _log(self, msg="", level=logging.INFO): self.logger.log(level, msg) import random import string from multiprocessing import Process class Bank(object): def __init__(self, queue): self.bus = MyBus(Bank.__name__) self.queue = queue self.bus.subscribe("main", self.randomly_place_order) self.bus.subscribe("exit", self.terminate) def randomly_place_order(self): order = random.sample(['BUY', 'SELL'], 1)[0] code = random.sample(string.ascii_uppercase, 4) amount = random.randint(0, 100) message = "%s %s %d" % (order, ''.join(code), amount) self.bus.log("Placing order: %s" % message) self.queue.put(message) def run(self): self.bus.start() self.bus.block(interval=0.01) def terminate(self): self.bus.unsubscribe("main", self.randomly_place_order) self.bus.unsubscribe("exit", self.terminate) from queue import Empty class Broker(Process): def __init__(self, queue): Process.__init__(self) self.queue = queue self.bus = MyBus(Broker.__name__) self.bus.subscribe("main", self.check) def check(self): try: message = self.queue.get_nowait() except Empty: return if message == "stop": self.bus.unsubscribe("main", self.check) self.bus.exit() elif message.startswith("BUY"): self.buy(*message.split(' ', 2)[1:]) elif message.startswith("SELL"): self.sell(*message.split(' ', 2)[1:]) def run(self): self.bus.start() self.bus.block(interval=0.01) def stop(self): self.queue.put("stop") def buy(self, code, amount): self.bus.log("BUY order placed for %s %s" % (amount, code)) def sell(self, code, amount): self.bus.log("SELL order placed for %s %s" % (amount, code)) if __name__ == '__main__': from multiprocessing import Queue queue = Queue() broker = Broker(queue) broker.start() bank = Bank(queue) bank.run() 

El problema es que algunas partes del objeto MyBus no son seleccionables, y usted está guardando una instancia de MyBus en su instancia de Broker . Debido a que Windows no tiene soporte para fork() , cuando llama a broker.start() , todo el estado de broker debe ser decapado y recreado en el proceso hijo que genera multiprocessing para ejecutar broker.run . Funciona en Linux porque Linux soporta fork ; En este caso, no es necesario encurtir nada: el proceso hijo contiene el estado completo del padre tan pronto como se bifurca.

Hay dos formas de resolver este problema. La primera forma, y ​​la más difícil, es hacer que su instancia de broker seleccionable. Para hacer eso, necesitas hacer MyBus seleccionable. El error que está recibiendo en este momento se refiere al atributo logger en MyBus , que no es seleccionable. Ese es fácil de arreglar; simplemente agregue los métodos __getstate__ / __setstate__ a MyBus , que se usan para controlar cómo el objeto es decapado / no encurtido. Si eliminamos el registrador cuando decapamos, y lo volvemos a crear cuando lo eliminamos, solucionaremos el problema:

 class MyBus(wspbus.Bus): ... def __getstate__(self): self_dict = self.__dict__ del self_dict['logger'] return self_dict def __setstate__(self, d): self.__dict__.update(d) self.open_logger() 

Esto funciona, pero luego nos topamos con otro error de decapado:

 Traceback (most recent call last): File "async2.py", line 121, in  broker.start() File "C:\python34\lib\multiprocessing\process.py", line 105, in start self._popen = self._Popen(self) File "C:\python34\lib\multiprocessing\context.py", line 212, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "C:\python34\lib\multiprocessing\context.py", line 313, in _Popen return Popen(process_obj) File "C:\python34\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__ reduction.dump(process_obj, to_child) File "C:\python34\lib\multiprocessing\reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) _pickle.PicklingError: Can't pickle : attribute lookup State on cherrypy.process.wspbus failed 

Resulta que cherrypy.process.wspbus._StateEnum.State , que es un atributo en la clase wspbus.Bus heredada por MyBus , es una clase anidada, y las clases anidadas no pueden ser encurtidas:

 class _StateEnum(object): class State(object): name = None def __repr__(self): return "states.%s" % self.name 

El objeto State (sorpresa) se utiliza para rastrear el estado de la instancia de Bus . Ya que estamos haciendo el decapado antes de poner en marcha el bus, podríamos simplemente eliminar el atributo de state del objeto cuando decapamos, y establecerlo en States.STOPPED cuando deseleccionamos.

 class MyBus(wspbus.Bus): def __init__(self, name=""): wspbus.Bus.__init__(self) self.open_logger(name) self.subscribe("log", self._log) def __getstate__(self): self_dict = self.__dict__ del self_dict['logger'] del self_dict['state'] return self_dict def __setstate__(self, d): self.__dict__.update(d) self.open_logger() self.state = wspbus.states.STOPPED # Initialize to STOPPED 

Con estos cambios, el código funciona como se espera! La única limitación es que solo es seguro MyBus si el autobús no se ha iniciado todavía, lo cual está bien para su caso de uso.

Una vez más, esta es la manera difícil. La forma más sencilla es simplemente eliminar la necesidad de MyBus completo la instancia de MyBus . Solo puede crear la instancia de MyBus en el proceso hijo, en lugar del padre:

 class Broker(Process): def __init__(self, queue): Process.__init__(self) self.queue = queue ... def run(self): self.bus = MyBus(Broker.__name__) # Create the instance here, in the child self.bus.subscribe("main", self.check) self.bus.start() self.bus.block(interval=0.01) 

Siempre que no necesite acceder a broker.bus en el padre, esta es la opción más sencilla.