Manejo asíncrono de excepciones en Python.

Tengo el siguiente código usando asyncio y aiohttp para hacer solicitudes HTTP asíncronas.

 import sys import asyncio import aiohttp @asyncio.coroutine def get(url): try: print('GET %s' % url) resp = yield from aiohttp.request('GET', url) except Exception as e: raise Exception("%s has error '%s'" % (url, e)) else: if resp.status >= 400: raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason)) return (yield from resp.text()) @asyncio.coroutine def fill_data(run): url = 'http://www.google.com/%s' % run['name'] run['data'] = yield from get(url) def get_runs(): runs = [ {'name': 'one'}, {'name': 'two'} ] loop = asyncio.get_event_loop() task = asyncio.wait([fill_data(r) for r in runs]) loop.run_until_complete(task) return runs try: get_runs() except Exception as e: print(repr(e)) sys.exit(1) 

Por alguna razón, las excepciones generadas dentro de la función get no se detectan:

 Future/Task exception was never retrieved Traceback (most recent call last): File "site-packages/asyncio/tasks.py", line 236, in _step result = coro.send(value) File "mwe.py", line 25, in fill_data run['data'] = yield from get(url) File "mwe.py", line 17, in get raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason)) Exception: http://www.google.com/two has error '404: Not Found' 

Entonces, ¿cuál es la forma correcta de manejar las excepciones generadas por las couroutines?

asyncio.wait no consume realmente los Futures pasado, solo espera a que se completen, y luego devuelve los objetos Future :

asyncio.wait asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

Espere a que se completen los futuros y los objetos de coroutine dados por la secuencia de futuros. Los coroutines serán envueltos en tareas. Devuelve dos conjuntos de Future : (hecho, pendiente).

Hasta que realmente yield from los artículos en la lista done , permanecerán sin consumir. Como su progtwig sale sin consumir los futuros, verá los mensajes de “la excepción nunca se recuperó”.

Para su caso de uso, probablemente tenga más sentido usar asyncio.gather , que en realidad consumirá cada Future , y luego devolverá un Future único que agregue todos sus resultados (o genere la primera Exception lanzada por un futuro en la lista de entrada) .

 def get_runs(): runs = [ {'name': 'one'}, {'name': 'two'} ] loop = asyncio.get_event_loop() tasks = asyncio.gather(*[fill_data(r) for r in runs]) loop.run_until_complete(tasks) return runs 

Salida:

 GET http://www.google.com/two GET http://www.google.com/one Exception("http://www.google.com/one has error '404: Not Found'",) 

Tenga en cuenta que asyncio.gather realidad le permite personalizar su comportamiento cuando uno de los futuros genera una excepción; el comportamiento predeterminado es generar la primera excepción que golpea, pero también puede devolver cada objeto de excepción en la lista de salida:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

Devuelva un futuro agregando resultados de los objetos o futuros de coroutine dados.

Todos los futuros deben compartir el mismo bucle de eventos. Si todas las tareas se realizan con éxito, el resultado futuro devuelto es la lista de resultados (en el orden de la secuencia original, no necesariamente el orden de llegada de los resultados). Si return_exceptions es True , las excepciones en las tareas se tratan de la misma manera que los resultados exitosos, y se reúnen en la lista de resultados; de lo contrario, la primera excepción generada se propagará inmediatamente al futuro devuelto.

Para depurar o “manejar” las excepciones en la callback :

Coroutine que devuelve algún resultado o plantea excepciones.

 @asyncio.coroutine def async_something_entry_point(self): try: return self.real_stuff_which_throw_exceptions() except: raise Exception(some_identifier_here + ' ' + traceback.format_exc()) 

Y callback:

 def callback(self, future: asyncio.Future): exc = future.exception() if exc: # Handle wonderful empty TimeoutError exception if type(exc) == TimeoutError: self.logger(' callback exception TimeoutError') else: self.logger(" callback exception " + str(exc)) # store your result where you want self.result.append( future.result() ) 

asyncio proporciona una API para el manejo personalizado de errores, consulte el documento https://docs.python.org/3/library/asyncio-eventloop.html#error-handling-api