使用 python 和 pandas 将 OHLC 股票数据转换为不同的时间范围

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/9944436/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-13 15:41:36  来源:igfitidea点击:

Converting OHLC stock data into a different timeframe with python and pandas

pythonstockpandas

提问by kgr

Could someone please point me in the right direction with respect to OHLC data timeframe conversion with Pandas? What I'm trying to do is build a Dataframe with data for higher timeframes, given data with lower timeframe.

有人可以就Pandas 的OHLC 数据时间帧转换向我指出正确的方向吗?我想要做的是在给定时间范围较短的数据的情况下,使用较高时间范围的数据构建一个数据框。

For example, given I have the following one-minute (M1) data:

例如,假设我有以下一分钟 (M1) 数据:

                       Open    High     Low   Close  Volume
Date                                                       
1999-01-04 10:22:00  1.1801  1.1819  1.1801  1.1817       4
1999-01-04 10:23:00  1.1817  1.1818  1.1804  1.1814      18
1999-01-04 10:24:00  1.1817  1.1817  1.1802  1.1806      12
1999-01-04 10:25:00  1.1807  1.1815  1.1795  1.1808      26
1999-01-04 10:26:00  1.1803  1.1806  1.1790  1.1806       4
1999-01-04 10:27:00  1.1801  1.1801  1.1779  1.1786      23
1999-01-04 10:28:00  1.1795  1.1801  1.1776  1.1788      28
1999-01-04 10:29:00  1.1793  1.1795  1.1782  1.1789      10
1999-01-04 10:31:00  1.1780  1.1792  1.1776  1.1792      12
1999-01-04 10:32:00  1.1788  1.1792  1.1788  1.1791       4

which has Open, High, Low, Close (OHLC) and volume values for every minute I would like to build a set of 5-minute readings (M5) which would look like so:

它具有每分钟的开盘价、最高价、最低价、收盘价 (OHLC) 和成交量值,我想构建一组 5 分钟读数 (M5),如下所示:

                       Open    High     Low   Close  Volume
Date                                                       
1999-01-04 10:25:00  1.1807  1.1815  1.1776  1.1789      91
1999-01-04 10:30:00  1.1780  1.1792  1.1776  1.1791      16

So the workflow is that:

所以工作流程是:

  • Open is the Open of the first row in the timewindow
  • High is the highest High in the timewindow
  • Low is the lowest Low
  • Close is the last Close
  • Volume is simply a sum of Volumes
  • Open 是时间窗口中第一行的 Open
  • 高是时间窗口中的最高高
  • 低是最低的低
  • 关闭是最后一个关闭
  • 体积只是体积的总和

There are few issues though:

不过有几个问题:

  • the data has gaps ( note there is no 10:30:00 row)
  • the 5-minute intervals have to start at round time, e.g. M5 starts at 10:25:00 not 10:22:00
  • first, incomplete set can be omitted like in this example, or included (so we could have 10:20:00 5-minute entry)
  • 数据有间隙(注意没有 10:30:00 行)
  • 5 分钟的间隔必须在圆形时间开始,例如 M5 开始于 10:25:00 而不是 10:22:00
  • 首先,可以像本例中那样省略或包含不完整的集合(所以我们可以有 10:20:00 5 分钟的条目)

The Pandas documentation on up-down samplinggives an example, but they use mean value as the value of up-sampled row, which won't work here. I have tried using groupbyand aggbut to no avail. For one getting highest High and lowest Low might be not so hard, but I have no idea how to get first Open and last Close.

对上下采样熊猫文件给出了一个例子,但他们使用的平均值作为上采样行的值,它不会在这里工作。我曾尝试使用groupbyagg但无济于事。对于获得最高最高价和最低最低价的人来说可能并不难,但我不知道如何获得第一个开盘价和最后一个收盘价。

What I tried is something along the lines of:

我尝试的是以下方面的内容:

grouped = slice.groupby( dr5minute.asof ).agg( 
    { 'Low': lambda x : x.min()[ 'Low' ], 'High': lambda x : x.max()[ 'High' ] } 
)

but it results in following error, which I don't understand:

但它导致以下错误,我不明白:

In [27]: grouped = slice.groupby( dr5minute.asof ).agg( { 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] } )
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <module>()
----> 1 grouped = slice.groupby( dr5minute.asof ).agg( { 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] } )

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in agg(self, func, *args, **kwargs)
    242         See docstring for aggregate
    243         """
--> 244         return self.aggregate(func, *args, **kwargs)
    245 
    246     def _iterate_slices(self):

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, arg, *args, **kwargs)
   1153                     colg = SeriesGroupBy(obj[col], column=col,
   1154                                          grouper=self.grouper)
-> 1155                     result[col] = colg.aggregate(func)
   1156 
   1157             result = DataFrame(result)

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, func_or_funcs, *args, **kwargs)
    906                 return self._python_agg_general(func_or_funcs, *args, **kwargs)
    907             except Exception:
--> 908                 result = self._aggregate_named(func_or_funcs, *args, **kwargs)
    909 
    910             index = Index(sorted(result), name=self.grouper.names[0])

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in _aggregate_named(self, func, *args, **kwargs)
    976             grp = self.get_group(name)
    977             grp.name = name
--> 978             output = func(grp, *args, **kwargs)
    979             if isinstance(output, np.ndarray):
    980                 raise Exception('Must produce aggregated value')

/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <lambda>(x)
----> 1 grouped = slice.groupby( dr5minute.asof ).agg( { 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] } )

IndexError: invalid index to scalar variable.

So any help on doing that would be greatly appreciated. If the path I chose is not going to work, please suggest other relatively efficient approach (I have millions of rows). Some resources on using Pandas for financial processing would also be nice.

因此,我们将不胜感激。如果我选择的路径不起作用,请建议其他相对有效的方法(我有数百万行)。一些关于使用 Pandas 进行财务处理的资源也会很好。

采纳答案by Garrett

Your approach is sound, but fails because each function in the dict-of-functions applied to agg() receives a Series object reflecting the column matched by the key value. Therefore, it's not necessary to filter on column label again. With this, and assuming groupby preserves order, you can slice the Series to extract the first/last element of the Open/Close columns (note: groupby documentation does not claim to preserve order of original data series, but seems to in practice.)

您的方法是合理的,但失败了,因为应用于 agg() 的 dict-of-functions 中的每个函数都会收到一个反映与键值匹配的列的 Series 对象。因此,没有必要再次过滤列标签。有了这个,并假设 groupby 保留顺序,您可以对系列进行切片以提取打开/关闭列的第一个/最后一个元素(注意:groupby 文档并未声称保留原始数据系列的顺序,但似乎在实践中。)

In [50]: df.groupby(dr5minute.asof).agg({'Low': lambda s: s.min(), 
                                         'High': lambda s: s.max(),
                                         'Open': lambda s: s[0],
                                         'Close': lambda s: s[-1],
                                         'Volume': lambda s: s.sum()})
Out[50]: 
                      Close    High     Low    Open  Volume
key_0                                                      
1999-01-04 10:20:00  1.1806  1.1819  1.1801  1.1801      34
1999-01-04 10:25:00  1.1789  1.1815  1.1776  1.1807      91
1999-01-04 10:30:00  1.1791  1.1792  1.1776  1.1780      16

For reference, here is a table to summarize the expected input and output types of an aggregation function based on the groupby object type and how the aggregation function(s) is/are passed to agg().

作为参考,这里有一个表格总结了基于 groupby 对象类型的聚合函数的预期输入和输出类型以及聚合函数如何传递给 agg()。

                  agg() method     agg func    agg func          agg()
                  input type       accepts     returns           result
GroupBy Object
SeriesGroupBy     function         Series      value             Series
                  dict-of-funcs    Series      value             DataFrame, columns match dict keys
                  list-of-funcs    Series      value             DataFrame, columns match func names
DataFrameGroupBy  function         DataFrame   Series/dict/ary   DataFrame, columns match original DataFrame
                  dict-of-funcs    Series      value             DataFrame, columns match dict keys, where dict keys must be columns in original DataFrame
                  list-of-funcs    Series      value             DataFrame, MultiIndex columns (original cols x func names)

From the above table, if aggregation requires access to more than one column, the only option is to pass a single function to a DataFrameGroupBy object. Therefore, an alternate way to accomplish the original task is to define a function like the following:

从上表中可以看出,如果聚合需要访问多个列,唯一的选择是将单个函数传递给 DataFrameGroupBy 对象。因此,完成原始任务的另一种方法是定义如下函数:

def ohlcsum(df):
    df = df.sort()
    return {
       'Open': df['Open'][0],
       'High': df['High'].max(),
       'Low': df['Low'].min(),
       'Close': df['Close'][-1],
       'Volume': df['Volume'].sum()
      }

and apply agg() with it:

并应用 agg() :

In [30]: df.groupby(dr5minute.asof).agg(ohlcsum)
Out[30]: 
                       Open    High     Low   Close  Volume
key_0                                                      
1999-01-04 10:20:00  1.1801  1.1819  1.1801  1.1806      34
1999-01-04 10:25:00  1.1807  1.1815  1.1776  1.1789      91
1999-01-04 10:30:00  1.1780  1.1792  1.1776  1.1791      16

Though pandas may offer some cleaner built-in magic in the future, hopefully this explains how to work with today's agg() capabilities.

虽然 Pandas 将来可能会提供一些更简洁的内置魔法,但希望这能解释如何使用今天的 agg() 功能。

回答by Andrea

With a more recent version of Pandas, there is a resamplemethod very fast and useful to accomplish the same task:

使用更新版本的 Pandas,有resample一种非常快速且有用的方法可以完成相同的任务:

ohlc_dict = {                                                                                                             
'Open':'first',                                                                                                    
'High':'max',                                                                                                       
'Low':'min',                                                                                                        
'Close': 'last',                                                                                                    
'Volume': 'sum'
}

df.resample('5T', how=ohlc_dict, closed='left', label='left')

回答by Yundong Cai

bitfinex_klines_pd = bitfinex_klines_pd.resample('4h').agg({
    'open': lambda s: s[0],
    'high': lambda df: df.max(),
    'low': lambda df: df.min(),
    'close': lambda df: df[-1],
    'volume': lambda df: df.sum()
})

回答by Poopy McFartnoise

Within my main()function I'm receiving streaming bid/ask data. I then do the following:

在我的main()函数中,我正在接收流式买入/卖出数据。然后我执行以下操作:

df = pd.DataFrame([])

for msg_type, msg in response.parts():
    if msg_type == "pricing.Price":
        sd = StreamingData(datetime.now(),instrument_string(msg),
                           mid_string(msg),account_api,account_id,
                           's','5min',balance)
        df = df.append(sd.df())
        sd.resample(df)

I created a class StreamingData()which takes the provided input (also created some functions to break up the bid/ask data into individual components (bid, ask, mid, instrument, etc.).

我创建了一个StreamingData()类,它接受提供的输入(还创建了一些函数来将买/卖数据分解为单独的组件(买、卖、中间、工具等)。

The beauty of this is all you have to do is change the 's'and '5min'to whatever timeframes you want. Set it to 'm' and 'D' to get daily prices by the minute.

这样做的好处是您只需将“s”“5min”更改为您想要的任何时间范围。将其设置为 'm' 和 'D' 以按分钟获取每日价格。

This is what my StreamingData()looks like:

这是我的StreamingData() 的样子:

class StreamingData(object):
def __init__(self, time, instrument, mid, api, _id, xsec, xmin, balance):
    self.time = time
    self.instrument = instrument
    self.mid = mid
    self.api = api
    self._id = _id
    self.xsec = xsec
    self.xmin = xmin
    self.balance = balance
    self.data = self.resample(self.df())

def df(self):
    df1 = pd.DataFrame({'Time':[self.time]})
    df2 = pd.DataFrame({'Mid':[float(self.mid)]})
    df3 = pd.concat([df1,df2],axis=1,join='inner')
    df = df3.set_index(['Time'])
    df.index = pd.to_datetime(df.index,unit='s')
    return df

def resample(self, df):
    xx = df.to_period(freq=self.xsec)
    openCol = xx.resample(self.xmin).first()
    highCol = xx.resample(self.xmin).max()
    lowCol = xx.resample(self.xmin).min()
    closeCol = xx.resample(self.xmin).last()
    self.data = pd.concat([openCol,highCol,lowCol,closeCol],
                           axis=1,join='inner')
    self.data['Open'] = openCol.round(5)
    self.data['High'] = highCol.round(5)
    self.data['Low'] = lowCol.round(5)
    self.data['Close'] = closeCol.round(5)
    return self.data

So it takes in the data from StreamingData(), creates a time indexed dataframe within df(), appends it, then sends through to resample(). The prices I calculate are based off of: mid = (bid+ask)/2

所以它从StreamingData() 接收数据,在df() 中创建一个时间索引数据帧,附加它,然后发送到resample()。我计算的价格基于:mid = (bid+ask)/2