Concurrencia limitada con asyncio

Supongamos que tenemos un montón de enlaces para descargar y cada uno de los enlaces puede tomar una cantidad diferente de tiempo para descargar. Y se me permite descargar usando solo 3 conexiones solamente. Ahora, quiero asegurarme de hacer esto eficientemente usando asyncio.

Esto es lo que estoy tratando de lograr: En cualquier momento, intente asegurar que tengo al menos 3 descargas en ejecución.

Connection 1: 1---------7---9--- Connection 2: 2---4----6----- Connection 3: 3-----5---8----- 

Los números representan los enlaces de descarga, mientras que los guiones representan Esperando descarga.

Aquí está el código que estoy usando ahora

 from random import randint import asyncio count = 0 async def download(code, permit_download, no_concurrent, downloading_event): global count downloading_event.set() wait_time = randint(1, 3) print('downloading {} will take {} second(s)'.format(code, wait_time)) await asyncio.sleep(wait_time) # I/O, context will switch to main function print('downloaded {}'.format(code)) count -= 1 if count < no_concurrent and not permit_download.is_set(): permit_download.set() async def main(loop): global count permit_download = asyncio.Event() permit_download.set() downloading_event = asyncio.Event() no_concurrent = 3 i = 0 while i = no_concurrent: permit_download.clear() loop.create_task(download(i, permit_download, no_concurrent, downloading_event)) await downloading_event.wait() # To force context to switch to download function downloading_event.clear() i += 1 else: await permit_download.wait() await asyncio.sleep(9) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main(loop)) finally: loop.close() 

Y la salida es la esperada:

 downloading 0 will take 2 second(s) downloading 1 will take 3 second(s) downloading 2 will take 1 second(s) downloaded 2 downloading 3 will take 2 second(s) downloaded 0 downloading 4 will take 3 second(s) downloaded 1 downloaded 3 downloading 5 will take 2 second(s) downloading 6 will take 2 second(s) downloaded 5 downloaded 6 downloaded 4 downloading 7 will take 1 second(s) downloading 8 will take 1 second(s) downloaded 7 downloaded 8 

Pero aquí están mis preguntas:

  1. En este momento, simplemente estoy esperando por 9 segundos para mantener la función principal en ejecución hasta que las descargas estén completas. ¿Hay una forma eficiente de esperar a que se complete la última descarga antes de salir de la función principal? (Sé que hay asyncio.wait, pero necesitaré almacenar todas las referencias de tareas para que funcione)

  2. ¿Qué es una buena biblioteca que hace este tipo de tareas? Sé que JavaScript tiene muchas bibliotecas asíncronas, pero ¿qué pasa con Python?

Edición: 2. ¿Qué es una buena biblioteca que cuida los patrones asíncronos comunes? (Algo como https://www.npmjs.com/package/async )

Básicamente necesitas un conjunto de tareas de descarga de tamaño fijo. asyncio no viene con esa funcionalidad lista para asyncio , pero es fácil de crear una: simplemente mantenga un conjunto de tareas y no permita que crezca más allá del límite. Aunque la pregunta indica su renuencia a seguir esa ruta, el código resulta mucho más elegante:

 async def download(code): wait_time = randint(1, 3) print('downloading {} will take {} second(s)'.format(code, wait_time)) await asyncio.sleep(wait_time) # I/O, context will switch to main function print('downloaded {}'.format(code)) async def main(loop): no_concurrent = 3 dltasks = set() i = 0 while i < 9: if len(dltasks) >= no_concurrent: # Wait for some download to finish before adding a new one _done, dltasks = await asyncio.wait( dltasks, return_when=asyncio.FIRST_COMPLETED) dltasks.add(loop.create_task(download(i))) i += 1 # Wait for the remaining downloads to finish await asyncio.wait(dltasks) 

Una alternativa es crear un número fijo de corrutinas que realicen la descarga, como un grupo de subprocesos de tamaño fijo, y alimentarlas con un asyncio.Queue . Esto elimina la necesidad de limitar manualmente el número de descargas, que se limitará automáticamente por el número de download() invocaciones download() :

 # download() defined as above async def download_from(q): while True: code = await q.get() if code is None: # pass on the word that we're done, and exit await q.put(None) break await download(code) async def main(loop): q = asyncio.Queue() dltasks = [loop.create_task(download_from(q)) for _ in range(3)] i = 0 while i < 9: await q.put(i) i += 1 # Inform the consumers there is no more work. await q.put(None) await asyncio.wait(dltasks) 

En cuanto a su otra pregunta, la opción obvia sería aiohttp .

Si no me equivoco estás buscando asyncio.Semaphore . Ejemplo de uso:

 import asyncio from random import randint async def download(code): wait_time = randint(1, 3) print('downloading {} will take {} second(s)'.format(code, wait_time)) await asyncio.sleep(wait_time) # I/O, context will switch to main function print('downloaded {}'.format(code)) sem = asyncio.Semaphore(3) async def safe_download(i): async with sem: # semaphore limits num of simultaneous downloads return await download(i) async def main(): tasks = [ asyncio.ensure_future(safe_download(i)) # creating task starts coroutine for i in range(9) ] await asyncio.gather(*tasks) # await moment all downloads done if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() 

Salida:

 downloading 0 will take 3 second(s) downloading 1 will take 3 second(s) downloading 2 will take 1 second(s) downloaded 2 downloading 3 will take 3 second(s) downloaded 1 downloaded 0 downloading 4 will take 2 second(s) downloading 5 will take 1 second(s) downloaded 5 downloaded 3 downloading 6 will take 3 second(s) downloading 7 will take 1 second(s) downloaded 4 downloading 8 will take 2 second(s) downloaded 7 downloaded 8 downloaded 6 

Ejemplo de descarga asíncrona con aiohttp se puede encontrar aquí .