asyncio CancelledError and KeyboardInterrupt

Estoy intentando 2 formas de detener la ejecución de un bucle infinito:

  • supervisor_1 : la tarea se cancela programáticamente
  • supervisor_2 : la tarea se detiene con Ctrl + C

Si bien supervisor_2 no arroja ningún error cuando se interrumpe, no puedo obtener que supervisor_1 haya conseguido que la Task was destroyed but it is pending! . ¿Alguna idea de por qué?

Aquí está el código:

 import asyncio import aioredis from functools import partial class Listener: def __init__(self, redis_conn): self.redis_conn = redis_conn async def forever(self, loop_name): counter = 0 try: while True: print('{}: {}'.format(loop_name, counter)) counter += 1 await asyncio.sleep(1) except asyncio.CancelledError: print('Task Cancelled') self.redis_conn.close() await self.redis_conn.wait_closed() async def supervisor_1(redis_conn): redis_conn = await redis_conn l = Listener(redis_conn) task = asyncio.ensure_future( asyncio.gather(l.forever('loop_1'), l.forever('loop_2'))) await asyncio.sleep(2) task.cancel() async def supervisor_2(redis_conn): redis_conn = await redis_conn l = Listener(redis_conn) await asyncio.gather(l.forever('loop_1'), l.forever('loop_2')) if __name__ == '__main__': redis_conn = aioredis.create_pool(('localhost', 5003), db=1) loop = asyncio.get_event_loop() run = partial(supervisor_2, redis_conn=redis_conn) task = asyncio.ensure_future(run()) try: loop.run_until_complete(task) except KeyboardInterrupt: print('Interruped !') task.cancel() loop.run_forever() finally: loop.close() 

@update :

Gracias a @Gerasimov, aquí hay una versión que soluciona el problema, pero de alguna manera todavía genera errores de vez en cuando en KeyboardInterrupt:

 async def supervisor(redis_conn): redis_conn = await redis_conn l = Listener(redis_conn) task = asyncio.ensure_future( asyncio.gather(l.forever('loop_1'), l.forever('loop_2')) ) await asyncio.sleep(10) task.cancel() with suppress(asyncio.CancelledError): await task async def kill_tasks(): pending = asyncio.Task.all_tasks() for task in pending: task.cancel() with suppress(asyncio.CancelledError): await task 

y

 if __name__ == '__main__': redis_conn = aioredis.create_pool(('localhost', 5003), db=1) loop = asyncio.get_event_loop() run = partial(supervisor, redis_conn=redis_conn) task = asyncio.ensure_future(run()) try: loop.run_until_complete(task) except KeyboardInterrupt: print('Interruped !') loop.run_until_complete(kill_tasks()) finally: loop.close() 

task.cancel() sí mismo no finaliza la tarea: solo le dice a la tarea que CancelledError debe aparecer dentro de ella y regresa de inmediato. Debería llamarlo y esperar a que la tarea se cancele realmente (mientras que generará CancelledError ).

Tampoco debes suprimir CancelledError dentro de la tarea.

Lea esta respuesta donde traté de mostrar diferentes formas de trabajar con tareas. Por ejemplo, para cancelar alguna tarea y esperar a que se cancele, puede hacer:

 from contextlib import suppress task = ... # remember, task doesn't suppress CancelledError itself task.cancel() # returns immediately, we should await task raised CancelledError. with suppress(asyncio.CancelledError): await task # or loop.run_until_complete(task) if it happens after event loop stopped # Now when we awaited for CancelledError and handled it, # task is finally over and we can close event loop without warning.