PyKafka producer.get_delivery_report lanzando Queue.empty cuando block = false

Actualmente estoy trabajando en una integración de Kafka usando Python, y soy nuevo en Kafka y Python proveniente de un fondo PHP.

He logrado que el productor trabaje, sin embargo, no está procesando cada mensaje lo suficientemente rápido debido a la espera de ack de Kafka.

En la página de GitHub ( https://github.com/Parsely/pykafka ) hay el siguiente ejemplo que debe procesar mensajes de forma asíncrona y aún permitir informes de entrega:

>>> with topic.get_producer(delivery_reports=True) as producer: ... count = 0 ... while True: ... count += 1 ... producer.produce('test msg', partition_key='{}'.format(count)) ... if count % 10**5 == 0: # adjust this or bring lots of RAM ;) ... while True: ... try: ... msg, exc = producer.get_delivery_report(block=False) ... if exc is not None: ... print 'Failed to deliver msg {}: {}'.format( ... msg.partition_key, repr(exc)) ... else: ... print 'Successfully delivered msg {}'.format( ... msg.partition_key) ... except Queue.Empty: ... break 

He modificado el ejemplo, sin embargo, desde las pruebas puedo ver que el primer mensaje se envió con éxito, pero se lanzó una excepción Queue.empty.

Este es mi código modificado:

     from pykafka import KafkaClient import Queue import json client = KafkaClient(hosts='1.1.1.1:9092') topic = client.topics['test'] sync = False # sync = True if sync: with topic.get_sync_producer() as producer: count = 0 while True: count += 1 producer.produce('Test message ' + str(count)) print 'Sent message ' + str(count) else: with topic.get_producer(delivery_reports=True) as producer: count = 0 while True: count += 1 if count >= 100: print 'Processed 100 messages' break producer.produce('Test message ' + str(count)) while True: try: msg, exc = producer.get_delivery_report(block=False) if exc is not None: print 'Failed to deliver msg {}: {}'.format(msg.offset, repr(exc)) else: print 'Successfully delivered msg {}'.format(msg.offset) except Queue.Empty: print 'Queue.empty' break 

    Y la salida:

     /Users/jim/Projects/kafka_test/env/bin/python /Users/jim/Projects/kafka_test/producer.py Queue.empty ... ... x100 Processed 100 messages 

    Al revisar a mi consumidor, puedo ver que los 100 mensajes se enviaron correctamente, pero no puedo decírselo a mi productor.

    ¿Tiene alguna sugerencia sobre cómo puedo mejorar esta implementación, más específicamente cómo puedo boost mi rendimiento y mantener la capacidad de comprobar que el mensaje fue exitoso?

    Encontré un problema de GitHub relacionado con esto: https://github.com/Parsely/pykafka/issues/291

    Arreglé esto bajando los min_queued_messages a 1.

     with topic.get_sync_producer(min_queued_messages=1) as producer: count = 0 while True: count += 1 time_start = time.time() producer.produce('Test message ' + str(count)) time_end = time.time() print 'Sent message %d, %ss duration' % (count, (time_end - time_start))