Python Asyncio, cómo crear y cancelar tareas desde otro hilo.

Tengo una aplicación multi-hilo de python. Quiero ejecutar un bucle asyncio en un subproceso y publicar calbacks y coroutines desde otro subproceso. Debería ser fácil, pero no puedo moverme la cabeza con las cosas de asyncio .

Llegué a la siguiente solución que hace la mitad de lo que quiero, no dude en comentar sobre cualquier cosa:

import asyncio from threading import Thread class B(Thread): def __init__(self): Thread.__init__(self) self.loop = None def run(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) #why do I need that?? self.loop.run_forever() def stop(self): self.loop.call_soon_threadsafe(self.loop.stop) def add_task(self, coro): """this method should return a task object, that I can cancel, not a handle""" f = functools.partial(self.loop.create_task, coro) return self.loop.call_soon_threadsafe(f) def cancel_task(self, xx): #no idea @asyncio.coroutine def test(): while True: print("running") yield from asyncio.sleep(1) b.start() time.sleep(1) #need to wait for loop to start t = b.add_task(test()) time.sleep(10) #here the program runs fine but how can I cancel the task? b.stop() 

Así que iniciar y detener el bucle funciona bien. Pensé en crear una tarea usando create_task, pero ese método no es seguro para subprocesos, así que lo envolví en call_soon_threadsafe. Pero me gustaría poder obtener el objeto de tarea para poder cancelar la tarea. Podría hacer una cosa complicada usando Futuro y Condición, pero debe haber una forma más simple, ¿no es así?

Creo que es posible que deba hacer que su método add_task consciente de si se está llamando desde un subproceso distinto al del bucle de eventos. De esa manera, si se llama desde el mismo hilo, puede llamar directamente a asyncio.async , de lo contrario, puede hacer un trabajo adicional para pasar la tarea del hilo del bucle al hilo de llamada. Aquí hay un ejemplo:

 import time import asyncio import functools from threading import Thread, current_thread, Event from concurrent.futures import Future class B(Thread): def __init__(self, start_event): Thread.__init__(self) self.loop = None self.tid = None self.event = start_event def run(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.tid = current_thread() self.loop.call_soon(self.event.set) self.loop.run_forever() def stop(self): self.loop.call_soon_threadsafe(self.loop.stop) def add_task(self, coro): """this method should return a task object, that I can cancel, not a handle""" def _async_add(func, fut): try: ret = func() fut.set_result(ret) except Exception as e: fut.set_exception(e) f = functools.partial(asyncio.async, coro, loop=self.loop) if current_thread() == self.tid: return f() # We can call directly if we're not going between threads. else: # We're in a non-event loop thread so we use a Future # to get the task from the event loop thread once # it's ready. fut = Future() self.loop.call_soon_threadsafe(_async_add, f, fut) return fut.result() def cancel_task(self, task): self.loop.call_soon_threadsafe(task.cancel) @asyncio.coroutine def test(): while True: print("running") yield from asyncio.sleep(1) event = Event() b = B(event) b.start() event.wait() # Let the loop's thread signal us, rather than sleeping t = b.add_task(test()) # This is a real task time.sleep(10) b.stop() 

Primero, guardamos el ID de hilo del bucle de eventos en el método de run , para poder averiguar si las llamadas a add_task provienen de otros hilos más adelante. Si se llama a add_task desde un subproceso de bucle que no es de evento, usamos call_soon_threadsafe para llamar a una función que progtwigrá la rutina y luego usaremos un concurrent.futures.Future para devolver la tarea al subproceso de llamada, que espera el resultado del Future .

Una nota sobre la cancelación de una tarea: Cuando llame a cancel en una Task , se generará un CancelledError en la próxima vez que se ejecute el bucle de eventos. Esto significa que la rutina que se está ajustando a la tarea se cancelará debido a la excepción la próxima vez que scope un punto de rendimiento, a menos que la rutina detecte el error CancelledError y evite que se aborte. También tenga en cuenta que esto solo funciona si la función que se está envolviendo es en realidad una coroutina interrumpible; un asyncio.Future devuelto por BaseEventLoop.run_in_executor , por ejemplo, no puede ser cancelado realmente, porque en realidad está envuelto alrededor de un concurrent.futures.Future , y esos no pueden ser cancelados una vez que su función subyacente realmente comienza a ejecutarse. En esos casos, asyncio.Future dirá que se canceló, pero la función que se ejecuta en el ejecutor continuará ejecutándose.

Edición: Se actualizó el primer ejemplo para usar concurrent.futures.Future , en lugar de una queue.Queue , según la sugerencia de Andrew Svetlov.

Nota: asyncio.async está en desuso ya que la versión 3.4.4 usa asyncio.ensure_future en asyncio.ensure_future lugar.

Tu haces todo bien. Para detener tareas, hacer método.

 class B(Thread): # ... def cancel(self, task): self.loop.call_soon_threadsafe(task.cancel) 

Por cierto, tiene que configurar un bucle de eventos para el subproceso creado explícitamente por

 self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) 

porque asyncio crea un bucle de eventos implícito solo para el hilo principal.

solo como referencia aquí, el código que finalmente implementé basado en la ayuda que recibí en este sitio, es más sencillo ya que no necesitaba todas las funciones. ¡gracias de nuevo!

 import asyncio from threading import Thread from concurrent.futures import Future import functools class B(Thread): def __init__(self): Thread.__init__(self) self.loop = None def run(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.loop.run_forever() def stop(self): self.loop.call_soon_threadsafe(self.loop.stop) def _add_task(self, future, coro): task = self.loop.create_task(coro) future.set_result(task) def add_task(self, coro): future = Future() p = functools.partial(self._add_task, future, coro) self.loop.call_soon_threadsafe(p) return future.result() #block until result is available def cancel(self, task): self.loop.call_soon_threadsafe(task.cancel) 

Desde la versión 3.4.4, asyncio proporciona una función llamada run_coroutine_threadsafe para enviar un objeto de rutina de un hilo a un bucle de eventos. Devuelve un concurrent.futures.Future para acceder al resultado o cancelar la tarea.

Usando tu ejemplo:

 @asyncio.coroutine def test(loop): try: while True: print("Running") yield from asyncio.sleep(1, loop=loop) except asyncio.CancelledError: print("Cancelled") loop.stop() raise loop = asyncio.new_event_loop() thread = threading.Thread(target=loop.run_forever) future = asyncio.run_coroutine_threadsafe(test(loop), loop) thread.start() time.sleep(5) future.cancel() thread.join()