Reiniciar un consumidor Kafka (python) consume todos los mensajes en la cola nuevamente

Estoy usando Kafka 0.8.1 y Kafka python-0.9.0. En mi configuración, tengo 2 kafka brokers de configuración. Cuando ejecuto mi kafka consumer, puedo verlo recuperando mensajes de la cola y haciendo un seguimiento de las compensaciones para ambos corredores. ¡Todo funciona genial!

Mi problema es que cuando reinicio el consumidor, comienza a consumir mensajes desde el principio. Lo que esperaba era que al reiniciar, el consumidor comenzaría a consumir mensajes desde donde se detuvo antes de morir.

Intenté hacer un seguimiento de las compensaciones de mensajes en Redis y luego llamar a consumer.seek antes de leer un mensaje de la cola para asegurarme de que solo estaba recibiendo los mensajes que no había visto antes. Mientras esto funcionaba, antes de implementar esta solución, quería consultar con todos ustedes … quizás haya algo que no entiendo bien sobre Kafka o el cliente de python-Kafka. Parece que el consumidor poder reiniciar la lectura desde donde lo dejó es una funcionalidad bastante básica.

¡Gracias!

Cuídate con la biblioteca kafka-python. Tiene algunos problemas menores.

Si la velocidad no es realmente un problema para su consumidor, puede configurar la confirmación automática en cada mensaje. Debería funcionar.

SimpleConsumer proporciona un método de seek ( https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185 ) que le permite comenzar a consumir mensajes en el punto que desee.

Las llamadas más habituales son:

  • consumer.seek(0, 0) para comenzar a leer desde el principio de la cola.
  • consumer.seek(0, 1) para comenzar a leer desde el desplazamiento actual.
  • consumer.seek(0, 2) para omitir todos los mensajes pendientes y comenzar a leer solo los mensajes nuevos.

El primer argumento es un desplazamiento de esas posiciones. De esa manera, si llama a consumer.seek(5, 0) , omitirá los primeros 5 mensajes de la cola.

Además, no se olvide, el desplazamiento se almacena para grupos de consumidores. Asegúrate de usar el mismo todo el tiempo.

kafka-python almacena las compensaciones con el servidor kafka, no en una conexión de zookeeper separada. Desafortunadamente, las apis del servidor kafka para admitir las compensaciones de confirmación / recuperación no fueron completamente funcionales hasta que apache kafka 0.8.1.1. Si actualiza su servidor kafka, su configuración debería funcionar. También sugeriría actualizar kafka-python a 0.9.4.

[Kafka-python mantenedor]

El consumidor de Kafka es capaz de almacenar compensaciones en Zookeeper . En la API de Java tenemos dos opciones : consumidor de alto nivel, que administra el estado para nosotros y comienza a consumir donde lo dejó después del reinicio, y consumidor de bajo nivel sin estado sin esta superpotencia.

Por lo que entiendo en el código de consumidor de Python ( https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py ), tanto SimpleConsumer como MultiProcessConsumer son estadísticos y hacen un seguimiento de las compensaciones actuales en Zookeeper, por lo que es extraño que tengas este problema de reconsumo.

Asegúrese de tener las mismas ID de grupo de consumidores en los reinicios (¿puede ser que lo configure al azar?) Y verifique las siguientes opciones:

 auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit 

Puede ser que consums <100 mensajes o <5000 ms?