Python threading.timer – repite la función cada ‘n’ segundos

Estoy teniendo dificultades con el temporizador de python y agradecería mucho algún consejo o ayuda: D

No tengo mucho conocimiento de cómo funcionan los subprocesos, pero solo quiero disparar una función cada 0.5 segundos y poder iniciar y detener y reiniciar el temporizador.

Sin embargo, sigo recibiendo RuntimeError: threads can only be started once cuando ejecuto threading.timer.start() dos veces. ¿Hay una solución para esto? Intenté aplicar threading.timer.cancel() antes de cada inicio.

Pseudo código:

 t=threading.timer(0.5,function) while True: t.cancel() t.start() 

La mejor manera es iniciar el hilo del temporizador una vez. Dentro de tu hilo de temporizador codificarías lo siguiente

 class MyThread(Thread): def __init__(self, event): Thread.__init__(self) self.stopped = event def run(self): while not self.stopped.wait(0.5): print("my thread") # call a function 

En el código que inició el temporizador, puede set el evento detenido para detener el temporizador.

 stopFlag = Event() thread = MyThread(stopFlag) thread.start() # this will stop the timer stopFlag.set() 

Usando hilos de temporizador-

 from threading import Timer,Thread,Event class perpetualTimer(): def __init__(self,t,hFunction): self.t=t self.hFunction = hFunction self.thread = Timer(self.t,self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t,self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): print 'ipsem lorem' t = perpetualTimer(5,printer) t.start() 

Esto puede ser detenido por t.cancel()

De Equivalente de setInterval en python :

 import threading def setInterval(interval): def decorator(function): def wrapper(*args, **kwargs): stopped = threading.Event() def loop(): # executed in another thread while not stopped.wait(interval): # until stopped function(*args, **kwargs) t = threading.Thread(target=loop) t.daemon = True # stop if the program exits t.start() return stopped return wrapper return decorator 

Uso:

 @setInterval(.5) def function(): "..." stop = function() # start timer, the first call is in .5 seconds stop.set() # stop the loop stop = function() # start new timer # ... stop.set() 

O aquí está la misma funcionalidad pero como una función independiente en lugar de un decorador :

 cancel_future_calls = call_repeatedly(60, print, "Hello, World") # ... cancel_future_calls() 

Aquí es cómo hacerlo sin utilizar hilos .

Con el fin de proporcionar una respuesta correcta utilizando el temporizador como solicitó el OP, mejoraré la respuesta de swapnil jariwala :

 from threading import Timer import time class InfiniteTimer(): """A Timer class that does not stop, unless you want it to.""" def __init__(self, seconds, target): self._should_continue = False self.is_running = False self.seconds = seconds self.target = target self.thread = None def _handle_target(self): self.is_running = True self.target() self.is_running = False self._start_timer() def _start_timer(self): if self._should_continue: # Code could have been running when cancel was called. self.thread = Timer(self.seconds, self._handle_target) self.thread.start() def start(self): if not self._should_continue and not self.is_running: self._should_continue = True self._start_timer() else: print("Timer already started or running, please wait if you're restarting.") def cancel(self): if self.thread is not None: self._should_continue = False # Just in case thread is running and cancel fails. self.thread.cancel() else: print("Timer never started or failed to initialize.") def tick(): print('ipsem lorem') # Example Usage t = InfiniteTimer(0.5, tick) t.start() 

El tiempo de importación es opcional sin el uso del ejemplo.

Mejorando un poco la respuesta de Hans Then , podemos subclasificar la función del temporizador. Lo siguiente se convierte en nuestro código completo de “temporizador de repetición”, y se puede usar como un reemplazo directo para el subprocesamiento. Temporizador con todos los mismos argumentos:

 from threading import Timer class RepeatTimer(Timer): def run(self): while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs) 

Ejemplo de uso:

 def dummyfn(msg="foo"): print(msg) timer = RepeatTimer(1, dummyfn) timer.start() time.sleep(5) timer.cancel() 

produce la siguiente salida:

 foo foo foo foo 

y

 timer = RepeatTimer(1, dummyfn, args=("bar",)) timer.start() time.sleep(5) timer.cancel() 

produce

 bar bar bar bar 

He cambiado algún código en swapnil-jariwala para hacer un pequeño reloj de consola.

 from threading import Timer, Thread, Event from datetime import datetime class PT(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def printer(): tempo = datetime.today() h,m,s = tempo.hour, tempo.minute, tempo.second print(f"{h}:{m}:{s}") t = PT(1, printer) t.start() 

SALIDA

 >>> 11:39:11 11:39:12 11:39:13 11:39:14 11:39:15 11:39:16 ... 

Temporizador con una interfaz gráfica tkinter

Este código pone el reloj temporizador en una pequeña ventana con tkinter

 from threading import Timer, Thread, Event from datetime import datetime import tkinter as tk app = tk.Tk() lab = tk.Label(app, text="Timer will start in a sec") lab.pack() class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): tempo = datetime.today() clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second) try: lab['text'] = clock except RuntimeError: exit() t = perpetualTimer(1, printer) t.start() app.mainloop() 

Un ejemplo de juego de tarjetas (tipo de)

 from threading import Timer, Thread, Event from datetime import datetime class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() x = datetime.today() start = x.second def printer(): global questions, counter, start x = datetime.today() tempo = x.second if tempo - 3 > start: show_ans() #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="") print() print("-" + questions[counter]) counter += 1 if counter == len(answers): counter = 0 def show_ans(): global answers, c2 print("It is {}".format(answers[c2])) c2 += 1 if c2 == len(answers): c2 = 0 questions = ["What is the capital of Italy?", "What is the capital of France?", "What is the capital of England?", "What is the capital of Spain?"] answers = "Rome", "Paris", "London", "Madrid" counter = 0 c2 = 0 print("Get ready to answer") t = perpetualTimer(3, printer) t.start() 

salida:

 Get ready to answer >>> -What is the capital of Italy? It is Rome -What is the capital of France? It is Paris -What is the capital of England? ... 

Tuve que hacer esto para un proyecto. Lo que terminé haciendo fue iniciar un hilo separado para la función

 t = threading.Thread(target =heartbeat, args=(worker,)) t.start() 

**** Heartbeat es mi función, el trabajador es uno de mis argumentos ****

Dentro de mi función de latido del corazón:

 def heartbeat(worker): while True: time.sleep(5) #all of my code 

Entonces, cuando comienzo el hilo, la función repetidamente esperará 5 segundos, ejecutará todo mi código y lo hará indefinidamente. Si quieres matar el proceso solo mata el hilo.

He implementado una clase que funciona como un temporizador.

Dejo el enlace aquí por si alguien lo necesita: https://github.com/ivanhalencp/python/tree/master/xTimer

 from threading import Timer def TaskManager(): #do stuff t = Timer( 1, TaskManager ) t.start() TaskManager() 

Aquí hay una pequeña muestra, ayudará a entender mejor cómo funciona. function taskManager () al final crea una llamada de función retrasada a sí mismo.

Intenta cambiar la variable “dalay” y podrás ver la diferencia

 from threading import Timer, _sleep # ------------------------------------------ DATA = [] dalay = 0.25 # sec counter = 0 allow_run = True FIFO = True def taskManager(): global counter, DATA, delay, allow_run counter += 1 if len(DATA) > 0: if FIFO: print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]") else: print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]") else: print("["+str(counter)+"] no data") if allow_run: #delayed method/function call to it self t = Timer( dalay, taskManager ) t.start() else: print(" END task-manager: disabled") # ------------------------------------------ def main(): DATA.append("data from main(): 0") _sleep(2) DATA.append("data from main(): 1") _sleep(2) # ------------------------------------------ print(" START task-manager:") taskManager() _sleep(2) DATA.append("first data") _sleep(2) DATA.append("second data") print(" START main():") main() print(" END main():") _sleep(2) DATA.append("last data") allow_run = False 

Me gusta la respuesta de right2clicky, especialmente porque no requiere que se rasgue un Hilo y que se cree uno nuevo cada vez que el Temporizador haga tic. Además, es una anulación fácil para crear una clase con una callback de temporizador que se llama periódicamente. Ese es mi caso de uso normal:

 class MyClass(RepeatTimer): def __init__(self, period): super().__init__(period, self.on_timer) def on_timer(self): print("Tick") if __name__ == "__main__": mc = MyClass(1) mc.start() time.sleep(5) mc.cancel()