许多数据帧上的高效 Python Pandas Stock Beta 计算
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39501277/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Efficient Python Pandas Stock Beta Calculation on Many Dataframes
提问by cwse
I have many (4000+) CSVs of stock data (Date, Open, High, Low, Close) which I import into individual Pandas dataframes to perform analysis. I am new to python and want to calculate a rolling 12month beta for each stock, I found a post to calculate rolling beta (Python pandas calculate rolling stock beta using rolling apply to groupby object in vectorized fashion) however when used in my code below takes over 2.5 hours! Considering I can run the exact same calculations in SQL tables in under 3 minutes this is too slow.
我有很多(4000+)个 CSV 的股票数据(日期、开盘价、最高价、最低价、收盘价),我将它们导入到单独的 Pandas 数据框中以执行分析。我是 Python 新手,想为每只股票计算滚动 12 个月的 beta,我找到了一篇计算滚动 beta 的帖子(Python Pandas使用滚动应用到 groupby 对象以矢量化方式计算滚动股票 beta)但是在我下面的代码中使用时需要超过2.5小时!考虑到我可以在 3 分钟内在 SQL 表中运行完全相同的计算,这太慢了。
How can I improve the performance of my below code to match that of SQL? I understand Pandas/python has that capability. My current method loops over each row which I know slows performance but I am unaware of any aggregate way to perform a rolling window beta calculation on a dataframe.
如何提高以下代码的性能以匹配 SQL 的性能?我知道 Pandas/python 有这种能力。我当前的方法循环遍历每一行,我知道这会降低性能,但我不知道在数据帧上执行滚动窗口 beta 计算的任何聚合方式。
Note: the first 2 steps of loading the CSVs into individual dataframes and calculating daily returns only takes ~20seconds. All my CSV dataframes are stored in the dictionary called 'FilesLoaded' with names such as 'XAO'.
注意:将 CSV 加载到单个数据帧和计算每日收益的前 2 个步骤只需要大约 20 秒。我所有的 CSV 数据帧都存储在名为“FilesLoaded”的字典中,名称为“XAO”。
Your help would be much appreciated! Thank you :)
您的帮助将不胜感激!谢谢 :)
import pandas as pd, numpy as np
import datetime
import ntpath
pd.set_option('precision',10) #Set the Decimal Point precision to DISPLAY
start_time=datetime.datetime.now()
MarketIndex = 'XAO'
period = 250
MinBetaPeriod = period
# ***********************************************************************************************
# CALC RETURNS
# ***********************************************************************************************
for File in FilesLoaded:
FilesLoaded[File]['Return'] = FilesLoaded[File]['Close'].pct_change()
# ***********************************************************************************************
# CALC BETA
# ***********************************************************************************************
def calc_beta(df):
np_array = df.values
m = np_array[:,0] # market returns are column zero from numpy array
s = np_array[:,1] # stock returns are column one from numpy array
covariance = np.cov(s,m) # Calculate covariance between stock and market
beta = covariance[0,1]/covariance[1,1]
return beta
#Build Custom "Rolling_Apply" function
def rolling_apply(df, period, func, min_periods=None):
if min_periods is None:
min_periods = period
result = pd.Series(np.nan, index=df.index)
for i in range(1, len(df)+1):
sub_df = df.iloc[max(i-period, 0):i,:]
if len(sub_df) >= min_periods:
idx = sub_df.index[-1]
result[idx] = func(sub_df)
return result
#Create empty BETA dataframe with same index as RETURNS dataframe
df_join = pd.DataFrame(index=FilesLoaded[MarketIndex].index)
df_join['market'] = FilesLoaded[MarketIndex]['Return']
df_join['stock'] = np.nan
for File in FilesLoaded:
df_join['stock'].update(FilesLoaded[File]['Return'])
df_join = df_join.replace(np.inf, np.nan) #get rid of infinite values "inf" (SQL won't take "Inf")
df_join = df_join.replace(-np.inf, np.nan)#get rid of infinite values "inf" (SQL won't take "Inf")
df_join = df_join.fillna(0) #get rid of the NaNs in the return data
FilesLoaded[File]['Beta'] = rolling_apply(df_join[['market','stock']], period, calc_beta, min_periods = MinBetaPeriod)
# ***********************************************************************************************
# CLEAN-UP
# ***********************************************************************************************
print('Run-time: {0}'.format(datetime.datetime.now() - start_time))
采纳答案by piRSquared
Generate Random Stock Data
20 Years of Monthly Data for 4,000 Stocks
生成随机股票数据
4,000 只股票的 20 年月度数据
dates = pd.date_range('1995-12-31', periods=480, freq='M', name='Date')
stoks = pd.Index(['s{:04d}'.format(i) for i in range(4000)])
df = pd.DataFrame(np.random.rand(480, 4000), dates, stoks)
df.iloc[:5, :5]
Roll Function
Returns groupby object ready to apply custom functions
See Source
滚动函数
返回准备应用自定义函数的 groupby 对象
请参阅源代码
def roll(df, w):
# stack df.values w-times shifted once at each stack
roll_array = np.dstack([df.values[i:i+w, :] for i in range(len(df.index) - w + 1)]).T
# roll_array is now a 3-D array and can be read into
# a pandas panel object
panel = pd.Panel(roll_array,
items=df.index[w-1:],
major_axis=df.columns,
minor_axis=pd.Index(range(w), name='roll'))
# convert to dataframe and pivot + groupby
# is now ready for any action normally performed
# on a groupby object
return panel.to_frame().unstack().T.groupby(level=0)
Beta Function
Use closed form solution of OLS regression
Assume column 0 is market
See Source
Beta 函数
使用 OLS 回归的封闭形式解
假设第 0 列是市场
查看来源
def beta(df):
# first column is the market
X = df.values[:, [0]]
# prepend a column of ones for the intercept
X = np.concatenate([np.ones_like(X), X], axis=1)
# matrix algebra
b = np.linalg.pinv(X.T.dot(X)).dot(X.T).dot(df.values[:, 1:])
return pd.Series(b[1], df.columns[1:], name='Beta')
Demonstration
示范
rdf = roll(df, 12)
betas = rdf.apply(beta)
Timing
定时
Validation
Compare calculations with OP
验证
比较计算与 OP
def calc_beta(df):
np_array = df.values
m = np_array[:,0] # market returns are column zero from numpy array
s = np_array[:,1] # stock returns are column one from numpy array
covariance = np.cov(s,m) # Calculate covariance between stock and market
beta = covariance[0,1]/covariance[1,1]
return beta
print(calc_beta(df.iloc[:12, :2]))
-0.311757542437
print(beta(df.iloc[:12, :2]))
s0001 -0.311758
Name: Beta, dtype: float64
Note the first cell
Is the same value as validated calculations above
注意第一个单元格
与上面验证的计算值相同
betas = rdf.apply(beta)
betas.iloc[:5, :5]
Response to comment
Full working example with simulated multiple dataframes
回复评论
完整的工作示例,模拟多个数据帧
num_sec_dfs = 4000
cols = ['Open', 'High', 'Low', 'Close']
dfs = {'s{:04d}'.format(i): pd.DataFrame(np.random.rand(480, 4), dates, cols) for i in range(num_sec_dfs)}
market = pd.Series(np.random.rand(480), dates, name='Market')
df = pd.concat([market] + [dfs[k].Close.rename(k) for k in dfs.keys()], axis=1).sort_index(1)
betas = roll(df.pct_change().dropna(), 12).apply(beta)
for c, col in betas.iteritems():
dfs[c]['Beta'] = col
dfs['s0001'].head(20)
回答by piRSquared
Using a generator to improve memory efficiency
使用生成器提高内存效率
Simulated data
模拟数据
m, n = 480, 10000
dates = pd.date_range('1995-12-31', periods=m, freq='M', name='Date')
stocks = pd.Index(['s{:04d}'.format(i) for i in range(n)])
df = pd.DataFrame(np.random.rand(m, n), dates, stocks)
market = pd.Series(np.random.rand(m), dates, name='Market')
df = pd.concat([df, market], axis=1)
Beta Calculation
贝塔计算
def beta(df, market=None):
# If the market values are not passed,
# I'll assume they are located in a column
# named 'Market'. If not, this will fail.
if market is None:
market = df['Market']
df = df.drop('Market', axis=1)
X = market.values.reshape(-1, 1)
X = np.concatenate([np.ones_like(X), X], axis=1)
b = np.linalg.pinv(X.T.dot(X)).dot(X.T).dot(df.values)
return pd.Series(b[1], df.columns, name=df.index[-1])
roll function
This returns a generator and will be far more memory efficient
roll 函数
这将返回一个生成器,并且内存效率更高
def roll(df, w):
for i in range(df.shape[0] - w + 1):
yield pd.DataFrame(df.values[i:i+w, :], df.index[i:i+w], df.columns)
Putting it all together
把这一切放在一起
betas = pd.concat([beta(sdf) for sdf in roll(df.pct_change().dropna(), 12)], axis=1).T
Validation
验证
OP beta calc
OP beta 计算
def calc_beta(df):
np_array = df.values
m = np_array[:,0] # market returns are column zero from numpy array
s = np_array[:,1] # stock returns are column one from numpy array
covariance = np.cov(s,m) # Calculate covariance between stock and market
beta = covariance[0,1]/covariance[1,1]
return beta
Experiment setup
实验设置
m, n = 12, 2
dates = pd.date_range('1995-12-31', periods=m, freq='M', name='Date')
cols = ['Open', 'High', 'Low', 'Close']
dfs = {'s{:04d}'.format(i): pd.DataFrame(np.random.rand(m, 4), dates, cols) for i in range(n)}
market = pd.Series(np.random.rand(m), dates, name='Market')
df = pd.concat([market] + [dfs[k].Close.rename(k) for k in dfs.keys()], axis=1).sort_index(1)
betas = pd.concat([beta(sdf) for sdf in roll(df.pct_change().dropna(), 12)], axis=1).T
for c, col in betas.iteritems():
dfs[c]['Beta'] = col
dfs['s0000'].head(20)
calc_beta(df[['Market', 's0000']])
0.0020118230147777435
NOTE:
The calculations are the same
注:
计算方法相同
回答by mcguip
While efficient subdivision of the input data set into rolling windows is important to the optimization of the overall calculations, the performance of the beta calculation itself can also be significantly improved.
虽然将输入数据集有效细分为滚动窗口对于优化整体计算很重要,但 beta 计算本身的性能也可以显着提高。
The following optimizes only the subdivision of the data set into rolling windows:
以下仅优化将数据集细分为滚动窗口:
def numpy_betas(x_name, window, returns_data, intercept=True):
if intercept:
ones = numpy.ones(window)
def lstsq_beta(window_data):
x_data = numpy.vstack([window_data[x_name], ones]).T if intercept else window_data[[x_name]]
beta_arr, residuals, rank, s = numpy.linalg.lstsq(x_data, window_data)
return beta_arr[0]
indices = [int(x) for x in numpy.arange(0, returns_data.shape[0] - window + 1, 1)]
return DataFrame(
data=[lstsq_beta(returns_data.iloc[i:(i + window)]) for i in indices]
, columns=list(returns_data.columns)
, index=returns_data.index[window - 1::1]
)
The following also optimizes the beta calculation itself:
下面还优化了 beta 计算本身:
def custom_betas(x_name, window, returns_data):
window_inv = 1.0 / window
x_sum = returns_data[x_name].rolling(window, min_periods=window).sum()
y_sum = returns_data.rolling(window, min_periods=window).sum()
xy_sum = returns_data.mul(returns_data[x_name], axis=0).rolling(window, min_periods=window).sum()
xx_sum = numpy.square(returns_data[x_name]).rolling(window, min_periods=window).sum()
xy_cov = xy_sum - window_inv * y_sum.mul(x_sum, axis=0)
x_var = xx_sum - window_inv * numpy.square(x_sum)
betas = xy_cov.divide(x_var, axis=0)[window - 1:]
betas.columns.name = None
return betas
Comparing the performance of the two different calculations, you can see that as the window used in the beta calculation increases, the second method dramatically outperforms the first:
比较两种不同计算的性能,可以看到随着 beta 计算中使用的窗口的增加,第二种方法显着优于第一种:
Comparing the performance to that of @piRSquared's implementation, the custom method takes roughly 350 millis to evaluate compared to over 2 seconds.
将性能与@piRSquared 的实现进行比较,自定义方法大约需要 350 毫秒来评估,而评估时间超过 2 秒。
回答by hkiran
Further optimizing on @piRSquared's implementation for both speed and memory. the code is also simplified for clarity.
进一步优化@piRSquared 的速度和内存实现。为了清楚起见,还简化了代码。
from numpy import nan, ndarray, ones_like, vstack, random
from numpy.lib.stride_tricks import as_strided
from numpy.linalg import pinv
from pandas import DataFrame, date_range
def calc_beta(s: ndarray, m: ndarray):
x = vstack((ones_like(m), m))
b = pinv(x.dot(x.T)).dot(x).dot(s)
return b[1]
def rolling_calc_beta(s_df: DataFrame, m_df: DataFrame, period: int):
result = ndarray(shape=s_df.shape, dtype=float)
l, w = s_df.shape
ls, ws = s_df.values.strides
result[0:period - 1, :] = nan
s_arr = as_strided(s_df.values, shape=(l - period + 1, period, w), strides=(ls, ls, ws))
m_arr = as_strided(m_df.values, shape=(l - period + 1, period), strides=(ls, ls))
for row in range(period, l):
result[row, :] = calc_beta(s_arr[row - period, :], m_arr[row - period])
return DataFrame(data=result, index=s_df.index, columns=s_df.columns)
if __name__ == '__main__':
num_sec_dfs, num_periods = 4000, 480
dates = date_range('1995-12-31', periods=num_periods, freq='M', name='Date')
stocks = DataFrame(data=random.rand(num_periods, num_sec_dfs), index=dates,
columns=['s{:04d}'.format(i) for i in
range(num_sec_dfs)]).pct_change()
market = DataFrame(data=random.rand(num_periods), index=dates, columns=
['Market']).pct_change()
betas = rolling_calc_beta(stocks, market, 12)
%timeit betas = rolling_calc_beta(stocks, market, 12)
%timeit betas = rolling_calc_beta(股票,市场,12)
335 ms ± 2.69 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
每个循环 335 ms ± 2.69 ms(7 次运行的平均值 ± 标准偏差,每次 1 次循环)
回答by user3396549
but these would be blockish when you require beta calculations across the dates(m) for multiple stocks(n) resulting (m x n) number of calculations.
但是当您需要跨日期(m)的多个股票(n)的 beta 计算结果(mxn)计算数量时,这些将是块状的。
Some relief could be taken by running each date or stock on multiple cores, but then you will end up having huge hardware.
可以通过在多个内核上运行每个日期或库存来缓解一些压力,但最终您将拥有庞大的硬件。
The major time requirement for the solutions available is finding the variance and co-variance and also NaNshould be avoided in (Index and stock) data for a correct calculation as per pandas==0.23.0.
可用解决方案的主要时间要求是找到方差和协方差,并且在(指数和库存)数据中应避免NaN,以便按照 pandas==0.23.0 进行正确计算。
Thus running again would result stupid move unless the calculations are cached.
因此,除非计算被缓存,否则再次运行将导致愚蠢的移动。
numpy variance and co-variance version also happens to miss-calculate the beta if NaNare not dropped.
如果不删除NaN,numpy 方差和协方差版本也会碰巧计算错误。
A Cython implementation is must for huge set of data.
Cython 实现对于大量数据是必须的。