¿Cómo puedo agregar un hilo de fondo al matraz?

Estoy ocupado escribiendo un pequeño servidor de juegos para probar el matraz. El juego expone una API a través de REST a los usuarios. Es fácil para los usuarios realizar acciones y consultar datos, sin embargo, me gustaría atender el “mundo del juego” fuera del bucle app.run () para actualizar las entidades del juego, etc. Dado que Flask está implementado de forma tan limpia, me gustaría para ver si hay una forma de flask para hacer esto.

Sus subprocesos adicionales deben iniciarse desde la misma aplicación a la que llama el servidor WSGI.

El siguiente ejemplo crea un subproceso en segundo plano que se ejecuta cada 5 segundos y manipula las estructuras de datos que también están disponibles para las funciones enrutadas de Flask.

import threading import atexit from flask import Flask POOL_TIME = 5 #Seconds # variables that are accessible from anywhere commonDataStruct = {} # lock to control access to variable dataLock = threading.Lock() # thread handler yourThread = threading.Thread() def create_app(): app = Flask(__name__) def interrupt(): global yourThread yourThread.cancel() def doStuff(): global commonDataStruct global yourThread with dataLock: # Do your stuff with commonDataStruct Here # Set the next thread to happen yourThread = threading.Timer(POOL_TIME, doStuff, ()) yourThread.start() def doStuffStart(): # Do initialisation stuff here global yourThread # Create your thread yourThread = threading.Timer(POOL_TIME, doStuff, ()) yourThread.start() # Initiate doStuffStart() # When you kill Flask (SIGTERM), clear the trigger for the next thread atexit.register(interrupt) return app app = create_app() 

Llámalo desde Gunicorn con algo como esto:

 gunicorn -b 0.0.0.0:5000 --log-config log.conf --pid=app.pid myfile:app 

Además de usar hilos puros o la cola de apio (tenga en cuenta que ya no es necesario el matraz-apio), también puede echar un vistazo a flask-apscheduler:

https://github.com/viniciuschiele/flask-apscheduler

Un ejemplo simple copiado de https://github.com/viniciuschiele/flask-apscheduler/blob/master/examples/jobs.py :

 from flask import Flask from flask_apscheduler import APScheduler class Config(object): JOBS = [ { 'id': 'job1', 'func': 'jobs:job1', 'args': (1, 2), 'trigger': 'interval', 'seconds': 10 } ] SCHEDULER_API_ENABLED = True def job1(a, b): print(str(a) + ' ' + str(b)) if __name__ == '__main__': app = Flask(__name__) app.config.from_object(Config()) scheduler = APScheduler() # it is also possible to enable the API directly # scheduler.api_enabled = True scheduler.init_app(app) scheduler.start() app.run()