Cómo agregar un tiempo de espera a una función en Python

En el pasado, se han realizado muchos bashs para agregar la funcionalidad de tiempo de espera en Python, de modo que cuando un límite de tiempo especificado caduque, el código de espera podría continuar. Desafortunadamente, las recetas anteriores permitieron que la función en ejecución continuara ejecutando y consumiendo recursos, o bien eliminaron la función utilizando un método de terminación de subproceso específico de la plataforma. El propósito de este wiki es desarrollar una respuesta multiplataforma a este problema que muchos progtwigdores han tenido que abordar para varios proyectos de progtwigción.

#! /usr/bin/env python """Provide way to add timeout specifications to arbitrary functions. There are many ways to add a timeout to a function, but no solution is both cross-platform and capable of terminating the procedure. This module use the multiprocessing module to solve both of those problems.""" ################################################################################ __author__ = 'Stephen "Zero" Chappell ' __date__ = '11 February 2010' __version__ = '$Revision: 3 $' ################################################################################ import inspect import sys import time import multiprocessing ################################################################################ def add_timeout(function, limit=60): """Add a timeout parameter to a function and return it. It is illegal to pass anything other than a function as the first parameter. If the limit is not given, it gets a default value equal to one minute. The function is wrapped and returned to the caller.""" assert inspect.isfunction(function) if limit <= 0: raise ValueError() return _Timeout(function, limit) class NotReadyError(Exception): pass ################################################################################ def _target(queue, function, *args, **kwargs): """Run a function with arguments and return output via a queue. This is a helper function for the Process created in _Timeout. It runs the function with positional arguments and keyword arguments and then returns the function's output by way of a queue. If an exception gets raised, it is returned to _Timeout to be raised by the value property.""" try: queue.put((True, function(*args, **kwargs))) except: queue.put((False, sys.exc_info()[1])) class _Timeout: """Wrap a function and add a timeout (limit) attribute to it. Instances of this class are automatically generated by the add_timeout function defined above. Wrapping a function allows asynchronous calls to be made and termination of execution after a timeout has passed.""" def __init__(self, function, limit): """Initialize instance in preparation for being called.""" self.__limit = limit self.__function = function self.__timeout = time.clock() self.__process = multiprocessing.Process() self.__queue = multiprocessing.Queue() def __call__(self, *args, **kwargs): """Execute the embedded function object asynchronously. The function given to the constructor is transparently called and requires that "ready" be intermittently polled. If and when it is True, the "value" property may then be checked for returned data.""" self.cancel() self.__queue = multiprocessing.Queue(1) args = (self.__queue, self.__function) + args self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs) self.__process.daemon = True self.__process.start() self.__timeout = self.__limit + time.clock() def cancel(self): """Terminate any possible execution of the embedded function.""" if self.__process.is_alive(): self.__process.terminate() @property def ready(self): """Read-only property indicating status of "value" property.""" if self.__queue.full(): return True elif not self.__queue.empty(): return True elif self.__timeout < time.clock(): self.cancel() else: return False @property def value(self): """Read-only property containing data returned from function.""" if self.ready is True: flag, load = self.__queue.get() if flag: return load raise load raise NotReadyError() def __get_limit(self): return self.__limit def __set_limit(self, value): if value <= 0: raise ValueError() self.__limit = value limit = property(__get_limit, __set_limit, doc="Property for controlling the value of the timeout.") 

Edición: este código fue escrito para Python 3.x y no fue diseñado para los métodos de clase como decoración. El módulo de multiprocessing no fue diseñado para modificar instancias de clase a través de los límites del proceso.

El principal problema con su código es el uso excesivo de la prevención de conflictos de espacio de nombres con doble guión bajo en una clase que no está destinada a ser subclasificada en absoluto.

En general, self.__foo es un olor de código que debe ir acompañado de un comentario en la línea de # This is a mixin and we don't want arbitrary subclasses to have a namespace conflict .

Además, la API del cliente de este método se vería así:

 def mymethod(): pass mymethod = add_timeout(mymethod, 15) # start the processing timeout_obj = mymethod() try: # access the property, which is really a function call ret = timeout_obj.value except TimeoutError: # handle a timeout here ret = None 

Esto no es muy pythonico y una mejor api para el cliente sería:

 @timeout(15) def mymethod(): pass try: my_method() except TimeoutError: pass 

Está utilizando @property en su clase para algo que es un acceso de estado que muta, no es una buena idea. Por ejemplo, ¿qué pasaría cuando .value se accede dos veces? Parece que fallaría porque queue.get () devolvería basura porque la cola ya está vacía.

Eliminar @property por completo. No lo use en este contexto, no es adecuado para su caso de uso. Haga un locking de llamada cuando se le llame y devuelva el valor o aumente la excepción. Si realmente debe tener acceso a un valor más adelante, conviértalo en un método como .get () o .value ().

Este código para el objective _ debe reescribirse un poco:

 def _target(queue, function, *args, **kwargs): try: queue.put((True, function(*args, **kwargs))) except: queue.put((False, exc_info())) # get *all* the exec info, don't do exc_info[1] # then later: raise exc_info[0], exc_info[1], exc_info[2] 

De esta manera, el rastro de la stack se conservará correctamente y será visible para el progtwigdor.

Creo que ha hecho un primer bash razonable al escribir una biblioteca útil, me gusta el uso del módulo de procesamiento para lograr los objectives.

Así es como se obtiene la syntax del decorador mencionada por Jerub.

 def timeout(limit=None): if limit is None: limit = DEFAULT_TIMEOUT if limit <= 0: raise TimeoutError() # why not ValueError here? def wrap(function): return _Timeout(function,limit) return wrap @timeout(15) def mymethod(): pass 

La biblioteca de Pebble fue diseñada para ofrecer una implementación multiplataforma capaz de lidiar con una lógica problemática que podría fallar, segfault o ejecutarse indefinidamente .

 from pebble import concurrent @concurrent.process(timeout=10) def function(foo, bar=0): return foo + bar future = function(1, bar=2) try: result = future.result() # blocks until results are ready except Exception as error: print("Function raised %s" % error) print(error.traceback) # traceback of the function except TimeoutError as error: print("Function took longer than %d seconds" % error.args[1]) 

El decorador funciona también con métodos estáticos y de clase. Sin embargo, no recomendaría decorar métodos, ya que es una práctica bastante propensa a errores.