Multiprocessing Queue.get () se cuelga

Estoy tratando de implementar multiprocesamiento básico y me he encontrado con un problema. El script de python se adjunta a continuación.

import time, sys, random, threading from multiprocessing import Process from Queue import Queue from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency append_queue = Queue(10) database = FrequencyStore() def add_to_append_queue(_list): append_queue.put(_list) def process_append_queue(): while True: item = append_queue.get() database.append(item) print("Appended to database in %.4f seconds" % database.append_time) append_queue.task_done() return def main(): database.load_db() print("Database loaded in %.4f seconds" % database.load_time) append_queue_process = Process(target=process_append_queue) append_queue_process.daemon = True append_queue_process.start() #t = threading.Thread(target=process_append_queue) #t.daemon = True #t.start() while True: path = raw_input("file: ") if path == "exit": break a = AnalyzeFrequency(path) a.analyze() print("Analyzed file in %.4f seconds" % a._time) add_to_append_queue(a.get_results()) append_queue.join() #append_queue_process.join() database.save_db() print("Database saved in %.4f seconds" % database.save_time) sys.exit(0) if __name__=="__main__": main() 

El AnalyzeFrequency analiza las frecuencias de las palabras en un archivo y get_results() devuelve una lista ordenada de dichas palabras y frecuencias. La lista es muy grande, tal vez 10000 artículos.

Esta lista se pasa luego al método add_to_append_queue que lo agrega a una cola. Process_append_queue toma los elementos uno por uno y agrega las frecuencias a una “base de datos“. Esta operación toma un poco más de tiempo que el análisis real en main() así que estoy tratando de usar un proceso separado para este método. Cuando bash hacer esto con el módulo de subprocesamiento, todo funciona perfectamente bien, sin errores. Cuando bash y uso Proceso, el script se cuelga en item = append_queue.get() .

¿Podría alguien explicar lo que está sucediendo aquí y quizás dirigirme hacia una solución?

Todas las respuestas apreciadas!

    ACTUALIZAR

    El error de pickle fue mi culpa, fue solo un error tipográfico. Ahora estoy usando la clase Queue dentro del multiprocesamiento, pero el método append_queue.get () aún se cuelga. NUEVO CÓDIGO

     import time, sys, random from multiprocessing import Process, Queue from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency append_queue = Queue() database = FrequencyStore() def add_to_append_queue(_list): append_queue.put(_list) def process_append_queue(): while True: database.append(append_queue.get()) print("Appended to database in %.4f seconds" % database.append_time) return def main(): database.load_db() print("Database loaded in %.4f seconds" % database.load_time) append_queue_process = Process(target=process_append_queue) append_queue_process.daemon = True append_queue_process.start() #t = threading.Thread(target=process_append_queue) #t.daemon = True #t.start() while True: path = raw_input("file: ") if path == "exit": break a = AnalyzeFrequency(path) a.analyze() print("Analyzed file in %.4f seconds" % a._time) add_to_append_queue(a.get_results()) #append_queue.join() #append_queue_process.join() print str(append_queue.qsize()) database.save_db() print("Database saved in %.4f seconds" % database.save_time) sys.exit(0) if __name__=="__main__": main() 

    ACTUALIZACIÓN 2

    Este es el código de la base de datos:

     class FrequencyStore: def __init__(self): self.sorter = Sorter() self.db = {} self.load_time = -1 self.save_time = -1 self.append_time = -1 self.sort_time = -1 def load_db(self): start_time = time.time() try: file = open("results.txt", 'r') except: raise IOError self.db = {} for line in file: word, count = line.strip("\n").split("=") self.db[word] = int(count) file.close() self.load_time = time.time() - start_time def save_db(self): start_time = time.time() _db = [] for key in self.db: _db.append([key, self.db[key]]) _db = self.sort(_db) try: file = open("results.txt", 'w') except: raise IOError file.truncate(0) for x in _db: file.write(x[0] + "=" + str(x[1]) + "\n") file.close() self.save_time = time.time() - start_time def create_sorted_db(self): _temp_db = [] for key in self.db: _temp_db.append([key, self.db[key]]) _temp_db = self.sort(_temp_db) _temp_db.reverse() return _temp_db def get_db(self): return self.db def sort(self, _list): start_time = time.time() _list = self.sorter.mergesort(_list) _list.reverse() self.sort_time = time.time() - start_time return _list def append(self, _list): start_time = time.time() for x in _list: if x[0] not in self.db: self.db[x[0]] = x[1] else: self.db[x[0]] += x[1] self.append_time = time.time() - start_time 

    Los comentarios sugieren que estás intentando ejecutar esto en Windows. Como dije en un comentario,

    Si está ejecutando esto en Windows, no puede funcionar: Windows no tiene fork() , por lo que cada proceso obtiene su propia Cola y no tienen nada que ver entre sí. Todo el módulo se importa “desde cero” por cada proceso en Windows. Deberá crear la Cola en main() y pasarla como un argumento a la función de trabajo.

    Aquí está lo que necesita hacer para que sea portátil, aunque eliminé todas las cosas de la base de datos porque es irrelevante para los problemas que ha descrito hasta ahora. También eliminé el violín del daemon , porque esa es una forma perezosa de evitar cerrar las cosas limpiamente y, a menudo, como no, volverá a morderlo más tarde:

     def process_append_queue(append_queue): while True: x = append_queue.get() if x is None: break print("processed %d" % x) print("worker done") def main(): import multiprocessing as mp append_queue = mp.Queue(10) append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,)) append_queue_process.start() for i in range(100): append_queue.put(i) append_queue.put(None) # tell worker we're done append_queue_process.join() if __name__=="__main__": main() 

    La salida es lo “obvio”:

     processed 0 processed 1 processed 2 processed 3 processed 4 ... processed 96 processed 97 processed 98 processed 99 worker done 

    Nota: dado que Windows no (no puede) fork() , es imposible que los procesos de trabajo hereden cualquier objeto Python en Windows. Cada proceso ejecuta todo el progtwig desde su inicio. Es por eso que su progtwig original no pudo funcionar: cada proceso creó su propia Queue , que no tiene ninguna relación con la Queue en el otro proceso. En el enfoque que se muestra arriba, solo el proceso principal crea una Queue , y el proceso principal lo pasa (como argumento) al proceso de trabajo.

    queue.Queue es seguro para subprocesos, pero no funciona en todos los procesos. Sin embargo, esto es bastante fácil de arreglar. En lugar de:

     from multiprocessing import Process from Queue import Queue 

    Usted quiere:

     from multiprocessing import Process, Queue