El asyncio de Python funciona sincrónicamente.

Estoy tratando de aprovechar la nueva biblioteca de asyncio de Python para enviar solicitudes HTTP asíncronas. Quiero esperar unos pocos milisegundos (la variable de timeout ) antes de enviar cada solicitud, pero, por supuesto, enviarlos todos de forma asíncrona y, no esperar una respuesta después de cada solicitud enviada.

Estoy haciendo algo como lo siguiente:

 @asyncio.coroutine def handle_line(self, line, destination): print("Inside! line {} destination {}".format(line, destination)) response = yield from aiohttp.request('POST', destination, data=line, headers=tester.headers) print(response.status) return (yield from response.read()) @asyncio.coroutine def send_data(self, filename, timeout): destination='foo' logging.log(logging.DEBUG, 'sending_data') with open(filename) as log_file: for line in log_file: try: json_event = json.loads(line) except ValueError as e: print("Error parsing json event") time.sleep(timeout) yield from asyncio.async(self.handle_line(json.dumps(json_event), destination)) loop=asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1)) 

La salida que estoy obteniendo (imprimiendo las 200 respuestas) parece que este código se ejecuta de forma síncrona. ¿Qué estoy haciendo mal?

Hay un par de problemas aquí:

  1. Debe usar asyncio.sleep , no time.sleep , porque este último bloqueará el bucle de eventos.

  2. No debe utilizar el yield from después de la asyncio.async(self.handle_line(...)) , ya que eso hará que el script se bloquee hasta que self.handle_line coroutine esté completo, lo que significa que no terminará haciendo cualquier cosa concurrentemente procesa cada línea, espera a que finalice el procesamiento y luego pasa a la siguiente línea. En su lugar, debe ejecutar todas las llamadas asyncio.async sin esperar, guardar los objetos de Task devueltos a una lista, y luego usar asyncio.wait para esperar a que todos se completen una vez que los haya iniciado todos.

Poniendo todo junto:

 @asyncio.coroutine def handle_line(self, line, destination): print("Inside! line {} destination {}".format(line, destination)) response = yield from aiohttp.request('POST', destination, data=line, headers=tester.headers) print(response.status) return (yield from response.read()) @asyncio.coroutine def send_data(self, filename, timeout): destination='foo' logging.log(logging.DEBUG, 'sending_data') tasks = [] with open(filename) as log_file: for line in log_file: try: json_event = json.loads(line) except ValueError as e: print("Error parsing json event") yield from asyncio.sleep(timeout) tasks.append(asyncio.async( self.handle_line(json.dumps(json_event), destination)) yield from asyncio.wait(tasks) asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))