Multiproceso de Python apply_async “assert left> 0” AssertionError

Estoy tratando de cargar archivos numpy asincrónicamente en un Pool:

self.pool = Pool(2, maxtasksperchild = 1) ... nextPackage = self.pool.apply_async(loadPackages, (...)) for fi in np.arange(len(files)): packages = nextPackage.get(timeout=30) # preload the next package asynchronously. It will be available # by the time it is required. nextPackage = self.pool.apply_async(loadPackages, (...)) 

El método “loadPackages”:

 def loadPackages(... (2 strings & 2 ints) ...): print("This isn't printed!') packages = { "TRUE": np.load(gzip.GzipFile(path1, "r")), "FALSE": np.load(gzip.GzipFile(path2, "r")) } return packages 

Incluso antes de que se cargue el primer “paquete”, se produce el siguiente error:

Excepción en el hilo Thread-8: Traceback (última llamada más reciente):
Archivo “C: \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ threading.py”, línea 914, en _bootstrap_inner self.run () Archivo “C: \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ threading .py “, línea 862, en run self._target (* self._args, ** self._kwargs) Archivo” C: \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ multiprocessing \ pool.py “, línea 463 , en _handle_results task = get () Archivo “C: \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ multiprocessing \ connection.py”, línea 250, en recv buf = self._recv_bytes () Archivo “C: \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ multiprocessing \ connection.py “, línea 318, en _recv_bytes devuelve self._get_more_data (ov, maxsize) Archivo” C: \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ multiprocessing \ connection.py “, línea 337, en _get_more_data assert left> 0 AssertionError

Superviso los recursos de cerca: la memoria no es un problema, todavía me queda mucho cuando ocurre el error. Los archivos descomprimidos son simples arreglos numéricos multidimensionales. Individualmente, usar un Pool con un método más simple funciona, y cargar el archivo como funciona. Sólo en combinación falla. (Todo esto sucede en un generador de keras personalizado. Dudo que esto ayude, pero quién sabe.) Python 3.5.

¿Cuál podría ser la causa de este problema? ¿Cómo se puede interpretar este error?

¡Gracias por tu ayuda!

Hay un error en el código central de Python C que evita que las respuestas de datos de más de 2GB regresen correctamente al hilo principal. necesita dividir los datos en partes más pequeñas como se sugiere en la respuesta anterior o no usar el multiprocesamiento para esta función

Informé de este error a la lista de errores de Python ( https://bugs.python.org/issue34563 ) y creé una PR ( https://github.com/python/cpython/pull/9027 ) para solucionarlo, pero probablemente lo hará tómate un tiempo para liberarlo

Si está interesado, puede encontrar más detalles sobre las causas del error en la descripción del error en el enlace que publiqué.

Creo que he encontrado una solución al recuperar datos en trozos pequeños. En mi caso era una lista de listas.

Tuve:

 for i in range(0, NUMBER_OF_THREADS): print('MAIN: Getting data from process ' + str(i) + ' proxy...') X_train.extend(ListasX[i]._getvalue()) Y_train.extend(ListasY[i]._getvalue()) ListasX[i] = None ListasY[i] = None gc.collect() 

Cambiado a:

 CHUNK_SIZE = 1024 for i in range(0, NUMBER_OF_THREADS): print('MAIN: Getting data from process ' + str(i) + ' proxy...') for k in range(0, len(ListasX[i]), CHUNK_SIZE): X_train.extend(ListasX[i][k:k+CHUNK_SIZE]) Y_train.extend(ListasY[i][k:k+CHUNK_SIZE]) ListasX[i] = None ListasY[i] = None gc.collect() 

Y ahora parece funcionar, posiblemente al serializar menos datos a la vez. Entonces, tal vez si puede segmentar sus datos en porciones más pequeñas puede superar el problema. ¡Buena suerte!