¿Cómo ejecutar tareas simultáneamente en asyncio?

Estoy tratando de aprender cómo ejecutar tareas simultáneamente usando el módulo asyncio de Python. En el siguiente código, tengo un “rastreador web” simulado para un ejemplo. Básicamente, estoy tratando de llegar donde hay un máximo de dos solicitudes activas fetch () que ocurren en un momento dado, y quiero que se llame a process () durante el período de suspensión ().

import asyncio class Crawler(): urlq = ['http://www.google.com', 'http://www.yahoo.com', 'http://www.cnn.com', 'http://www.gamespot.com', 'http://www.facebook.com', 'http://www.evergreen.edu'] htmlq = [] MAX_ACTIVE_FETCHES = 2 active_fetches = 0 def __init__(self): pass async def fetch(self, url): self.active_fetches += 1 print("Fetching URL: " + url); await(asyncio.sleep(2)) self.active_fetches -= 1 self.htmlq.append(url) async def crawl(self): while self.active_fetches < self.MAX_ACTIVE_FETCHES: if self.urlq: url = self.urlq.pop() task = asyncio.create_task(self.fetch(url)) await task else: print("URL queue empty") break; def process(self, page): print("processed page: " + page) # main loop c = Crawler() while(c.urlq): asyncio.run(c.crawl()) while c.htmlq: page = c.htmlq.pop() c.process(page) 

Sin embargo, el código anterior descarga las URL una por una (no de dos en dos al mismo tiempo) y no realiza ningún “procesamiento” hasta que se hayan recuperado todas las URL. ¿Cómo puedo hacer que las tareas fetch () se ejecuten al mismo tiempo, y hacer que se llame al proceso () durante la suspensión ()?

Su método de crawl está esperando después de cada tarea individual; Deberías cambiarlo por esto:

 async def crawl(self): tasks = [] while self.active_fetches < self.MAX_ACTIVE_FETCHES: if self.urlq: url = self.urlq.pop() tasks.append(asyncio.create_task(self.fetch(url))) await asyncio.gather(*tasks) 

EDITAR : Aquí hay una versión más limpia con comentarios que se recuperan y procesan todos al mismo tiempo, a la vez que se conserva la capacidad básica de poner un límite a la cantidad máxima de captadores.

 import asyncio class Crawler: def __init__(self, urls, max_workers=2): self.urls = urls # create a queue that only allows a maximum of two items self.fetching = asyncio.Queue() self.max_workers = max_workers async def crawl(self): # DON'T await here; start consuming things out of the queue, and # meanwhile execution of this function continues. We'll start two # coroutines for fetching and two coroutines for processing. all_the_coros = asyncio.gather( *[self._worker(i) for i in range(self.max_workers)]) # place all URLs on the queue for url in self.urls: await self.fetching.put(url) # now put a bunch of `None`'s in the queue as signals to the workers # that there are no more items in the queue. for _ in range(self.max_workers): await self.fetching.put(None) # now make sure everything is done await all_the_coros async def _worker(self, i): while True: url = await self.fetching.get() if url is None: # this coroutine is done; simply return to exit return print(f'Fetch worker {i} is fetching a URL: {url}') page = await self.fetch(url) self.process(page) async def fetch(self, url): print("Fetching URL: " + url); await asyncio.sleep(2) return f"the contents of {url}" def process(self, page): print("processed page: " + page) # main loop c = Crawler(['http://www.google.com', 'http://www.yahoo.com', 'http://www.cnn.com', 'http://www.gamespot.com', 'http://www.facebook.com', 'http://www.evergreen.edu']) asyncio.run(c.crawl()) 

Puedes hacer htmlq an asyncio.Queue() y cambiar htmlq.append a htmlq.push . Entonces tu main puede ser asíncrono, así:

 async def main(): c = Crawler() asyncio.create_task(c.crawl()) while True: page = await c.htmlq.get() if page is None: break c.process(page) 

Su código de nivel superior se reduce a una llamada a asyncio.run(main()) .

Una vez que haya terminado con el rastreo, crawl() puede poner en cola None para notificar a la organización principal que se ha realizado el trabajo.