Comunicando el final de la cola

Estoy aprendiendo a usar el módulo de Cola, y estoy un poco confundido acerca de cómo se puede hacer que un subproceso de consumidor de la cola sepa que la cola está completa. Idealmente, me gustaría usar get() desde el subproceso del consumidor y hacer que lance una excepción si la cola ha sido marcada como “hecha”. ¿Hay una mejor manera de comunicar esto que agregando un valor de centinela para marcar el último elemento en la cola?

La cola no tiene inherentemente la idea de estar completa o terminada. Se pueden usar indefinidamente. Para cerrarla cuando haya terminado, de hecho tendrá que poner Ninguno o algún otro valor mágico al final y escribir la lógica para verificarlo, como lo describió. La forma ideal probablemente sería subclasificar el objeto Queue.

Consulte http://en.wikipedia.org/wiki/Queue_(data_structure) para obtener más información sobre la cola en general.

original (la mayoría de esto ha cambiado; consulte las actualizaciones a continuación)

Basándome en algunas de las sugerencias (¡gracias!) De Glenn Maynard y otros, decidí Queue.Queue un descendiente de Queue.Queue que implementa un método de close . Está disponible en la forma de un módulo primitivo (sin empaquetar). Lo limpiaré un poco y lo empaquetaré correctamente cuando tenga un poco más de tiempo. Por ahora, el módulo solo contiene la clase CloseableQueue y la clase de excepción Closed . Estoy planeando expandirlo para incluir también las subclases de Queue.LifoQueue y Queue.PriorityQueue .

Actualmente se encuentra en un estado bastante preliminar, lo que quiere decir que a pesar de que pasa su conjunto de pruebas, todavía no lo he usado para nada. Su experiencia puede ser diferente. Mantendré esta respuesta actualizada con noticias emocionantes.

La clase CloseableQueue difiere un poco de la sugerencia de Glenn en que el cierre de la cola evitará futuros s, pero no impedirá get futuros s hasta que la cola se vacíe. Esto tenía más sentido para mí; parecía que la funcionalidad para borrar la cola se podía agregar como una mezcla separada * que sería ortogonal a la funcionalidad de cierre. Básicamente, con CloseableQueue , al cerrar la cola, se indica que se ha put el último elemento. También hay una opción para hacer esto de forma atómica al pasar last=True para la llamada final. Las llamadas subsiguientes a put , y las llamadas subsiguientes a get una vez que la cola se haya vaciado, así como las llamadas bloqueadas pendientes que coincidan con esas descripciones, generarán la excepción Closed .

Esto es principalmente útil para situaciones en las que un solo productor está generando datos para uno o más consumidores, pero también podría ser útil para un arreglo multi-múltiple donde los consumidores están esperando un artículo o conjunto de artículos en particular. En particular, no proporciona una manera de determinar que todos los productores han terminado la producción. Hacer que eso funcione implicaría la provisión de alguna manera de registrar a los productores ( .open() ?), Así como una manera de indicar que el registro de productores está cerrado.

Sugerencias y / o revisiones de código son muy bienvenidos. No he escrito un montón de código de concurrencia, pero espero que el conjunto de pruebas sea lo suficientemente completo como para que el hecho de que el código se apruebe sea una indicación de la calidad del código, en lugar de la falta del mismo. Pude reutilizar un montón de código del conjunto de pruebas del módulo Queue: el archivo se incluye en este módulo y se usa como base para varias subclases y rutinas, incluida la prueba de regresión. Esto probablemente (con suerte) ayudó a evitar la completa ineptitud en el departamento de pruebas. El código en sí mismo simplemente reemplaza a Queue.get y Queue.put con cambios mínimos, y agrega los métodos de close y closed .

En cierto modo, he evitado intencionalmente usar cualquier fantasía nueva como los gestores de contexto tanto en el propio código como en el conjunto de pruebas en un esfuerzo por mantener el código tan compatible con versiones anteriores como lo es el propio módulo Queue, que es bastante considerable. Probablemente __exit__ métodos __enter__ y __exit__ en algún momento; de lo contrario, la función de cierre de contextlib debería ser aplicable a una instancia de CloseableQueue.

*: Aquí uso el término “mixin” a la ligera. Como las clases del módulo Queue son de estilo antiguo, los mixins tendrían que mezclarse usando las funciones de clase de fábrica; se aplican algunas restricciones; Oferta nula donde lo prohíba Guido.

actualizar

El módulo CloseableQueue ahora proporciona las clases CloseableLifoQueue y CloseablePriorityQueue . También he añadido algunas funciones de conveniencia para apoyar la iteración. Todavía es necesario volver a trabajar como un paquete adecuado. Hay una función de fábrica de clases para permitir una subclasificación conveniente de otras clases derivadas de Queue.Queue .

actualización 2

CloseableQueue ahora está disponible a través de PyPI , por ejemplo, con

 $ easy_install CloseableQueue 

Los comentarios y las críticas son bienvenidos, especialmente por el downvoter anónimo de esta respuesta.

Un centinela es una forma natural de cerrar una cola, pero hay un par de cosas a tener en cuenta.

Primero, recuerde que puede tener más de un consumidor, por lo que debe enviar un centinela una vez por cada consumidor activo y garantizar que cada consumidor solo consumirá un centinela para garantizar que cada consumidor reciba su centinela de apagado.

En segundo lugar, recuerde que la cola define una interfaz y que, cuando sea posible, el código debe comportarse independientemente de la cola subyacente. Es posible que tenga un PriorityQueue o que tenga alguna otra clase que exponga la misma interfaz y devuelva valores en algún otro orden.

Desafortunadamente, es difícil lidiar con ambos. Para lidiar con el caso general de diferentes colas, un consumidor que está cerrando debe continuar consumiendo valores después de recibir su centinela de cierre hasta que la cola esté vacía. Eso significa que puede consumir el centinela de otro hilo. Esta es una debilidad de la interfaz de la cola: debe tener una llamada Queue.shutdown para que todos los consumidores lancen una excepción, pero falta.

Entonces, en la práctica:

  • Si está seguro de que solo está utilizando una Cola normal, simplemente envíe un centinela por hilo.
  • Si puede estar usando un PriorityQueue, asegúrese de que el centinela tenga la prioridad más baja.

Queue es un registro FIFO (primero en entrar, primero en salir), así que recuerde que el consumidor puede ser más rápido que el productor. Cuando los clientes detectan que la cola está vacía, normalmente se realiza una de las siguientes acciones:

  1. Enviar a API: cambiar al siguiente hilo.
  2. Enviar a API: duerme unos ms y luego vuelve a comprobar la cola.
  3. Enviar a API: espere un evento (como nuevo mensaje en cola).

Si no desea que el hilo de los consumidores finalice después de que se complete el trabajo, ponga en cola un valor centinela para finalizar la tarea.

La mejor manera de hacer esto sería que la propia cola notifique a un cliente que ha alcanzado el estado de “hecho”. El cliente puede entonces tomar cualquier acción que sea apropiada.

Lo que has sugerido; revisar la cola para ver si se hace periódicamente, sería altamente indeseable. El sondeo es un antipatterno en la progtwigción multiproceso, siempre debe usar notificaciones.

EDITAR:
Entonces, usted dice que la cola en sí misma sabe que está “hecha” en función de algunos criterios y necesita notificar a los clientes de ese hecho. Creo que estás en lo correcto y la mejor manera de hacerlo es lanzar cuando un cliente llama a get () y la cola está en el estado finalizado. Si su lanzamiento esto anularía la necesidad de un valor de centinela en el lado del cliente. Internamente, la cola puede detectar que está “hecha” de cualquier manera que le plazca, por ejemplo, la cola está vacía, su estado se estableció en etc. Sin embargo, no veo la necesidad de un valor de centinela.