Python, Pairwise ‘distance’, necesita una forma rápida de hacerlo

Para un proyecto paralelo en mi doctorado, me ocupé de la tarea de modelar algún sistema en Python. En cuanto a la eficiencia, mi progtwig encuentra un cuello de botella en el siguiente problema, que expondré en un Ejemplo de trabajo mínimo.

Trato con un gran número de segmentos codificados por sus puntos de inicio y finales 3D, por lo que cada segmento está representado por 6 escalares.

Necesito calcular una distancia mínima entre segmentos de intersegmento. La expresión analítica de la distancia mínima entre dos segmentos se encuentra en esta fuente . Para el MWE:

import numpy as np N_segments = 1000 List_of_segments = np.random.rand(N_segments, 6) Pairwise_minimal_distance_matrix = np.zeros( (N_segments,N_segments) ) for i in range(N_segments): for j in range(i+1,N_segments): p0 = List_of_segments[i,0:3] #beginning point of segment i p1 = List_of_segments[i,3:6] #end point of segment i q0 = List_of_segments[j,0:3] #beginning point of segment j q1 = List_of_segments[j,3:6] #end point of segment j #for readability, some definitions a = np.dot( p1-p0, p1-p0) b = np.dot( p1-p0, q1-q0) c = np.dot( q1-q0, q1-q0) d = np.dot( p1-p0, p0-q0) e = np.dot( q1-q0, p0-q0) s = (b*ec*d)/(a*cb*b) t = (a*eb*d)/(a*cb*b) #the minimal distance between segment i and j Pairwise_minimal_distance_matrix[i,j] = sqrt(sum( (p0+(p1-p0)*s-(q0+(q1-q0)*t))**2)) #minimal distance 

Ahora, me doy cuenta de que esto es extremadamente ineficiente, y por eso estoy aquí. He buscado mucho sobre cómo evitar el bucle, pero tengo un pequeño problema. Aparentemente, este tipo de cálculos se realiza mejor con el cdist de python. Sin embargo, las funciones de distancia personalizadas que puede manejar deben ser funciones binarias. Este es un problema en mi caso, porque mis vectores tienen específicamente una longitud de 6, y tienen que dividirse en sus primeros y últimos 3 componentes. No creo que pueda traducir el cálculo de la distancia en una función binaria.

Cualquier entrada es apreciada.

Puede utilizar las capacidades de vectorización de Numpy para acelerar el cálculo. Mi versión calcula todos los elementos de la matriz de distancia de una vez y luego establece la diagonal y el triángulo inferior en cero.

 def pairwise_distance2(s): # we need this because we're gonna divide by zero old_settings = np.seterr(all="ignore") N = N_segments # just shorter, could also use len(s) # we repeat p0 and p1 along all columns p0 = np.repeat(s[:,0:3].reshape((N, 1, 3)), N, axis=1) p1 = np.repeat(s[:,3:6].reshape((N, 1, 3)), N, axis=1) # and q0, q1 along all rows q0 = np.repeat(s[:,0:3].reshape((1, N, 3)), N, axis=0) q1 = np.repeat(s[:,3:6].reshape((1, N, 3)), N, axis=0) # element-wise dot product over the last dimension, # while keeping the number of dimensions at 3 # (so we can use them together with the p* and q*) a = np.sum((p1 - p0) * (p1 - p0), axis=-1).reshape((N, N, 1)) b = np.sum((p1 - p0) * (q1 - q0), axis=-1).reshape((N, N, 1)) c = np.sum((q1 - q0) * (q1 - q0), axis=-1).reshape((N, N, 1)) d = np.sum((p1 - p0) * (p0 - q0), axis=-1).reshape((N, N, 1)) e = np.sum((q1 - q0) * (p0 - q0), axis=-1).reshape((N, N, 1)) # same as above s = (b*ec*d)/(a*cb*b) t = (a*eb*d)/(a*cb*b) # almost same as above pairwise = np.sqrt(np.sum( (p0 + (p1 - p0) * s - ( q0 + (q1 - q0) * t))**2, axis=-1)) # turn the error reporting back on np.seterr(**old_settings) # set everything at or below the diagonal to 0 pairwise[np.tril_indices(N)] = 0.0 return pairwise 

Ahora vamos a dar una vuelta. Con tu ejemplo, N = 1000 , obtengo un tiempo de

 %timeit pairwise_distance(List_of_segments) 1 loops, best of 3: 10.5 s per loop %timeit pairwise_distance2(List_of_segments) 1 loops, best of 3: 398 ms per loop 

Y por supuesto, los resultados son los mismos:

 (pairwise_distance2(List_of_segments) == pairwise_distance(List_of_segments)).all() 

devuelve True . También estoy bastante seguro de que hay una multiplicación de matrices oculta en algún lugar del algoritmo, por lo que debería haber cierto potencial para una mayor aceleración (y también para la limpieza).

Por cierto: he intentado usar numba primero sin éxito. Aunque no estoy seguro de por qué.

Esto es más que una meta respuesta, al menos para empezar. Es posible que su problema ya esté en “mi progtwig tiene un cuello de botella” y “Me doy cuenta de que esto es extremadamente ineficiente”.

¿Extremadamente ineficiente? ¿En qué medida? ¿Tienes comparación? ¿Es su código demasiado lento para terminar en un tiempo razonable? ¿Qué es un tiempo razonable para ti? ¿Puedes tirar más poder de computación al problema? Igualmente importante: ¿utiliza una infraestructura adecuada para ejecutar su código (numpy / scipy comstackdo con comstackdores de proveedores, posiblemente con soporte OpenMP)?

Luego, si tiene respuestas para todas las preguntas anteriores y necesita optimizar aún más su código, ¿dónde está exactamente el cuello de botella en su código actual? ¿Lo perfilaste? ¿Es el cuerpo del bucle posiblemente mucho más pesado que la evaluación del propio bucle? Si es así, entonces “el bucle” no es su cuello de botella y, en primer lugar, no debe preocuparse por el bucle nested. Optimice el cuerpo al principio, posiblemente al crear representaciones matriciales poco ortodoxas de sus datos para que pueda realizar todos estos cálculos únicos en un paso, por ejemplo, mediante la multiplicación de matrices. Si su problema no se soluciona con operaciones de álgebra lineal eficientes, puede comenzar a escribir una extensión en C, usar Cython o usar PyPy (que recientemente recibió un poco de soporte básico de números). Las posibilidades de optimización son infinitas. Las preguntas realmente son: qué tan cerca de una solución práctica está usted, cuánto necesita optimizar y cuánto esfuerzo está dispuesto a invertir.

Descargo de responsabilidad: También he hecho cosas no canónicas de distancia de pares con scipy / numpy para mi PhD ;-). Para una métrica de distancia en particular, terminé codificando la parte “por pares” en Python simple (es decir, también usé el bucle doblemente nested), pero hice un poco de esfuerzo para lograr que el cuerpo fuera lo más eficiente posible (con una combinación de i) a Representación de la multiplicación de la matriz críptica de mi problema y ii) usando un bottleneck ).

Puedes usarlo así:

 def distance3d (p, q): if (p == q).all (): return 0 p0 = p[0:3] p1 = p[3:6] q0 = q[0:3] q1 = q[3:6] ... # Distance computation using the formula above. print (distance.cdist (List_of_segments, List_of_segments, distance3d)) 

Sin embargo, no parece ser más rápido, ya que ejecuta el mismo bucle internamente.