这是过去一直是困扰着我的一个模块,本月底交易系统的更新完成之后,回测也正式完成初始功能的开发,现在我就简单梳理一下过程。

1
2
3
4
5
6
7
版本:V2
开发时间:2023/6/26 - 2023/10/23
1. 增加自定义指标模块
2. 增加震荡、趋势仓位管理
3. 增加保存K线以及价格数据
4. 增加完整的回测模块
5. 可完整脱离TV自运行

1.回测功能的作用

要相信一个策略是否能盈利,最起码要告诉我它过去是盈利的。

这个就是回测功能,之前一直只使用TradingView的交易策略,就是因为有一个功能完善的回测的功能。

2.数据的获取

之前,回测功能运行逻辑一直还没在脑子里形成概念,主要是因为没有可用的实时数据,根据币安的api文档,我发现其实可以利用获取K线数据的方法,获取过去短时间内的价格数据。

把这个获取K线的时间周期interval改为‘1S’,就可以获取过去一段时间里,每秒交易对的价格数据。

1
f'https://api.binance.com/api/v3/klines?symbol={symbol}&interval={interval}&limit=1'

然而,币安api只能获取最多1000条的历史K线数据,于是我决定从当前开始,实时收录好需要的交易对每秒的历史数据(包括历史K线数据)了。

然后我开一个每秒循环的线程,获取价格的api改为获取当前价格的api,从10月13日开始收录一些自己想要的交易对的价格数据。

1
url = f'https://api.binance.com/api/v3/ticker/price?symbol={symbol}'

这是数据包

有了价格数据包,用同样的方法,自然也可以获取到K线数据包

3.遍历数据

有了数据后,流程突然就闪现在脑海里,我通过遍历价格数据,然后根据时间戳是不是就可以获取对应的K线数据,再传入指标,通过指标和实时价格,那不就可以模拟出过去的数据了!

说干就干,遍历时间数据包,然后改造一下策略代码,将时间和对应价格的数据传进来

1
2
3
4
5
6
7
8
9
10
price_data = pd.read_csv('./data/'+symbol+'-1s.csv')

for row in price_data.itertuples(index=False):
# 获取到文档的时间戳和开盘价
timestamp = row.timestamp
last_price = row.open
#原时间,用于给打印附加时间
current_datetime = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')

update_price_retest(last_price,str(current_datetime))

然后就是K线数据的获取

模拟实际的运行状态,需要在对应的时间节点获取对应K线的数据传入指标,得到指标结果。

1
2
3
if  current_time.hour in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23] and current_time.minute == 0 and current_time.second >= 0 and current_time.second <= 3 and timestamp_test - nonelast_update_time >= 5:
nonelast_update_time = timestamp_test
current_datetime = str(current_datetime)

在遍历价格的循环内,我通过如上方法,每当时间第一次走到整时时会触发一次,然后在触发的方法内,在获取1H 4H 1D的K线数据,以1H为例

1
2
3
4
5
6
7
8
9
10
11
12
 # 1h的判断
if current_time.hour in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23]:
current_datetime_test = current_datetime_test.replace(second=0) #先把秒置为0
current_datetime_1h = current_datetime_test
current_datetime_1h -= datetime.timedelta(hours=1) #再减1时,用于拿到上个收线K
regex_pattern = f'^{current_datetime_1h}'
filtered_data_1h = kline_data_1h.loc[kline_data_1h[0].str.contains(regex_pattern)]
filtered_data_1h.loc[:, 'cumsum'] = filtered_data_1h.index.to_series().cumsum()
response_pd_1h = kline_data_1h.loc[0:filtered_data_1h['cumsum'].max()].copy()
response_pd_1h.columns = response_pd_1h.iloc[0]
response_pd_1h = response_pd_1h[1:].reset_index(drop=True)
buildIndicator_1h(response_pd_1h,current_datetime,logger_1h)

代码最终获取的response_pd_1h 即为每个整时状态下,需要获取的K线数据,过程中也是遇到很多坑,包括时差问题,数据问题。

在方法中,通过传入的pd数据,就可根据指标计算出需要的结果,再传入到策略中

1
update_direction_retest(trend_KDJ,str(datetime))

至此,回测需要的两个最重要的数据,价格和K线数据,就完成了获取,完善好这个,整个回测功能突然就感觉差不多了。

4.其他

4.1策略需要对应增加一种测试状态

4.2时间限制

1
2
3
end_timestamp   = '2024-10-11 2:40:41'
start_datetime = datetime.datetime.strptime(start_timestamp, '%Y-%m-%d %H:%M:%S')
end_datetime = datetime.datetime.strptime(end_timestamp, '%Y-%m-%d %H:%M:%S')

通过条件限制,就会只在对应的时间范围内测试

1
f start_datetime <= current_datetime <= end_datetime:

4.3测试结果分析

我目前是同步会测试将近100组测试参数,同时测试两个交易对的情况下,回测一个月的数据,将近需要30分钟。

通过结果可以很清晰地知道不同的测试组过去一段时间内的运行情况,当然,拿到好的测试参数也不用高兴太早,考虑滑点问题,任何回测数据也是仅供参数。

5.后续的升级方向

目前回测功能是完备的,但是整体还是灵活性不够,包括参数的更改之类的,整体目前也有2300+行代码,太多了。

后续的改进方向,首先是优化代码,优化参数的输入方式,提升整体的效率;

然后是需要得到更多的回测数据,例如盈亏本比,盈利因子,回测率,夏普比例。