Funciones asíncronas de regulación en Python Asyncio

Tengo una list de awaitables que quiero pasar al asyncio.AbstractEventLoop pero necesito acelerar las solicitudes a una API de terceros.

Me gustaría evitar algo que espere para pasar el future al bucle porque mientras tanto, locking mi bucle esperando. ¿Que opciones tengo? Semaphores y ThreadPools limitarán la cantidad de progtwigs que se ejecutan simultáneamente, pero ese no es mi problema. Necesito acelerar mis solicitudes a 100 / seg, pero no importa cuánto tiempo lleve completar la solicitud.

Este es un ejemplo muy conciso (que no funciona) que utiliza la biblioteca estándar, que demuestra el problema. Se supone que esto se acelera a 100 / seg, pero se acelera a 116.651 / seg. ¿Cuál es la mejor manera de regular la progtwigción de una solicitud asíncrona en asyncio ?

Código de trabajo:

 import asyncio from threading import Lock class PTBNL: def __init__(self): self._req_id_seq = 0 self._futures = {} self._results = {} self.token_bucket = TokenBucket() self.token_bucket.set_rate(100) def run(self, *awaitables): loop = asyncio.get_event_loop() if not awaitables: loop.run_forever() elif len(awaitables) == 1: return loop.run_until_complete(*awaitables) else: future = asyncio.gather(*awaitables) return loop.run_until_complete(future) def sleep(self, secs) -> True: self.run(asyncio.sleep(secs)) return True def get_req_id(self) -> int: new_id = self._req_id_seq self._req_id_seq += 1 return new_id def start_req(self, key): loop = asyncio.get_event_loop() future = loop.create_future() self._futures[key] = future return future def end_req(self, key, result=None): future = self._futures.pop(key, None) if future: if result is None: result = self._results.pop(key, []) if not future.done(): future.set_result(result) def req_data(self, req_id, obj): # Do Some Work Here self.req_data_end(req_id) pass def req_data_end(self, req_id): print(req_id, " has ended") self.end_req(req_id) async def req_data_async(self, obj): req_id = self.get_req_id() future = self.start_req(req_id) self.req_data(req_id, obj) await future return future.result() async def req_data_batch_async(self, contracts): futures = [] FLAG = False for contract in contracts: req_id = self.get_req_id() future = self.start_req(req_id) futures.append(future) nap = self.token_bucket.consume(1) if FLAG is False: FLAG = True start = asyncio.get_event_loop().time() asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract) await asyncio.gather(*futures) elapsed = asyncio.get_event_loop().time() - start return futures, len(contracts)/elapsed class TokenBucket: def __init__(self): self.tokens = 0 self.rate = 0 self.last = asyncio.get_event_loop().time() self.lock = Lock() def set_rate(self, rate): with self.lock: self.rate = rate self.tokens = self.rate def consume(self, tokens): with self.lock: if not self.rate: return 0 now = asyncio.get_event_loop().time() lapse = now - self.last self.last = now self.tokens += lapse * self.rate if self.tokens > self.rate: self.tokens = self.rate self.tokens -= tokens if self.tokens >= 0: return 0 else: return -self.tokens / self.rate if __name__ == '__main__': asyncio.get_event_loop().set_debug(True) app = PTBNL() objs = [obj for obj in range(500)] l,t = app.run(app.req_data_batch_async(objs)) print(l) print(t) 

Edición: He agregado un ejemplo simple de TrottleTestApp aquí usando semáforos, pero aún no puedo acelerar la ejecución:

 import asyncio import time class ThrottleTestApp: def __init__(self): self._req_id_seq = 0 self._futures = {} self._results = {} self.sem = asyncio.Semaphore() async def allow_requests(self, sem): """Permit 100 requests per second; call loop.create_task(allow_requests()) at the beginning of the program to start this routine. That call returns a task handle that can be canceled to end this routine. asyncio.Semaphore doesn't give us a great way to get at the value other than accessing sem._value. We do that here, but creating a wrapper that adds a current_value method would make this cleaner""" while True: while sem._value  True: self.run(asyncio.sleep(secs)) return True def get_req_id(self) -> int: new_id = self._req_id_seq self._req_id_seq += 1 return new_id def start_req(self, key): loop = asyncio.get_event_loop() future = loop.create_future() self._futures[key] = future return future def end_req(self, key, result=None): future = self._futures.pop(key, None) if future: if result is None: result = self._results.pop(key, []) if not future.done(): future.set_result(result) def req_data(self, req_id, obj): # This is the method that "does" something self.req_data_end(req_id) pass def req_data_end(self, req_id): print(req_id, " has ended") self.end_req(req_id) async def req_data_batch_async(self, objs): futures = [] FLAG = False for obj in objs: req_id = self.get_req_id() future = self.start_req(req_id) futures.append(future) if FLAG is False: FLAG = True start = time.time() self.do_request(req_id, obj) await asyncio.gather(*futures) elapsed = time.time() - start print("Roughly %s per second" % (len(objs)/elapsed)) return futures if __name__ == '__main__': asyncio.get_event_loop().set_debug(True) app = ThrottleTestApp() objs = [obj for obj in range(10000)] app.run(app.req_data_batch_async(objs)) 

Puedes hacer esto implementando el algoritmo de cubo con fugas :

 import asyncio import contextlib import collections import time from types import TracebackType from typing import Dict, Optional, Type try: # Python 3.7 base = contextlib.AbstractAsyncContextManager _current_task = asyncio.current_task except AttributeError: base = object # type: ignore _current_task = asyncio.Task.current_task # type: ignore class AsyncLeakyBucket(base): """A leaky bucket rate limiter. Allows up to max_rate / time_period acquisitions before blocking. time_period is measured in seconds; the default is 60. """ def __init__( self, max_rate: float, time_period: float = 60, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: self._loop = loop self._max_level = max_rate self._rate_per_sec = max_rate / time_period self._level = 0.0 self._last_check = 0.0 # queue of waiting futures to signal capacity to self._waiters: Dict[asyncio.Task, asyncio.Future] = collections.OrderedDict() def _leak(self) -> None: """Drip out capacity from the bucket.""" if self._level: # drip out enough level for the elapsed time since # we last checked elapsed = time.time() - self._last_check decrement = elapsed * self._rate_per_sec self._level = max(self._level - decrement, 0) self._last_check = time.time() def has_capacity(self, amount: float = 1) -> bool: """Check if there is enough space remaining in the bucket""" self._leak() requested = self._level + amount # if there are tasks waiting for capacity, signal to the first # there there may be some now (they won't wake up until this task # yields with an await) if requested < self._max_level: for fut in self._waiters.values(): if not fut.done(): fut.set_result(True) break return self._level + amount <= self._max_level async def acquire(self, amount: float = 1) -> None: """Acquire space in the bucket. If the bucket is full, block until there is space. """ if amount > self._max_level: raise ValueError("Can't acquire more than the bucket capacity") loop = self._loop or asyncio.get_event_loop() task = _current_task(loop) assert task is not None while not self.has_capacity(amount): # wait for the next drip to have left the bucket # add a future to the _waiters map to be notified # 'early' if capacity has come up fut = loop.create_future() self._waiters[task] = fut try: await asyncio.wait_for( asyncio.shield(fut), 1 / self._rate_per_sec * amount, loop=loop ) except asyncio.TimeoutError: pass fut.cancel() self._waiters.pop(task, None) self._level += amount return None async def __aenter__(self) -> None: await self.acquire() return None async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc: Optional[BaseException], tb: Optional[TracebackType] ) -> None: return None 

Tenga en cuenta que perdemos capacidad del depósito de manera oportunista, no es necesario ejecutar una tarea asíncrona separada solo para bajar el nivel; en cambio, la capacidad se filtra cuando se prueba la capacidad restante suficiente.

Tenga en cuenta que las tareas que esperan la capacidad se guardan en un diccionario ordenado, y cuando puede haber capacidad de sobra nuevamente, la primera tarea que aún está en espera se despierta temprano.

Puedes usar esto como un administrador de contexto; tratando de adquirir el cazo cuando está lleno de bloques hasta que se haya liberado nuevamente la capacidad suficiente:

 bucket = AsyncLeakyBucket(100) # ... async with bucket: # only reached once the bucket is no longer full 

o puede llamar a acquire() directamente:

 await bucket.acquire() # blocks until there is space in the bucket 

o simplemente puedes probar si hay espacio primero:

 if bucket.has_capacity(): # reject a request due to rate limiting 

Tenga en cuenta que puede contar algunas solicitudes como ‘más pesadas’ o ‘más ligeras’ aumentando o disminuyendo la cantidad que ‘gotea’ en el cubo:

 await bucket.acquire(10) if bucket.has_capacity(0.5): 

Tenga cuidado con esto sin embargo; cuando se mezclan gotas grandes y pequeñas, las gotas pequeñas tienden a correr antes que las gotas grandes cuando están en o cerca de la velocidad máxima, porque existe una mayor probabilidad de que haya suficiente capacidad libre para una gota más pequeña antes de que haya espacio para una más grande.

Manifestación:

 >>> import asyncio, time >>> bucket = AsyncLeakyBucket(5, 10) >>> async def task(id): ... await asyncio.sleep(id * 0.01) ... async with bucket: ... print(f'{id:>2d}: Drip! {time.time() - ref:>5.2f}') ... >>> ref = time.time() >>> tasks = [task(i) for i in range(15)] >>> result = asyncio.run(asyncio.wait(tasks)) 0: Drip! 0.00 1: Drip! 0.02 2: Drip! 0.02 3: Drip! 0.03 4: Drip! 0.04 5: Drip! 2.05 6: Drip! 4.06 7: Drip! 6.06 8: Drip! 8.06 9: Drip! 10.07 10: Drip! 12.07 11: Drip! 14.08 12: Drip! 16.08 13: Drip! 18.08 14: Drip! 20.09 

El cubo se llena rápidamente al comienzo en una ráfaga, lo que hace que el rest de las tareas se distribuyan de manera más equitativa; cada 2 segundos se libera suficiente capacidad para que otra tarea sea manejada.

El tamaño máximo de ráfaga es igual al valor de la tasa máxima, en la demostración anterior que se estableció en 5. Si no desea permitir ráfagas, establezca la tasa máxima en 1 y el período de tiempo en el tiempo mínimo entre goteos:

 >>> bucket = AsyncLeakyBucket(1, 1.5) # no bursts, drip every 1.5 seconds >>> async def task(): ... async with bucket: ... print(f'Drip! {time.time() - ref:>5.2f}') ... >>> ref = time.time() >>> tasks = [task() for _ in range(5)] >>> result = asyncio.run(asyncio.wait(tasks)) Drip! 0.00 Drip! 1.50 Drip! 3.01 Drip! 4.51 Drip! 6.02 

Otra solución, mediante el uso de semáforos acotados, por parte de un compañero de trabajo, un mentor y un amigo, es la siguiente:

 import asyncio class AsyncLeakyBucket(object): def __init__(self, max_tasks: float, time_period: float = 60, loop: asyncio.events=None): self._delay_time = time_period / max_tasks self._sem = asyncio.BoundedSemaphore(max_tasks) self._loop = loop or asyncio.get_event_loop() self._loop.create_task(self._leak_sem()) async def _leak_sem(self): """ Background task that leaks semaphore releases based on the desired rate of tasks per time_period """ while True: await asyncio.sleep(self._delay_time) try: self._sem.release() except ValueError: pass async def __aenter__(self) -> None: await self._sem.acquire() async def __aexit__(self, exc_type, exc, tb) -> None: pass 

Se puede seguir utilizando con el mismo código async with bucket que en la respuesta de @Martijn