¿Cómo funciona realmente el asyncio?

Esta pregunta está motivada por mi otra pregunta: ¿Cómo esperar en cdef?

Hay toneladas de artículos y publicaciones en el blog sobre asyncio , pero todos son muy superficiales. No pude encontrar ninguna información sobre cómo se implementa realmente asyncio y qué hace que la E / S sea asíncrona. Estaba tratando de leer el código fuente, pero son miles de líneas que no son del código C de mayor grado, muchas de las cuales tratan con objetos auxiliares, pero lo más importante es que es difícil establecer una conexión entre la syntax de Python y el código C que se traduciría. dentro.

La propia documentación de Asycnio es aún menos útil. No hay información sobre cómo funciona, solo algunas pautas sobre cómo usarlo, que a veces también son engañosas o están mal escritas.

Estoy familiarizado con la implementación de coroutines de Go, y tenía la esperanza de que Python hiciera lo mismo. Si ese fuera el caso, el código que encontré en la publicación vinculada anteriormente habría funcionado. Como no lo hizo, ahora estoy tratando de averiguar por qué. Mi mejor conjetura hasta ahora es la siguiente, corríjame en el lugar en el que estoy equivocado:

  1. Las definiciones de procedimiento de la forma async def foo(): ... realidad se interpretan como métodos de una clase hereditaria de coroutine .
  2. Tal vez, async def se divide realmente en múltiples métodos por las instrucciones de await , donde el objeto, en el que se llama a estos métodos, es capaz de realizar un seguimiento del progreso realizado a través de la ejecución hasta el momento.
  3. Si lo anterior es cierto, entonces, esencialmente, la ejecución de un coroutine se reduce a métodos de llamada del objeto de coroutine por algún administrador global (¿loop?).
  4. El administrador global es consciente de alguna manera (¿cómo?) Cuando las operaciones de E / S se realizan mediante el código Python (¿solo?) Y puede elegir uno de los métodos actuales pendientes para ejecutar después de que el método de ejecución actual renuncie al control (toque en la await statement).

En otras palabras, aquí está mi bash de “desugaring” de alguna syntax de asyncio en algo más comprensible:

 async def coro(name): print('before', name) await asyncio.sleep() print('after', name) asyncio.gather(coro('first'), coro('second')) # translated from async def coro(name) class Coro(coroutine): def before(self, name): print('before', name) def after(self, name): print('after', name) def __init__(self, name): self.name = name self.parts = self.before, self.after self.pos = 0 def __call__(): self.parts[self.pos](self.name) self.pos += 1 def done(self): return self.pos == len(self.parts) # translated from asyncio.gather() class AsyncIOManager: def gather(*coros): while not every(c.done() for c in coros): coro = random.choice(coros) coro() 

Si mi suposición es correcta, entonces tengo un problema. ¿Cómo ocurre realmente la E / S en este escenario? ¿En un hilo aparte? ¿Se suspende todo el intérprete y la E / S ocurre fuera del intérprete? ¿Qué se entiende exactamente por E / S? Si mi procedimiento de python se llama C open() y, a su vez, envía una interrupción al kernel, y le cede el control, ¿cómo sabe esto el intérprete de Python y puede continuar ejecutando otro código, mientras que el kernel hace el I / real? ¿O y hasta que despierte el procedimiento de Python que envió la interrupción originalmente? ¿Cómo puede el intérprete de Python, en principio, ser consciente de que esto está sucediendo?

¿Cómo funciona el asyncio?

Antes de responder a esta pregunta, necesitamos entender algunos términos básicos, omítalos si ya conoce alguno de ellos.

Generadores

Los generadores son objetos que nos permiten suspender la ejecución de una función python. Los generadores seleccionados por el usuario se implementan utilizando el yield palabras clave Al crear una función normal que contiene la palabra clave de yield , convertimos esa función en un generador:

 >>> def test(): ... yield 1 ... yield 2 ... >>> gen = test() >>> next(gen) 1 >>> next(gen) 2 >>> next(gen) Traceback (most recent call last): File "", line 1, in  StopIteration 

Como puede ver, al llamar a next() en el generador, el intérprete carga el marco de la prueba y devuelve el valor de yield . Si se vuelve a llamar a next() , el marco se carga de nuevo en la stack de intérpretes y continúa obteniendo otro valor.

A la tercera vez que se llama a next() , nuestro generador se terminó y se lanzó StopIteration .

Comunicándose con un generador.

Una característica menos conocida de los generadores, es el hecho de que puede comunicarse con ellos mediante dos métodos: send() y throw() .

 >>> def test(): ... val = yield 1 ... print(val) ... yield 2 ... yield 3 ... >>> gen = test() >>> next(gen) 1 >>> gen.send("abc") abc 2 >>> gen.throw(Exception()) Traceback (most recent call last): File "", line 1, in  File "", line 4, in test Exception 

Al llamar a gen.send() , el valor se pasa como un valor de retorno de la palabra clave de yield .

gen.throw() por otro lado, permite lanzar Excepciones dentro de los generadores, con la excepción planteada en el mismo punto en que se llamó el yield .

Devolviendo valores desde generadores

Al StopIteration un valor de un generador, el valor se coloca dentro de la excepción StopIteration . Posteriormente, podemos recuperar el valor de la excepción y utilizarlo según nuestras necesidades.

 >>> def test(): ... yield 1 ... return "abc" ... >>> gen = test() >>> next(gen) 1 >>> try: ... next(gen) ... except StopIteration as exc: ... print(exc.value) ... abc 

He aquí, una nueva palabra clave: yield from

Python 3.4 vino con la adición de una nueva palabra clave: yield from . Lo que esa palabra clave nos permite hacer es transmitir cualquier next() , send() y throw() en un generador nested más interno. Si el generador interno devuelve un valor, también es el valor de retorno del yield from :

 >>> def inner(): ... print((yield 2)) ... return 3 ... >>> def outer(): ... yield 1 ... val = yield from inner() ... print(val) ... yield 4 ... >>> gen = outer() >>> next(gen) 1 >>> next(gen) 2 >>> gen.send("abc") abc 3 4 

Poniendolo todo junto

Al introducir el yield from la nueva palabra clave en Python 3.4, ahora pudimos crear generadores dentro de generadores que, al igual que un túnel, pasan los datos de un lado a otro desde los generadores más internos a los más externos. Esto ha generado un nuevo significado para los generadores: las coroutinas .

Coroutines son funciones que se pueden detener y reanudar mientras se ejecutan. En Python, se definen utilizando la palabra clave async def . Al igual que los generadores, ellos también usan su propia forma de yield from que está a la await . Antes de que se introdujeran async y await en Python 3.5, creamos coroutines exactamente de la misma manera que se crearon los generadores (con yield from lugar de await ).

 async def inner(): return 1 async def outer(): await inner() 

Al igual que todos los iteradores o generadores que implementan el __iter__() , coroutines implementa __await__() que les permite continuar cada vez await coro se llama a await coro .

Hay un buen diagtwig de secuencia dentro de los documentos de Python que deberías revisar.

En asyncio, aparte de las funciones de coroutine, tenemos 2 objetos importantes: tareas y futuros .

Futuros

Los futuros son objetos que tienen el __await__() implementado, y su trabajo es mantener cierto estado y resultado. El estado puede ser uno de los siguientes:

  1. PENDIENTE: el futuro no tiene ningún resultado o excepción establecida.
  2. CANCELADO – el futuro fue cancelado usando fut.cancel()
  3. FINALIZADO: el futuro se terminó, ya sea por un conjunto de resultados usando fut.set_result() o por un conjunto de excepciones usando fut.set_exception()

El resultado, tal como lo ha adivinado, puede ser un objeto de Python, que se devolverá, o una excepción que puede ser provocada.

Otra característica importante de future objetos future , es que contienen un método llamado add_done_callback() . Este método permite llamar a las funciones tan pronto como se realiza la tarea, ya sea que haya generado una excepción o haya finalizado.

Tareas

Los objetos de tarea son futuros especiales, que se envuelven alrededor de las coroutinas y se comunican con las coroutinas más internas y externas. Cada vez que un coroutine await un futuro, el futuro vuelve a la tarea (al igual que en el yield from ), y la tarea lo recibe.

A continuación, la tarea se une al futuro. Lo hace llamando a add_done_callback() en el futuro. De ahora en adelante, si se hará el futuro, ya sea cancelando, pasando una excepción o pasando un objeto Python como resultado, se llamará la callback de la tarea, y volverá a la existencia.

Asyncio

La pregunta final que debemos responder es: ¿cómo se implementa el IO?

Profundo dentro de asyncio, tenemos un bucle de eventos. Un bucle de eventos de tareas. El trabajo del bucle de eventos es llamar a las tareas cada vez que estén listas y coordinar todo ese esfuerzo en una sola máquina de trabajo.

La parte IO del bucle de eventos se basa en una única función crucial llamada select . Select es una función de locking, implementada por el sistema operativo que se encuentra debajo, que permite esperar en los sockets los datos entrantes o salientes. Al recibir los datos, se activa y devuelve los sockets que recibieron los datos o los sockets que están listos para escribir.

Cuando intenta recibir o enviar datos a través de un socket a través de asyncio, lo que realmente sucede a continuación es que el socket primero se comprueba si tiene algún dato que pueda leerse o enviarse inmediatamente. Si el .send() está lleno, o el .recv() está vacío, el socket se registra en la función de select (simplemente agregándolo a una de las listas, rlist para recv y wlist para send ) y el correspondiente La función await un nuevo objeto future creado, vinculado a ese zócalo.

Cuando todas las tareas disponibles están a la espera de futuros, el bucle de eventos se select y espera. Cuando el uno de los sockets tiene datos entrantes, o se ha agotado el búfer de send , asyncio comprueba el objeto futuro vinculado a ese socket y lo configura en listo.

Ahora toda la magia pasa. El futuro está listo, la tarea que se agregó antes con add_done_callback() vuelve a la vida, y llama a .send() a la rutina que reanuda la más interna (debido a la cadena de await ) y lees los datos recién recibidos de un búfer cercano fueron dertwigdos.

Método de la cadena de nuevo, en caso de recv() :

  1. select.select espera.
  2. Se devuelve un socket listo, con datos.
  3. Los datos del socket se mueven a un búfer.
  4. future.set_result() .
  5. La tarea que se agregó con add_done_callback() ahora está add_done_callback() .
  6. La tarea llama a .send() en la coroutine que va hasta la coroutine más interna y la despierta.
  7. Los datos se leen desde el búfer y se devuelven a nuestro humilde usuario.

En resumen, asyncio utiliza las capacidades del generador, que permiten pausar y reanudar funciones. Utiliza el yield from capacidades que permiten pasar datos desde el generador más interno hasta el más externo. Utiliza todos esos para detener la ejecución de la función mientras espera que se complete la E / S (mediante la función de select del sistema operativo).

¿Y lo mejor de todo? Mientras una de las funciones está en pausa, otra puede ejecutarse e intercalarse con el delicado tejido, que es asíntico.

Hablar de async/await asyncio y asyncio no es lo mismo. La primera es una construcción fundamental de bajo nivel (coroutines), mientras que la última es una biblioteca que utiliza estas construcciones. A la inversa, no hay una única respuesta final.

La siguiente es una descripción general de cómo funcionan las bibliotecas async/await asyncio y asyncioasyncio . Es decir, puede haber otros trucos en la parte superior (hay …) pero son intrascendentes a menos que los construyas tú mismo. La diferencia debe ser despreciable a menos que ya sepa lo suficiente como para no tener que hacer una pregunta de este tipo.

1. Coroutines versus subrutinas en una shell de nuez

Al igual que las subrutinas (funciones, procedimientos, …), las corrutinas (generadores, …) son una abstracción de la stack de llamadas y el puntero de instrucciones: hay una stack de piezas de código en ejecución, y cada una está en una instrucción específica.

La distinción de def contra async def es meramente por claridad. La diferencia real es el return versus el yield . A partir de esto, await o yield from la diferencia de llamadas individuales a stacks completas.

1.1. Subrutinas

Una subrutina representa un nuevo nivel de stack para contener variables locales y un recorrido único de sus instrucciones para llegar a su fin. Considere una subrutina como esta:

 def subfoo(bar): qux = 3 return qux * bar 

Cuando lo ejecutas, eso significa

  1. asignar espacio de stack para bar y qux
  2. Ejecutar recursivamente la primera instrucción y saltar a la siguiente instrucción
  3. Una vez en un return , empuje su valor a la stack que llama
  4. Borrar la stack (1) y el puntero de instrucciones (2).

Notablemente, 4. significa que una subrutina siempre comienza en el mismo estado. Todo lo exclusivo de la función en sí se pierde al finalizar. Una función no se puede reanudar, incluso si hay instrucciones después de la return .

 root -\ : \- subfoo --\ :/--<---return --/ | V 

1.2. Coroutines como subrutinas persistentes.

Una coroutina es como una subrutina, pero puede salir sin destruir su estado. Considera una coroutina como esta:

  def cofoo(bar): qux = yield bar # yield marks a break point return qux 

Cuando lo ejecutas, eso significa

  1. asignar espacio de stack para bar y qux
  2. Ejecutar recursivamente la primera instrucción y saltar a la siguiente instrucción
    1. una vez en un yield , empuje su valor a la stack de llamada pero almacene la stack y el puntero de instrucción
    2. una vez que llama al yield , restaure la stack y el puntero de instrucción y presione los argumentos a qux
  3. Una vez en un return , empuje su valor a la stack que llama
  4. Borrar la stack (1) y el puntero de instrucciones (2).

Tenga en cuenta la adición de 2.1 y 2.2: una coroutina se puede suspender y reanudar en puntos predefinidos. Esto es similar a cómo se suspende una subrutina al llamar a otra subrutina. La diferencia es que la coroutina activa no está estrictamente vinculada a su stack de llamadas. En cambio, una coroutina suspendida es parte de una stack separada y aislada.

 root -\ : \- cofoo --\ :/--<+--yield --/ | : V : 

Esto significa que las coroutinas suspendidas pueden almacenarse libremente o moverse entre stacks. Cualquier stack de llamadas que tenga acceso a una rutina puede decidir reanudarla.

1.3. Atravesando la stack de llamadas

Hasta ahora, nuestra coroutine solo baja la stack de llamadas con yield . Una subrutina puede subir y bajar la stack de llamadas con return y () . Para completar, las rutinas también necesitan un mecanismo para subir la stack de llamadas. Considera una coroutina como esta:

 def wrap(): yield 'before' yield from cofoo() yield 'after' 

Cuando lo ejecutas, eso significa que aún asigna la stack y el puntero de instrucciones como una subrutina. Cuando se suspende, todavía es como almacenar una subrutina.

Sin embargo, el yield from ambos hace. Suspende la stack y el puntero de instrucción de la wrap y ejecuta el cofoo . Tenga en cuenta que la wrap permanece suspendida hasta que el cofoo termine por completo. Cada vez que cofoo suspende o se envía algo, cofoo se conecta directamente a la stack de llamadas.

1.4. Coroutines hasta el fondo.

Según lo establecido, el yield from permite conectar dos ámbitos a través de otro intermedio. Cuando se aplica recursivamente, eso significa que la parte superior de la stack se puede conectar a la parte inferior de la stack.

 root -\ : \-> coro_a -yield-from-> coro_b --\ :/ <-+------------------------yield ---/ | : :\ --+-- coro_a.send----------yield ---\ : coro_b <-/ 

Tenga en cuenta que root y coro_b no se conocen entre sí. Esto hace que las coroutinas sean mucho más limpias que las devoluciones de llamada: las coroutinas aún se basan en una relación 1: 1 como las subrutinas. Coroutines suspende y reanuda toda su stack de ejecución existente hasta un punto de llamada regular.

Cabe destacar que la root podría tener un número arbitrario de coroutines para reanudar. Sin embargo, nunca se puede reanudar más de uno al mismo tiempo. ¡Los coroutines de la misma raíz son concurrentes pero no paralelos!

1.5. Python async y await

Hasta ahora, la explicación ha utilizado explícitamente el yield y el yield from vocabulario de los generadores: la funcionalidad subyacente es la misma. La nueva syntax de Python3.5 async y await existe principalmente para mayor claridad.

 def foo(): # subroutine? return None def foo(): # coroutine? yield from foofoo() # generator? coroutine? async def foo(): # coroutine! await foofoo() # coroutine! return None 

El async for y async with sentencias son necesarios porque usted rompería el yield from/await cadena yield from/await con las afirmaciones simples for y with .

2. Anatomía de un evento simple.

Por sí misma, una coroutine no tiene ningún concepto de ceder el control a otra coroutine. Solo puede ceder el control a la persona que llama en la parte inferior de una stack de coroutine. Esta persona que llama puede cambiar a otra computadora central y ejecutarla.

Este nodo raíz de varias coroutinas es comúnmente un bucle de eventos : en suspensión, una corutina produce un evento en el que quiere reanudarse. A su vez, el bucle de eventos es capaz de esperar eficientemente a que estos eventos ocurran. Esto le permite decidir qué rutina ejecutará a continuación, o cómo esperar antes de reanudar.

Un diseño de este tipo implica que hay un conjunto de eventos predefinidos que el bucle entiende. Varios coroutines se await , hasta que finalmente se await un evento. Este evento puede comunicarse directamente con el bucle de eventos mediante el control.

 loop -\ : \-> coroutine --await--> event --\ :/ <-+----------------------- yield --/ | : | : # loop waits for event to happen | : :\ --+-- send(reply) -------- yield --\ : coroutine <--yield-- event <-/ 

La clave es que la suspensión correcta permite que el bucle de eventos y los eventos se comuniquen directamente. La stack intermedia de coroutine no requiere ningún conocimiento sobre qué bucle lo está ejecutando, ni cómo funcionan los eventos.

2.1.1. Eventos en el tiempo

El evento más simple de manejar es alcanzar un punto en el tiempo. Este es también un bloque fundamental de código roscado: un hilo sleep repetidamente hasta que se cumple una condición. Sin embargo, una ejecución regular bloquea la ejecución por sí misma: queremos que otras coroutinas no se bloqueen. En su lugar, queremos decirle al bucle de eventos cuándo debe reanudar la stack de rutina actual.

2.1.2. Definiendo un evento

Un evento es simplemente un valor que podemos identificar, ya sea a través de una enumeración, un tipo u otra identidad. Podemos definir esto con una clase simple que almacena nuestro tiempo objective. Además de almacenar la información del evento, podemos permitirle await una clase directamente.

 class AsyncSleep: """Event to sleep until a point in time""" def __init__(self, until: float): self.until = until # used whenever someone ``await``s an instance of this Event def __await__(self): # yield this Event to the loop yield self def __repr__(self): return '%s(until=%.1f)' % (self.__class__.__name__, self.until) 

Esta clase solo almacena el evento, no dice cómo manejarlo realmente.

La única característica especial es __await__ : es lo que busca la palabra clave __await__ . En la práctica, es un iterador, pero no está disponible para la maquinaria de iteración normal.

2.2.1. A la espera de un evento

Ahora que tenemos un evento, ¿cómo reactjsn los coroutines? Deberíamos poder express el equivalente de sleep await nuestro evento. Para ver mejor lo que está pasando, esperamos dos veces durante la mitad del tiempo:

 import time async def asleep(duration: float): """await that ``duration`` seconds pass""" await AsyncSleep(time.time() + duration / 2) await AsyncSleep(time.time() + duration / 2) 

Podemos instanciar y ejecutar directamente esta coroutine. Similar a un generador, el uso de coroutine.send ejecuta la coroutine hasta que yield un resultado.

 coroutine = asleep(100) while True: print(coroutine.send(None)) time.sleep(0.1) 

Esto nos da dos eventos de AsyncSleep y luego una StopIteration cuando se realiza la rutina. Tenga en cuenta que el único retraso es de time.sleep en el bucle! Cada AsyncSleep solo almacena un desplazamiento de la hora actual.

2.2.2. Evento + sueño

En este punto, tenemos dos mecanismos separados a nuestra disposición:

  • AsyncSleep Eventos que pueden producirse desde el interior de una coroutine.
  • time.sleep que puede esperar sin impactar a las coroutinas.

En particular, estos dos son ortogonales: ninguno afecta o activa al otro. Como resultado, podemos idear nuestra propia estrategia para sleep para enfrentar el retraso de un AsyncSleep .

2.3. Un bucle de eventos ingenuo.

Si tenemos varias coroutinas, cada una nos puede decir cuándo quiere que nos despierten. Luego podemos esperar hasta que el primero de ellos quiera reanudarse, luego el siguiente, y así sucesivamente. Cabe destacar que en cada punto solo nos importa cuál es el siguiente .

Esto hace que para una progtwigción sencilla:

  1. Clasificar coroutines por su tiempo de despertar deseado
  2. Elige el primero que quiera despertar.
  3. espera hasta este punto en el tiempo
  4. corre esta coroutine
  5. repetir desde 1.

Una implementación trivial no necesita conceptos avanzados. Una list permite ordenar coroutines por fecha. La espera es un tiempo regular. time.sleep . Ejecutar coroutines funciona igual que antes con coroutine.send .

 def run(*coroutines): """Cooperatively run all ``coroutines`` until completion""" # store wake-up-time and coroutines waiting = [(0, coroutine) for coroutine in coroutines] while waiting: # 2. pick the first coroutine that wants to wake up until, coroutine = waiting.pop(0) # 3. wait until this point in time time.sleep(max(0.0, until - time.time())) # 4. run this coroutine try: command = coroutine.send(None) except StopIteration: continue # 1. sort coroutines by their desired suspension if isinstance(command, AsyncSleep): waiting.append((command.until, coroutine)) waiting.sort(key=lambda item: item[0]) 

Por supuesto, esto tiene un amplio margen de mejora. Podemos usar un montón para la cola de espera o una tabla de envío para eventos. También podríamos obtener valores de retorno de StopIteration y asignarlos a la rutina. Sin embargo, el principio fundamental sigue siendo el mismo.

2.4. Espera cooperativa

El evento AsyncSleep y el evento run loop son una implementación completamente funcional de eventos progtwigdos.

 async def sleepy(identifier: str = "coroutine", count=5): for i in range(count): print(identifier, 'step', i + 1, 'at %.2f' % time.time()) await asleep(0.1) run(*(sleepy("coroutine %d" % j) for j in range(5))) 

Esto cambia de manera cooperativa entre cada una de las cinco coroutinas, suspendiendo cada una por 0.1 segundos. Aunque el bucle de eventos es síncrono, sigue ejecutando el trabajo en 0,5 segundos en lugar de 2,5 segundos. Cada coroutine mantiene el estado y actúa independientemente.

3. Bucle de eventos de E / S

Un bucle de eventos que admite la sleep es adecuado para el sondeo . Sin embargo, la espera de E / S en un identificador de archivo se puede hacer de manera más eficiente: el sistema operativo implementa E / S y, por lo tanto, sabe qué controladores están listos. Idealmente, un bucle de eventos debería admitir un evento explícito "listo para E / S".

3.1. La llamada select

Python ya tiene una interfaz para consultar el sistema operativo para leer los identificadores de E / S. Cuando se llama con los controladores para leer o escribir, devuelve los controladores listos para leer o escribir:

 readable, writeable, _ = select.select(rlist, wlist, xlist, timeout) 

Por ejemplo, podemos open un archivo para escribir y esperar a que esté listo:

 write_target = open('/tmp/foo') readable, writeable, _ = select.select([], [write_target], []) 

Una vez que seleccione devoluciones, writeable contiene nuestro archivo abierto.

3.2. Evento básico de E / S

De forma similar a la solicitud de AsyncSleep , necesitamos definir un evento para E / S. Con la lógica de select subyacente, el evento debe referirse a un objeto legible, digamos un archivo open . Además, almacenamos la cantidad de datos para leer.

 class AsyncRead: def __init__(self, file, amount=1): self.file = file self.amount = amount self._buffer = '' def __await__(self): while len(self._buffer) < self.amount: yield self # we only get here if ``read`` should not block self._buffer += self.file.read(1) return self._buffer def __repr__(self): return '%s(file=%s, amount=%d, progress=%d)' % ( self.__class__.__name__, self.file, self.amount, len(self._buffer) ) 

Al igual que con AsyncSleep , principalmente almacenamos los datos necesarios para la llamada al sistema subyacente. Esta vez, __await__ se puede reanudar varias veces, hasta que se haya leído nuestra amount deseada. Además, return el resultado de E / S en lugar de simplemente reanudarlo.

3.3. Aumentar un bucle de eventos con E / S de lectura

La base de nuestro bucle de eventos sigue siendo la run definida anteriormente. Primero, necesitamos rastrear las solicitudes de lectura. Esto ya no es un progtwig ordenado, solo mapeamos las solicitudes de lectura a las rutinarias.

 # new waiting_read = {} # type: Dict[file, coroutine] 

Como select.select toma un parámetro de tiempo de espera, podemos usarlo en lugar de time.sleep .

 # old time.sleep(max(0.0, until - time.time())) # new readable, _, _ = select.select(list(reads), [], []) 

Esto nos da todos los archivos legibles - si hay alguno, ejecutamos la correspondiente coroutine. Si no hay ninguno, hemos esperado el tiempo suficiente para que se ejecute nuestra rutina actual.

 # new - reschedule waiting coroutine, run readable coroutine if readable: waiting.append((until, coroutine)) waiting.sort() coroutine = waiting_read[readable[0]] 

Finalmente, tenemos que escuchar las solicitudes de lectura.

 # new if isinstance(command, AsyncSleep): ... elif isinstance(command, AsyncRead): ... 

3.4. Poniendo todo junto

Lo anterior fue un poco de simplificación. Necesitamos hacer algunos cambios para no morir de hambre, si es que siempre podemos leer. Necesitamos manejar tener nada que leer o nada que esperar. Sin embargo, el resultado final todavía se ajusta a 30 LOC.

 def run(*coroutines): """Cooperatively run all ``coroutines`` until completion""" waiting_read = {} # type: Dict[file, coroutine] waiting = [(0, coroutine) for coroutine in coroutines] while waiting or waiting_read: # 2. wait until the next coroutine may run or read ... try: until, coroutine = waiting.pop(0) except IndexError: until, coroutine = float('inf'), None readable, _, _ = select.select(list(waiting_read), [], []) else: readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time())) # ... and select the appropriate one if readable and time.time() < until: if until and coroutine: waiting.append((until, coroutine)) waiting.sort() coroutine = waiting_read.pop(readable[0]) # 3. run this coroutine try: command = coroutine.send(None) except StopIteration: continue # 1. sort coroutines by their desired suspension ... if isinstance(command, AsyncSleep): waiting.append((command.until, coroutine)) waiting.sort(key=lambda item: item[0]) # ... or register reads elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine 

3.5. Cooperativa de E / S

AsyncSleep , AsyncRead y las implementaciones de ejecución ahora son completamente funcionales para dormir y / o leer. Igual que para sleepy , podemos definir un ayudante para probar la lectura:

 async def ready(path, amount=1024*32): print('read', path, 'at', '%d' % time.time()) with open(path, 'rb') as file: result = return await AsyncRead(file, amount) print('done', path, 'at', '%d' % time.time()) print('got', len(result), 'B') run(sleepy('background', 5), ready('/dev/urandom')) 

Al ejecutar esto, podemos ver que nuestra E / S está intercalada con la tarea en espera:

 id background round 1 read /dev/urandom at 1530721148 id background round 2 id background round 3 id background round 4 id background round 5 done /dev/urandom at 1530721148 got 1024 B 

4. E / S sin locking

Si bien I / O en los archivos entiende el concepto, no es realmente adecuado para una biblioteca como asyncio : la llamada de select siempre vuelve para los archivos , y tanto la open como la read pueden bloquearse indefinidamente . Esto bloquea todas las secuencias de un ciclo de eventos, lo cual es malo. Las bibliotecas como aiofiles utilizan subprocesos y sincronización para falsificar E / S sin locking y eventos en el archivo.

Sin embargo, los sockets permiten la E / S sin locking, y su latencia inherente lo hace mucho más crítico. Cuando se usa en un bucle de eventos, la espera de datos y el rebash pueden ajustarse sin bloquear nada.

4.1. Evento de I / O sin locking

Similar a nuestro AsyncRead , podemos definir un evento de suspender y leer para sockets. En lugar de tomar un archivo, tomamos un socket, que no debe ser bloqueado. Además, nuestro __await__ usa socket.recv lugar de file.read .

 class AsyncRecv: def __init__(self, connection, amount=1, read_buffer=1024): assert not connection.getblocking(), 'connection must be non-blocking for async recv' self.connection = connection self.amount = amount self.read_buffer = read_buffer self._buffer = b'' def __await__(self): while len(self._buffer) < self.amount: try: self._buffer += self.connection.recv(self.read_buffer) except BlockingIOError: yield self return self._buffer def __repr__(self): return '%s(file=%s, amount=%d, progress=%d)' % ( self.__class__.__name__, self.connection, self.amount, len(self._buffer) ) 

A diferencia de AsyncRead , __await__ realiza una E / S realmente sin locking. Cuando los datos están disponibles, siempre se lee. Cuando no hay datos disponibles, siempre se suspende. Eso significa que el bucle de eventos solo se bloquea mientras realizamos un trabajo útil.

4.2. Deslocking del bucle de eventos

En lo que respecta al bucle de eventos, nada cambia mucho. El evento a escuchar sigue siendo el mismo que para los archivos: un descriptor de archivos marcado como listo para select .

 # old elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine # new elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine elif isinstance(command, AsyncRecv): waiting_read[command.connection] = coroutine 

En este punto, debería ser obvio que AsyncRead y AsyncRecv son el mismo tipo de evento. Podríamos fácilmente refactorizarlos para que sean un evento con un componente de E / S intercambiable. En efecto, el bucle de eventos, las rutinas y los eventos separan claramente un planificador, un código intermedio arbitrario y la E / S real.

4.3. El lado feo de la E / S sin locking

En principio, lo que debe hacer en este punto es replicar la lógica de read como un recv para AsyncRecv . Sin embargo, esto es mucho más feo ahora: tiene que manejar los rendimientos tempranos cuando las funciones se bloquean dentro del kernel, pero el control de rendimiento es para usted. Por ejemplo, abrir una conexión en lugar de abrir un archivo es mucho más largo:

 # file file = open(path, 'rb') # non-blocking socket connection = socket.socket() connection.setblocking(False) # open without blocking - retry on failure try: connection.connect((url, port)) except BlockingIOError: pass 

En pocas palabras, lo que queda son unas pocas docenas de líneas de manejo de Excepciones. Los eventos y el bucle de eventos ya funcionan en este punto.

 id background round 1 read localhost:25000 at 1530783569 read /dev/urandom at 1530783569 done localhost:25000 at 1530783569 got 32768 B id background round 2 id background round 3 id background round 4 done /dev/urandom at 1530783569 got 4096 B id background round 5 

Apéndice

Código de ejemplo en github

Your coro desugaring is conceptually correct, but slightly incomplete.

await doesn’t suspend unconditionally, but only if it encounters a blocking call. How does it know that a call is blocking? This is decided by the code being awaited. For example, an awaitable implementation of socket read could be desugared to:

 def read(sock, n): # sock must be in non-blocking mode try: return sock.recv(n) except EWOULDBLOCK: event_loop.add_reader(sock.fileno, current_task()) return SUSPEND 

In real asyncio the equivalent code modifies the state of a Future instead of returning magic values, but the concept is the same. When appropriately adapted to a generator-like object, the above code can be await ed.

On the caller side, when your coroutine contains:

 data = await read(sock, 1024) 

It desugars into something close to:

 data = read(sock, 1024) if data is SUSPEND: return SUSPEND self.pos += 1 self.parts[self.pos](...) 

People familiar with generators tend to describe the above in terms of yield from which does the suspension automatically.

The suspension chain continues all the way up to the event loop, which notices that the coroutine is suspended, removes it from the runnable set, and goes on to execute coroutines that are runnable, if any. If no coroutines are runnable, the loop waits in select() until either a file descriptor a coroutine is interested in becomes ready for IO. (The event loop maintains a file-descriptor-to-coroutine mapping.)

In the above example, once select() tells the event loop that sock is readable, it will re-add coro to the runnable set, so it will be continued from the point of suspension.

En otras palabras:

  1. Everything happens in the same thread by default.

  2. The event loop is responsible for scheduling the coroutines and waking them up when whatever they were waiting for (typically an IO call that would normally block, or a timeout) becomes ready.

For insight on coroutine-driving event loops, I recommend this talk by Dave Beazley, where he demonstrates coding an event loop from scratch in front of live audience.

It all boils down to the two main challenges that asyncio is addressing:

  • How to perform multiple I/O in a single thread?
  • How to implement cooperative multitasking?

The answer to the first point has been around for a long while and is called a select loop . In python, it is implemented in the selectors module .

The second question is related to the concept of coroutine , ie functions that can stop their execution and be restred later on. In python, coroutines are implemented using generators and the yield from statement. That’s what is hiding behind the async/await syntax .

More resources in this answer .


EDIT: Addressing your comment about goroutines:

The closest equivalent to a goroutine in asyncio is actually not a coroutine but a task (see the difference in the documentation ). In python, a coroutine (or a generator) knows nothing about the concepts of event loop or I/O. It simply is a function that can stop its execution using yield while keeping its current state, so it can be restred later on. The yield from syntax allows for chaining them in a transparent way.

Now, within an asyncio task, the coroutine at the very bottom of the chain always ends up yielding a future . This future then bubbles up to the event loop, and gets integrated into the inner machinery. When the future is set to done by some other inner callback, the event loop can restre the task by sending the future back into the coroutine chain.


EDIT: Addressing some of the questions in your post:

How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter?

No, nothing happens in a thread. I/O is always managed by the event loop, mostly through file descriptors. However the registration of those file descriptors is usually hidden by high-level coroutines, making the dirty work for you.

What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?

An I/O is any blocking call. In asyncio, all the I/O operations should go through the event loop, because as you said, the event loop has no way to be aware that a blocking call is being performed in some synchronous code. That means you’re not supposed to use a synchronous open within the context of a coroutine. Instead, use a dedicated library such aiofiles which provides an asynchronous version of open .