Cómo crear una aplicación de Python con dos subprocesos cada uno que tiene una aplicación de autollamada

No he encontrado ninguna solución para mi problema. Necesito crear una aplicación de python con dos subprocesos, cada uno de los cuales está conectado a un enrutador WAMP usando la biblioteca autobahn.

A continuación escribo el código de mi experimento:

wampAddress = 'ws://172.17.3.139:8181/ws' wampRealm = 's4t' from threading import Thread from autobahn.twisted.wamp import ApplicationRunner from autobahn.twisted.wamp import ApplicationSession from twisted.internet.defer import inlineCallbacks class AutobahnMRS(ApplicationSession): @inlineCallbacks def onJoin(self, details): print("Sessio attached [Connect to WAMP Router]") def onMessage(*args): print args try: yield self.subscribe(onMessage, 'test') print ("Subscribed to topic: test") except Exception as e: print("Exception:" +e) class AutobahnIM(ApplicationSession): @inlineCallbacks def onJoin(self, details): print("Sessio attached [Connect to WAMP Router]") try: yield self.publish('test','YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO') print ("Subscribed to topic: test") except Exception as e: print("Exception:" +e) class ManageRemoteSystem: def __init__(self): self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm) def start(self): self.runner.run(AutobahnMRS); class InternalMessages: def __init__(self): self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm) def start(self): self.runner.run(AutobahnIM); #class S4tServer: if __name__ == '__main__': server = ManageRemoteSystem() sendMessage = InternalMessages() thread1 = Thread(target = server.start()) thread1.start() thread1.join() thread2 = Thread(target = sendMessage.start()) thread2.start() thread2.join() 

Cuando ejecuto esta aplicación de Python, solo se inicia el thread1 y, más tarde, cuando finalizo la aplicación (ctrl-c), se muestran los siguientes mensajes de error:

 Sessio attached [Connect to WAMP Router] Subscribed to topic: test ^CTraceback (most recent call last): File "test_pub.py", line 71, in  p2 = multiprocessing.Process(target = server.start()) File "test_pub.py", line 50, in start self.runner.run(AutobahnMRS); File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/wamp.py", line 175, in run reactor.run() File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1191, in run self.startRunning(installSignalHandlers=installSignalHandlers) File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1171, in startRunning ReactorBase.startRunning(self) File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 683, in startRunning raise error.ReactorNotRestartable() twisted.internet.error.ReactorNotRestartable 

Necesito implementar en una aplicación que tenga sus funcionalidades y también debe tener un sistema para comunicarse con un enrutador WAMP con una biblioteca de python autobahn.

En otras palabras, necesito una solución capaz de comunicarme con un enrutador WAMP, pero al mismo tiempo, esta aplicación no debe estar bloqueada con la parte de la autopista (creo que la solución es comenzar dos hilos, un hilo administra algunas funciones). y segundo hilo gestiona la parte de la autopista).

Con el esquema que propuse anteriormente, hay otro problema, la necesidad de enviar un mensaje, en un tema específico en el enrutador WAMP, desde la parte de la aplicación en el “subproceso no autobahn”, esta funcionalidad debe llamarse mediante una función específica Sin bloquear las otras funcionalidades.

Espero haber dado todos los detalles.

Muchas gracias por cualquier respuesta.

——————————–EDITAR—————– —————-

Después de algunas investigaciones he implementado lo que necesito para el protocolo websocket, el código es el siguiente:

 # ----- twisted ---------- class _WebSocketClientProtocol(WebSocketClientProtocol): def __init__(self, factory): self.factory = factory def onOpen(self): #log.debug("Client connected") self.factory.protocol_instance = self self.factory.base_client._connected_event.set() class _WebSocketClientFactory(WebSocketClientFactory): def __init__(self, *args, **kwargs): WebSocketClientFactory.__init__(self, *args, **kwargs) self.protocol_instance = None self.base_client = None def buildProtocol(self, addr): return _WebSocketClientProtocol(self) # ------ end twisted ------- lass BaseWBClient(object): def __init__(self, websocket_settings): #self.settings = websocket_settings # instance to be set by the own factory self.factory = None # this event will be triggered on onOpen() self._connected_event = threading.Event() # queue to hold not yet dispatched messages self._send_queue = Queue.Queue() self._reactor_thread = None def connect(self): log.msg("Connecting to host:port") self.factory = _WebSocketClientFactory( "ws://host:port", debug=True) self.factory.base_client = self c = connectWS(self.factory) self._reactor_thread = threading.Thread(target=reactor.run, args=(False,)) self._reactor_thread.daemon = True self._reactor_thread.start() def send_message(self, body): if not self._check_connection(): return log.msg("Queing send") self._send_queue.put(body) reactor.callFromThread(self._dispatch) def _check_connection(self): if not self._connected_event.wait(timeout=10): log.err("Unable to connect to server") self.close() return False return True def _dispatch(self): log.msg("Dispatching") while True: try: body = self._send_queue.get(block=False) except Queue.Empty: break self.factory.protocol_instance.sendMessage(body) def close(self): reactor.callFromThread(reactor.stop) import time def Ppippo(coda): while True: coda.send_message('YOOOOOOOO') time.sleep(5) if __name__ == '__main__': ws_setting = {'host':'', 'port':} client = BaseWBClient(ws_setting) t1 = threading.Thread(client.connect()) t11 = threading.Thread(Ppippo(client)) t11.start() t1.start() 

El código anterior funciona bien, pero necesito traducirlo para operar con el protocolo WAMP instalado en websocket.

¿Alguien sabe como resuelvo?

La mala noticia es que Autobahn está utilizando el bucle principal Twisted, por lo que no puede ejecutarlo en dos subprocesos a la vez.

La buena noticia es que no es necesario ejecutarlo en dos subprocesos para ejecutar dos cosas, y de todos modos, dos subprocesos serían más complicados.

La API para comenzar con varias aplicaciones es un poco confusa, porque tiene dos objetos ApplicationRunner , y a primera vista parece que la forma en que ejecuta una aplicación en autobahn es llamar a ApplicationRunner.run .

Sin embargo, ApplicationRunner es simplemente una comodidad que envuelve las cosas que configuran la aplicación y las cosas que ejecutan el bucle principal; La verdadera carne de la obra ocurre en WampWebSocketClientFactory .

Para lograr lo que desea, solo tiene que deshacerse de los hilos y ejecutar el bucle principal usted mismo, haciendo que las instancias de ApplicationRunner simplemente configuren sus aplicaciones.

Para lograr esto, deberá cambiar la última parte de su progtwig para hacer esto:

 class ManageRemoteSystem: def __init__(self): self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm) def start(self): # Pass start_reactor=False to all runner.run() calls self.runner.run(AutobahnMRS, start_reactor=False) class InternalMessages: def __init__(self): self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm) def start(self): # Same as above self.runner.run(AutobahnIM, start_reactor=False) if __name__ == '__main__': server = ManageRemoteSystem() sendMessage = InternalMessages() server.start() sendMessage.start() from twisted.internet import reactor reactor.run()