¿Por qué la comunicación a través de la memoria compartida es mucho más lenta que a través de las colas?

Estoy usando Python 2.7.5 en un Apple MacBook Pro de cosecha reciente que tiene cuatro hardware y ocho CPU lógicas; Es decir, la utilidad sysctl da:

$ sysctl hw.physicalcpu hw.physicalcpu: 4 $ sysctl hw.logicalcpu hw.logicalcpu: 8 

Necesito realizar un procesamiento bastante complicado en una gran lista o matriz 1-D, y luego guardar el resultado como una salida intermedia que se usará nuevamente en un punto posterior en un cálculo posterior dentro de mi aplicación. La estructura de mi problema se presta de forma bastante natural a la paralelización, así que pensé que intentaría usar el módulo de multiprocesamiento de Python para subdividir la matriz 1D en varias partes (ya sea 4 piezas u 8 piezas, todavía no estoy seguro de cuál) realice los cálculos en paralelo y luego vuelva a ensamblar la salida resultante en su formato final. Estoy tratando de decidir si usar multiprocessing.Queue() (colas de mensajes) o multiprocessing.Array() (memoria compartida) como mi mecanismo preferido para comunicar los cálculos resultantes de los procesos secundarios al proceso principal principal, y He estado experimentando con un par de modelos “de juguete” para asegurarme de que entiendo cómo funciona realmente el módulo de multiprocesamiento. Sin embargo, me he encontrado con un resultado bastante inesperado: al crear dos soluciones esencialmente equivalentes al mismo problema, la versión que utiliza la memoria compartida para la comunicación entre procesos parece requerir mucho más tiempo de ejecución (¡como 30 veces más!) Que la versión que usa el mensaje colas A continuación, he incluido dos versiones diferentes de código fuente de muestra para un problema de “juguete” que genera una larga secuencia de números aleatorios utilizando procesos paralelos, y comunica el resultado aglomerado a un proceso principal de dos maneras diferentes: primero utilizando colas de mensajes , y la segunda vez utilizando memoria compartida.

Aquí está la versión que usa colas de mensajes:

 import random import multiprocessing import datetime def genRandom(count, id, q): print("Now starting process {0}".format(id)) output = [] # Generate a list of random numbers, of length "count" for i in xrange(count): output.append(random.random()) # Write the output to a queue, to be read by the calling process q.put(output) if __name__ == "__main__": # Number of random numbers to be generated by each process size = 1000000 # Number of processes to create -- the total size of all of the random # numbers generated will ultimately be (procs * size) procs = 4 # Create a list of jobs and queues jobs = [] outqs = [] for i in xrange(0, procs): q = multiprocessing.Queue() p = multiprocessing.Process(target=genRandom, args=(size, i, q)) jobs.append(p) outqs.append(q) # Start time of the parallel processing and communications section tstart = datetime.datetime.now() # Start the processes (ie calculate the random number lists) for j in jobs: j.start() # Read out the data from the queues data = [] for q in outqs: data.extend(q.get()) # Ensure all of the processes have finished for j in jobs: j.join() # End time of the parallel processing and communications section tstop = datetime.datetime.now() tdelta = datetime.timedelta.total_seconds(tstop - tstart) msg = "{0} random numbers generated in {1} seconds" print(msg.format(len(data), tdelta)) 

Cuando lo ejecuto, obtengo un resultado que normalmente se ve así:

 $ python multiproc_queue.py Now starting process 0 Now starting process 1 Now starting process 2 Now starting process 3 4000000 random numbers generated in 0.514805 seconds 

Ahora, aquí está el segmento de código equivalente, pero refactorizado solo ligeramente para que use memoria compartida en lugar de colas:

 import random import multiprocessing import datetime def genRandom(count, id, d): print("Now starting process {0}".format(id)) # Generate a list of random numbers, of length "count", and write them # directly to a segment of an array in shared memory for i in xrange(count*id, count*(id+1)): d[i] = random.random() if __name__ == "__main__": # Number of random numbers to be generated by each process size = 1000000 # Number of processes to create -- the total size of all of the random # numbers generated will ultimately be (procs * size) procs = 4 # Create a list of jobs and a block of shared memory jobs = [] data = multiprocessing.Array('d', size*procs) for i in xrange(0, procs): p = multiprocessing.Process(target=genRandom, args=(size, i, data)) jobs.append(p) # Start time of the parallel processing and communications section tstart = datetime.datetime.now() # Start the processes (ie calculate the random number lists) for j in jobs: j.start() # Ensure all of the processes have finished for j in jobs: j.join() # End time of the parallel processing and communications section tstop = datetime.datetime.now() tdelta = datetime.timedelta.total_seconds(tstop - tstart) msg = "{0} random numbers generated in {1} seconds" print(msg.format(len(data), tdelta)) 

Sin embargo, cuando ejecuto la versión de memoria compartida, el resultado típico se parece más a esto:

 $ python multiproc_shmem.py Now starting process 0 Now starting process 1 Now starting process 2 Now starting process 3 4000000 random numbers generated in 15.839607 seconds 

Mi pregunta: ¿por qué hay una diferencia tan grande en las velocidades de ejecución (aproximadamente 0,5 segundos frente a 15 segundos, un factor de 30X) entre las dos versiones de mi código? Y, en particular, ¿cómo puedo modificar la versión de memoria compartida para que funcione más rápido?

Esto se debe a que multiprocessing.Array utiliza un locking de forma predeterminada para evitar que múltiples procesos accedan a él a la vez:

multiprocessing.Array (typecode_or_type, size_or_initializer, *, lock = True)

Si el locking es Verdadero (el valor predeterminado), se creará un nuevo objeto de locking para sincronizar el acceso al valor. Si lock es un objeto Lock o RLock, se utilizará para sincronizar el acceso al valor. Si el locking es Falso, el acceso al objeto devuelto no estará protegido automáticamente por un locking, por lo que no será necesariamente “seguro para el proceso”.

Esto significa que en realidad no está escribiendo simultáneamente en la matriz, solo un proceso puede acceder a ella a la vez. Dado que los trabajadores de su ejemplo no están haciendo nada más que escrituras de matrices, la espera constante en este locking perjudica gravemente el rendimiento. Si usa lock=False cuando crea la matriz, el rendimiento es mucho mejor:

Con lock=True :

 Now starting process 0 Now starting process 1 Now starting process 2 Now starting process 3 4000000 random numbers generated in 4.811205 seconds 

Con lock=False :

 Now starting process 0 Now starting process 3 Now starting process 1 Now starting process 2 4000000 random numbers generated in 0.192473 seconds 

Tenga en cuenta que usar lock=False significa que necesita proteger manualmente el acceso a la Array cuando esté haciendo algo que no sea seguro para el proceso. Su ejemplo es tener procesos de escritura en partes únicas, así que está bien. Pero si intentaba leerlo mientras hacía eso, o tenía diferentes procesos de escritura en partes superpuestas, tendría que adquirir un locking manualmente.