Comprensión del multiprocesamiento: administración de memoria compartida, lockings y colas en Python

El multiprocesamiento es una herramienta poderosa en Python, y quiero entenderlo más a fondo. Quiero saber cuándo usar los lockings y colas regulares y cuándo usar un administrador de multiprocesamiento para compartirlos entre todos los procesos.

Se me ocurrieron los siguientes escenarios de prueba con cuatro condiciones diferentes para multiprocesamiento:

  1. Usando un pool y NO Manager

  2. Usando un grupo y un administrador

  3. Usando procesos individuales y NO Manager

  4. Utilizando procesos individuales y un gestor.

El trabajo

Todas las condiciones ejecutan una función de trabajo the_job . the_job consiste en una impresión que está asegurada por un locking. Además, la entrada a la función simplemente se pone en una cola (para ver si se puede recuperar de la cola). Esta entrada es simplemente un índice idx del range(10) creado en el script principal llamado start_scenario (que se muestra en la parte inferior).

 def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print 'I' print 'was ' print 'here ' print '!!!!' print '1111' print 'einhundertelfzigelf\n' who= ' By run %d \n' % idx print who lock.release() queue.put(idx) 

El éxito de una condición se define como recuperar perfectamente la entrada de la cola, ver la función read_queue en la parte inferior.

Las condiciones

Las condiciones 1 y 2 son bastante autoexplicativas. La condición 1 implica crear un locking y una cola, y pasarlos a un grupo de procesos:

 def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. FAILS! """ mypool = mp.Pool(ncores) lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) mypool.imap(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) 

(La función auxiliar make_iterator encuentra al final de esta publicación). Las condiciones 1 fallan con RuntimeError: Lock objects should only be shared between processes through inheritance .

La condición 2 es bastante similar, pero ahora el locking y la cola están bajo la supervisión de un administrador:

 def scenario_2_pool_manager(jobfunc, args, ncores): """Runs a pool of processes WITH a Manager for the lock and queue. SUCCESSFUL! """ mypool = mp.Pool(ncores) lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) mypool.imap(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) 

En la condición 3, los nuevos procesos se inician manualmente y el locking y la cola se crean sin un administrador:

 def scenario_3_single_processes_no_manager(jobfunc, args, ncores): """Runs an individual process for every task WITHOUT a Manager, SUCCESSFUL! """ lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) 

La condición 4 es similar, pero ahora usa un administrador:

 def scenario_4_single_processes_manager(jobfunc, args, ncores): """Runs an individual process for every task WITH a Manager, SUCCESSFUL! """ lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) 

En ambas condiciones, 3 y 4, comienzo un nuevo proceso para cada una de las 10 tareas de the_job con la mayoría de los procesos de ncores que operan al mismo tiempo. Esto se logra con la siguiente función de ayuda:

 def do_job_single_processes(jobfunc, iterator, ncores): """Runs a job function by starting individual processes for every task. At most `ncores` processes operate at the same time :param jobfunc: Job to do :param iterator: Iterator over different parameter settings, contains a lock and a queue :param ncores: Number of processes operating at the same time """ keep_running=True process_dict = {} # Dict containing all subprocees while len(process_dict)>0 or keep_running: terminated_procs_pids = [] # First check if some processes did finish their job for pid, proc in process_dict.iteritems(): # Remember the terminated processes if not proc.is_alive(): terminated_procs_pids.append(pid) # And delete these from the process dict for terminated_proc in terminated_procs_pids: process_dict.pop(terminated_proc) # If we have less active processes than ncores and there is still # a job to do, add another process if len(process_dict) < ncores and keep_running: try: task = iterator.next() proc = mp.Process(target=jobfunc, args=(task,)) proc.start() process_dict[proc.pid]=proc except StopIteration: # All tasks have been started keep_running=False time.sleep(0.1) 

El resultado

Solo la condición 1 falla ( RuntimeError: Lock objects should only be shared between processes through inheritance ) mientras que las otras 3 condiciones son exitosas. Intento envolver mi cabeza en torno a este resultado.

¿Por qué la agrupación necesita compartir un locking y una cola entre todos los procesos pero los procesos individuales de la condición 3 no lo hacen?

Lo que sé es que para las condiciones de la agrupación (1 y 2) todos los datos de los iteradores se pasan a través del decapado, mientras que en las condiciones de un solo proceso (3 y 4) todos los datos de los iteradores se pasan por herencia del proceso principal (I am utilizando Linux ). Supongo que hasta que se cambie la memoria desde un proceso secundario, se accede a la misma memoria que utiliza el proceso parental (copia en escritura). Pero tan pronto como uno diga lock.acquire() , esto debe cambiarse y los procesos secundarios utilizan diferentes lockings colocados en otro lugar de la memoria, ¿no es así? ¿Cómo puede un niño saber que un hermano ha activado un locking que no se comparte a través de un administrador?

Finalmente, algo relacionado es mi pregunta de cuán diferentes son las condiciones 3 y 4. Ambos tienen procesos individuales pero difieren en el uso de un administrador. ¿Se considera que ambos son un código válido ? ¿O debería uno evitar usar un administrador si en realidad no hay necesidad de uno?


Script completo

Para aquellos que simplemente desean copiar y pegar todo para ejecutar el código, aquí está el script completo:

 __author__ = 'Me and myself' import multiprocessing as mp import time def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print 'I' print 'was ' print 'here ' print '!!!!' print '1111' print 'einhundertelfzigelf\n' who= ' By run %d \n' % idx print who lock.release() queue.put(idx) def read_queue(queue): """Turns a qeue into a normal python list.""" results = [] while not queue.empty(): result = queue.get() results.append(result) return results def make_iterator(args, lock, queue): """Makes an iterator over args and passes the lock an queue to each element.""" return ((arg, lock, queue) for arg in args) def start_scenario(scenario_number = 1): """Starts one of four multiprocessing scenarios. :param scenario_number: Index of scenario, 1 to 4 """ args = range(10) ncores = 3 if scenario_number==1: result = scenario_1_pool_no_manager(the_job, args, ncores) elif scenario_number==2: result = scenario_2_pool_manager(the_job, args, ncores) elif scenario_number==3: result = scenario_3_single_processes_no_manager(the_job, args, ncores) elif scenario_number==4: result = scenario_4_single_processes_manager(the_job, args, ncores) if result != args: print 'Scenario %d fails: %s != %s' % (scenario_number, args, result) else: print 'Scenario %d successful!' % scenario_number def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. FAILS! """ mypool = mp.Pool(ncores) lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) mypool.map(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) def scenario_2_pool_manager(jobfunc, args, ncores): """Runs a pool of processes WITH a Manager for the lock and queue. SUCCESSFUL! """ mypool = mp.Pool(ncores) lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) mypool.map(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) def scenario_3_single_processes_no_manager(jobfunc, args, ncores): """Runs an individual process for every task WITHOUT a Manager, SUCCESSFUL! """ lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) def scenario_4_single_processes_manager(jobfunc, args, ncores): """Runs an individual process for every task WITH a Manager, SUCCESSFUL! """ lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) def do_job_single_processes(jobfunc, iterator, ncores): """Runs a job function by starting individual processes for every task. At most `ncores` processes operate at the same time :param jobfunc: Job to do :param iterator: Iterator over different parameter settings, contains a lock and a queue :param ncores: Number of processes operating at the same time """ keep_running=True process_dict = {} # Dict containing all subprocees while len(process_dict)>0 or keep_running: terminated_procs_pids = [] # First check if some processes did finish their job for pid, proc in process_dict.iteritems(): # Remember the terminated processes if not proc.is_alive(): terminated_procs_pids.append(pid) # And delete these from the process dict for terminated_proc in terminated_procs_pids: process_dict.pop(terminated_proc) # If we have less active processes than ncores and there is still # a job to do, add another process if len(process_dict) < ncores and keep_running: try: task = iterator.next() proc = mp.Process(target=jobfunc, args=(task,)) proc.start() process_dict[proc.pid]=proc except StopIteration: # All tasks have been started keep_running=False time.sleep(0.1) def main(): """Runs 1 out of 4 different multiprocessing scenarios""" start_scenario(1) if __name__ == '__main__': main() 

multiprocessing.Lock se implementa utilizando un objeto Semaphore proporcionado por el sistema operativo. En Linux, el hijo simplemente hereda un identificador del semáforo del padre a través de os.fork . Esto no es una copia del semáforo; en realidad, está heredando el mismo identificador que el padre, de la misma forma en que se pueden heredar los descriptores de archivos. Windows, por otro lado, no es compatible con os.fork , por lo que tiene que encurtir el Lock . Para ello, crea un identificador duplicado para el Semáforo de Windows utilizado internamente por el objeto multiprocessing.Lock , utilizando la API de DuplicateHandle Windows, que establece:

El identificador duplicado se refiere al mismo objeto que el identificador original. Por lo tanto, cualquier cambio en el objeto se refleja a través de ambos controladores

La API DuplicateHandle permite otorgar la propiedad del identificador duplicado al proceso hijo, de modo que el proceso hijo pueda usarlo en realidad después de deshacerlo. Al crear un controlador duplicado que pertenece al niño, puede efectivamente “compartir” el objeto de locking.

Aquí está el objeto semáforo en multiprocessing/synchronize.py

 class SemLock(object): def __init__(self, kind, value, maxvalue): sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) debug('created semlock with handle %s' % sl.handle) self._make_methods() if sys.platform != 'win32': def _after_fork(obj): obj._semlock._after_fork() register_after_fork(self, _after_fork) def _make_methods(self): self.acquire = self._semlock.acquire self.release = self._semlock.release self.__enter__ = self._semlock.__enter__ self.__exit__ = self._semlock.__exit__ def __getstate__(self): # This is called when you try to pickle the `Lock`. assert_spawning(self) sl = self._semlock return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) def __setstate__(self, state): # This is called when unpickling a `Lock` self._semlock = _multiprocessing.SemLock._rebuild(*state) debug('recreated blocker with handle %r' % state[0]) self._make_methods() 

Tenga en cuenta la llamada assert_spawning en __getstate__ , a la que se llama cuando decapamos el objeto. Así es como se implementa:

 # # Check that the current thread is spawning a child process # def assert_spawning(self): if not Popen.thread_is_spawning(): raise RuntimeError( '%s objects should only be shared between processes' ' through inheritance' % type(self).__name__ ) 

Esa función es la que se asegura de que está “heredando” el Lock , llamando a thread_is_spawning . En Linux, ese método simplemente devuelve False :

 @staticmethod def thread_is_spawning(): return False 

Esto se debe a que Linux no necesita encogerse para heredar el Lock , por lo que si __getstate__ se está llamando en Linux, no debemos heredar. En Windows, hay más cosas:

 def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) class Popen(object): ''' Start a subprocess to run the code of a process object ''' _tls = thread._local() def __init__(self, process_obj): ... # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, 'wb') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close() @staticmethod def thread_is_spawning(): return getattr(Popen._tls, 'process_handle', None) is not None 

Aquí, thread_is_spawning devuelve True si el objeto Popen._tls tiene un atributo process_handle . Podemos ver que el atributo process_handle se crea en __init__ , luego los datos que queremos heredar se pasan del padre al hijo usando el dump , luego se elimina el atributo. Así que thread_is_spawning solo será True durante __init__ . De acuerdo con este hilo de la lista de correo de python-ideas , esto es en realidad una limitación artificial agregada para simular el mismo comportamiento que os.fork en Linux. En realidad, Windows podría admitir pasar el Lock en cualquier momento, ya que DuplicateHandle se puede ejecutar en cualquier momento.

Todo lo anterior se aplica al objeto Queue porque usa Lock internamente.

Yo diría que heredar objetos de Lock es preferible a usar un Lock Manager Manager.Lock() , porque cuando se usa un Lock Manager.Lock , cada llamada que realice al Lock debe enviarse a través del IPC al proceso del Manager , que será mucho más lento que usar un Lock compartido que vive dentro del proceso de llamada. Sin embargo, ambos enfoques son perfectamente válidos.

Finalmente, es posible pasar un Lock a todos los miembros de un grupo sin usar un Manager , usando los argumentos de la palabra clave initializer / initargs :

 lock = None def initialize_lock(l): global lock lock = l def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. """ lock = mp.Lock() mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,)) queue = mp.Queue() iterator = make_iterator(args, queue) mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly. mypool.close() mypool.join() return read_queue(queue) 

Esto funciona porque los argumentos pasados ​​a initargs se pasan al método __init__ de los objetos Process que se ejecutan dentro del Pool , por lo que terminan siendo heredados, en lugar de decapados.