¿Qué es un buen algoritmo de limitación de velocidad?

Podría usar algún pseudocódigo, o mejor, Python. Estoy intentando implementar una cola de limitación de velocidad para un bot IRC de Python, y funciona parcialmente, pero si alguien activa menos mensajes que el límite (por ejemplo, el límite de velocidad es de 5 mensajes por 8 segundos, y la persona solo activa 4), y el siguiente activador es durante los 8 segundos (p. ej., 16 segundos más tarde), el bot envía el mensaje, pero la cola se llena y el bot espera 8 segundos, aunque no es necesario ya que ha transcurrido el período de 8 segundos.

Aquí, el algoritmo más simple , si solo quiere soltar los mensajes cuando llegan demasiado rápido (en lugar de ponerlos en cola, lo que tiene sentido porque la cola puede ser arbitrariamente grande):

rate = 5.0; // unit: messages per = 8.0; // unit: seconds allowance = rate; // unit: messages last_check = now(); // floating-point, eg usec accuracy. Unit: seconds when (message_received): current = now(); time_passed = current - last_check; last_check = current; allowance += time_passed * (rate / per); if (allowance > rate): allowance = rate; // throttle if (allowance < 1.0): discard_message(); else: forward_message(); allowance -= 1.0; 

No hay estructuras de datos, temporizadores, etc. en esta solución y funciona limpiamente 🙂 Para ver esto, la "asignación" crece a una velocidad de 5/8 unidades por segundo como máximo, es decir, como máximo cinco unidades por ocho segundos. Todos los mensajes que se reenvían deducen una unidad, por lo que no puede enviar más de cinco mensajes por cada ocho segundos.

Tenga en cuenta que la rate debe ser un número entero, es decir, sin una parte decimal que no sea cero, o el algoritmo no funcionará correctamente (la tasa real no será rate/per ). Por ejemplo, rate=0.5; per=1.0; rate=0.5; per=1.0; no funciona porque el allowance nunca crecerá a 1.0. Pero la rate=1.0; per=2.0; rate=1.0; per=2.0; funciona bien.

Use este decorador @RateLimited (ratepersec) antes de su función que se pone en cola.

Básicamente, esto comprueba si han transcurrido 1 / segundos desde la última vez y, si no, espera el rest del tiempo, de lo contrario no espera. Esto efectivamente te limita a la velocidad / seg. El decorador se puede aplicar a cualquier función que desee con una tasa limitada.

En su caso, si desea un máximo de 5 mensajes por 8 segundos, use @RateLimited (0.625) antes de la función sendToQueue.

 import time def RateLimited(maxPerSecond): minInterval = 1.0 / float(maxPerSecond) def decorate(func): lastTimeCalled = [0.0] def rateLimitedFunction(*args,**kargs): elapsed = time.clock() - lastTimeCalled[0] leftToWait = minInterval - elapsed if leftToWait>0: time.sleep(leftToWait) ret = func(*args,**kargs) lastTimeCalled[0] = time.clock() return ret return rateLimitedFunction return decorate @RateLimited(2) # 2 per second at most def PrintNumber(num): print num if __name__ == "__main__": print "This should print 1,2,3... at about 2 per second." for i in range(1,100): PrintNumber(i) 

Un Token Bucket es bastante simple de implementar.

Comience con un cubo con 5 fichas.

Cada 5/8 segundos: si el grupo tiene menos de 5 fichas, agregue una.

Cada vez que desee enviar un mensaje: Si el depósito tiene ≥1 token, saque un token y envíe el mensaje. De lo contrario, espere / suelte el mensaje / lo que sea.

(Obviamente, en el código real, usarías un contador de enteros en lugar de tokens reales y puedes optimizar cada paso de 5/8 al almacenar las marcas de tiempo)


Leyendo la pregunta de nuevo, si el límite de frecuencia se restablece por completo cada 8 segundos, entonces aquí hay una modificación:

Comience con una marca de tiempo, last_send , hace mucho tiempo (por ejemplo, en la época). Además, comienza con el mismo cubo de 5 tokens.

Golpea la regla de cada 5/8 segundos.

Cada vez que envíe un mensaje: primero, verifique si el last_send ≥ 8 segundos atrás. Si es así, llena el cubo (configúralo en 5 fichas). Segundo, si hay tokens en el depósito, envíe el mensaje (de lo contrario, suelte / espere / etc.). En tercer lugar, establezca last_send a ahora.

Eso debería funcionar para ese escenario.


De hecho, he escrito un bot IRC usando una estrategia como esta (el primer enfoque). Está en Perl, no en Python, pero aquí hay un código para ilustrar:

La primera parte aquí trata de agregar fichas al cubo. Puede ver la optimización de agregar tokens en función del tiempo (de la 2ª a la última línea) y luego la última línea sujeta el contenido del grupo al máximo (MESSAGE_BURST)

  my $start_time = time; ... # Bucket handling my $bucket = $conn->{fujiko_limit_bucket}; my $lasttx = $conn->{fujiko_limit_lasttx}; $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL; ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST; 

$ conn es una estructura de datos que se pasa alrededor. Esto se encuentra dentro de un método que se ejecuta de manera rutinaria (se calcula cuándo la próxima vez tendrá algo que hacer y se duerme durante tanto tiempo o hasta que reciba tráfico de red). La siguiente parte del método maneja el envío. Es bastante complicado, porque los mensajes tienen prioridades asociadas con ellos.

  # Queue handling. Start with the ultimate queue. my $queues = $conn->{fujiko_queues}; foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) { # Ultimate is special. We run ultimate no matter what. Even if # it sends the bucket negative. --$bucket; $entry->{code}(@{$entry->{args}}); } $queues->[PRIORITY_ULTIMATE] = []; 

Esa es la primera cola, que se ejecuta sin importar qué. Incluso si nuestra conexión muere por inundación. Se utiliza para cosas extremadamente importantes, como responder al PING del servidor. A continuación, el rest de las colas:

  # Continue to the other queues, in order of priority. QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) { my $queue = $queues->[$pri]; while (scalar(@$queue)) { if ($bucket < 1) { # continue later. $need_more_time = 1; last QRUN; } else { --$bucket; my $entry = shift @$queue; $entry->{code}(@{$entry->{args}}); } } } 

Finalmente, el estado del depósito se guarda de nuevo en la estructura de datos $ conn (en realidad, un poco más adelante en el método; primero calcula qué tan pronto tendrá más trabajo)

  # Save status. $conn->{fujiko_limit_bucket} = $bucket; $conn->{fujiko_limit_lasttx} = $start_time; 

Como puede ver, el código de manejo del cucharón real es muy pequeño: aproximadamente cuatro líneas. El rest del código es el manejo de colas de prioridad. El bot tiene colas de prioridad, por lo que, por ejemplo, alguien que conversa con él no puede evitar que cumpla con sus importantes obligaciones de patear / prohibir.

para bloquear el procesamiento hasta que se pueda enviar el mensaje, por lo que se pueden enviar más mensajes a la cola, la hermosa solución de Antti también se puede modificar de la siguiente manera:

 rate = 5.0; // unit: messages per = 8.0; // unit: seconds allowance = rate; // unit: messages last_check = now(); // floating-point, eg usec accuracy. Unit: seconds when (message_received): current = now(); time_passed = current - last_check; last_check = current; allowance += time_passed * (rate / per); if (allowance > rate): allowance = rate; // throttle if (allowance < 1.0): time.sleep( (1-allowance) * (per/rate)) forward_message(); allowance = 0.0; else: forward_message(); allowance -= 1.0; 

simplemente espera hasta que haya suficiente asignación para enviar el mensaje. para no comenzar con dos veces la velocidad, la asignación también se puede inicializar con 0.

Mantén el tiempo que las últimas cinco líneas fueron enviadas. Mantenga los mensajes en cola hasta que el quinto mensaje más reciente (si existe) haya pasado al menos 8 segundos (con last_five como una matriz de veces):

 now = time.time() if len(last_five) == 0 or (now - last_five[-1]) >= 8.0: last_five.insert(0, now) send_message(msg) if len(last_five) > 5: last_five.pop() 

Una solución es adjuntar una marca de tiempo a cada elemento de la cola y descartar el elemento después de que hayan transcurrido 8 segundos. Puede realizar esta comprobación cada vez que se agregue la cola a.

Esto solo funciona si limita el tamaño de la cola a 5 y descarta cualquier adición mientras la cola esté llena.

Si alguien sigue interesado, uso esta clase simple y llamable junto con un almacenamiento de valor de clave LRU cronometrado para limitar la tasa de solicitud por IP. Utiliza un deque, pero se puede reescribir para que se use con una lista.

 from collections import deque import time class RateLimiter: def __init__(self, maxRate=5, timeUnit=1): self.timeUnit = timeUnit self.deque = deque(maxlen=maxRate) def __call__(self): if self.deque.maxlen == len(self.deque): cTime = time.time() if cTime - self.deque[0] > self.timeUnit: self.deque.append(cTime) return False else: return True self.deque.append(time.time()) return False r = RateLimiter() for i in range(0,100): time.sleep(0.1) print(i, "block" if r() else "pass") 

Solo una implementación en python de un código de respuesta aceptada.

 import time class Object(object): pass def get_throttler(rate, per): scope = Object() scope.allowance = rate scope.last_check = time.time() def throttler(fn): current = time.time() time_passed = current - scope.last_check; scope.last_check = current; scope.allowance = scope.allowance + time_passed * (rate / per) if (scope.allowance > rate): scope.allowance = rate if (scope.allowance < 1): pass else: fn() scope.allowance = scope.allowance - 1 return throttler 

Qué tal esto:

 long check_time = System.currentTimeMillis(); int msgs_sent_count = 0; private boolean isRateLimited(int msgs_per_sec) { if (System.currentTimeMillis() - check_time > 1000) { check_time = System.currentTimeMillis(); msgs_sent_count = 0; } if (msgs_sent_count > (msgs_per_sec - 1)) { return true; } else { msgs_sent_count++; } return false; } 

Necesitaba una variación en Scala. Aquí está:

 case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) { import Thread.sleep private def now = System.currentTimeMillis / 1000.0 private val (calls, sec) = callsPerSecond private var allowance = 1.0 private var last = now def apply(a: A): B = { synchronized { val t = now val delta_t = t - last last = t allowance += delta_t * (calls / sec) if (allowance > calls) allowance = calls if (allowance < 1d) { sleep(((1 - allowance) * (sec / calls) * 1000d).toLong) } allowance -= 1 } f(a) } } 

Aquí es cómo se puede utilizar:

 val f = Limiter((5d, 8d), { _: Unit ⇒ println(System.currentTimeMillis) }) while(true){f(())}