Python: ¿Cómo puedo ejecutar las funciones de Python en paralelo?

Primero investigué y no pude encontrar una respuesta a mi pregunta. Estoy tratando de ejecutar varias funciones en paralelo en Python.

Tengo algo como esto:

files.py import common #common is a util class that handles all the IO stuff dir1 = 'C:\folder1' dir2 = 'C:\folder2' filename = 'test.txt' addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45] def func1(): c = common.Common() for i in range(len(addFiles)): c.createFiles(addFiles[i], filename, dir1) c.getFiles(dir1) time.sleep(10) c.removeFiles(addFiles[i], dir1) c.getFiles(dir1) def func2(): c = common.Common() for i in range(len(addFiles)): c.createFiles(addFiles[i], filename, dir2) c.getFiles(dir2) time.sleep(10) c.removeFiles(addFiles[i], dir2) c.getFiles(dir2) 

Quiero llamar a func1 y func2 y hacer que se ejecuten al mismo tiempo. Las funciones no interactúan entre sí o en el mismo objeto. Ahora tengo que esperar a que func1 termine antes de que func2 se inicie. ¿Cómo hago algo como a continuación:

 process.py from files import func1, func2 runBothFunc(func1(), func2()) 

Quiero poder crear ambos directorios casi al mismo tiempo porque cada minuto cuento cuántos archivos se están creando. Si el directorio no está allí, se perderá mi tiempo.

Podrías usar threading o multiprocessing .

Debido a las peculiaridades de CPython , es poco probable que la threading logre un verdadero paralelismo. Por esta razón, el multiprocessing es generalmente una mejor apuesta.

Aquí hay un ejemplo completo:

 from multiprocessing import Process def func1(): print 'func1: starting' for i in xrange(10000000): pass print 'func1: finishing' def func2(): print 'func2: starting' for i in xrange(10000000): pass print 'func2: finishing' if __name__ == '__main__': p1 = Process(target=func1) p1.start() p2 = Process(target=func2) p2.start() p1.join() p2.join() 

La mecánica de inicio / unión de procesos secundarios se puede encapsular fácilmente en una función a lo largo de las líneas de su runBothFunc :

 def runInParallel(*fns): proc = [] for fn in fns: p = Process(target=fn) p.start() proc.append(p) for p in proc: p.join() runInParallel(func1, func2) 

Esto se puede hacer con elegancia con Ray , un sistema que le permite paralelizar y distribuir fácilmente su código Python.

Para paralelizar su ejemplo, tendría que definir sus funciones con el decorador @ray.remote y luego invocarlas con .remote .

 import ray ray.init() dir1 = 'C:\\folder1' dir2 = 'C:\\folder2' filename = 'test.txt' addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45] # Define the functions. # You need to pass every global variable used by the function as an argument. # This is needed because each remote function runs in a different process, # and thus it does not have access to the global variables defined in # the current process. @ray.remote def func1(filename, addFiles, dir): # func1() code here... @ray.remote def func2(filename, addFiles, dir): # func2() code here... # Start two tasks in the background and wait for them to finish. ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Si pasa el mismo argumento a ambas funciones y el argumento es grande, una forma más eficiente de hacerlo es usar ray.put() . Esto evita que el gran argumento se serialice dos veces y que se creen dos copias de memoria:

 largeData_id = ray.put(largeData) ray.get([func1(largeData_id), func2(largeData_id)]) 

Si func1() y func2() devuelven resultados, debe volver a escribir el código de la siguiente manera:

 ret_id1 = func1.remote(filename, addFiles, dir1) ret_id2 = func1.remote(filename, addFiles, dir2) ret1, ret2 = ray.get([ret_id1, ret_id2]) 

Hay una serie de ventajas de utilizar Ray sobre el módulo de multiprocesamiento . En particular, el mismo código se ejecutará en una sola máquina, así como en un grupo de máquinas. Para más ventajas de Ray ver este post relacionado .

No hay forma de garantizar que dos funciones se ejecutarán de forma sincronizada, lo que parece ser lo que usted quiere hacer.

Lo mejor que puedes hacer es dividir la función en varios pasos, luego esperar a que ambos terminen en los puntos críticos de sincronización usando Process.join como en las menciones de las respuestas de aix.

Esto es mejor que time.sleep(10) porque no puede garantizar los tiempos exactos. Con la espera explícita, está diciendo que las funciones deben realizarse ejecutando ese paso antes de pasar al siguiente, en lugar de asumir que se realizará dentro de 10 ms, lo que no está garantizado en función de lo que sucede en la máquina.

Si es usuario de Windows y utiliza Python 3, esta publicación lo ayudará a realizar la progtwigción paralela en Python. Cuando ejecute la progtwigción de grupo de la biblioteca de multiprocesamiento habitual, obtendrá un error con respecto a la función principal de su progtwig. Esto se debe a que el hecho de que Windows no tiene funcionalidad fork (). La siguiente publicación está dando una solución al problema mencionado.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Desde que estaba usando el python 3, cambié el progtwig de la siguiente manera:

 from types import FunctionType import marshal def _applicable(*args, **kwargs): name = kwargs['__pw_name'] code = marshal.loads(kwargs['__pw_code']) gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls']) defs = marshal.loads(kwargs['__pw_defs']) clsr = marshal.loads(kwargs['__pw_clsr']) fdct = marshal.loads(kwargs['__pw_fdct']) func = FunctionType(code, gbls, name, defs, clsr) func.fdct = fdct del kwargs['__pw_name'] del kwargs['__pw_code'] del kwargs['__pw_defs'] del kwargs['__pw_clsr'] del kwargs['__pw_fdct'] return func(*args, **kwargs) def make_applicable(f, *args, **kwargs): if not isinstance(f, FunctionType): raise ValueError('argument must be a function') kwargs['__pw_name'] = f.__name__ # edited kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited return _applicable, args, kwargs def _mappable(x): x,name,code,defs,clsr,fdct = x code = marshal.loads(code) gbls = globals() #gbls = marshal.loads(gbls) defs = marshal.loads(defs) clsr = marshal.loads(clsr) fdct = marshal.loads(fdct) func = FunctionType(code, gbls, name, defs, clsr) func.fdct = fdct return func(x) def make_mappable(f, iterable): if not isinstance(f, FunctionType): raise ValueError('argument must be a function') name = f.__name__ # edited code = marshal.dumps(f.__code__) # edited defs = marshal.dumps(f.__defaults__) # edited clsr = marshal.dumps(f.__closure__) # edited fdct = marshal.dumps(f.__dict__) # edited return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable) 

Después de esta función, el código de problema anterior también se cambia un poco así:

 from multiprocessing import Pool from poolable import make_applicable, make_mappable def cube(x): return x**3 if __name__ == "__main__": pool = Pool(processes=2) results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)] print([result.get(timeout=10) for result in results]) 

Y tengo la salida como:

 [1, 8, 27, 64, 125, 216] 

Estoy pensando que esta publicación puede ser útil para algunos de los usuarios de Windows.