Python NotImplementedError: los objetos de la agrupación no se pueden pasar entre procesos

Estoy tratando de entregar trabajo cuando se agrega una página a la lista de páginas, pero la salida de mi código devuelve un error NotImplemented. Aquí está el código con lo que estoy tratando de hacer:

Código:

from multiprocessing import Pool, current_process import time import random import copy_reg import types import threading class PageControler(object): def __init__(self): self.nProcess = 3 self.pages = [1,2,3,4,5,6,7,8,9,10] self.manageWork() def manageWork(self): self.pool = Pool(processes=self.nProcess) time.sleep(2) work_queue = threading.Thread(target=self.modifyQueue) work_queue.start() #pool.close() #pool.join() def deliverWork(self): if self.pages != []: pag = self.pages.pop() self.pool.apply_async(self.myFun) def modifyQueue(self): t = time.time() while (time.time()-t) < 10: time.sleep(1) self.pages.append(99) print self.pages self.deliverWork() def myFun(self): time.sleep(2) if __name__ == '__main__': def _pickle_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) copy_reg.pickle(types.MethodType, _pickle_method) PageControler() 

Salida:

 NotImplementedError: pool objects cannot be passed between processes or pickled 

¿Es alguna forma de pasar el objeto de la piscina entre los procesos?

Editar:

Estoy usando Python 2.6

Para capturar el método de instancia que está intentando pasar a la PageControler , Python necesita PageControler todo el objeto PageControler , incluidas sus variables de instancia. Una de esas variables de instancia es el objeto Pool sí mismo, y los objetos Pool no pueden ser decapados, de ahí el error. Puede __getstate__ esto implementando __getstate__ en el objeto, y usándolo para eliminar el objeto de la pool de la instancia antes de decapar:

 class PageControler(object): def __init__(self): self.nProcess = 3 self.pages = [1,2,3,4,5,6,7,8,9,10] self.manageWork() def manageWork(self): self.pool = Pool(processes=self.nProcess) time.sleep(2) work_queue = threading.Thread(target=self.modifyQueue) work_queue.start() #pool.close() #pool.join() def deliverWork(self): if self.pages != []: pag = self.pages.pop() self.pool.apply_async(self.myFun) def modifyQueue(self): t = time.time() while (time.time()-t) < 10: time.sleep(1) self.pages.append(99) print self.pages self.deliverWork() def myFun(self): time.sleep(2) def __getstate__(self): self_dict = self.__dict__.copy() del self_dict['pool'] return self_dict def __setstate__(self, state): self.__dict__.update(state) 

__getstate__ siempre se llama antes de decapar un objeto, y le permite especificar exactamente qué partes del estado del objeto deben ser realmente decapadas. Luego, al descomprimir, se __setstate__(state) si está implementado (en nuestro caso), o si no lo está, el dict devuelto por __getstate__ se usará como __dict__ para la instancia no seleccionada. En el ejemplo anterior, establecemos explícitamente __dict__ al __dict__ que __dict__ en __getstate__ , pero podríamos no haber implementado __setstate__ y obtener el mismo efecto.

La respuesta de Dano es un buen enfoque si debe pasar todo el objeto al proceso. En su caso, la función que está pasando a la agrupación no tiene ningún requisito para una referencia a la instancia de clase. Entonces, una alternativa puede ser hacer que la función sea un método estático utilizando el decorador @staticmethod . Si la función requiere referencia a una o dos variables miembro de la clase, estas podrían pasarse como argumentos para las variables de solo lectura, y se actualizarán en una callback si también se requiere una escritura (por supuesto, deberá hacer esto si desea actualizar la instancia de clase local en cualquier caso).

Por ejemplo:

 Class A(object): def __init__(self): self._pool = multiprocessing.Pool(1) self.member_variable = 1 @staticmethod def MyFunc(variable): variable += 1 return variable def Callback(self, return_val): self.member_variable = return_val def CallFuncAsync(self): pool.apply_async(self.MyFunc, (self.member_variable,), callback=self.Callback)