Media móvil o media stream

¿Hay una función de scipy o una función o módulo numpy para python que calcule la media en ejecución de una matriz 1D dada una ventana específica?

Para una solución corta y rápida que hace todo en un solo bucle, sin dependencias, el código siguiente funciona muy bien.

mylist = [1, 2, 3, 4, 5, 6, 7] N = 3 cumsum, moving_aves = [0], [] for i, x in enumerate(mylist, 1): cumsum.append(cumsum[i-1] + x) if i>=N: moving_ave = (cumsum[i] - cumsum[iN])/N #can do stuff with moving_ave here moving_aves.append(moving_ave) 

UPD: Alleo y jasaarim han propuesto soluciones más eficientes.


Puedes usar np.convolve para eso:

 np.convolve(x, np.ones((N,))/N, mode='valid') 

Explicación

La media stream es un caso de la operación matemática de convolución . Para la media en ejecución, desliza una ventana a lo largo de la entrada y calcula la media de los contenidos de la ventana. Para señales 1D discretas, la convolución es lo mismo, excepto que en lugar de la media, se calcula una combinación lineal arbitraria, es decir, se multiplica cada elemento por un coeficiente correspondiente y se sumn los resultados. Esos coeficientes, uno para cada posición en la ventana, a veces se llaman kernel de convolución. Ahora, la media aritmética de N valores es (x_1 + x_2 + ... + x_N) / N , por lo que el kernel correspondiente es (1/N, 1/N, ..., 1/N) , y eso es exactamente lo que obtenemos utilizando np.ones((N,))/N

Bordes

El argumento de mode de np.convolve especifica cómo manejar los bordes. Elegí el modo valid aquí porque creo que así es como la mayoría de la gente espera que el medio de ejecución funcione, pero es posible que tenga otras prioridades. Aquí hay una gráfica que ilustra la diferencia entre los modos:

 import numpy as np import matplotlib.pyplot as plt modes = ['full', 'same', 'valid'] for m in modes: plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m)); plt.axis([-10, 251, -.1, 1.1]); plt.legend(modes, loc='lower center'); plt.show() 

Ejecución de modos de convolución media

Solucion eficiente

La convolución es mucho mejor que un enfoque directo, pero (supongo) utiliza FFT y, por lo tanto, es bastante lenta. Sin embargo, especialmente para calcular el funcionamiento significa que el siguiente enfoque funciona bien

 def running_mean(x, N): cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) return (cumsum[N:] - cumsum[:-N]) / float(N) 

El código a comprobar

 In[3]: x = numpy.random.random(100000) In[4]: N = 1000 In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid') 10 loops, best of 3: 41.4 ms per loop In[6]: %timeit result2 = running_mean(x, N) 1000 loops, best of 3: 1.04 ms per loop 

Tenga en cuenta que numpy.allclose(result1, result2) es True , dos métodos son equivalentes. A mayor N, mayor diferencia en el tiempo.

Actualización: el siguiente ejemplo muestra la antigua función pandas.rolling_mean que se ha eliminado en las versiones recientes de pandas. Un equivalente moderno de la llamada de función a continuación sería

 In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values Out[8]: array([ 0.49815397, 0.49844183, 0.49840518, ..., 0.49488191, 0.49456679, 0.49427121]) 

Pandas es más adecuado para esto que NumPy o SciPy. Su función rolling_mean hace el trabajo convenientemente. También devuelve una matriz NumPy cuando la entrada es una matriz.

Es difícil superar a rolling_mean en el rendimiento con cualquier implementación Python pura personalizada. Aquí hay un ejemplo de desempeño contra dos de las soluciones propuestas:

 In [1]: import numpy as np In [2]: import pandas as pd In [3]: def running_mean(x, N): ...: cumsum = np.cumsum(np.insert(x, 0, 0)) ...: return (cumsum[N:] - cumsum[:-N]) / N ...: In [4]: x = np.random.random(100000) In [5]: N = 1000 In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid') 10 loops, best of 3: 172 ms per loop In [7]: %timeit running_mean(x, N) 100 loops, best of 3: 6.72 ms per loop In [8]: %timeit pd.rolling_mean(x, N)[N-1:] 100 loops, best of 3: 4.74 ms per loop In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N)) Out[9]: True 

También hay buenas opciones sobre cómo lidiar con los valores de borde.

Puedes calcular una media stream con:

 import numpy as np def runningMean(x, N): y = np.zeros((len(x),)) for ctr in range(len(x)): y[ctr] = np.sum(x[ctr:(ctr+N)]) return y/N 

Pero es lento.

Afortunadamente, numpy incluye una función de convolución que podemos usar para acelerar el proceso. La media stream es equivalente a convolving x con un vector que es N largo, con todos los miembros iguales a 1/N La implementación numpy de convolve incluye el transitorio inicial, por lo que debe eliminar los primeros puntos N-1:

 def runningMeanFast(x, N): return np.convolve(x, np.ones((N,))/N)[(N-1):] 

En mi máquina, la versión rápida es 20-30 veces más rápida, dependiendo de la longitud del vector de entrada y el tamaño de la ventana de promediado.

Tenga en cuenta que convolve incluye un modo 'same' que parece que debería abordar el problema transitorio de inicio, pero lo divide entre el principio y el final.

o módulo para python que calcula

en mis pruebas en Tradewave.net TA-lib siempre gana:

 import talib as ta import numpy as np import pandas as pd import scipy from scipy import signal import time as t PAIR = info.primary_pair PERIOD = 30 def initialize(): storage.reset() storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0]) def cumsum_sma(array, period): ret = np.cumsum(array, dtype=float) ret[period:] = ret[period:] - ret[:-period] return ret[period - 1:] / period def pandas_sma(array, period): return pd.rolling_mean(array, period) def api_sma(array, period): # this method is native to Tradewave and does NOT return an array return (data[PAIR].ma(PERIOD)) def talib_sma(array, period): return ta.MA(array, period) def convolve_sma(array, period): return np.convolve(array, np.ones((period,))/period, mode='valid') def fftconvolve_sma(array, period): return scipy.signal.fftconvolve( array, np.ones((period,))/period, mode='valid') def tick(): close = data[PAIR].warmup_period('close') t1 = t.time() sma_api = api_sma(close, PERIOD) t2 = t.time() sma_cumsum = cumsum_sma(close, PERIOD) t3 = t.time() sma_pandas = pandas_sma(close, PERIOD) t4 = t.time() sma_talib = talib_sma(close, PERIOD) t5 = t.time() sma_convolve = convolve_sma(close, PERIOD) t6 = t.time() sma_fftconvolve = fftconvolve_sma(close, PERIOD) t7 = t.time() storage.elapsed[-1] = storage.elapsed[-1] + t2-t1 storage.elapsed[-2] = storage.elapsed[-2] + t3-t2 storage.elapsed[-3] = storage.elapsed[-3] + t4-t3 storage.elapsed[-4] = storage.elapsed[-4] + t5-t4 storage.elapsed[-5] = storage.elapsed[-5] + t6-t5 storage.elapsed[-6] = storage.elapsed[-6] + t7-t6 plot('sma_api', sma_api) plot('sma_cumsum', sma_cumsum[-5]) plot('sma_pandas', sma_pandas[-10]) plot('sma_talib', sma_talib[-15]) plot('sma_convolve', sma_convolve[-20]) plot('sma_fftconvolve', sma_fftconvolve[-25]) def stop(): log('ticks....: %s' % info.max_ticks) log('api......: %.5f' % storage.elapsed[-1]) log('cumsum...: %.5f' % storage.elapsed[-2]) log('pandas...: %.5f' % storage.elapsed[-3]) log('talib....: %.5f' % storage.elapsed[-4]) log('convolve.: %.5f' % storage.elapsed[-5]) log('fft......: %.5f' % storage.elapsed[-6]) 

resultados:

 [2015-01-31 23:00:00] ticks....: 744 [2015-01-31 23:00:00] api......: 0.16445 [2015-01-31 23:00:00] cumsum...: 0.03189 [2015-01-31 23:00:00] pandas...: 0.03677 [2015-01-31 23:00:00] talib....: 0.00700 # <<< Winner! [2015-01-31 23:00:00] convolve.: 0.04871 [2015-01-31 23:00:00] fft......: 0.22306 

introduzca la descripción de la imagen aquí

Para obtener una solución lista para usar, consulte https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html . Proporciona promedio de ejecución con el tipo de ventana flat . Tenga en cuenta que esto es un poco más sofisticado que el simple método de convolución de hacerlo usted mismo, ya que trata de resolver los problemas al principio y al final de los datos al reflejarlos (lo que puede o no funcionar en su caso). ..).

Para empezar, puedes probar:

 a = np.random.random(100) plt.plot(a) b = smooth(a, window='flat') plt.plot(b) 

Sé que esta es una pregunta antigua, pero aquí hay una solución que no utiliza ninguna estructura de datos ni bibliotecas adicionales. Es lineal en el número de elementos de la lista de entrada y no puedo pensar en ninguna otra forma de hacerlo más eficiente (en realidad, si alguien sabe de una mejor manera de asignar el resultado, hágamelo saber).

NOTA: esto sería mucho más rápido usando una matriz numpy en lugar de una lista, pero quería eliminar todas las dependencias. También sería posible mejorar el rendimiento mediante la ejecución de subprocesos múltiples

La función asume que la lista de entrada es unidimensional, así que tenga cuidado.

 ### Running mean/Moving average def running_mean(l, N): sum = 0 result = list( 0 for x in l) for i in range( 0, N ): sum = sum + l[i] result[i] = sum / (i+1) for i in range( N, len(l) ): sum = sum - l[iN] + l[i] result[i] = sum / N return result 

Si es importante mantener las dimensiones de la entrada (en lugar de restringir la salida al área 'valid' de una convolución), puede usar scipy.ndimage.filters.uniform_filter1d :

 import numpy as np from scipy.ndimage.filters import uniform_filter1d N = 1000 x = np.random.random(100000) y = uniform_filter1d(x, size=N) y.shape == x.shape >>> True 

uniform_filter1d permite múltiples formas de manejar el borde donde 'reflect' es el valor predeterminado, pero en mi caso, prefería 'nearest' .

También es bastante rápido (casi 50 veces más rápido que np.convolve ):

 %timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same') 100 loops, best of 3: 9.28 ms per loop %timeit y2 = uniform_filter1d(x, size=N) 10000 loops, best of 3: 191 µs per loop 

Todavía no he comprobado qué tan rápido es esto, pero podrías intentarlo:

 from collections import deque cache = deque() # keep track of seen values n = 10 # window size A = xrange(100) # some dummy iterable cum_sum = 0 # initialize cumulative sum for t, val in enumerate(A, 1): cache.append(val) cum_sum += val if t < n: avg = cum_sum / float(t) else: # if window is saturated, cum_sum -= cache.popleft() # subtract oldest value avg = cum_sum / float(n) 

Un poco tarde para la fiesta, pero he hecho mi propia pequeña función que NO se envuelve alrededor de los extremos o almohadillas con ceros que luego se usan para encontrar el promedio también. Como tratamiento adicional, es que también vuelve a muestrear la señal en puntos espaciados linealmente. Personaliza el código a voluntad para obtener otras características.

El método es una simple multiplicación de matrices con un núcleo gaussiano normalizado.

 def running_mean(y_in, x_in, N_out=101, sigma=1): ''' Returns running mean as a Bell-curve weighted average at evenly spaced points. Does NOT wrap signal around, or pad with zeros. Arguments: y_in -- y values, the values to be smoothed and re-sampled x_in -- x values for array Keyword arguments: N_out -- NoOf elements in resampled array. sigma -- 'Width' of Bell-curve in units of param x . ''' N_in = size(y_in) # Gaussian kernel x_out = np.linspace(np.min(x_in), np.max(x_in), N_out) x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out) gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2)) # Normalize kernel, such that the sum is one along axis 1 normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in)) gauss_kernel_normalized = gauss_kernel / normalization # Perform running average as a linear operation y_out = gauss_kernel_normalized @ y_in return y_out, x_out 

Un uso simple en una señal sinusoidal con ruido distribuido normal agregado: introduzca la descripción de la imagen aquí

En lugar de adormecer o escéptico, recomendaría pandas para hacer esto más rápidamente:

 df['data'].rolling(3).mean() 

Esto toma la media móvil (MA) de 3 períodos de la columna “datos”. También puede calcular las versiones modificadas, por ejemplo, la que excluye la celda actual (desplazada una atrás) se puede calcular fácilmente de la siguiente manera:

 df['data'].shift(periods=1).rolling(3).mean() 

Otro enfoque para encontrar la media móvil sin usar numpy, panda

 import itertools sample = [2, 6, 10, 8, 11, 10] list(itertools.starmap(lambda a,b: b/a, enumerate(itertools.accumulate(sample), 1))) 

imprimirá [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]

Esta pregunta es ahora más antigua que cuando NeXuS escribió sobre esto el mes pasado, PERO me gusta cómo su código trata los casos de borde. Sin embargo, debido a que se trata de un “promedio móvil simple”, sus resultados están por detrás de los datos a los que se aplican. Pensé que tratar los casos de borde de una manera más satisfactoria que los modos valid , same y full de NumPy podría lograrse aplicando un enfoque similar a un método basado en convolution() .

Mi contribución utiliza un promedio de ejecución central para alinear sus resultados con sus datos. Cuando hay muy pocos puntos disponibles para usar la ventana de tamaño completo, los promedios de ejecución se calculan a partir de ventanas sucesivamente más pequeñas en los bordes de la matriz. [En realidad, desde ventanas sucesivamente más grandes, pero eso es un detalle de implementación.]

 import numpy as np def running_mean(l, N): # Also works for the(strictly invalid) cases when N is even. if (N//2)*2 == N: N = N - 1 front = np.zeros(N//2) back = np.zeros(N//2) for i in range(1, (N//2)*2, 2): front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid') for i in range(1, (N//2)*2, 2): back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid') return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]]) 

Es relativamente lento porque utiliza convolve() , y es probable que un verdadero Pythonista lo arregle bastante. Sin embargo, creo que la idea se mantiene.

Hay muchas respuestas arriba sobre el cálculo de una media stream. Mi respuesta agrega dos características adicionales:

  1. ignora los valores de nan
  2. calcula la media para los N valores vecinos, NO incluye el valor del interés en sí mismo

Esta segunda característica es particularmente útil para determinar qué valores difieren de la tendencia general en una cierta cantidad.

Uso numpy.cumsum ya que es el método más eficiente en el tiempo ( vea la respuesta de Alleo arriba ).

 N=10 # number of points to test on each side of point of interest, best if even padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0) n_nan = np.cumsum(np.isnan(padded_x)) cumsum = np.nancumsum(padded_x) window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x) window_n_values = (N - window_n_nan) movavg = (window_sum) / (window_n_values) 

Este código funciona incluso para Ns solamente. Puede ajustarse para los números impares cambiando el np.insert de padded_x y n_nan.

Ejemplo de salida (crudo en negro, movavg en azul): datos sin procesar (negro) y promedio móvil (azul) de 10 puntos alrededor de cada valor, sin incluir ese valor. Los valores de nan son ignorados.

Este código se puede adaptar fácilmente para eliminar todos los valores de promedio móvil calculados a partir de menos de valores de corte = 3 valores no nan.

 window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan cutoff = 3 window_n_values[window_n_values 

datos sin procesar (negro) y promedio móvil (azul) mientras se ignora cualquier ventana con menos de 3 valores no nanométricos

Esta respuesta contiene soluciones que utilizan la biblioteca estándar de Python para tres escenarios diferentes.

Promedio de carrera con itertools.accumulate

Esta es una solución Python 3.2+ con uso eficiente de la memoria que calcula el promedio de ejecución en una iterable de valores aprovechando itertools.accumulate .

 >>> from itertools import accumulate >>> values = range(100) 

Tenga en cuenta que los values pueden ser iterables, incluidos los generadores o cualquier otro objeto que produzca valores sobre la marcha.

Primero, construya perezosamente la sum acumulada de los valores.

 >>> cumu_sum = accumulate(value_stream) 

Luego, enumerate la sum acumulada (comenzando en 1) y construya un generador que produzca la fracción de los valores acumulados y el índice de enumeración actual.

 >>> rolling_avg = (accu/i for i, accu in enumerate(cumu_sum, 1)) 

Puede emitir means = list(rolling_avg) si necesita todos los valores en la memoria a la vez o llame al next incremental.
(Por supuesto, también puede iterar sobre rolling_avg con un bucle for , que llamará a next implícitamente).

 >>> next(rolling_avg) # 0/1 >>> 0.0 >>> next(rolling_avg) # (0 + 1)/2 >>> 0.5 >>> next(rolling_avg) # (0 + 1 + 2)/3 >>> 1.0 

Esta solución se puede escribir como una función de la siguiente manera.

 from itertools import accumulate def rolling_avg(iterable): cumu_sum = accumulate(iterable) yield from (accu/i for i, accu in enumerate(cumu_sum, 1)) 

Una coroutine a la que puedes enviar valores en cualquier momento.

Esta línea consume los valores que usted le envía y mantiene un promedio de los valores vistos hasta ahora.

Es útil cuando no tiene una cantidad de valores, pero adquiera los valores que se promediarán uno por uno en diferentes momentos a lo largo de la vida de su progtwig.

 def rolling_avg_coro(): i = 0 total = 0.0 avg = None while True: next_value = yield avg i += 1 total += next_value avg = total/i 

La coroutina funciona así:

 >>> averager = rolling_avg_coro() # instantiate coroutine >>> next(averager) # get coroutine going (this is called priming) >>> >>> averager.send(5) # 5/1 >>> 5.0 >>> averager.send(3) # (5 + 3)/2 >>> 4.0 >>> print('doing something else...') doing something else... >>> averager.send(13) # (5 + 3 + 13)/3 >>> 7.0 

Cálculo del promedio sobre una ventana deslizante de tamaño N

Esta función de generador toma un iterable y un tamaño de ventana N y produce el promedio sobre los valores actuales dentro de la ventana. Utiliza un deque , que es una estructura de datos similar a una lista, pero optimizada para modificaciones rápidas ( pop , append ) en ambos puntos finales .

 from collections import deque from itertools import islice def sliding_avg(iterable, N): it = iter(iterable) window = deque(islice(it, N)) num_vals = len(window) if num_vals < N: msg = 'window size {} exceeds total number of values {}' raise ValueError(msg.format(N, num_vals)) N = float(N) # force floating point division if using Python 2 s = sum(window) while True: yield s/N try: nxt = next(it) except StopIteration: break s = s - window.popleft() + nxt window.append(nxt) 

Aquí está la función en acción:

 >>> values = range(100) >>> N = 5 >>> window_avg = sliding_avg(values, N) >>> >>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5 >>> 2.0 >>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5 >>> 3.0 >>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5 >>> 4.0 

Aunque hay soluciones para esta pregunta aquí, por favor, eche un vistazo a mi solución. Es muy sencillo y funciona bien.

 import numpy as np dataset = np.asarray([1, 2, 3, 4, 5, 6, 7]) ma = list() window = 3 for t in range(0, len(dataset)): if t+window <= len(dataset): indices = range(t, t+window) ma.append(np.average(np.take(dataset, indices))) else: ma = np.asarray(ma) 

Utilice solo la biblioteca de Python Stadnard (memoria eficiente)

Solo da otra versión del uso de la biblioteca estándar deque solamente. Me sorprende bastante que la mayoría de las respuestas estén usando pandas o numpy .

 def moving_average(iterable, n=3): d = deque(maxlen=n) for i in iterable: d.append(i) if len(d) == n: yield sum(d)/n r = moving_average([40, 30, 50, 46, 39, 44]) assert list(r) == [40.0, 42.0, 45.0, 43.0] 

Actaully encontré otra implementación en documentos de Python

 def moving_average(iterable, n=3): # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0 # http://en.wikipedia.org/wiki/Moving_average it = iter(iterable) d = deque(itertools.islice(it, n-1)) d.appendleft(0) s = sum(d) for elem in it: s += elem - d.popleft() d.append(elem) yield s / n 

Sin embargo, la implementación me parece un poco más compleja de lo que debería ser. Pero debe estar en los documentos estándar de Python por una razón, ¿podría alguien comentar sobre la implementación de la mía y el documento estándar?

Después de leer las otras respuestas, no creo que esto sea lo que se pidió en la pregunta, pero llegué aquí con la necesidad de mantener un promedio de una lista de valores que crecía en tamaño.

Entonces, si desea mantener una lista de valores que está adquiriendo en algún lugar (un sitio, un dispositivo de medición, etc.) y el promedio de los últimos n valores actualizados, puede usar el siguiente código, que minimiza el esfuerzo de agregar nuevos elementos:

 class Running_Average(object): def __init__(self, buffer_size=10): """ Create a new Running_Average object. This object allows the efficient calculation of the average of the last `buffer_size` numbers added to it. Examples -------- >>> a = Running_Average(2) >>> a.add(1) >>> a.get() 1.0 >>> a.add(1) # there are two 1 in buffer >>> a.get() 1.0 >>> a.add(2) # there's a 1 and a 2 in the buffer >>> a.get() 1.5 >>> a.add(2) >>> a.get() # now there's only two 2 in the buffer 2.0 """ self._buffer_size = int(buffer_size) # make sure it's an int self.reset() def add(self, new): """ Add a new number to the buffer, or replaces the oldest one there. """ new = float(new) # make sure it's a float n = len(self._buffer) if n < self.buffer_size: # still have to had numbers to the buffer. self._buffer.append(new) if self._average != self._average: # ~ if isNaN(). self._average = new # no previous numbers, so it's new. else: self._average *= n # so it's only the sum of numbers. self._average += new # add new number. self._average /= (n+1) # divide by new number of numbers. else: # buffer full, replace oldest value. old = self._buffer[self._index] # the previous oldest number. self._buffer[self._index] = new # replace with new one. self._index += 1 # update the index and make sure it's... self._index %= self.buffer_size # ... smaller than buffer_size. self._average -= old/self.buffer_size # remove old one... self._average += new/self.buffer_size # ...and add new one... # ... weighted by the number of elements. def __call__(self): """ Return the moving average value, for the lazy ones who don't want to write .get . """ return self._average def get(self): """ Return the moving average value. """ return self() def reset(self): """ Reset the moving average. If for some reason you don't want to just create a new one. """ self._buffer = [] # could use np.empty(self.buffer_size)... self._index = 0 # and use this to keep track of how many numbers. self._average = float('nan') # could use np.NaN . def get_buffer_size(self): """ Return current buffer_size. """ return self._buffer_size def set_buffer_size(self, buffer_size): """ >>> a = Running_Average(10) >>> for i in range(15): ... a.add(i) ... >>> a() 9.5 >>> a._buffer # should not access this!! [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0] Decreasing buffer size: >>> a.buffer_size = 6 >>> a._buffer # should not access this!! [9.0, 10.0, 11.0, 12.0, 13.0, 14.0] >>> a.buffer_size = 2 >>> a._buffer [13.0, 14.0] Increasing buffer size: >>> a.buffer_size = 5 Warning: no older data available! >>> a._buffer [13.0, 14.0] Keeping buffer size: >>> a = Running_Average(10) >>> for i in range(15): ... a.add(i) ... >>> a() 9.5 >>> a._buffer # should not access this!! [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0] >>> a.buffer_size = 10 # reorders buffer! >>> a._buffer [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0] """ buffer_size = int(buffer_size) # order the buffer so index is zero again: new_buffer = self._buffer[self._index:] new_buffer.extend(self._buffer[:self._index]) self._index = 0 if self._buffer_size < buffer_size: print('Warning: no older data available!') # should use Warnings! else: diff = self._buffer_size - buffer_size print(diff) new_buffer = new_buffer[diff:] self._buffer_size = buffer_size self._buffer = new_buffer buffer_size = property(get_buffer_size, set_buffer_size) 

Y puedes probarlo con, por ejemplo:

 def graph_test(N=200): import matplotlib.pyplot as plt values = list(range(N)) values_average_calculator = Running_Average(N/2) values_averages = [] for value in values: values_average_calculator.add(value) values_averages.append(values_average_calculator()) fig, ax = plt.subplots(1, 1) ax.plot(values, label='values') ax.plot(values_averages, label='averages') ax.grid() ax.set_xlim(0, N) ax.set_ylim(0, N) fig.show() 

Lo que da:

Values and their average as a function of values #

There is a comment by mab buried in one of the answers above which has this method. bottleneck has move_mean which is a simple moving average:

 import numpy as np import bottleneck as bn a = np.arange(10) + np.random.random(10) mva = bn.move_mean(a, window=2, min_count=1) 

min_count is a handy parameter that will basically take the moving average up to that point in your array. If you don’t set min_count , it will equal window , and everything up to window points will be nan .

I feel this can be elegantly solved using bottleneck

See basic sample below:

 import numpy as np import bottleneck as bn a = np.random.randint(4, 1000, size=100) mm = bn.move_mean(a, window=5, min_count=1) 
  • “mm” is the moving mean for “a”.

  • “window” is the max number of entries to consider for moving mean.

  • “min_count” is min number of entries to consider for moving mean (eg for first few elements or if the array has nan values).

The good part is Bottleneck helps to deal with nan values and it’s also very efficient.

How about a moving average filter ? It is also a one-liner and has the advantage, that you can easily manipulate the window type if you need something else than the rectangle, ie. a N-long simple moving average of an array a:

 lfilter(np.ones(N)/N, [1], a)[N:] 

And with the triangular window applied:

 lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:] 

Another solution just using a standard library and deque:

 from collections import deque import itertools def moving_average(iterable, n=3): # http://en.wikipedia.org/wiki/Moving_average it = iter(iterable) # create an iterable object from input argument d = deque(itertools.islice(it, n-1)) # create deque object by slicing iterable d.appendleft(0) s = sum(d) for elem in it: s += elem - d.popleft() d.append(elem) yield s / n # example on how to use it for i in moving_average([40, 30, 50, 46, 39, 44]): print(i) # 40.0 # 42.0 # 45.0 # 43.0 

For educational purposes, let me add two more Numpy solutions (which are slower than the cumsum solution):

 import numpy as np from numpy.lib.stride_tricks import as_strided def ra_strides(arr, window): ''' Running average using as_strided''' n = arr.shape[0] - window + 1 arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides) return arr_strided.mean(axis=1) def ra_add(arr, window): ''' Running average using add.reduceat''' n = arr.shape[0] - window + 1 indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2) arr = np.append(arr, 0) return np.add.reduceat(arr, indices )[::2]/window 

Functions used: as_strided , add.reduceat

If you do choose to roll your own, rather than use an existing library, please be conscious of floating point error and try to minimize its effects:

 class SumAccumulator: def __init__(self): self.values = [0] self.count = 0 def add( self, val ): self.values.append( val ) self.count = self.count + 1 i = self.count while i & 0x01: i = i >> 1 v0 = self.values.pop() v1 = self.values.pop() self.values.append( v0 + v1 ) def get_total(self): return sum( reversed(self.values) ) def get_size( self ): return self.count 

If all your values are roughly the same order of magnitude, then this will help to preserve precision by always adding values of roughly similar magnitudes.