Esperando que una tarea se complete después de KeyboardInterrupt en asyncio

Estoy tratando de entender cómo funciona el asyncio . En mi caso, el cliente realiza una conexión tcp al servidor, envía una cadena de inicio de sesión, si está autenticada, recibe una secuencia de caracteres. Finalmente, en KeyboardInterrupt envía la cadena de logoff al servidor y hapilly se desconecta.

Actualmente estoy atascado en la parte final ya que mi método / tarea de cierre de sesión se destruye antes de que tenga la oportunidad de completarse.

 ^CTask was destroyed but it is pending! source_traceback: Object created at (most recent call last): File "tst.py", line 101, in  client.login() File "tst.py", line 29, in login logoff_tsk = self.loop.create_task(self.logoff()) task: <Task pending coro= cb=[myClass._shutdown()] created at tst.py:29> 

A continuación se muestra el código que produce este error:

 from functools import partial import asyncio class myClass: def __init__(self, *a, **kw): self.transport = None self.protocol = None self.usr = str(kw.get("usr", "")) self.pwd = str(kw.get("pwd", "")) self.host = str(kw.get("host", "")) or "127.0.0.2" self.port = int(kw.get("port", 0)) or 5038 self.loop = asyncio.get_event_loop() self.loop.set_debug(enabled=True) def reactor(self, recv): print("## ~/~ From reactor: {!r}".format(recv.decode())) def login(self): connection_tsk = self.loop.create_task(self.loop.create_connection( partial( myProtocol, reactor_func=self.reactor), host=self.host, port=self.port)) connection_tsk.add_done_callback(self.set_transport_protocol) try: self.loop.run_forever() except KeyboardInterrupt: logoff_tsk = self.loop.create_task(self.logoff()) logoff_tsk.add_done_callback(self._shutdown) def set_transport_protocol(self, fut): print("AmiCtl.set_transport_protocol") transport, protocol = fut.result() self.transport = transport self.protocol = protocol self.loop.call_soon(self._login) def _login(self): login_string = self.cmd("Login") self.loop.create_task(self.transmit(login_string)) @asyncio.coroutine def transmit(self, cmd): if self.transport: print("## ~/~ Sending data: {!r}".format(cmd)) self.transport.write(cmd) @asyncio.coroutine def logoff(self): command = self.cmd("Logoff") yield from asyncio.shield(self.transmit(command)) def _shutdown(self): if self.transport: self.transport.close() self.loop.stop() self.loop.close() print("\n{!r}".format(self.loop)) def cmd(self, action): """ Produce login/logoff string. """ class myProtocol(asyncio.Protocol): def __init__(self, reactor_func=None): self.reactor_func = reactor_func self.loop = asyncio.get_event_loop() def connection_made(self, transport): self.transport = transport peername = transport.get_extra_info("peername") print("## ~/~ Connection made: {peer}".format(peer=peername)) def data_received(self, data): if callable(self.reactor_func): self.reactor_func(data) def connection_lost(self, exc): print("## ~/~ Lost connection to the server!!") self.loop.stop() client = myClass(usr="my_usr", pwd="my_pwd") client.login() 

¿Cómo puedo mejorar / arreglar mi código por favor?

No me referiré a tu código ya que creo que tu pregunta es una pregunta xy. ( http://xyproblem.info ), pero intentaré responder de una manera más genérica. Lo que estoy leyendo es que estás preguntando cómo cerrar correctamente una aplicación asyncio.

He establecido un pequeño ejemplo que creo que con alguna explicación lo pondrá en el camino correcto. Lea el código a continuación, mi esperanza es que se pueda relacionar con su caso de uso y explicaré a continuación.

 import asyncio import signal loop = asyncio.get_event_loop() class Connector: def __init__(self): self.closing = False self.closed = asyncio.Future() task = loop.create_task(self.connection_with_client()) task.add_done_callback(self.closed.set_result) async def connection_with_client(self): while not self.closing: print('Read/write to open connection') await asyncio.sleep(1) print('I will now close connection') await asyncio.sleep(1) conn = Connector() def stop(loop): conn.closing = True print("from here I will wait until connection_with_client has finished") conn.closed.add_done_callback(lambda _: loop.stop()) loop.add_signal_handler(signal.SIGINT, stop, loop) loop.run_forever() loop.close() 

Lo que está pidiendo no es realmente trivial, una de las cosas más difíciles de manejar haciendo asyncio es cerrar correctamente una aplicación asyncio.

Para realizar el cierre administrado, debe tener una rutina en el lugar que maneje el apagado y el futuro, como se ha hecho, cuando se haya asegurado de que todo esté apagado. En mi ejemplo, es task.add_done_callback(self.closed.set_result) pero este futuro podría establecerse de otras maneras.

En segundo lugar, debe agregar un controlador de señal que ejecutará una función no asíncrona (normal) y progtwigr una callback, que activa el ciclo para que se cierre / detenga cuando finalice su ‘cierre futuro’.

Eche un vistazo a mi código y juegue con él hasta que entienda el flujo. No te culpo por preguntar, en mi opinión, una de las cosas más difíciles de hacer asyncio es hacer un seguimiento de las rutinas “sueltas”, que resultarán en un cierre impuro.

Cuando hice este ‘ejercicio’ mi primera vez y necesitaba comprender los principios, revisé el código de https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py#L93-L95 Si te tomas el tiempo para leer el código. Estos chicos manejan tu problema exacto de una manera muy hermosa. Sé que el código es un poco complejo, pero toma una taza de café y sigue las llamadas de método hasta que tenga sentido.

Espero que esto te ayude, publica más detalles o comentarios si no estoy seguro. Y tómese el tiempo para comprender este tema (las cosas de cierre de asycnio). Cuando domina la gestión de cierre con asyncio, ya no es un asyncio-padawan;)

Puede pensar que hay mucho código para apagar, pero a mi entender, esta es la forma de hacerlo (y más o menos la única forma de aplicaciones más grandes y complejas).

La mejor de las suertes.