Python MySQLDB tiempo de espera de consulta

Estoy tratando de imponer un límite de tiempo en las consultas en Python MySQLDB. Tengo una situación en la que no tengo control sobre las consultas, pero tengo que asegurarme de que no se ejecuten en un límite de tiempo establecido. He intentado usar signal.SIGALRM para interrumpir la llamada para ejecutar, pero esto no parece funcionar. La señal se envía, pero no se captura hasta que finaliza la llamada a ejecutar.

Escribí un caso de prueba para probar este comportamiento:

#!/usr/local/bin/python2.6 import time import signal from somewhere import get_dbc class Timeout(Exception): """ Time Exceded """ def _alarm_handler(*args): raise Timeout dbc = get_dbc() signal.signal(signal.SIGALRM, _alarm_handler) signal.alarm(1) try: print "START: ", time.time() dbc.execute("SELECT SLEEP(10)") except Timeout: print "TIMEOUT!", time.time()' 

“SELECT SLEEP (10)” simula una consulta lenta, pero veo el mismo comportamiento con una consulta lenta real.

El resultado:

 START: 1254440686.69 TIMEOUT! 1254440696.69 

Como puede ver, está durmiendo durante 10 segundos y luego obtengo la Excepción de tiempo de espera.

Preguntas:

  1. ¿Por qué no recibo la señal hasta que finalice la ejecución?
  2. ¿Hay otra forma confiable de limitar el tiempo de ejecución de consultas?

La solución basada en trenzas de @ nosklo es elegante y viable, pero si desea evitar la dependencia de lo retorcido, la tarea aún es factible, por ejemplo:

 import multiprocessing def query_with_timeout(dbc, timeout, query, *a, **k): conn1, conn2 = multiprocessing.Pipe(False) subproc = multiprocessing.Process(target=do_query, args=(dbc, query, conn2)+a, kwargs=k) subproc.start() subproc.join(timeout) if conn1.poll(): return conn1.recv() subproc.terminate() raise TimeoutError("Query %r ran for >%r" % (query, timeout)) def do_query(dbc, query, conn, *a, **k): cu = dbc.cursor() cu.execute(query, *a, **k) return cu.fetchall() 

He intentado usar signal.SIGALRM para interrumpir la llamada para ejecutar, pero esto no parece funcionar. La señal se envía, pero no se captura hasta que finaliza la llamada a ejecutar.

La biblioteca mysql maneja las llamadas de sistemas interrumpidas internamente, por lo que no verá los efectos secundarios de SIGALRM hasta después de que se complete la llamada a la API (sin matar el proceso o el subproceso actual)

Puedes probar parcheando MySQL-Python y usar la opción MYSQL_OPT_READ_TIMEOUT (agregada en mysql 5.0.25)

¿Por qué no recibo la señal hasta que finalice la ejecución?

La consulta se ejecuta a través de una función C, que impide que la máquina virtual de Python se ejecute hasta que regrese.

¿Hay otra forma confiable de limitar el tiempo de ejecución de consultas?

Esta es (IMO) una solución realmente fea, pero funciona. Puede ejecutar la consulta en un proceso separado (ya sea a través de fork() o el módulo de multiprocessing ). Ejecute el temporizador de alarma en su proceso principal y, cuando lo reciba, envíe un SIGINT o SIGKILL al proceso secundario. Si usa multiprocessing , puede usar el método Process.terminate() .

Utilice adbapi . Te permite hacer una llamada db de forma asíncrona.

 from twisted.internet import reactor from twisted.enterprise import adbapi def bogusQuery(): return dbpool.runQuery("SELECT SLEEP(10)") def printResult(l): # function that would be called if it didn't time out for item in l: print item def handle_timeout(): # function that will be called when it timeout reactor.stop() dbpool = adbapi.ConnectionPool("MySQLdb", user="me", password="myself", host="localhost", database="async") bogusQuery().addCallback(printResult) reactor.callLater(4, handle_timeout) reactor.run() 

Notas genéricas

Últimamente he experimentado el mismo problema con varias condiciones que tuve que cumplir:

  • la solución debe ser segura para subprocesos
  • múltiples conexiones a la base de datos desde la misma máquina pueden estar activas al mismo tiempo, elimine la conexión / consulta exacta
  • La aplicación contiene conexiones a muchas bases de datos diferentes: controlador portátil para cada host de base de datos

Tuvimos el siguiente diseño de clase ( desafortunadamente no puedo publicar fonts reales ):

 class AbstractModel: pass class FirstDatabaseModel(AbstractModel): pass # Connection to one DB host class SecondDatabaseModel(AbstractModel): pass # Connection to one DB host 

Y creó varios hilos para cada modelo.


Solución Python 3.2

En nuestra aplicación un modelo = una base de datos . Así que he creado una ” conexión de servicio ” para cada modelo (para que podamos ejecutar KILL en conexión paralela). Por lo tanto, si se creó una instancia de FirstDatabaseModel se crearon 2 conexiones de base de datos; Si se crearon 5 instancias solo se usaron 6 conexiones:

 class AbstractModel: _service_connection = None # Formal declaration def __init__(self): ''' Somehow load config and create connection ''' self.config = # ... self.connection = MySQLFromConfig(self.config) self._init_service_connection() # Get connection ID (pseudocode) self.connection_id = self.connection.FetchOneCol('SELECT CONNECTION_ID()') def _init_service_connection(self): ''' Initialize one singleton connection for model ''' cls = type(self) if cls._service_connection is not None: return cls._service_connection = MySQLFromConfig(self.config) 

Ahora necesitamos un asesino:

 def _kill_connection(self): # Add your own mysql data escaping sql = 'KILL CONNECTION {}'.format(self.connection_id) # Do your own connection check and renewal type(self)._service_connection.execute(sql) 

Nota: connection.execute = crear cursor, ejecutar, cerrar cursor.

Y haz que el hilo asesino sea seguro usando el threading.Lock .

 def _init_service_connection(self): ''' Initialize one singleton connection for model ''' cls = type(self) if cls._service_connection is not None: return cls._service_connection = MySQLFromConfig(self.config) cls._service_connection_lock = threading.Lock() def _kill_connection(self): # Add your own mysql data escaping sql = 'KILL CONNECTION {}'.format(self.connection_id) cls = type(self) # Do your own connection check and renewal try: cls._service_connection_lock.acquire() cls._service_connection.execute(sql) finally: cls._service_connection_lock.release() 

Y, finalmente, agregue el método de ejecución cronometrado usando threading.Timer :

 def timed_query(self, sql, timeout=5): kill_query_timer = threading.Timer(timeout, self._kill_connection) kill_query_timer.start() try: self.connection.long_query() finally: kill_query_timer.cancel() 

¿Por qué no recibo la señal hasta que finalice la ejecución?

El proceso que espera la red de E / S se encuentra en un estado ininterrumpible (cosa de UNIX, no relacionada con Python o MySQL). Recibe la señal después de que finalice la llamada al sistema (probablemente como un código de error EINTR , aunque no estoy seguro).

¿Hay otra forma confiable de limitar el tiempo de ejecución de consultas?

Creo que generalmente lo hace una herramienta externa como mkill que monitorea MySQL para consultas de larga ejecución y las mata.