¿Cómo paso matrices de números grandes entre los subprocesos de Python sin guardar en el disco?

¿Hay una buena manera de pasar una gran parte de los datos entre dos subprocesos de Python sin usar el disco? Aquí hay un ejemplo animado de lo que espero lograr:

import sys, subprocess, numpy cmdString = """ import sys, numpy done = False while not done: cmd = raw_input() if cmd == 'done': done = True elif cmd == 'data': ##Fake data. In real life, get data from hardware. data = numpy.zeros(1000000, dtype=numpy.uint8) data.dump('data.pkl') sys.stdout.write('data.pkl' + '\\n') sys.stdout.flush()""" proc = subprocess.Popen( #python vs. pythonw on Windows? [sys.executable, '-c %s'%cmdString], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for i in range(3): proc.stdin.write('data\n') print proc.stdout.readline().rstrip() a = numpy.load('data.pkl') print a.shape proc.stdin.write('done\n') 

Esto crea un subproceso que genera una matriz numpy y guarda la matriz en el disco. El proceso principal luego carga la matriz desde el disco. ¡Funciona!

El problema es que nuestro hardware puede generar datos 10 veces más rápido que el disco puede leer / escribir. ¿Hay una manera de transferir datos de un proceso de python a otro puramente en memoria, tal vez incluso sin hacer una copia de los datos? ¿Puedo hacer algo como pasar por referencia?

Mi primer bash de transferir datos puramente en memoria es bastante malo:

 import sys, subprocess, numpy cmdString = """ import sys, numpy done = False while not done: cmd = raw_input() if cmd == 'done': done = True elif cmd == 'data': ##Fake data. In real life, get data from hardware. data = numpy.zeros(1000000, dtype=numpy.uint8) ##Note that this is NFG if there's a '10' in the array: sys.stdout.write(data.tostring() + '\\n') sys.stdout.flush()""" proc = subprocess.Popen( #python vs. pythonw on Windows? [sys.executable, '-c %s'%cmdString], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for i in range(3): proc.stdin.write('data\n') a = numpy.fromstring(proc.stdout.readline().rstrip(), dtype=numpy.uint8) print a.shape proc.stdin.write('done\n') 

Esto es extremadamente lento (mucho más lento que guardar en disco) y muy, muy frágil. ¡Tiene que haber una mejor manera!

No estoy casado con el módulo ‘subproceso’, siempre que el proceso de toma de datos no bloquee la aplicación principal. Intenté brevemente ‘multiprocesamiento’, pero sin éxito hasta ahora.

Antecedentes: tenemos una pieza de hardware que genera hasta ~ 2 GB / s de datos en una serie de buffers de ctypes. El código de Python para manejar estos buffers tiene sus manos llenas solo para lidiar con el flujo de información. Quiero coordinar este flujo de información con varias otras piezas de hardware que se ejecutan simultáneamente en un progtwig “maestro”, sin que los subprocesos se bloqueen entre sí. Mi enfoque actual es reducir los datos un poco en el subproceso antes de guardarlos en el disco, pero sería bueno pasar todo el dinero al proceso “maestro”.

Mientras buscaba en Google para obtener más información sobre el código que Joe Kington publicó, encontré el paquete numpy-sharedmem . A juzgar por este tutorial de números / multiprocesamiento , parece compartir la misma herencia intelectual (¿quizás en gran medida los mismos autores? No estoy seguro).

Usando el módulo sharedmem, puede crear una matriz de memoria de números compartidos (¡genial!), Y usarla con multiprocesamiento como este:

 import sharedmem as shm import numpy as np import multiprocessing as mp def worker(q,arr): done = False while not done: cmd = q.get() if cmd == 'done': done = True elif cmd == 'data': ##Fake data. In real life, get data from hardware. rnd=np.random.randint(100) print('rnd={0}'.format(rnd)) arr[:]=rnd q.task_done() if __name__=='__main__': N=10 arr=shm.zeros(N,dtype=np.uint8) q=mp.JoinableQueue() proc = mp.Process(target=worker, args=[q,arr]) proc.daemon=True proc.start() for i in range(3): q.put('data') # Wait for the computation to finish q.join() print arr.shape print(arr) q.put('done') proc.join() 

Rendimientos streams

 rnd=53 (10,) [53 53 53 53 53 53 53 53 53 53] rnd=15 (10,) [15 15 15 15 15 15 15 15 15 15] rnd=87 (10,) [87 87 87 87 87 87 87 87 87 87] 

Básicamente, solo desea compartir un bloque de memoria entre procesos y verlo como una matriz numpy, ¿verdad?

En ese caso, échale un vistazo a esto (publicado hace un tiempo, no es mi trabajo). Hay un par de implementaciones similares (algunas más flexibles), pero todas utilizan esencialmente este principio.

 # "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing" # Modified and corrected by Nadav Horesh, Mar 2010 # No rights reserved import numpy as N import ctypes import multiprocessing as MP _ctypes_to_numpy = { ctypes.c_char : N.dtype(N.uint8), ctypes.c_wchar : N.dtype(N.int16), ctypes.c_byte : N.dtype(N.int8), ctypes.c_ubyte : N.dtype(N.uint8), ctypes.c_short : N.dtype(N.int16), ctypes.c_ushort : N.dtype(N.uint16), ctypes.c_int : N.dtype(N.int32), ctypes.c_uint : N.dtype(N.uint32), ctypes.c_long : N.dtype(N.int64), ctypes.c_ulong : N.dtype(N.uint64), ctypes.c_float : N.dtype(N.float32), ctypes.c_double : N.dtype(N.float64)} _numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys())) def shmem_as_ndarray(raw_array, shape=None ): address = raw_array._obj._wrapper.get_address() size = len(raw_array) if (shape is None) or (N.asarray(shape).prod() != size): shape = (size,) elif type(shape) is int: shape = (shape,) else: shape = tuple(shape) dtype = _ctypes_to_numpy[raw_array._obj._type_] class Dummy(object): pass d = Dummy() d.__array_interface__ = { 'data' : (address, False), 'typestr' : dtype.str, 'descr' : dtype.descr, 'shape' : shape, 'strides' : None, 'version' : 3} return N.asarray(d) def empty_shared_array(shape, dtype, lock=True): ''' Generate an empty MP shared array given ndarray parameters ''' if type(shape) is not int: shape = N.asarray(shape).prod() try: c_type = _numpy_to_ctypes[dtype] except KeyError: c_type = _numpy_to_ctypes[N.dtype(dtype)] return MP.Array(c_type, shape, lock=lock) def emptylike_shared_array(ndarray, lock=True): 'Generate a empty shared array with size and dtype of a given array' return empty_shared_array(ndarray.size, ndarray.dtype, lock) 

De las otras respuestas, parece que numpy-sharedmem es el camino a seguir.

Sin embargo, si necesita una solución Python pura, o instalar extensiones, cython o similar es una molestia (grande), puede usar el siguiente código, que es una versión simplificada del código de Nadav:

 import numpy, ctypes, multiprocessing _ctypes_to_numpy = { ctypes.c_char : numpy.dtype(numpy.uint8), ctypes.c_wchar : numpy.dtype(numpy.int16), ctypes.c_byte : numpy.dtype(numpy.int8), ctypes.c_ubyte : numpy.dtype(numpy.uint8), ctypes.c_short : numpy.dtype(numpy.int16), ctypes.c_ushort : numpy.dtype(numpy.uint16), ctypes.c_int : numpy.dtype(numpy.int32), ctypes.c_uint : numpy.dtype(numpy.uint32), ctypes.c_long : numpy.dtype(numpy.int64), ctypes.c_ulong : numpy.dtype(numpy.uint64), ctypes.c_float : numpy.dtype(numpy.float32), ctypes.c_double : numpy.dtype(numpy.float64)} _numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys())) def shm_as_ndarray(mp_array, shape = None): '''Given a multiprocessing.Array, returns an ndarray pointing to the same data.''' # support SynchronizedArray: if not hasattr(mp_array, '_type_'): mp_array = mp_array.get_obj() dtype = _ctypes_to_numpy[mp_array._type_] result = numpy.frombuffer(mp_array, dtype) if shape is not None: result = result.reshape(shape) return numpy.asarray(result) def ndarray_to_shm(array, lock = False): '''Generate an 1D multiprocessing.Array containing the data from the passed ndarray. The data will be *copied* into shared memory.''' array1d = array.ravel(order = 'A') try: c_type = _numpy_to_ctypes[array1d.dtype] except KeyError: c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)] result = multiprocessing.Array(c_type, array1d.size, lock = lock) shm_as_ndarray(result)[:] = array1d return result 

Lo usarías así:

  1. Use sa = ndarray_to_shm(a) para convertir ndarray a en un multiprocessing.Array compartido.
  2. Use multiprocessing.Process(target = somefunc, args = (sa, ) (y start , quizás join ) para llamar a somefunc en un proceso separado, pasando la matriz compartida.
  3. En somefunc , use a = shm_as_ndarray(sa) para obtener un ndarray que apunte a los datos compartidos. (En realidad, es posible que desee hacer lo mismo en el proceso original, inmediatamente después de crear sa SA, para que dos ndarrays hagan referencia a los mismos datos).

AFAICS, no es necesario establecer el locking en True, ya que shm_as_ndarray no utilizará el locking de todos modos. Si necesita un locking, debe establecer el locking en Verdadero y llamar a adquirir / release en sa .

Además, si su matriz no es unidimensional, es posible que desee transferir la forma junto con sa (por ejemplo, use args = (sa, a.shape) ).

Esta solución tiene la ventaja de que no necesita paquetes adicionales o módulos de extensión, excepto el multiprocesamiento (que se encuentra en la biblioteca estándar).

Utilice hilos. Pero supongo que vas a tener problemas con la GIL.

En su lugar: elige tu veneno .

Sé por las implementaciones de MPI con las que trabajo, que usan la memoria compartida para las comunicaciones en el nodo. Tendrá que codificar su propia sincronización en ese caso.

2 GB / s suena como que tendrá problemas con la mayoría de los métodos “fáciles”, según sus limitaciones en tiempo real y la memoria principal disponible.

Utilice hilos. Probablemente no tendrás problemas con la GIL.

La GIL solo afecta al código Python, no a las bibliotecas respaldadas por C / Fortran / Cython. La mayoría de las operaciones numéricas y una buena parte de la stack Scientific Python respaldada por C liberan la GIL y pueden funcionar bien en varios núcleos. Esta publicación de blog analiza el GIL y el científico Python en más profundidad.

Editar

Las formas sencillas de utilizar subprocesos incluyen el módulo de threading y multiprocessing.pool.ThreadPool .

Una posibilidad a considerar es usar una unidad de RAM para el almacenamiento temporal de archivos que se compartirán entre procesos . Una unidad de RAM es donde una parte de la RAM se trata como un disco duro lógico, en el que los archivos se pueden escribir / leer como lo haría con una unidad normal, pero a velocidades de lectura / escritura de la RAM.

Este artículo describe el uso del software ImDisk (para MS Win) para crear dicho disco y obtiene velocidades de lectura / escritura de archivos de 6 a 10 Gigabytes / segundo: https://www.tekrevue.com/tip/create-10-gbs-ram -disk-windows /

Un ejemplo en Ubuntu: https://askubuntu.com/questions/152868/how-do-i-make-a-ram-disk#152871

Otro beneficio notable es que los archivos con formatos arbitrarios pueden transmitirse con este método: por ejemplo, Picke, JSON, XML, CSV, HDF5, etc.

Tenga en cuenta que cualquier cosa almacenada en el disco RAM se borra al reiniciar.