¿Cómo usar asyncio con la librería de locking existente?

Tengo pocas funciones de locking foo , bar y no puedo cambiarlas (algunas bibliotecas internas no las controlo. Habla con uno o más servicios de red). ¿Cómo lo uso como asíncrono ?. Por ejemplo, no quiero hacer lo siguiente.

 results = [] for inp in inps: val = foo(inp) result = bar(val) results.append(result) 

Esto será ineficaz, ya que puedo llamar a foo para la segunda entrada mientras estoy esperando la primera bar misma. ¿Cómo los envuelvo de tal manera que se puedan usar con asyncio (es decir, nuevo async , await syntax)?

Asummos que las funciones son reingresantes. es decir, está bien volver a llamar a foo cuando ya se está procesando un foo anterior.


Actualizar

Ampliando respuesta con decorador reutilizable. Haga clic aquí por ejemplo.

 def run_in_executor(f): @functools.wraps(f) def inner(*args, **kwargs): loop = asyncio.get_running_loop() return loop.run_in_executor(None, functools.partial(f, *args, **kwargs)) return inner 

Hay (más o menos) dos preguntas aquí: primero, cómo ejecutar el código de locking de forma asíncrona, y segundo, cómo ejecutar el código asíncrono en paralelo (asyncio es de un solo hilo, por lo que GIL aún se aplica, por lo que no es realmente concurrente, pero yo divago).

Las tareas paralelas se pueden crear usando asyncio.ensure_future, como se documenta aquí .

Para ejecutar un código síncrono, deberá ejecutar el código de locking en un ejecutor . Ejemplo:

 import concurrent.futures import asyncio import time def blocking(delay): time.sleep(delay) print('Completed.') async def non_blocking(loop, executor): # Run three of the blocking tasks concurrently. asyncio.wait will # automatically wrap these in Tasks. If you want explicit access # to the tasks themselves, use asyncio.ensure_future, or add a # "done, pending = asyncio.wait..." assignment await asyncio.wait( fs={ # Returns after delay=12 seconds loop.run_in_executor(executor, blocking, 12), # Returns after delay=14 seconds loop.run_in_executor(executor, blocking, 14), # Returns after delay=16 seconds loop.run_in_executor(executor, blocking, 16) }, return_when=asyncio.ALL_COMPLETED ) loop = asyncio.get_event_loop() executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) loop.run_until_complete(non_blocking(loop, executor)) 

Si desea progtwigr estas tareas utilizando un bucle for (como en su ejemplo), tiene varias estrategias diferentes, pero el enfoque subyacente es progtwigr las tareas utilizando el bucle for (o comprensión de lista, etc.), esperarlas con asyncio. Espera, y luego recupera los resultados. Ejemplo:

 done, pending = await asyncio.wait( fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps], return_when=asyncio.ALL_COMPLETED ) # Note that any errors raise during the above will be raised here; to # handle errors you will need to call task.exception() and check if it # is not None before calling task.result() results = [task.result() for task in done] 

Ampliando la respuesta aceptada para resolver realmente el problema en cuestión.

Nota: Requiere python 3.7+

 import functools from urllib.request import urlopen import asyncio def legacy_blocking_function(): # You cannot change this function r = urlopen("https://example.com") return r.read().decode() def run_in_executor(f): @functools.wraps(f) def inner(*args, **kwargs): loop = asyncio.get_running_loop() return loop.run_in_executor(None, lambda: f(*args, **kwargs)) return inner @run_in_executor def foo(arg): # Your wrapper for async use resp = legacy_blocking_function() return f"{arg}{len(resp)}" @run_in_executor def bar(arg): # Another wrapper resp = legacy_blocking_function() return f"{len(resp)}{arg}" async def process_input(inp): # Modern async function (coroutine) res = await foo(inp) res = f"XXX{res}XXX" return await bar(res) async def main(): inputs = ["one", "two", "three"] input_tasks = [asyncio.create_task(process_input(inp)) for inp in inputs] print([await t for t in asyncio.as_completed(input_tasks)]) # This doesn't work as expected :( # print([await t for t in asyncio.as_completed([process_input(inp) for inp in input_tasks])]) if __name__ == '__main__': asyncio.run(main()) 

Haga clic aquí para obtener una versión actualizada de este ejemplo y para enviar solicitudes de extracción.