multiprocesamiento: ¿Cómo comparto un dictado entre múltiples procesos?

Un progtwig que crea varios procesos que funcionan en una cola capaz de unirse, Q , y puede eventualmente manipular un diccionario global D para almacenar resultados. (por lo que cada proceso secundario puede usar D para almacenar su resultado y también ver qué resultados están produciendo los otros procesos secundarios)

Si imprimo el diccionario D en un proceso secundario, veo las modificaciones que se han realizado en él (es decir, en D). Pero después de que el proceso principal se une a Q, si imprimo D, ¡es un dict vacío!

Entiendo que es un problema de sincronización / locking. ¿Alguien puede decirme qué está pasando aquí y cómo puedo sincronizar el acceso a D?

Una respuesta general implica el uso de un objeto Manager . Adaptado de los documentos:

 from multiprocessing import Process, Manager def f(d): d[1] += '1' d['2'] += 2 if __name__ == '__main__': manager = Manager() d = manager.dict() d[1] = '1' d['2'] = 2 p1 = Process(target=f, args=(d,)) p2 = Process(target=f, args=(d,)) p1.start() p2.start() p1.join() p2.join() print d 

Salida:

 $ python mul.py {1: '111', '2': 6} 

El multiprocesamiento no es como el enhebrado. Cada proceso hijo obtendrá una copia de la memoria del proceso principal. En general, el estado se comparte a través de la comunicación (tuberías / sockets), señales o memoria compartida.

El multiprocesamiento hace que algunas abstracciones estén disponibles para su caso de uso: estado compartido que se trata como local mediante el uso de servidores proxy o memoria compartida: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Secciones relevantes:

Me gustaría compartir mi propio trabajo que es más rápido que el dictado de Manager y es más simple y más estable que la biblioteca pyshmht que usa mucha memoria y no funciona para Mac OS. Aunque mi dictado solo funciona para cuerdas simples y es inmutable actualmente. Uso la implementación del sondeo lineal y almaceno pares de claves y valores en un bloque de memoria separado después de la tabla.

 from mmap import mmap import struct from timeit import default_timer from multiprocessing import Manager from pyshmht import HashTable class shared_immutable_dict: def __init__(self, a): self.hs = 1 << (len(a) * 3).bit_length() kvp = self.hs * 4 ht = [0xffffffff] * self.hs kvl = [] for k, v in a.iteritems(): h = self.hash(k) while ht[h] != 0xffffffff: h = (h + 1) & (self.hs - 1) ht[h] = kvp kvp += self.kvlen(k) + self.kvlen(v) kvl.append(k) kvl.append(v) self.m = mmap(-1, kvp) for p in ht: self.m.write(uint_format.pack(p)) for x in kvl: if len(x) <= 0x7f: self.m.write_byte(chr(len(x))) else: self.m.write(uint_format.pack(0x80000000 + len(x))) self.m.write(x) def hash(self, k): h = hash(k) h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1) return h def get(self, k, d=None): h = self.hash(k) while True: x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0] if x == 0xffffffff: return d self.m.seek(x) if k == self.read_kv(): return self.read_kv() h = (h + 1) & (self.hs - 1) def read_kv(self): sz = ord(self.m.read_byte()) if sz & 0x80: sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000 return self.m.read(sz) def kvlen(self, k): return len(k) + (1 if len(k) <= 0x7f else 4) def __contains__(self, k): return self.get(k, None) is not None def close(self): self.m.close() uint_format = struct.Struct('>I') def uget(a, k, d=None): return to_unicode(a.get(to_str(k), d)) def uin(a, k): return to_str(k) in a def to_unicode(s): return s.decode('utf-8') if isinstance(s, str) else s def to_str(s): return s.encode('utf-8') if isinstance(s, unicode) else s def mmap_test(): n = 1000000 d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time)) def manager_test(): n = 100000 d = Manager().dict({str(i * 2): '1' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time)) def shm_test(): n = 1000000 d = HashTable('tmp', n) d.update({str(i * 2): '1' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time)) if __name__ == '__main__': mmap_test() manager_test() shm_test() 

En mi portátil los resultados de rendimiento son:

 mmap speed: 247288 gets per sec manager speed: 33792 gets per sec shm speed: 691332 gets per sec 

Ejemplo de uso simple:

 ht = shared_immutable_dict({'a': '1', 'b': '2'}) print ht.get('a') 

Tal vez pueda probar pyshmht , compartiendo la extensión de tabla hash basada en memoria para Python.

darse cuenta

  1. No está completamente probado, solo para su referencia.

  2. Actualmente carece de mecanismos de locking / sem para multiprocesamiento.

Esto no funciona como se esperaba al menos en Python 3.7.2 usando osx 10.14.4

Dict no está sincronizado y su contenido es reescrito por otros procesos. Sin embargo, multiprocessing.Manager().list() funciona como se esperaba.