¿Existe un mapa paralelo simple basado en procesos para python?

Estoy buscando un mapa paralelo simple basado en procesos para Python, es decir, una función

parmap(function,[data]) 

eso ejecutaría la función en cada elemento de [datos] en un proceso diferente (bueno, en un núcleo diferente, pero AFAIK, la única manera de ejecutar cosas en diferentes núcleos en Python es iniciar múltiples intérpretes) y devolver una lista de resultados .

Existe algo como esto? Me gustaría algo simple , por lo que un módulo simple sería bueno. Por supuesto, si no existe tal cosa, me conformaré con una gran biblioteca: – /

Parece que lo que necesitas es el método de mapa en multiprocessing.Pool () :

mapa (func, iterable [, chunksize])

 A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks till the result is ready. This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integ 

Por ejemplo, si quisieras mapear esta función:

 def f(x): return x**2 

Para rango (10), puede hacerlo usando la función incorporada map ():

 map(f, range(10)) 

o usando un mapa de método del objeto multiprocessing.Pool () ():

 import multiprocessing pool = multiprocessing.Pool() print pool.map(f, range(10)) 

Para aquellos que buscan Python equivalente de mclapply () de R, aquí está mi implementación. Es una mejora de los siguientes dos ejemplos:

  • ” Paralela el mapa de Pandas () o apply () “, como lo menciona @Rafael Valero.
  • Cómo aplicar el mapa a funciones con múltiples argumentos .

Se puede aplicar a funciones de mapas con argumentos simples o múltiples.

 import numpy as np, pandas as pd from scipy import sparse import functools, multiprocessing from multiprocessing import Pool num_cores = multiprocessing.cpu_count() def parallelize_dataframe(df, func, U=None, V=None): #blockSize = 5000 num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) ) blocks = np.array_split(df, num_partitions) pool = Pool(num_cores) if V is not None and U is not None: # apply func with multiple arguments to dataframe (ie involves multiple columns) df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks)) else: # apply func with one argument to dataframe (ie involves single column) df = pd.concat(pool.map(func, blocks)) pool.close() pool.join() return df def square(x): return x**2 def test_func(data): print("Process working on: ", data.shape) data["squareV"] = data["testV"].apply(square) return data def vecProd(row, U, V): return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) ) def mProd_func(data, U, V): data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 ) return data def generate_simulated_data(): N, D, nnz, K = [302, 184, 5000, 5] I = np.random.choice(N, size=nnz, replace=True) J = np.random.choice(D, size=nnz, replace=True) vals = np.random.sample(nnz) sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D]) # Generate parameters U and V which could be used to reconstruct the matrix Y U = np.random.sample(N*K).reshape([N,K]) V = np.random.sample(D*K).reshape([D,K]) return sparseY, U, V def main(): Y, U, V = generate_simulated_data() # find row, column indices and obvseved values for sparse matrix Y (testI, testJ, testV) = sparse.find(Y) colNames = ["obsI", "obsJ", "testV", "predV", "squareV"] dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float} obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames) obsValDF["obsI"] = testI obsValDF["obsJ"] = testJ obsValDF["testV"] = testV obsValDF = obsValDF.astype(dtype=dtypes) print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape)) # calculate the square of testVals obsValDF = parallelize_dataframe(obsValDF, test_func) # reconstruct prediction of testVals using parameters U and V obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V) print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape)) print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:]) if __name__ == '__main__': main() 

Esto se puede hacer con elegancia con Ray , un sistema que le permite paralelizar y distribuir fácilmente su código Python.

Para paralelizar su ejemplo, tendría que definir su función de mapa con el decorador @ray.remote , y luego invocarlo con .remote . Esto asegurará que cada instancia de la función remota se ejecutará en un proceso diferente.

 import time import ray ray.init() # Define the function you want to apply map on, as remote function. @ray.remote def f(x): # Do some work... time.sleep(1) return x*x # Define a helper parmap(f, list) function. # This function executes a copy of f() on each element in "list". # Each copy of f() runs in a different process. # Note f.remote(x) returns a future of its result (ie, # an identifier of the result) rather than the result itself. def parmap(f, list): return [f.remote(x) for x in list] # Call parmap() on a list consisting of first 5 integers. result_ids = parmap(f, range(1, 6)) # Get the results results = ray.get(result_ids) print(results) 

Esto imprimirá:

 [1, 4, 9, 16, 25] 

y terminará en aproximadamente len(list)/p (redondeado al entero más cercano) donde p es el número de núcleos en su máquina. Suponiendo una máquina con 2 núcleos, nuestro ejemplo se ejecutará en 5/2 redondeado, es decir, en aproximadamente 3 segundos.

Hay una serie de ventajas de utilizar Ray sobre el módulo de multiprocesamiento . En particular, el mismo código se ejecutará en una sola máquina, así como en un grupo de máquinas. Para más ventajas de Ray ver este post relacionado .