¿Cómo agregar una coroutine a un asyncio loop en ejecución?

¿Cómo se puede agregar una nueva coroutine a un bucle asyncio en ejecución? Es decir. Una que ya está ejecutando un conjunto de coroutines.

Supongo que, como solución alternativa, uno podría esperar a que se completen las coroutines existentes y luego inicializar un nuevo bucle (con la coroutine adicional). Pero hay una manera mejor?

Puedes usar create_task para progtwigr nuevas coroutinas:

 import asyncio async def cor1(): ... async def cor2(): ... async def main(loop): await asyncio.sleep(0) t1 = loop.create_task(cor1()) await cor2() await t1 loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close() 

Para agregar una función a un bucle de evento ya en ejecución, puede usar:

asyncio.ensure_future(my_coro())

En mi caso, estaba usando subprocesos asyncio ( threading ) junto con asyncio y quería agregar una tarea al bucle de eventos que ya se estaba ejecutando. Para cualquier otra persona en la misma situación, asegúrese de indicar explícitamente el bucle de eventos (ya que no existe uno dentro de un Thread ). es decir:

En ámbito global:

 event_loop = asyncio.get_event_loop() 

Luego, más tarde, dentro de tu Thread :

 asyncio.ensure_future(my_coro(), loop=event_loop) 

Su pregunta está muy cerca de “¿Cómo agregar una llamada de función al progtwig en ejecución?”

¿Exactamente cuándo necesitas agregar un nuevo bucle de rutina al evento?

Veamos algunos ejemplos. Aquí el progtwig que inicia el bucle de eventos con dos coroutines paralelamente:

 import asyncio from random import randint async def coro1(): res = randint(0,3) await asyncio.sleep(res) print('coro1 finished with output {}'.format(res)) return res async def main(): await asyncio.gather( coro1(), coro1() ) # here we have two coroutines running parallely if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main()) 

Salida:

 coro1 finished with output 1 coro1 finished with output 2 [Finished in 2.2s] 

¿Es posible que necesite agregar algunas coroutinas que tomarían los resultados de coro1 y usarlas tan pronto como coro1 listas? En ese caso, simplemente cree una correlación que espere a coro1 y use su valor de retorno:

 import asyncio from random import randint async def coro1(): res = randint(0,3) await asyncio.sleep(res) print('coro1 finished with output {}'.format(res)) return res async def coro2(): res = await coro1() res = res * res await asyncio.sleep(res) print('coro2 finished with output {}'.format(res)) return res async def main(): await asyncio.gather( coro2(), coro2() ) # here we have two coroutines running parallely if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main()) 

Salida:

 coro1 finished with output 1 coro2 finished with output 1 coro1 finished with output 3 coro2 finished with output 9 [Finished in 12.2s] 

Piense en las coroutinas como en las funciones regulares con una syntax específica. Puede iniciar algunos conjuntos de funciones para que se ejecuten de forma paralela (por asyncio.gather ), puede iniciar la siguiente función una vez que asyncio.gather , puede crear nuevas funciones que llamen a otras.

Ninguna de las respuestas aquí parece responder exactamente a la pregunta. Es posible agregar tareas a un bucle de eventos en ejecución haciendo que una tarea “principal” lo haga por usted. No estoy seguro de cuál es la forma más pythonica de asegurarse de que el padre no termine hasta que sus hijos hayan terminado (asumiendo que es el comportamiento que desea), pero esto sí funciona.

 import asyncio import random async def add_event(n): print('starting ' + str(n)) await asyncio.sleep(n) print('ending ' + str(n)) return n async def main(loop): added_tasks = [] delays = [x for x in range(5)] # shuffle to simulate unknown run times random.shuffle(delays) for n in delays: print('adding ' + str(n)) task = loop.create_task(add_event(n)) added_tasks.append(task) await asyncio.sleep(0) print('done adding tasks') # make a list of tasks that (maybe) haven't completed running_tasks = added_tasks[::] # wait until we see that all tasks have completed while running_tasks: running_tasks = [x for x in running_tasks if not x.done()] await asyncio.sleep(0) print('done running tasks') # extract the results from the tasks and return them results = [x.result() for x in added_tasks] return results loop = asyncio.get_event_loop() results = loop.run_until_complete(main(loop)) loop.close() print(results)