¿Cómo calcular la media móvil en Python 3?

Digamos que tengo una lista:

y = ['1', '2', '3', '4','5','6','7','8','9','10'] 

Quiero crear una función que calcule el promedio móvil de n días. Entonces, si n era 5, me gustaría que mi código calcule los primeros 1-5, súmelos y encuentre el promedio, que sería 3.0, luego pase a 2-6, calcule el promedio, que sería 4.0, luego 3 -7, 4-8, 5-9, 6-10.

No quiero calcular los primeros n-1 días, así que a partir del noveno día, contará los días anteriores.

 def moving_average(x:'list of prices', n): for num in range(len(x)+1): print(x[num-n:num]) 

Esto parece imprimir lo que quiero:

 [] [] [] [] [] ['1', '2', '3', '4', '5'] ['2', '3', '4', '5', '6'] ['3', '4', '5', '6', '7'] ['4', '5', '6', '7', '8'] ['5', '6', '7', '8', '9'] ['6', '7', '8', '9', '10'] 

Sin embargo, no sé cómo calcular los números dentro de esas listas. ¿Algunas ideas?

Hay un gran generador de ventanas deslizantes en una versión antigua de los documentos de Python con ejemplos de itertools :

 from itertools import islice def window(seq, n=2): "Returns a sliding window (of width n) over data from the iterable" " s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... " it = iter(seq) result = tuple(islice(it, n)) if len(result) == n: yield result for elem in it: result = result[1:] + (elem,) yield result 

Usando eso sus promedios móviles es trivial:

 from __future__ import division # For Python 2 def moving_averages(values, size): for selection in window(values, size): yield sum(selection) / size 

Ejecutar esto contra tu entrada (mapeando las cadenas a números enteros) da:

 >>> y= ['1', '2', '3', '4','5','6','7','8','9','10'] >>> for avg in moving_averages(map(int, y), 5): ... print(avg) ... 3.0 4.0 5.0 6.0 7.0 8.0 

Para devolver None las primeras iteraciones n - 1 para conjuntos “incompletos”, simplemente expanda un poco la función moving_averages :

 def moving_averages(values, size): for _ in range(size - 1): yield None for selection in window(values, size): yield sum(selection) / size 

Si bien me gusta la respuesta de Martijn sobre esto, como George, me preguntaba si no sería más rápido usando una sum acumulada en lugar de aplicar la sum() una y otra vez en la mayoría de los mismos números.

También es interesante la idea de tener valores por defecto como predeterminados durante la fase de incremento. De hecho, puede haber muchos escenarios diferentes que uno podría concebir para promedios móviles. Dividamos el cálculo de promedios en tres fases:

  1. Ramp Up: inicio de iteraciones donde la iteración actual cuenta
  2. Progreso constante: tenemos exactamente el número de elementos de tamaño de ventana disponibles para calcular un average := sum(x[iteration_counter-window_size:iteration_counter])/window_size normal average := sum(x[iteration_counter-window_size:iteration_counter])/window_size
  3. Rampa hacia abajo: al final de los datos de entrada, podríamos devolver otro window_size - 1 “promedio” números.

Aquí hay una función que acepta

  • Iterables arbitrarios (los generadores están bien) como entrada para datos
  • Tamaños de ventana arbitrarios> = 1
  • Parámetros para activar / desactivar la producción de valores durante las fases de Ramp Up / Down
  • Funciones de callback para esas fases para controlar cómo se producen los valores. Esto se puede usar para proporcionar constantemente un valor predeterminado (por ejemplo, None ) o para proporcionar promedios parciales

Aquí está el código:

 from collections import deque def moving_averages(data, size, rampUp=True, rampDown=True): """Slide a window of  elements over  to calc an average First and last  iterations when window is not yet completely filled with data, or the window empties due to exhausted , the average is computed with just the available data (but still divided by ). Set rampUp/rampDown to False in order to not provide any values during those start and end  iterations. Set rampUp/rampDown to functions to provide arbitrary partial average numbers during those phases. The callback will get the currently available input data in a deque. Do not modify that data. """ d = deque() running_sum = 0.0 data = iter(data) # rampUp for count in range(1, size): try: val = next(data) except StopIteration: break running_sum += val d.append(val) #print("up: running sum:" + str(running_sum) + " count: " + str(count) + " deque: " + str(d)) if rampUp: if callable(rampUp): yield rampUp(d) else: yield running_sum / size # steady exhausted_early = True for val in data: exhausted_early = False running_sum += val #print("st: running sum:" + str(running_sum) + " deque: " + str(d)) yield running_sum / size d.append(val) running_sum -= d.popleft() # rampDown if rampDown: if exhausted_early: running_sum -= d.popleft() for (count) in range(min(len(d), size-1), 0, -1): #print("dn: running sum:" + str(running_sum) + " deque: " + str(d)) if callable(rampDown): yield rampDown(d) else: yield running_sum / size running_sum -= d.popleft() 

Parece ser un poco más rápido que la versión de Martijn, que es mucho más elegante, sin embargo. Aquí está el código de prueba:

 print("") print("Timeit") print("-" * 80) from itertools import islice def window(seq, n=2): "Returns a sliding window (of width n) over data from the iterable" " s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... " it = iter(seq) result = tuple(islice(it, n)) if len(result) == n: yield result for elem in it: result = result[1:] + (elem,) yield result # Martijn's version: def moving_averages_SO(values, size): for selection in window(values, size): yield sum(selection) / size import timeit problems = [int(i) for i in (10, 100, 1000, 10000, 1e5, 1e6, 1e7)] for problem_size in problems: print("{:12s}".format(str(problem_size)), end="") so = timeit.repeat("list(moving_averages_SO(range("+str(problem_size)+"), 5))", number=1*max(problems)//problem_size, setup="from __main__ import moving_averages_SO") print("{:12.3f} ".format(min(so)), end="") my = timeit.repeat("list(moving_averages(range("+str(problem_size)+"), 5, False, False))", number=1*max(problems)//problem_size, setup="from __main__ import moving_averages") print("{:12.3f} ".format(min(my)), end="") print("") 

Y la salida:

 Timeit -------------------------------------------------------------------------------- 10 7.242 7.656 100 5.816 5.500 1000 5.787 5.244 10000 5.782 5.180 100000 5.746 5.137 1000000 5.745 5.198 10000000 5.764 5.186 

La pregunta original ahora se puede resolver con esta llamada de función:

 print(list(moving_averages(range(1,11), 5, rampUp=lambda _: None, rampDown=False))) 

La salida:

 [None, None, None, None, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0] 

Usa las funciones de sum y map .

 print(sum(map(int, x[num-n:num]))) 

La función de map en Python 3 es básicamente una versión perezosa de esto:

 [int(i) for i in x[num-n:num]] 

Estoy seguro de que puedes adivinar qué hace la función de sum .

Un enfoque que evita recuadrar sums intermedias.

 list=range(0,12) def runs(v): global runningsum runningsum+=v return(runningsum) runningsum=0 runsumlist=[ runs(v) for v in list ] result = [ (runsumlist[k] - runsumlist[k-5])/5 for k in range(0,len(list)+1)] 

resultado de impresión

 [2,3,4,5,6,7,8,9] 

haga que se ejecute (int (v)) .. luego .. repr (runsumlist [k] – runsumlist [k-5]) / 5) si no desea llevar números a cadenas ..


Alt sin el global:

 list = [float[x] for x in range(0,12)] nave = 5 movingave = sum(list[:nave]/nave) for i in range(len(list)-nave):movingave.append(movingave[-1]+(list[i+nave]-list[i])/nave) print movingave 

Asegúrate de hacer matemáticas flotantes incluso si ingresas valores que son enteros

 [2.0,3.0,4.0,5.0,6.0,7.0,8.0,9,0] 

Hay otra solución que extiende una receta de itertools pairwise() . Puede extender esto a nwise() , que le da la ventana deslizante (y funciona si el iterable es un generador):

 def nwise(iterable, n): ts = it.tee(iterable, n) for c, t in enumerate(ts): next(it.islice(t, c, c), None) return zip(*ts) def moving_averages_nw(iterable, n): yield from (sum(x)/n for x in nwise(iterable, n)) >>> list(moving_averages_nw(range(1, 11), 5)) [3.0, 4.0, 5.0, 6.0, 7.0, 8.0] 

Si bien un costo de instalación relativamente alto para corto iterable , este costo se reduce en cuanto mayor es el impacto del conjunto de datos. Esto usa sum() pero el código es razonablemente elegante:

 Timeit MP cfi ***** -------------------------------------------------------------------------------- 10 4.658 4.959 7.351 100 5.144 4.070 4.234 1000 5.312 4.020 3.977 10000 5.317 4.031 3.966 100000 5.508 4.115 4.087 1000000 5.526 4.263 4.202 10000000 5.632 4.326 4.242