Cálculo de Stock Beta de Python Pandas eficiente en muchos marcos de datos

Tengo muchos (4000+) CSV de datos de stock (Fecha, Abierto, Alto, Bajo, Cerrar) que importo en marcos de datos individuales de Pandas para realizar el análisis. Soy nuevo en Python y quiero calcular una beta rodante de 12 meses para cada stock. Encontré una publicación para calcular la beta rodante (los pandas de Python calculan la beta de material rodante utilizando la aplicación de rodadura al objeto groupby en vectores ). Sin embargo, cuando se utiliza en mi código a continuación. más de 2,5 horas! Teniendo en cuenta que puedo ejecutar los mismos cálculos exactos en tablas SQL en menos de 3 minutos, esto es demasiado lento.

¿Cómo puedo mejorar el rendimiento de mi código de abajo para que coincida con el de SQL? Entiendo que Pandas / python tiene esa capacidad. Mi método actual recorre cada fila, lo que sé que ralentiza el rendimiento, pero no conozco ninguna forma agregada de realizar un cálculo beta de ventana móvil en un dataframe.

Nota: los primeros 2 pasos para cargar los CSV en marcos de datos individuales y calcular los rendimientos diarios solo toman unos 20 segundos. Todos mis marcos de datos CSV se almacenan en el diccionario llamado ‘FilesLoaded’ con nombres como ‘XAO’.

¡Tu ayuda sería muy apreciada! Gracias 🙂

import pandas as pd, numpy as np import datetime import ntpath pd.set_option('precision',10) #Set the Decimal Point precision to DISPLAY start_time=datetime.datetime.now() MarketIndex = 'XAO' period = 250 MinBetaPeriod = period # *********************************************************************************************** # CALC RETURNS # *********************************************************************************************** for File in FilesLoaded: FilesLoaded[File]['Return'] = FilesLoaded[File]['Close'].pct_change() # *********************************************************************************************** # CALC BETA # *********************************************************************************************** def calc_beta(df): np_array = df.values m = np_array[:,0] # market returns are column zero from numpy array s = np_array[:,1] # stock returns are column one from numpy array covariance = np.cov(s,m) # Calculate covariance between stock and market beta = covariance[0,1]/covariance[1,1] return beta #Build Custom "Rolling_Apply" function def rolling_apply(df, period, func, min_periods=None): if min_periods is None: min_periods = period result = pd.Series(np.nan, index=df.index) for i in range(1, len(df)+1): sub_df = df.iloc[max(i-period, 0):i,:] if len(sub_df) >= min_periods: idx = sub_df.index[-1] result[idx] = func(sub_df) return result #Create empty BETA dataframe with same index as RETURNS dataframe df_join = pd.DataFrame(index=FilesLoaded[MarketIndex].index) df_join['market'] = FilesLoaded[MarketIndex]['Return'] df_join['stock'] = np.nan for File in FilesLoaded: df_join['stock'].update(FilesLoaded[File]['Return']) df_join = df_join.replace(np.inf, np.nan) #get rid of infinite values "inf" (SQL won't take "Inf") df_join = df_join.replace(-np.inf, np.nan)#get rid of infinite values "inf" (SQL won't take "Inf") df_join = df_join.fillna(0) #get rid of the NaNs in the return data FilesLoaded[File]['Beta'] = rolling_apply(df_join[['market','stock']], period, calc_beta, min_periods = MinBetaPeriod) # *********************************************************************************************** # CLEAN-UP # *********************************************************************************************** print('Run-time: {0}'.format(datetime.datetime.now() - start_time)) 

Generar datos de stock aleatorios
20 años de datos mensuales para 4,000 acciones

 dates = pd.date_range('1995-12-31', periods=480, freq='M', name='Date') stoks = pd.Index(['s{:04d}'.format(i) for i in range(4000)]) df = pd.DataFrame(np.random.rand(480, 4000), dates, stoks) 

 df.iloc[:5, :5] 

introduzca la descripción de la imagen aquí


Función de rollo
Devuelve el objeto groupby listo para aplicar funciones personalizadas.
Ver Fuente

 def roll(df, w): # stack df.values w-times shifted once at each stack roll_array = np.dstack([df.values[i:i+w, :] for i in range(len(df.index) - w + 1)]).T # roll_array is now a 3-D array and can be read into # a pandas panel object panel = pd.Panel(roll_array, items=df.index[w-1:], major_axis=df.columns, minor_axis=pd.Index(range(w), name='roll')) # convert to dataframe and pivot + groupby # is now ready for any action normally performed # on a groupby object return panel.to_frame().unstack().T.groupby(level=0) 

Función beta
Utilice la solución de forma cerrada de regresión OLS.
Supongamos que la columna 0 es mercado
Ver Fuente

 def beta(df): # first column is the market X = df.values[:, [0]] # prepend a column of ones for the intercept X = np.concatenate([np.ones_like(X), X], axis=1) # matrix algebra b = np.linalg.pinv(XTdot(X)).dot(XT).dot(df.values[:, 1:]) return pd.Series(b[1], df.columns[1:], name='Beta') 

Demostración

 rdf = roll(df, 12) betas = rdf.apply(beta) 

Sincronización

introduzca la descripción de la imagen aquí


Validación
Comparar cálculos con OP

 def calc_beta(df): np_array = df.values m = np_array[:,0] # market returns are column zero from numpy array s = np_array[:,1] # stock returns are column one from numpy array covariance = np.cov(s,m) # Calculate covariance between stock and market beta = covariance[0,1]/covariance[1,1] return beta 

 print(calc_beta(df.iloc[:12, :2])) -0.311757542437 

 print(beta(df.iloc[:12, :2])) s0001 -0.311758 Name: Beta, dtype: float64 

Nota la primera celda
Es el mismo valor que los cálculos validados anteriores.

 betas = rdf.apply(beta) betas.iloc[:5, :5] 

introduzca la descripción de la imagen aquí


Respuesta a comentar
Ejemplo completo de trabajo con múltiples marcos de datos simulados

 num_sec_dfs = 4000 cols = ['Open', 'High', 'Low', 'Close'] dfs = {'s{:04d}'.format(i): pd.DataFrame(np.random.rand(480, 4), dates, cols) for i in range(num_sec_dfs)} market = pd.Series(np.random.rand(480), dates, name='Market') df = pd.concat([market] + [dfs[k].Close.rename(k) for k in dfs.keys()], axis=1).sort_index(1) betas = roll(df.pct_change().dropna(), 12).apply(beta) for c, col in betas.iteritems(): dfs[c]['Beta'] = col dfs['s0001'].head(20) 

introduzca la descripción de la imagen aquí

Usando un generador para mejorar la eficiencia de la memoria

Datos simulados

 m, n = 480, 10000 dates = pd.date_range('1995-12-31', periods=m, freq='M', name='Date') stocks = pd.Index(['s{:04d}'.format(i) for i in range(n)]) df = pd.DataFrame(np.random.rand(m, n), dates, stocks) market = pd.Series(np.random.rand(m), dates, name='Market') df = pd.concat([df, market], axis=1) 

Cálculo Beta

 def beta(df, market=None): # If the market values are not passed, # I'll assume they are located in a column # named 'Market'. If not, this will fail. if market is None: market = df['Market'] df = df.drop('Market', axis=1) X = market.values.reshape(-1, 1) X = np.concatenate([np.ones_like(X), X], axis=1) b = np.linalg.pinv(XTdot(X)).dot(XT).dot(df.values) return pd.Series(b[1], df.columns, name=df.index[-1]) 

función de rollo
Esto devuelve un generador y será mucho más eficiente en memoria

 def roll(df, w): for i in range(df.shape[0] - w + 1): yield pd.DataFrame(df.values[i:i+w, :], df.index[i:i+w], df.columns) 

Poniendolo todo junto

 betas = pd.concat([beta(sdf) for sdf in roll(df.pct_change().dropna(), 12)], axis=1).T 

Validación

OP beta calc

 def calc_beta(df): np_array = df.values m = np_array[:,0] # market returns are column zero from numpy array s = np_array[:,1] # stock returns are column one from numpy array covariance = np.cov(s,m) # Calculate covariance between stock and market beta = covariance[0,1]/covariance[1,1] return beta 

Configuración del experimento

 m, n = 12, 2 dates = pd.date_range('1995-12-31', periods=m, freq='M', name='Date') cols = ['Open', 'High', 'Low', 'Close'] dfs = {'s{:04d}'.format(i): pd.DataFrame(np.random.rand(m, 4), dates, cols) for i in range(n)} market = pd.Series(np.random.rand(m), dates, name='Market') df = pd.concat([market] + [dfs[k].Close.rename(k) for k in dfs.keys()], axis=1).sort_index(1) betas = pd.concat([beta(sdf) for sdf in roll(df.pct_change().dropna(), 12)], axis=1).T for c, col in betas.iteritems(): dfs[c]['Beta'] = col dfs['s0000'].head(20) 

introduzca la descripción de la imagen aquí

 calc_beta(df[['Market', 's0000']]) 0.0020118230147777435 

NOTA:
Los cálculos son los mismos.

Si bien la subdivisión eficiente del conjunto de datos de entrada en ventanas móviles es importante para la optimización de los cálculos generales, el rendimiento del cálculo beta en sí mismo también se puede mejorar significativamente.

Lo siguiente optimiza solo la subdivisión del conjunto de datos en ventanas móviles:

 def numpy_betas(x_name, window, returns_data, intercept=True): if intercept: ones = numpy.ones(window) def lstsq_beta(window_data): x_data = numpy.vstack([window_data[x_name], ones]).T if intercept else window_data[[x_name]] beta_arr, residuals, rank, s = numpy.linalg.lstsq(x_data, window_data) return beta_arr[0] indices = [int(x) for x in numpy.arange(0, returns_data.shape[0] - window + 1, 1)] return DataFrame( data=[lstsq_beta(returns_data.iloc[i:(i + window)]) for i in indices] , columns=list(returns_data.columns) , index=returns_data.index[window - 1::1] ) 

Lo siguiente también optimiza el cálculo beta en sí mismo:

 def custom_betas(x_name, window, returns_data): window_inv = 1.0 / window x_sum = returns_data[x_name].rolling(window, min_periods=window).sum() y_sum = returns_data.rolling(window, min_periods=window).sum() xy_sum = returns_data.mul(returns_data[x_name], axis=0).rolling(window, min_periods=window).sum() xx_sum = numpy.square(returns_data[x_name]).rolling(window, min_periods=window).sum() xy_cov = xy_sum - window_inv * y_sum.mul(x_sum, axis=0) x_var = xx_sum - window_inv * numpy.square(x_sum) betas = xy_cov.divide(x_var, axis=0)[window - 1:] betas.columns.name = None return betas 

Comparando el rendimiento de los dos cálculos diferentes, puede ver que a medida que aumenta la ventana utilizada en el cálculo beta, el segundo método supera dramáticamente al primero: introduzca la descripción de la imagen aquí

Comparando el rendimiento con el de la implementación de @ piRSquared, el método personalizado toma aproximadamente 350 milis para evaluar en comparación con más de 2 segundos.

Optimizando aún más la implementación de @ piRSquared para velocidad y memoria. El código también se simplifica para mayor claridad.

 from numpy import nan, ndarray, ones_like, vstack, random from numpy.lib.stride_tricks import as_strided from numpy.linalg import pinv from pandas import DataFrame, date_range def calc_beta(s: ndarray, m: ndarray): x = vstack((ones_like(m), m)) b = pinv(x.dot(xT)).dot(x).dot(s) return b[1] def rolling_calc_beta(s_df: DataFrame, m_df: DataFrame, period: int): result = ndarray(shape=s_df.shape, dtype=float) l, w = s_df.shape ls, ws = s_df.values.strides result[0:period - 1, :] = nan s_arr = as_strided(s_df.values, shape=(l - period + 1, period, w), strides=(ls, ls, ws)) m_arr = as_strided(m_df.values, shape=(l - period + 1, period), strides=(ls, ls)) for row in range(period, l): result[row, :] = calc_beta(s_arr[row - period, :], m_arr[row - period]) return DataFrame(data=result, index=s_df.index, columns=s_df.columns) if __name__ == '__main__': num_sec_dfs, num_periods = 4000, 480 dates = date_range('1995-12-31', periods=num_periods, freq='M', name='Date') stocks = DataFrame(data=random.rand(num_periods, num_sec_dfs), index=dates, columns=['s{:04d}'.format(i) for i in range(num_sec_dfs)]).pct_change() market = DataFrame(data=random.rand(num_periods), index=dates, columns= ['Market']).pct_change() betas = rolling_calc_beta(stocks, market, 12) 

% timeit betas = rolling_calc_beta (acciones, mercado, 12)

335 ms ± 2.69 ms por bucle (media ± desviación estándar de 7 ejecuciones, 1 bucle cada una)

pero estos serían bloqueados cuando requiera cálculos beta a través de las fechas (m) para múltiples acciones (n) resultantes (mxn) del número de cálculos.

Se podría tomar algo de alivio ejecutando cada fecha o stock en varios núcleos, pero luego terminará teniendo un gran hardware.

El principal requisito de tiempo para las soluciones disponibles es encontrar la varianza y la covarianza y también se debe evitar el NaN en los datos (índice y stock) para un cálculo correcto según pandas == 0.23.0.

Por lo tanto, ejecutar de nuevo resultaría un movimiento estúpido a menos que los cálculos se almacenen en caché.

La varianza numérica y la versión de covarianza también pueden calcular mal la beta si NaN no se descarta.

Una implementación de Cython es imprescindible para un gran conjunto de datos.