Interoperando con Django / Celery desde Java

Nuestra empresa tiene un sitio web basado en Python y algunos nodos de trabajadores basados ​​en Python que se comunican a través de Django / Celery y RabbitMQ. Tengo una aplicación basada en Java que necesita enviar tareas a los trabajadores basados ​​en Celery. Puedo enviar trabajos a RabbitMQ desde Java muy bien, pero los trabajadores de Celery nunca están recogiendo los trabajos. Al observar las capturas de paquetes de ambos tipos de presentaciones de trabajos, hay diferencias, pero no puedo entender cómo explicarlas porque muchas de ellas son binarias y no puedo encontrar documentación sobre la deencoding. ¿Alguien aquí tiene alguna referencia o experiencia con tener Java / RabbitMQ y Celery trabajando juntos?

Encontré la solución. La biblioteca de Java para RabbitMQ se refiere a intercambios / colas / routekeys. En Celery, el nombre de la cola se está asignando al intercambio al que se hace referencia en la biblioteca de Java. Por defecto, la cola para el apio es simplemente “apio”. Si la configuración de Django define una cola llamada “myqueue” con la siguiente syntax:

CELERY_ROUTES = { 'mypackage.myclass.runworker' : {'queue':'myqueue'}, } 

Entonces el código basado en Java necesita hacer algo como lo siguiente:

  ConnectionFactory factory = new ConnectionFactory(); Connection connection = null ; try { connection = factory.newConnection(mqHost, mqPort); } catch (IOException ioe) { log.error("Unable to create new MQ connection from factory.", ioe) ; } Channel channel = null ; try { channel = connection.createChannel(); } catch (IOException ioe) { log.error("Unable to create new channel for MQ connection.", ioe) ; } try { channel.queueDeclare("celery", false, false, false, true, null); } catch (IOException ioe) { log.error("Unable to declare queue for MQ channel.", ioe) ; } try { channel.exchangeDeclare("myqueue", "direct") ; } catch (IOException ioe) { log.error("Unable to declare exchange for MQ channel.", ioe) ; } try { channel.queueBind("celery", "myqueue", "myqueue") ; } catch (IOException ioe) { log.error("Unable to bind queue for channel.", ioe) ; } // Generate the message body as a string here. try { channel.basicPublish(mqExchange, mqRouteKey, new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null), messageBody.getBytes("ASCII")); } catch (IOException ioe) { log.error("IOException encountered while trying to publish task via MQ.", ioe) ; } 

Resulta que es solo una diferencia en terminología.