Haciendo peticiones de 1 millón con aiohttp / asyncio – literalmente

Seguí este tutorial: https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html y todo funciona bien cuando hago unas 50 000 solicitudes. Pero necesito hacer 1 mil llamadas a la API y luego tengo un problema con este código:

url = "http://some_url.com/?id={}" tasks = set() sem = asyncio.Semaphore(MAX_SIM_CONNS) for i in range(1, LAST_ID + 1): task = asyncio.ensure_future(bound_fetch(sem, url.format(i))) tasks.add(task) responses = asyncio.gather(*tasks) return await responses 

Debido a que Python necesita crear tareas de 1 millón, básicamente se retrasa y luego imprime el mensaje ” Killed ” en la terminal. ¿Hay alguna forma de usar un generador de un conjunto (o lista) de urls prefabricados? Gracias.

asyncio está ligado a la memoria (como cualquier otro progtwig). No puedes generar más tareas que la memoria pueda contener. Mi conjetura es que has alcanzado un límite de memoria. Compruebe dmesg para más información.

1 millón de RPS no significa que haya 1M tareas. Una tarea puede hacer varias solicitudes en el mismo segundo.

Progtwigr todas las tareas de 1 millón a la vez

Este es el código del que estás hablando. Toma hasta 3 GB de RAM, por lo que es fácil que el sistema operativo lo finalice si tiene poca memoria libre.

 import asyncio from aiohttp import ClientSession MAX_SIM_CONNS = 50 LAST_ID = 10**6 async def fetch(url, session): async with session.get(url) as response: return await response.read() async def bound_fetch(sem, url, session): async with sem: await fetch(url, session) async def fetch_all(): url = "http://localhost:8080/?id={}" tasks = set() async with ClientSession() as session: sem = asyncio.Semaphore(MAX_SIM_CONNS) for i in range(1, LAST_ID + 1): task = asyncio.create_task(bound_fetch(sem, url.format(i), session)) tasks.add(task) return await asyncio.gather(*tasks) if __name__ == '__main__': asyncio.run(fetch_all()) 

Use la cola para agilizar el trabajo

Esta es mi sugerencia sobre cómo usar asyncio.Queue para pasar las URL a las tareas de los trabajadores. La cola se llena según sea necesario, no hay una lista de direcciones URL prefabricada.

Solo se necesitan 30 MB de RAM 🙂

 import asyncio from aiohttp import ClientSession MAX_SIM_CONNS = 50 LAST_ID = 10**6 async def fetch(url, session): async with session.get(url) as response: return await response.read() async def fetch_worker(url_queue): async with ClientSession() as session: while True: url = await url_queue.get() try: if url is None: # all work is done return response = await fetch(url, session) # ...do something with the response finally: url_queue.task_done() # calling task_done() is necessary for the url_queue.join() to work correctly async def fetch_all(): url = "http://localhost:8080/?id={}" url_queue = asyncio.Queue(maxsize=100) worker_tasks = [] for i in range(MAX_SIM_CONNS): wt = asyncio.create_task(fetch_worker(url_queue)) worker_tasks.append(wt) for i in range(1, LAST_ID + 1): await url_queue.put(url.format(i)) for i in range(MAX_SIM_CONNS): # tell the workers that the work is done await url_queue.put(None) await url_queue.join() await asyncio.gather(*worker_tasks) if __name__ == '__main__': asyncio.run(fetch_all())