Método estático no puede decapar – multiprocesamiento – Python

Estoy aplicando algo de paralelización a mi código, en el que uso clases. Sabía que no es posible elegir un método de clase sin ningún otro enfoque diferente del que proporciona Python. He encontrado una solución aquí . En mi código, tengo partes que deberían estar en paralelo, ambas usando clase. Aquí, estoy publicando un código muy simple que solo representa la estructura de la mía (es lo mismo, pero eliminé el contenido de los métodos, que era una gran cantidad de cálculos matemáticos, insignificante para la salida que obtengo). El problema es que puedo descifrar un método (shepard_interpolation), pero con el otro (calcula_orientación_uncertainty) obtuve el error de encurtido. No sé por qué sucede esto, o por qué funciona en parte.

def _pickle_method(method): func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class if func_name.startswith('__') and not func_name.endswith('__'): #deal with mangled names cls_name = cls.__name__.lstrip('_') func_name = '_' + cls_name + func_name print cls return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): for cls in cls.__mro__: try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls) class ImageData(object): def __init__(self, width=60, height=60): self.width = width self.height = height self.data = [] for i in range(width): self.data.append([0] * height) def shepard_interpolation(self, seeds=20): print "ImD - Sucess" import copy_reg import types from itertools import product from multiprocessing import Pool copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) class VariabilityOfGradients(object): def __init__(self): pass @staticmethod def aux(): return "VoG - Sucess" @staticmethod def calculate_orientation_uncertainty(): results = [] pool = Pool() for x, y in product(range(1, 5), range(1, 5)): result = pool.apply_async(VariabilityOfGradients.aux) results.append(result.get()) pool.close() pool.join() if __name__ == '__main__': results = [] pool = Pool() for _ in range(3): result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()]) results.append(result.get()) pool.close() pool.join() VariabilityOfGradients.calculate_orientation_uncertainty() 

Al ejecutar, obtuve el mensaje “PicklingError: No puedo pickle: atributo de búsqueda incorporado en la función fallida”. Y esto es casi lo mismo que se encuentra aquí . La única diferencia que veo es que mis métodos son estáticos.

EDITAR:

Noté que en mi result = pool.apply_async(VariabilityOfGradients.aux()) , cuando llamo a la función como result = pool.apply_async(VariabilityOfGradients.aux()) , es decir, con el paréntesis (en los ejemplos de documentos que nunca vi esto), parece funcionar. Pero, cuando bash obtener el resultado, recibo el mensaje “TypeError: ‘int’ no se puede llamar” …

Cualquier ayuda sería apreciada. Gracias de antemano.

    Podría definir una función simple a nivel de módulo y también un método estático. Esto preserva las características de syntax de llamada, introspección y heredabilidad de un método estático, mientras evita el problema de decapado:

     def aux(): return "VoG - Sucess" class VariabilityOfGradients(object): aux = staticmethod(aux) 

    Por ejemplo,

     import copy_reg import types from itertools import product import multiprocessing as mp def _pickle_method(method): """ Author: Steven Bethard (author of argparse) http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods """ func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class cls_name = '' if func_name.startswith('__') and not func_name.endswith('__'): cls_name = cls.__name__.lstrip('_') if cls_name: func_name = '_' + cls_name + func_name return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): """ Author: Steven Bethard http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods """ for cls in cls.mro(): try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls) copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) class ImageData(object): def __init__(self, width=60, height=60): self.width = width self.height = height self.data = [] for i in range(width): self.data.append([0] * height) def shepard_interpolation(self, seeds=20): print "ImD - Success" def aux(): return "VoG - Sucess" class VariabilityOfGradients(object): aux = staticmethod(aux) @staticmethod def calculate_orientation_uncertainty(): pool = mp.Pool() results = [] for x, y in product(range(1, 5), range(1, 5)): # result = pool.apply_async(aux) # this works too result = pool.apply_async(VariabilityOfGradients.aux, callback=results.append) pool.close() pool.join() print(results) if __name__ == '__main__': results = [] pool = mp.Pool() for _ in range(3): result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()]) results.append(result.get()) pool.close() pool.join() VariabilityOfGradients.calculate_orientation_uncertainty() 

    rendimientos

     ImD - Success ImD - Success ImD - Success ['VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess'] 

    Por cierto, result.get () bloquea el proceso de llamada hasta que se completa la función llamada por pool.apply_async (por ejemplo, ImageData.shepard_interpolation ). Asi que

     for _ in range(3): result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()]) results.append(result.get()) 

    realmente está llamando a ImageData.shepard_interpolation secuencialmente, derrotando el propósito de la agrupación.

    En su lugar podrías usar

     for _ in range(3): pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()], callback=results.append) 

    La función de callback (por ejemplo, results.append ) se llama en un hilo del proceso de llamada cuando se completa la función. Se envía un argumento: el valor de retorno de la función. Por lo tanto, nada bloquea que las tres llamadas pool.apply_async se realicen rápidamente, y el trabajo realizado por las tres llamadas a ImageData.shepard_interpolation se realizará de forma simultánea.

    Alternativamente, podría ser más simple usar pool.map aquí.

     results = pool.map(ImageData.shepard_interpolation, [ImageData()]*3) 

    Si usa una bifurcación de multiprocessing llamada pathos.multiprocesssing , puede usar directamente clases y métodos de clase en las funciones de map de multiprocesamiento. Esto se debe a que se usa dill lugar de pickle o cPickle , y dill puede serializar casi cualquier cosa en python.

    pathos.multiprocessing también proporciona una función de mapa asíncrono … y puede map funciones con múltiples argumentos (por ejemplo, map(math.pow, [1,2,3], [4,5,6]) )

    Ver: ¿Qué pueden hacer juntos el multiprocesamiento y el eneldo?

    y: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

     >>> from pathos.multiprocessing import ProcessingPool as Pool >>> >>> p = Pool(4) >>> >>> def add(x,y): ... return x+y ... >>> x = [0,1,2,3] >>> y = [4,5,6,7] >>> >>> p.map(add, x, y) [4, 6, 8, 10] >>> >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> >>> p.map(Test.plus, [t]*4, x, y) [4, 6, 8, 10] >>> >>> p.map(t.plus, x, y) [4, 6, 8, 10] 

    Obtenga el código aquí: https://github.com/uqfoundation/pathos

    pathos también tiene un mapa asincrónico ( amap ), así como imap .

    No estoy seguro de si esto es lo que estás buscando, pero mi uso fue ligeramente diferente. Quería usar un método de una clase dentro de la misma clase que se ejecuta en varios subprocesos.

    Así es como lo implementé:

     from multiprocessing import Pool class Product(object): def __init__(self): self.logger = "test" def f(self, x): print(self.logger) return x*x def multi(self): p = Pool(5) print(p.starmap(Product.f, [(Product(), 1), (Product(), 2), (Product(), 3)])) if __name__ == '__main__': obj = Product() obj.multi()