¿Cómo crear un generador asíncrono en Python?

Estoy tratando de reescribir este código Python2.7 al nuevo orden mundial asíncrono:

def get_api_results(func, iterable): pool = multiprocessing.Pool(5) for res in pool.map(func, iterable): yield res 

map() bloquea hasta que se hayan calculado todos los resultados, por lo que estoy tratando de volver a escribir esto como una implementación asíncrona que dará resultados tan pronto como estén listos. Al igual que map() , los valores de retorno deben devolverse en el mismo orden que iterable . Intenté esto (necesito requests debido a requisitos de autenticación heredados):

 import requests def get(i): r = requests.get('https://example.com/api/items/%s' % i) return i, r.json() async def get_api_results(): loop = asyncio.get_event_loop() futures = [] for n in range(1, 11): futures.append(loop.run_in_executor(None, get, n)) async for f in futures: k, v = await f yield k, v for r in get_api_results(): print(r) 

pero con Python 3.6 estoy recibiendo:

  File "scratch.py", line 16, in  for r in get_api_results(): TypeError: 'async_generator' object is not iterable 

¿Cómo puedo lograr esto?

Con respecto a su código anterior (2.7), el multiprocesamiento se considera un poderoso reemplazo directo del módulo de subprocesos mucho más simple para procesar simultáneamente tareas intensivas de CPU, donde los subprocesos no funcionan tan bien. Es probable que su código no esté vinculado a la CPU, ya que solo necesita realizar solicitudes HTTP, y los subprocesos podrían haber sido suficientes para resolver su problema.

Sin embargo, en lugar de usar threading directamente, Python 3+ tiene un módulo agradable llamado concurrent.futures que con una API más limpia a través de clases geniales. Este módulo está disponible también para Python 2.7 como un paquete externo .

El siguiente código funciona en python 2 y python 3:

 # For python 2, first run: # # pip install futures # from __future__ import print_function import requests from concurrent import futures URLS = [ 'http://httpbin.org/delay/1', 'http://httpbin.org/delay/3', 'http://httpbin.org/delay/6', 'http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.coooom/', ] def fetch(url): r = requests.get(url) r.raise_for_status() return r.content def fetch_all(urls): with futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(fetch, url): url for url in urls} print("All URLs submitted.") for future in futures.as_completed(future_to_url): url = future_to_url[future] if future.exception() is None: yield url, future.result() else: # print('%r generated an exception: %s' % ( # url, future.exception())) yield url, None for url, s in fetch_all(URLS): status = "{:,.0f} bytes".format(len(s)) if s is not None else "Failed" print('{}: {}'.format(url, status)) 

Este código utiliza futures.ThreadPoolExecutor , basado en subprocesos. Mucha de la magia está en as_completed() usada aquí.

Tu código de Python 3.6 anterior, usa run_in_executor() que crea un futures.ProcessPoolExecutor() , ¡¡y realmente no usa IO asíncrono !!

Si realmente desea seguir adelante con asyncio, deberá utilizar un cliente HTTP que admita asyncio, como aiohttp . Aquí hay un código de ejemplo:

 import asyncio import aiohttp async def fetch(session, url): print("Getting {}...".format(url)) async with session.get(url) as resp: text = await resp.text() return "{}: Got {} bytes".format(url, len(text)) async def fetch_all(): async with aiohttp.ClientSession() as session: tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay)) for delay in (1, 1, 2, 3, 3)] for task in asyncio.as_completed(tasks): print(await task) return "Done." loop = asyncio.get_event_loop() resp = loop.run_until_complete(fetch_all()) print(resp) loop.close() 

Como puede ver, asyncio también tiene un as_completed() , que ahora utiliza IO asíncrono real, y utiliza solo un hilo en un proceso.

Pones tu bucle de eventos en otra co-rutina. No hagas eso El bucle de eventos es el “controlador” más externo del código asíncrono y debe ejecutarse de forma síncrona.

Si necesita procesar los resultados obtenidos, escriba más coroutines que lo hagan. Podrían tomar los datos de una cola, o podrían estar conduciendo la búsqueda directamente.

Podría tener una función principal que recupere y procese resultados, por ejemplo:

 async def main(loop): for n in range(1, 11): future = loop.run_in_executor(None, get, n) k, v = await future # do something with the result loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) 

Haría la función get() correctamente asíncrona también usando una biblioteca asíncrona como aiohttp para que no tenga que usar el ejecutor en absoluto.