Multiproceso Python – Depuración OSError: No se puede asignar memoria

Estoy enfrentando el siguiente problema. Estoy intentando paralelizar una función que actualiza un archivo, pero no puedo iniciar el Pool() debido a un OSError: [Errno 12] Cannot allocate memory . Comencé a buscar en el servidor, y no es como si estuviera usando uno viejo y débil / fuera de la memoria real. Ver htop : introduzca la descripción de la imagen aquí Además, free -m muestra que tengo un montón de RAM disponible además de los ~ 7GB de memoria de intercambio: introduzca la descripción de la imagen aquí Y los archivos con los que estoy tratando de trabajar tampoco son tan grandes. Pegaré mi código (y el seguimiento de la stack) a continuación, los tamaños son los siguientes:

El dataframe de matriz de predictionmatrix utilizado ocupa ca. geo.geojson MB según pandasdataframe.memory_usage() El archivo geo.geojson es de 2MB

¿Cómo hago para depurar esto? ¿Qué puedo comprobar y cómo? Gracias por cualquier consejo / truco!

Código:

 def parallelUpdateJSON(paramMatch, predictionmatrix, data): for feature in data['features']: currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch] if (len(currentfeature) > 0): feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}}) else: feature['properties'].update({"style": {"opacity": 0}}) def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix): with open('geo.geojson') as f: data = json.load(f) paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict) pool = Pool() func = partial(parallelUpdateJSON, paramMatch, predictionmatrix) pool.map(func, data) pool.close() pool.join() with open('output.geojson', 'w') as outfile: json.dump(data, outfile) 

Traza de la stack:

 --------------------------------------------------------------------------- OSError Traceback (most recent call last)  in () ----> 1 writeGeoJSON(6, 15, baseline)  in writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix) 14 print("Start loop") 15 paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict) ---> 16 pool = Pool(2) 17 func = partial(parallelUpdateJSON, paramMatch, predictionmatrix) 18 print(predictionmatrix.memory_usage()) /usr/lib/python3.5/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild) 116 from .pool import Pool 117 return Pool(processes, initializer, initargs, maxtasksperchild, --> 118 context=self.get_context()) 119 120 def RawValue(self, typecode_or_type, *args): /usr/lib/python3.5/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context) 166 self._processes = processes 167 self._pool = [] --> 168 self._repopulate_pool() 169 170 self._worker_handler = threading.Thread( /usr/lib/python3.5/multiprocessing/pool.py in _repopulate_pool(self) 231 w.name = w.name.replace('Process', 'PoolWorker') 232 w.daemon = True --> 233 w.start() 234 util.debug('added worker') 235 /usr/lib/python3.5/multiprocessing/process.py in start(self) 103 'daemonic processes are not allowed to have children' 104 _cleanup() --> 105 self._popen = self._Popen(self) 106 self._sentinel = self._popen.sentinel 107 _children.add(self) /usr/lib/python3.5/multiprocessing/context.py in _Popen(process_obj) 265 def _Popen(process_obj): 266 from .popen_fork import Popen --> 267 return Popen(process_obj) 268 269 class SpawnProcess(process.BaseProcess): /usr/lib/python3.5/multiprocessing/popen_fork.py in __init__(self, process_obj) 18 sys.stderr.flush() 19 self.returncode = None ---> 20 self._launch(process_obj) 21 22 def duplicate_for_child(self, fd): /usr/lib/python3.5/multiprocessing/popen_fork.py in _launch(self, process_obj) 65 code = 1 66 parent_r, child_w = os.pipe() ---> 67 self.pid = os.fork() 68 if self.pid == 0: 69 try: OSError: [Errno 12] Cannot allocate memory 

ACTUALIZAR

De acuerdo con la solución de @ robyschek, he actualizado mi código para:

 global g_predictionmatrix def worker_init(predictionmatrix): global g_predictionmatrix g_predictionmatrix = predictionmatrix def parallelUpdateJSON(paramMatch, data_item): for feature in data_item['features']: currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch] if (len(currentfeature) > 0): feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}}) else: feature['properties'].update({"style": {"opacity": 0}}) def use_the_pool(data, paramMatch, predictionmatrix): pool = Pool(initializer=worker_init, initargs=(predictionmatrix,)) func = partial(parallelUpdateJSON, paramMatch) pool.map(func, data) pool.close() pool.join() def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix): with open('geo.geojson') as f: data = json.load(f) paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict) use_the_pool(data, paramMatch, predictionmatrix) with open('trentino-grid.geojson', 'w') as outfile: json.dump(data, outfile) 

Y sigo teniendo el mismo error. Además, de acuerdo con la documentación , map() debería dividir mis data en partes, por lo que no creo que deba replicar mis tiempos de 80 MB de rownum. Aunque puedo estar equivocado … 🙂 Además, he notado que si uso una entrada más pequeña (~ 11MB en lugar de 80MB) no recibo el error. Así que supongo que estoy tratando de usar demasiada memoria, pero no puedo imaginar cómo va de 80 MB a algo que no pueden manejar 16 GB de RAM.

Tuvimos esto un par de veces. Según el administrador de mi sistema, hay “un error” en Unix, que generará el mismo error si se queda sin memoria, o si su proceso alcanza el límite máximo de descriptor de archivos.

Tuvimos una fuga de descriptor de archivo y el error fue [Errno 12] No se puede asignar memoria # 012OSError.

Por lo tanto, debe revisar su script y verificar si el problema no es la creación de demasiados FD en su lugar.

Cuando se utiliza un multiprocessing.Pool , la forma predeterminada de iniciar los procesos es la fork . El problema con la fork es que todo el proceso está duplicado. ( ver detalles aquí ). Por lo tanto, si su proceso principal ya está utilizando una gran cantidad de memoria, esta memoria se duplicará, llegando a este MemoryError . Por ejemplo, si su proceso principal utiliza 2GB de memoria y usted utiliza 8 subprocesos, necesitará 18 2GB de RAM.

Debería intentar usar un método de inicio diferente, como 'forkserver' o 'spawn' :

 from multiprocessing import set_start_method, Pool set_start_method('forkserver') # You can then start your Pool without each process # cloning your entire memory pool = Pool() func = partial(parallelUpdateJSON, paramMatch, predictionmatrix) pool.map(func, data) 

Estos métodos evitan la duplicación del espacio de trabajo de su Process pero pueden ser un poco más lentos para comenzar, ya que necesita volver a cargar los módulos que está utilizando.