Uso de Tornado con Pika para el monitoreo asíncrono de la cola

Tengo un servidor AMQP ( RabbitMQ ) del que me gustaría publicar y leer en un servidor web Tornado . Para hacer esto, pensé que usaría una biblioteca de python amqp asíncrona; en particular Pika (una variación de eso que supuestamente es compatible con Tornado).

He escrito un código que parece leer con éxito desde la cola, excepto que al final de la solicitud, obtengo una excepción (el navegador devuelve bien):

[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1) HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'}) Traceback (most recent call last): File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context yield File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext yield File "/usr/lib/python2.6/contextlib.py", line 113, in nested yield vars File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped callback(*args, **kwargs) File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events self._handle_read() File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read self.on_data_available(chunk) File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available self.channels[frame.channel_number].frame_handler(frame) KeyError: 1 

No estoy completamente seguro de que esté utilizando esta biblioteca correctamente, por lo que podría estar haciendo algo claramente mal. El flujo básico de mi código es:

  1. La solicitud viene en
  2. Cree una conexión a RabbitMQ utilizando TornadoConnection; especifique una callback
  3. En la conexión de callback, cree un canal, declare / vincule mi cola y llame a basic_consume; especifique una callback
  4. En la callback de consumo, cierre el canal y llame a la función de acabado de Tornado.
  5. Ver excepción.

Mis preguntas son algunas:

  1. ¿Es este flujo incluso correcto? No estoy seguro de cuál es el propósito de la callback de la conexión, excepto que no funciona si no la uso.
  2. ¿Debo crear una conexión AMQP por solicitud web? La documentación de RabbitMQ sugiere que no, no debería, sino que debería limitarme a crear solo canales. Pero, ¿dónde crearía la conexión y cómo bash reconectarme si se cae brevemente?
  3. Si estoy creando una conexión AMQP por solicitud web, ¿dónde debería cerrarla? Llamar a amqp.close () en mi callback parece arruinar las cosas aún más.

Intentaré tener un poco de código de ejemplo un poco más tarde, pero los pasos que describí anteriormente explican el lado consumidor de las cosas bastante completamente. También estoy teniendo problemas con el lado de la publicación, pero el consumo de colas es más urgente.

Sería útil ver algún código fuente, pero uso este mismo módulo pika que soporta tornados sin problemas en más de un proyecto de producción.

No desea crear una conexión por solicitud. Cree una clase que envuelva todas sus operaciones de AMQP, y cree una instancia como un singleton en el nivel de aplicación de tornado que se pueda usar en todas las solicitudes (y en los controladores de solicitudes). Hago esto en una función ‘runapp ()’ que hace cosas como esta y luego inicia el tornado principal ioloop.

Aquí hay una clase llamada ‘Eventos’. Es una implementación parcial (específicamente, no defino ‘self.handle_event’ aquí. Eso depende de usted.

 class Event(object): def __init__(self, config): self.host = 'localhost' self.port = '5672' self.vhost = '/' self.user = 'foo' self.exchange = 'myx' self.queue = 'myq' self.recv_routing_key = 'msgs4me' self.passwd = 'bar' self.connected = False self.connect() def connect(self): credentials = pika.PlainCredentials(self.user, self.passwd) parameters = pika.ConnectionParameters(host = self.host, port = self.port, virtual_host = self.vhost, credentials = credentials) srs = pika.connection.SimpleReconnectionStrategy() logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host, self.port)) self.connection = tornado_adapter.TornadoConnection(parameters, wait_for_open = False, reconnection_strategy = srs, callback = self.on_connected) def on_connected(self): # Open the channel logging.debug("Events: Opening a channel") self.channel = self.connection.channel() # Declare our exchange logging.debug("Events: Declaring the %s exchange" % self.exchange) self.channel.exchange_declare(exchange = self.exchange, type = "fanout", auto_delete = False, durable = True) # Declare our queue for this process logging.debug("Events: Declaring the %s queue" % self.queue) self.channel.queue_declare(queue = self.queue, auto_delete = False, exclusive = False, durable = True) # Bind to the exchange self.channel.queue_bind(exchange = self.exchange, queue = self.queue, routing_key = self.recv_routing_key) self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True) # We should be connected if we made it this far self.connected = True 

Y luego puse eso en un archivo llamado ‘events.py’. Mis RequestHandlers y cualquier código de back-end utilizan un módulo ‘common.py’ que envuelve el código que es útil para ambos (mis RequestHandlers no llaman ningún método de módulo amqp directamente – igual para db, cache, etc.), así que defina ‘events = None’ en el nivel de módulo en common.py, y yo instalo el objeto Event de la siguiente manera:

 import events def runapp(config): if myapp.common.events is None: myapp.common.events = myapp.events.Event(config) logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events) http_server = tornado.httpserver.HTTPServer(app, xheaders=config['HTTPServer']['xheaders'], no_keep_alive=config['HTTPServer']['no_keep_alive']) http_server.listen(port) main_loop = tornado.ioloop.IOLoop.instance() logging.debug("MAIN IOLOOP: %s", main_loop) main_loop.start() 

Feliz Año Nuevo

Alguien ha informado de éxito en la fusión de Tornado y Pika aquí . Por lo que puedo decir, no es tan simple como llamar a Pika desde Tornado, ya que ambas bibliotecas quieren tener sus propios bucles de eventos a cargo.