Vectorizar un cálculo de descuento numpy

Un término común en finanzas y aprendizaje de refuerzo es la recompensa acumulada descontada C[i] basada en una serie de tiempo de recompensas crudas R[i] . Dada una matriz R , nos gustaría calcular C[i] satisfaciendo la recurrencia C[i] = R[i] + discount * C[i+1] con C[-1] = R[-1] (y devuelve la matriz completa C ).

Una forma numéricamente estable de calcular esto en python con matrices numpy podría ser:

 import numpy as np def cumulative_discount(rewards, discount): future_cumulative_reward = 0 assert np.issubdtype(rewards.dtype, np.floating), rewards.dtype cumulative_rewards = np.empty_like(rewards) for i in range(len(rewards) - 1, -1, -1): cumulative_rewards[i] = rewards[i] + discount * future_cumulative_reward future_cumulative_reward = cumulative_rewards[i] return cumulative_rewards 

Sin embargo, esto se basa en un bucle de python. Dado que este es un cálculo tan común, seguramente existe una solución vectorizada que se basa en algunas otras funciones estándar sin recurrir a la citonización.

Tenga en cuenta que cualquier solución que use algo como np.power(discount, np.arange(len(rewards)) no será estable.

Podría usar scipy.signal.lfilter para resolver la relación de recurrencia:

 def alt(rewards, discount): """ C[i] = R[i] + discount * C[i+1] signal.lfilter(b, a, x, axis=-1, zi=None) a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[nM] - a[1]*y[n-1] - ... - a[N]*y[nN] """ r = rewards[::-1] a = [1, -discount] b = [1] y = signal.lfilter(b, a, x=r) return y[::-1] 

Este script prueba que el resultado es el mismo:

 import scipy.signal as signal import numpy as np def orig(rewards, discount): future_cumulative_reward = 0 cumulative_rewards = np.empty_like(rewards, dtype=np.float64) for i in range(len(rewards) - 1, -1, -1): cumulative_rewards[i] = rewards[i] + discount * future_cumulative_reward future_cumulative_reward = cumulative_rewards[i] return cumulative_rewards def alt(rewards, discount): """ C[i] = R[i] + discount * C[i+1] signal.lfilter(b, a, x, axis=-1, zi=None) a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[nM] - a[1]*y[n-1] - ... - a[N]*y[nN] """ r = rewards[::-1] a = [1, -discount] b = [1] y = signal.lfilter(b, a, x=r) return y[::-1] # test that the result is the same np.random.seed(2017) for i in range(100): rewards = np.random.random(10000) discount = 1.01 expected = orig(rewards, discount) result = alt(rewards, discount) if not np.allclose(expected,result): print('FAIL: {}({}, {})'.format('alt', rewards, discount)) break 

El cálculo que describe se conoce como regla de Horner o método de Horner para evaluar polinomios. Se implementa en NumPy polynomial.polyval .

Sin embargo, desea que toda la lista cumulative_rewards recompensas, es decir, todos los pasos intermedios de la regla de Horner. El método NumPy no devuelve esos valores intermedios. Tu función, decorada con el @jit de Numba, podría ser óptima para eso.

Como una posibilidad teórica, polyval que el polyval puede devolver la lista completa si se le da una matriz de coeficientes de Hankel . Esto se vectoriza pero, en última instancia, es menos eficiente que el bucle de Python, porque cada valor de cumulative_reward se calcula desde cero, independientemente de los demás.

 from numpy.polynomial.polynomial import polyval from scipy.linalg import hankel rewards = np.random.uniform(10, 100, size=(100,)) discount = 0.9 print(polyval(discount, hankel(rewards))) 

Esto coincide con la salida de

 print(cumulative_discount(rewards, discount))