Quiero esperar tanto en un descriptor de archivos como en un mutex, ¿cuál es la forma recomendada de hacer esto?

Me gustaría generar hilos para realizar ciertas tareas y usar una cola segura de hilos para comunicarme con ellos. También me gustaría hacer IO a una variedad de descriptores de archivos mientras estoy esperando.

¿Cuál es la forma recomendada para lograr esto? ¿Tengo que crear un conducto entre subprocesos y escribir en él cuando la cola pasa de no elementos a algunos elementos? ¿No hay una mejor manera?

Y si tengo que crear el conducto entre subprocesos, ¿por qué no hay más bibliotecas que implementan colas compartidas que le permitan crear la cola compartida y el conducto entre subprocesos como una sola entidad?

¿El hecho de que quiera hacer esto implica un defecto de diseño fundamental?

Estoy preguntando esto sobre C ++ y Python. Y estoy ligeramente interesado en una solución multiplataforma, pero principalmente en Linux.

Para un ejemplo más concreto …

Tengo un código que buscará cosas en un árbol de sistema de archivos. Tengo varios canales de comunicación abiertos al mundo exterior a través de sockets. Llegarán solicitudes que pueden (o no) dar como resultado la necesidad de buscar cosas en el árbol del sistema de archivos.

Voy a aislar el código que busca cosas en el árbol del sistema de archivos en uno o más subprocesos. Me gustaría recibir solicitudes que resulten en la necesidad de buscar en el árbol y colocarlas en una cola segura de subprocesos de cosas que deben hacer los subprocesos del buscador. Los resultados se pondrán en una cola de búsquedas completadas.

Me gustaría poder atender todas las solicitudes de no búsqueda rápidamente mientras se realizan las búsquedas. Me gustaría poder actuar en los resultados de búsqueda de manera oportuna.

El servicio de las solicitudes entrantes generalmente implicaría algún tipo de architecture dirigida por eventos que utiliza epoll . La cola de solicitudes de búsqueda de disco y la cola de resultados de resultados implicarían una cola segura para subprocesos que utiliza mutexes o semáforos para implementar la seguridad de subprocesos.

La forma estándar de esperar en una cola vacía es usar una variable de condición. Pero eso no funcionará si necesito atender otras solicitudes mientras estoy esperando. Ya sea que termino de sondear la cola de resultados todo el tiempo (y retrasar los resultados en la mitad del intervalo de sondeo, en promedio), bloqueando y no atendiendo las solicitudes.

Cada vez que uno usa una architecture dirigida por eventos, se requiere que uno tenga un solo mecanismo para informar la finalización del evento. En Linux, si uno está usando archivos, se requiere que uno use algo de la familia de selección o de encuesta, lo que significa que uno está atascado con el uso de una canalización para iniciar todos los eventos relacionados con ningún archivo.

Edición : Linux tiene eventfd y timerfd . Estos pueden agregarse a su lista de epoll y usarse para salir de la epoll_wait cuando se activa desde otro hilo o en un evento de temporizador respectivamente.

Hay otra opción y eso son las señales. Se puede usar fcntl modificar el descriptor de archivo de manera que se emita una señal cuando el descriptor de archivo se active. El manejador de señales puede entonces enviar un mensaje de lista de archivos a cualquier tipo de cola de su elección. Esto puede ser un simple semáforo o una cola mutex / condvar. Ya que uno ya no está utilizando select / poll , uno ya no necesita usar una canalización para poner en cola ninguno de los mensajes basados ​​en archivos.

Advertencia de salud: no he intentado esto y, aunque no puedo ver por qué no funcionará, realmente no conozco las implicaciones de rendimiento del enfoque de signal .

Resolví este problema exacto usando lo que mencionas, pipe () y libevent (que envuelve epoll). El subproceso de trabajo escribe un byte en su FD de canalización cuando su cola de salida va de vacía a no vacía. Eso despierta el hilo principal de IO, que luego puede agarrar la salida del hilo trabajador. Esto funciona muy bien, en realidad es muy simple de codificar.

Tiene la etiqueta de Linux, así que voy a desecharla: POSIX Message Queue hace todo esto, que debería cumplir con su solicitud “incorporada”, si no su deseo multiplataforma menos deseado.

La sincronización segura de subprocesos está integrada. Puede hacer que sus subprocesos de trabajo se bloqueen al leer la cola. Alternativamente, los MQ pueden usar mq_notify () para generar un nuevo hilo (o señalar uno existente) cuando hay un nuevo elemento puesto en la cola. Y como parece que va a utilizar select (), el identificador de MQ (mqd_t) se puede usar como descriptor de archivos con select.

En mi opinión, Duck’s y Twk’s son en realidad mejores respuestas que Doron’s (la seleccionada por el OP). doron sugiere escribir en una cola de mensajes desde el contexto de un manejador de señales, e indica que la cola de mensajes puede ser “cualquier tipo de cola”. Le recomiendo encarecidamente que no haga esto, ya que muchas llamadas de biblioteca / sistema de C no se pueden llamar de forma segura desde un controlador de señales (consulte la sección de seguridad de async-signal-safe ).

En particular, si elige una cola protegida por un mutex, no debe acceder a ella desde un controlador de señales. Considere este escenario: su hilo de consumidor bloquea la cola para leerlo. Inmediatamente después, el kernel envía la señal para notificarle que un descriptor de archivo ahora tiene datos. El manejador de señales se ejecuta en el hilo del consumidor, necesariamente), y trata de poner algo en su cola. Para ello, primero tiene que llevar la cerradura. Pero ya tiene la cerradura, por lo que ahora estás bloqueado.

En mi experiencia, select / poll es la única solución viable para un progtwig basado en eventos en UNIX / Linux. Desearía que hubiera una mejor manera dentro de un progtwig multiproceso, pero necesita algún mecanismo para “despertar” su cadena de consumidores. Todavía tengo que encontrar un método que no implique una llamada al sistema (ya que el subproceso del consumidor está en una cola de espera dentro del kernel durante cualquier llamada de locking, como seleccionar).

EDITAR: Olvidé mencionar una forma específica de Linux para manejar las señales al usar select / poll: signalfd (2) . Obtiene un descriptor de archivo en el que puede seleccionar / sondear, y manejar el código se ejecuta normalmente en lugar de en el contexto de un controlador de señales.

Parece que nadie ha mencionado esta opción todavía:

No ejecute select / poll / etc. en su “hilo principal”. Inicie un subproceso secundario dedicado que realiza la E / S e inserta notificaciones en su cola segura de subprocesos (la misma cola que utilizan los otros subprocesos para comunicarse con el subproceso principal) cuando se completan las operaciones de E / S.

Entonces su hilo principal solo necesita esperar en la cola de notificaciones.

C ++ 11 tiene std :: mutex y std :: condition_variable. Los dos se pueden usar para que una señal de un hilo sea otra cuando se cumple una determinada condición. Me parece que necesitarás construir tu solución a partir de estos primitivos. Si su entorno aún no es compatible con estas características de la biblioteca C ++ 11, puede encontrar otras muy similares en boost. Lo siento, no puedo decir mucho sobre python.

Este es un problema muy común, especialmente cuando está desarrollando un progtwig del lado del servidor de red. La apariencia principal de la mayoría de los progtwigs del lado del servidor de Linux se integrará así:

 epoll_add(serv_sock); while(1){ ret = epoll_wait(); foreach(ret as fd){ req = fd.read(); resp = proc(req); fd.send(resp); } } 

Es un solo hilo (el hilo principal), marco de servidor basado en epoll. El problema es que es de un solo hilo, no de múltiples hilos. Requiere que proc () nunca debe bloquearse o ejecutarse durante un tiempo significativo (digamos 10 ms para casos comunes).

Si proc () alguna vez se ejecuta durante un tiempo prolongado, NECESITAMOS MULTI HILOS, y ejecuta proc () en un hilo separado (el hilo trabajador).

Podemos enviar la tarea al subproceso de trabajo sin bloquear el subproceso principal, utilizando una cola de mensajes basada en mutex, es lo suficientemente rápido.

 epoll_add(serv_sock); while(1){ ret = epoll_wait(); foreach(ret as fd){ req = fd.read(); queue.add_job(req); // fast, non blockable } } 

Entonces necesitamos una forma de obtener el resultado de la tarea de un subproceso de trabajo. ¿Cómo? Si solo revisamos la cola de mensajes directamente, antes o después de epoll_wait ().

 epoll_add(serv_sock); while(1){ ret = epoll_wait(); // may blocks for 10ms resp = queue.check_result(); // fast, non blockable foreach(ret as fd){ req = fd.read(); queue.add_job(req); // fast, non blockable } } 

Sin embargo, la acción de verificación se ejecutará después de que epoll_wait () finalice, y epoll_wait () generalmente se bloquea durante 10 microsegundos (casos comunes) si todos los descriptores de archivo que espera no están activos.

Para un servidor, 10 ms es bastante tiempo! ¿Podemos indicar a epoll_wait () que finalice inmediatamente cuando se genere el resultado de la tarea?

¡Sí! Describiré cómo se hace en uno de mis proyectos de código abierto:

Crea una tubería para todos los hilos de trabajo, y Epoll espera en esa tubería también. Una vez que se genera el resultado de una tarea, el subproceso de trabajo escribe un byte en la canalización, ¡entonces epoll_wait () terminará casi al mismo tiempo! – Linux pipe tiene 5 us a 20 us latencia.


En mi proyecto SSDB (una base de datos NoSQL en el disco compatible con Redis), creo un SelectableQueue para pasar mensajes entre el subproceso principal y los subprocesos de trabajo. Al igual que su nombre, SelectableQueue tiene un descriptor de archivo, que puede ser esperado por epoll.

SelectableQueue: https://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94

Uso en el hilo principal:

 epoll_add(serv_sock); epoll_add(queue->fd()); while(1){ ret = epoll_wait(); foreach(ret as fd){ if(fd is queue){ sock, resp = queue->pop_result(); sock.send(resp); } if(fd is client_socket){ req = fd.read(); queue->add_task(fd, req); } } } 

Uso en subproceso de trabajo:

 fd, req = queue->pop_task(); resp = proc(req); queue->add_result(fd, resp); 

Una forma de lograr lo que desea hacer es implementar el patrón de observador

Registre su hilo principal como observador con todos sus hilos generados, y pídales que lo notifiquen cuando hayan terminado de hacer lo que debían (o actualizar durante su ejecución con la información que necesita).

Básicamente, desea cambiar su enfoque a un modelo impulsado por eventos.