Asyncio detectando desconexiones se bloquea.

Estoy usando Asyncio en Python 3.4, intentaré explicar lo que estoy haciendo hasta este punto y lo que (creo) está causando el problema.

Por un lado, tengo un marco de conexión UDP con operaciones de locking, tomo los datos que obtengo de este flujo y estoy creando el json que entrego al cliente en formato SSE. Todo esto está funcionando muy bien.

El problema con el que me estoy topando es que no puedo hacer que maneje las desconexiones del cliente correctamente si no hago nada y el cliente se desconecta. Comenzaré a recibir este error:

WARNING [selector_events:613] socket.send() raised exception. 

dado que el bucle aún se está ejecutando, he estado buscando formas de romper de manera limpia el bucle y activar el .close (), pero tengo problemas con los ejemplos que he encontrado y no hay muchos recursos en línea.

El único ejemplo que parece funcionar realmente es intentar leer una línea del cliente y si es una cadena vacía significa que el cliente está desconectado.

  while True: data = (yield from client_reader.readline()) if not data: #client disconnected break 

sin embargo, después de aproximadamente diez mensajes, todos los mensajes al cliente se detienen, creo que esto se debe a que se cuelga en “data = (rendimiento de client_reader.readline ())” después de que se cuelga si cierro el cliente, entonces se apaga correctamente y “End Conexión “se llama. ¿Alguna idea de por qué podría estar colgando? Creo que tengo un buen manejo de Asyncio en este punto, pero este me está desconcertando.

Nota: la ubicación () y el estado () son mis dos llamadas para obtener información del socket UDP. Las he ejecutado exitosamente sin problemas durante muchas horas con este mismo código, menos las líneas de desconexión del cliente.

 clients = {} def accept_client(client_reader, client_writer): task = asyncio.Task(handle_client(client_reader, client_writer)) clients[task] = (client_writer) def client_done(task): del clients[task] client_writer.close() log.info("End Connection") log.info("New Connection") task.add_done_callback(client_done) @asyncio.coroutine def handle_client(client_reader, client_writer): data = {'result':{'status':'Connection Ready'}} yield from postmessage(data,client_writer) while True: data = (yield from client_reader.readline()) if not data: #client disconnected break data = yield from asyncio.wait_for(location(), timeout=1.0) yield from postmessage(data,client_writer) data = yield from asyncio.wait_for(status(), timeout=1.0) yield from postmessage(data,client_writer) @asyncio.coroutine def postmessage(data, client_writer): mimetype=('text/event-stream') response = ('data: {0}\n\n'.format(data).encode('utf-8')) client_writer.write(response) client_writer.drain() 

Actualización: si agrego un tiempo de espera en el “rendimiento de client_reader”, aparece el siguiente error cuando llega al punto de que normalmente se bloquea.

 2014-11-17 03:13:56,214 INFO [try:23] End Connection 2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved future: <Task finished coro= exception=TimeoutError()> Traceback (most recent call last): File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step result = next(coro) File "try.py", line 35, in handle_client timeout=1.0)) File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError 

Aquí hay una secuencia de comandos de muestra que muestra el error en acción: simplemente ejecútelo en Python 3.4.2 y después de 9 iteraciones se bloqueará la lectura del cliente.

(La secuencia de comandos completa para que pueda ejecutarla y verla usted mismo)

 import asyncio import logging import json import time log = logging.getLogger(__name__) clients = {} def accept_client(client_reader, client_writer): task = asyncio.Task(handle_client(client_reader, client_writer)) clients[task] = (client_writer) def client_done(task): del clients[task] client_writer.close() log.info("End Connection") log.info("New Connection") task.add_done_callback(client_done) @asyncio.coroutine def handle_client(client_reader, client_writer): data = {'result':{'status':'Connection Ready'}} postmessage(data,client_writer) count = 0 while True: data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0)) if not data: #client disconnected break data = yield from asyncio.wait_for(test1(),timeout=1.0) yield from postmessage(data,client_writer) data = yield from asyncio.wait_for(test2(),timeout=1.0) yield from postmessage(data,client_writer) @asyncio.coroutine def postmessage(data, client_writer): mimetype=('text/event-stream') response = ('data: {0}\n\n'.format(data).encode('utf-8')) client_writer.write(response) client_writer.drain() @asyncio.coroutine def test1(): data = {'result':{ 'test1':{ } } } data = json.dumps(data) return data @asyncio.coroutine def test2(): data = {'result':{ 'test2':{ } } } data = json.dumps(data) return data def main(): loop = asyncio.get_event_loop() f = asyncio.start_server(accept_client, host=None, port=2991) loop.run_until_complete(f) loop.run_forever() if __name__ == '__main__': log = logging.getLogger("") formatter = logging.Formatter("%(asctime)s %(levelname)s " + "[%(module)s:%(lineno)d] %(message)s") # log the things log.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(formatter) log.addHandler(ch) main() 

Otra actualización: encontré que muere porque lee todas las líneas del encabezado del cliente y luego se agota cuando se queda sin líneas. Creo que la respuesta real que busco es cómo detectar las desconexiones de los clientes cuando en realidad no necesita recibir datos del cliente (aparte de la conexión inicial).

Ok, creo que entiendo tu problema. Está leyendo desde el cliente solo para saber si el cliente se ha desconectado, pero una vez que el cliente ha enviado su encabezado, la línea de readline() se bloqueará indefinidamente mientras el cliente todavía está conectado, lo que le impide realizar cualquier trabajo. Usar un tiempo de espera para evitar el locking está bien, solo necesita manejar el TimeoutError , ya que cuando ocurre, puede asumir que el cliente no se ha desconectado:

 from concurrent.futures import TimeoutError @asyncio.coroutine def handle_client(client_reader, client_writer): data = {'result':{'status':'Connection Ready'}} postmessage(data,client_writer) count = 0 while True: try: # See if client has disconnected. data = (yield from asyncio.wait_for(client_reader.readline(),timeout=0.01)) if not data: # Client disconnected break except TimeoutError: pass # Client hasn't disconnected. data = yield from asyncio.wait_for(test1(),timeout=1.0) yield from postmessage(data,client_writer) data = yield from asyncio.wait_for(test2(),timeout=1.0) yield from postmessage(data,client_writer) 

Tenga en cuenta que el tiempo de espera fue muy breve, ya que no queremos bloquear nada, solo queremos saber si la conexión se ha cerrado.

Pero una solución aún mejor sería no verificar explícitamente si la conexión se ha cerrado y, en cambio, manejar cualquier excepción que se obtenga al intentar enviar datos a través del socket cuando se haya cerrado la conexión:

 @asyncio.coroutine def handle_client(client_reader, client_writer): data = {'result':{'status':'Connection Ready'}} postmessage(data,client_writer) count = 0 while True: try: data = yield from asyncio.wait_for(test1(),timeout=1.0) yield from postmessage(data,client_writer) data = yield from asyncio.wait_for(test2(),timeout=1.0) yield from postmessage(data,client_writer) except ConnectionResetError: # And/or whatever other exceptions you see. break 

Leí el código fuente y encontré una manera fácil de hacer esto:

 if client_writer.transport._conn_lost: print('Connection is lost') # break some loop 

Tuve el mismo problema con select () que tampoco puede detectar un socket muerto mientras espero un evento.

Resolví mi problema usando un read () no bloqueante y considerando que el rendimiento de un evento de lectura de select () , asignado a 0 bytes recibidos, es una conexión muerta.

La muerte de un socket, mientras se ejecuta el código en modo kernel (sys call), no genera una señal, ni ninguna excepción. Parece que se le deja un enfoque heurístico, considerando que el rendimiento de una lectura () de cero bytes, debería considerarse como un enlace / socket muerto … No es lo que considero elegante, pero no he podido. Encuentra algo mejor (por el momento).