Python asyncio tarea obtuvo mal rendimiento.

Estoy confundido acerca de cómo jugar con el módulo asyncio en Python 3.4. Tengo una API de searching para un motor de búsqueda, y deseo que cada solicitud de búsqueda se ejecute en paralelo o de forma asíncrona, de modo que no tenga que esperar a que una búsqueda finalice para comenzar otra.

Aquí está mi API de búsqueda de alto nivel para construir algunos objetos con los resultados de búsqueda en bruto. El motor de búsqueda en sí está usando algún tipo de mecanismo de asyncio, así que no me molestaré con eso.

 # No asyncio module used here now class search(object): ... self.s = some_search_engine() ... def searching(self, *args, **kwargs): ret = {} # do some raw searching according to args and kwargs and build the wrapped results ... return ret 

Para tratar de sincronizar las solicitudes, escribí el siguiente caso de prueba para probar cómo puedo interactuar mis cosas con el módulo de asyncio .

 # Here is my testing script @asyncio.coroutine def handle(f, *args, **kwargs): r = yield from f(*args, **kwargs) return r s = search() loop = asyncio.get_event_loop() loop.run_until_complete(handle(s.searching, arg1, arg2, ...)) loop.close() 

Al ejecutarse con pytest, devolverá un RuntimeError: Task got bad yield : {results from searching...} , cuando llega a la línea r = yield from ...

También probé de otra manera.

 # same handle as above def handle(..): .... s = search() loop = asyncio.get_event_loop() tasks = [ asyncio.async(handle(s.searching, arg11, arg12, ...)), asyncio.async(handle(s.searching, arg21, arg22, ...)), ... ] loop.run_until_complete(asyncio.wait(tasks)) loop.close() 

Al ejecutar este caso de prueba mediante pytest, pasa, pero surgirá una excepción extraña del motor de búsqueda. Y dice que Future/Task exception was never retrieved .

Cosas que deseo preguntar:

  1. Para mi primer bash, ¿es esa la forma correcta de utilizar el yield from al devolver el resultado real de una llamada de función?
  2. Creo que necesito agregar algo de sueño a mi segundo caso de prueba para esperar a que termine la tarea, pero ¿cómo debo hacer eso? ¿Y cómo puedo hacer que las llamadas a mi función regresen en mi segundo caso de prueba?
  3. ¿Es esa una buena manera de implementar asyncio con un módulo existente, creando un controlador asíncrono para manejar las solicitudes?
  4. Si la respuesta a la pregunta 2 es NO, ¿es necesario que cada cliente llame a la search clase para incluir loop = get_event_loop() este tipo de cosas para asincronizar las solicitudes?

El problema es que no se puede simplemente llamar al código síncrono existente como si fuera un asyncio.coroutine y obtener un comportamiento asíncrono. Cuando se llama el yield from searching(...) , solo se obtendrá un comportamiento asíncrono si la searching sí es en realidad un asyncio.coroutine , o al menos devuelve un asyncio.Future . En este momento, la searching es solo una función síncrona regular, por lo que llamar el yield from searching(...) solo va a yield from searching(...) un error, ya que no devuelve un Future o una rutina.

Para obtener el comportamiento que desea, deberá tener una versión asíncrona de searching además de una versión synchronous (o simplemente eliminar la versión síncrona si no la necesita). Tienes algunas opciones para soportar tanto:

  1. Reescriba la searching como un asyncio.coroutine que usa asyncio compatibles con asyncio para hacer su E / S, en lugar de bloquear la E / S. Esto lo hará funcionar en un contexto de asyncio , pero significa que ya no podrá llamarlo directamente en un contexto síncrono. En su lugar, también deberá proporcionar un método alternativo de searching sincrónica que inicie un bucle de eventos asyncio y llame a return loop.run_until_complete(self.searching(...)) . Vea esta pregunta para más detalles sobre eso.
  2. Mantenga su implementación síncrona de searching y proporcione una API asíncrona alternativa que use BaseEventLoop.run_in_executor para ejecutar su método de searching en un hilo de fondo:

     class search(object): ... self.s = some_search_engine() ... def searching(self, *args, **kwargs): ret = {} ... return ret @asyncio.coroutine def searching_async(self, *args, **kwargs): loop = kwargs.get('loop', asyncio.get_event_loop()) try: del kwargs['loop'] # assuming searching doesn't take loop as an arg except KeyError: pass r = yield from loop.run_in_executor(None, self.searching, *args) # Passing None tells asyncio to use the default ThreadPoolExecutor return r 

    Script de prueba:

     s = search() loop = asyncio.get_event_loop() loop.run_until_complete(s.searching_async(arg1, arg2, ...)) loop.close() 

    De esta manera, puede mantener su código síncrono tal como está, y al menos proporcionar métodos que se puedan usar en el código asyncio sin bloquear el bucle de eventos. No es una solución tan limpia como lo sería si realmente usara E / S asíncrona en su código, pero es mejor que nada.

  3. Proporcione dos versiones de searching completamente separadas, una que use locking de E / S y otra que asyncio compatible con asyncio . Esto proporciona implementaciones ideales para ambos contextos, pero requiere el doble de trabajo.