¿Cómo puedo procesar eficientemente una matriz numpy en bloques similares a la función blkproc (blockproc) de Matlab?

Estoy buscando un buen método para dividir eficientemente una imagen en pequeñas regiones, procesar cada región por separado y luego volver a reunir los resultados de cada proceso en una sola imagen procesada. Matlab tenía una herramienta para esto llamada blkproc (reemplazada por blockproc en las versiones más recientes de Matlab).

En un mundo ideal, la función o clase admitiría la superposición entre las divisiones en la matriz de entrada también. En la ayuda de Matlab, blkproc se define como:

B = blkproc (A, [mn], [mborder nborder], diversión, …)

  • A es tu matriz de entrada,
  • [mn] es el tamaño de bloque
  • [mborder, nborder] es el tamaño de su región de borde (opcional)
  • La diversión es una función para aplicar a cada bloque.

He logrado un acercamiento, pero me parece torpe y apuesto a que hay una manera mucho mejor. A riesgo de mi propia vergüenza, aquí está mi código:

 import numpy as np def segmented_process(M, blk_size=(16,16), overlap=(0,0), fun=None): rows = [] for i in range(0, M.shape[0], blk_size[0]): cols = [] for j in range(0, M.shape[1], blk_size[1]): cols.append(fun(M[i:i+blk_size[0], j:j+blk_size[1]])) rows.append(np.concatenate(cols, axis=1)) return np.concatenate(rows, axis=0) R = np.random.rand(128,128) passthrough = lambda(x):x Rprime = segmented_process(R, blk_size=(16,16), overlap=(0,0), fun=passthrough) np.all(R==Rprime) 

Aquí hay algunos ejemplos de una forma diferente (sin bucle) de trabajar con bloques:

 import numpy as np from numpy.lib.stride_tricks import as_strided as ast A= np.arange(36).reshape(6, 6) print A #[[ 0 1 2 3 4 5] # [ 6 7 8 9 10 11] # ... # [30 31 32 33 34 35]] # 2x2 block view B= ast(A, shape= (3, 3, 2, 2), strides= (48, 8, 24, 4)) print B[1, 1] #[[14 15] # [20 21]] # for preserving original shape B[:, :]= np.dot(B[:, :], np.array([[0, 1], [1, 0]])) print A #[[ 1 0 3 2 5 4] # [ 7 6 9 8 11 10] # ... # [31 30 33 32 35 34]] print B[1, 1] #[[15 14] # [21 20]] # for reducing shape, processing in 3D is enough C= B.reshape(3, 3, -1) print C.sum(-1) #[[ 14 22 30] # [ 62 70 78] # [110 118 126]] 

Por lo tanto, solo tratar de copiar la funcionalidad matlab para numpy no es la mejor manera de proceder. A veces se necesita un pensamiento ‘fuera del sombrero’.

Advertencia :
En general, las implementaciones basadas en trucos de zancada pueden (pero no es necesario) sufrir algunas penalizaciones de rendimiento. Así que prepárate para medir tu rendimiento en todas las formas En cualquier caso, es aconsejable comprobar primero si la funcionalidad necesaria (o lo suficientemente similar, para poder adaptarla fácilmente) se ha implementado en su numpy o scipy .

Actualización :
Ten en cuenta que no hay magic real involucrada aquí con los strides , por lo que te proporcionaré una función simple para obtener una block_view de block_view de cualquier numpy nypy 2D adecuada. Así que, aquí vamos:

 from numpy.lib.stride_tricks import as_strided as ast def block_view(A, block= (3, 3)): """Provide a 2D block view to 2D array. No error checking made. Therefore meaningful (as implemented) only for blocks strictly compatible with the shape of A.""" # simple shape and strides computations may seem at first strange # unless one is able to recognize the 'tuple additions' involved ;-) shape= (A.shape[0]/ block[0], A.shape[1]/ block[1])+ block strides= (block[0]* A.strides[0], block[1]* A.strides[1])+ A.strides return ast(A, shape= shape, strides= strides) if __name__ == '__main__': from numpy import arange A= arange(144).reshape(12, 12) print block_view(A)[0, 0] #[[ 0 1 2] # [12 13 14] # [24 25 26]] print block_view(A, (2, 6))[0, 0] #[[ 0 1 2 3 4 5] # [12 13 14 15 16 17]] print block_view(A, (3, 12))[0, 0] #[[ 0 1 2 3 4 5 6 7 8 9 10 11] # [12 13 14 15 16 17 18 19 20 21 22 23] # [24 25 26 27 28 29 30 31 32 33 34 35]] 

Proceso por cortes / vistas. La concatenación es muy cara.

 for x in xrange(0, 160, 16): for y in xrange(0, 160, 16): view = A[x:x+16, y:y+16] view[:,:] = fun(view) 

Tomé ambos insumos, así como mi enfoque original y comparé los resultados. Como @eat señala correctamente, los resultados dependen de la naturaleza de sus datos de entrada. Sorprendentemente, la concatenación vence el procesamiento de la vista en algunos casos. Cada método tiene un punto dulce. Aquí está mi código de referencia:

 import numpy as np from itertools import product def segment_and_concatenate(M, fun=None, blk_size=(16,16), overlap=(0,0)): # truncate M to a multiple of blk_size M = M[:M.shape[0]-M.shape[0]%blk_size[0], :M.shape[1]-M.shape[1]%blk_size[1]] rows = [] for i in range(0, M.shape[0], blk_size[0]): cols = [] for j in range(0, M.shape[1], blk_size[1]): max_ndx = (min(i+blk_size[0], M.shape[0]), min(j+blk_size[1], M.shape[1])) cols.append(fun(M[i:max_ndx[0], j:max_ndx[1]])) rows.append(np.concatenate(cols, axis=1)) return np.concatenate(rows, axis=0) from numpy.lib.stride_tricks import as_strided def block_view(A, block= (3, 3)): """Provide a 2D block view to 2D array. No error checking made. Therefore meaningful (as implemented) only for blocks strictly compatible with the shape of A.""" # simple shape and strides computations may seem at first strange # unless one is able to recognize the 'tuple additions' involved ;-) shape= (A.shape[0]/ block[0], A.shape[1]/ block[1])+ block strides= (block[0]* A.strides[0], block[1]* A.strides[1])+ A.strides return as_strided(A, shape= shape, strides= strides) def segmented_stride(M, fun, blk_size=(3,3), overlap=(0,0)): # This is some complex function of blk_size and M.shape stride = blk_size output = np.zeros(M.shape) B = block_view(M, block=blk_size) O = block_view(output, block=blk_size) for b,o in zip(B, O): o[:,:] = fun(b); return output def view_process(M, fun=None, blk_size=(16,16), overlap=None): # truncate M to a multiple of blk_size from itertools import product output = np.zeros(M.shape) dz = np.asarray(blk_size) shape = M.shape - (np.mod(np.asarray(M.shape), blk_size)) for indices in product(*[range(0, stop, step) for stop,step in zip(shape, blk_size)]): # Don't overrun the end of the array. #max_ndx = np.min((np.asarray(indices) + dz, M.shape), axis=0) #slices = [slice(s, s + f, None) for s,f in zip(indices, dz)] output[indices[0]:indices[0]+dz[0], indices[1]:indices[1]+dz[1]][:,:] = fun(M[indices[0]:indices[0]+dz[0], indices[1]:indices[1]+dz[1]]) return output if __name__ == "__main__": R = np.random.rand(128,128) squareit = lambda(x):x*2 from timeit import timeit t ={} kn = np.array(list(product((8,16,64,128), (128, 512, 2048, 4096)) ) ) methods = ("segment_and_concatenate", "view_process", "segmented_stride") t = np.zeros((kn.shape[0], len(methods))) for i, (k, N) in enumerate(kn): for j, method in enumerate(methods): t[i,j] = timeit("""Rprime = %s(R, blk_size=(%d,%d), overlap = (0,0), fun = squareit)""" % (method, k, k), setup=""" from segmented_processing import %s import numpy as np R = np.random.rand(%d,%d) squareit = lambda(x):x**2""" % (method, N, N), number=5 ) print "k =", k, "N =", N #, "time:", t[i] print (" Speed up (view vs. concat, stride vs. concat): %0.4f, %0.4f" % ( t[i][0]/t[i][1], t[i][0]/t[i][2])) 

Y aquí están los resultados:

Una comparación de tres métodos para procesar grandes matrices de forma aleatoria en números. Tenga en cuenta que el método de zancada segmentada gana por 3-4x para tamaños de bloques pequeños. Solo en tamaños de bloque grandes (128 x 128) y matrices muy grandes (2048 x 2048 y mayores) el enfoque de procesamiento de la vista gana, y solo en un pequeño porcentaje. ¡Basado en el horneado, parece que @eat obtiene la marca de verificación! Gracias a los dos por los buenos ejemplos!

Poco tarde en el juego, pero esto haría bloques superpuestos. No lo he hecho aquí, pero creo que podría adaptarse fácilmente a los tamaños de paso para cambiar la ventana:

 from numpy.lib.stride_tricks import as_strided def rolling_block(A, block=(3, 3)): shape = (A.shape[0] - block[0] + 1, A.shape[1] - block[1] + 1) + block strides = (A.strides[0], A.strides[1]) + A.strides return as_strided(A, shape=shape, strides=strides) 

Incluso más tarde en el juego. Hay un paquete de procesamiento de imágenes suizas llamado Bob disponible en: https://www.idiap.ch/software/bob/docs/releases/last/sphinx/html/index.html Tiene un comando bob.ip.block de python descrito en: https://www.idiap.ch/software/bob/docs/releases/last/sphinx/html/ip/generated/bob.ip.block.html#bob.ip.block que parece hacer todo lo que el Matlab comando ‘blockproc’ hace. No lo he probado.

También hay un comando interesante bob.ip.DCTFeatures que incorpora las capacidades de ‘bloque’ para extraer o modificar los coeficientes DCT de una imagen.

Encontré este tutorial – ¡El código fuente final proporciona exactamente la funcionalidad deseada! Incluso debería funcionar para cualquier dimensionalidad (no lo probé) http://www.johnvinyard.com/blog/?p=268

Aunque la opción de “aplanar” al final del código fuente parece ser un poco buggy. Sin embargo, muy buena pieza de software!