@@ -89,7 +89,7 @@ async def _run(self, api, sim_send_chan, sim_recv_chan, md_send_chan, md_recv_ch
89
89
})
90
90
for ins in pack ["ins_list" ].split ("," ):
91
91
await self ._ensure_quote (ins )
92
- await self ._send_diff ()
92
+ await self ._send_diff () # 处理上一次未处理的 peek_message
93
93
elif pack ["aid" ] == "set_chart" :
94
94
if pack ["ins_list" ]:
95
95
# 回测模块中已保证每次将一个行情时间的数据全部发送给api,因此更新行情时 保持与初始化时一样的charts信息(即不作修改)
@@ -111,7 +111,7 @@ async def _run(self, api, sim_send_chan, sim_recv_chan, md_send_chan, md_recv_ch
111
111
pack ["chart_id" ]: None
112
112
}
113
113
})
114
- await self ._send_diff ()
114
+ await self ._send_diff () # 处理上一次未处理的 peek_message
115
115
elif pack ["aid" ] == "peek_message" :
116
116
self .pending_peek = True
117
117
await self ._send_diff ()
@@ -131,7 +131,7 @@ async def _md_handler(self):
131
131
132
132
async def _send_snapshot (self ):
133
133
"""发送初始合约信息"""
134
- async with TqChan (self .api , last_only = True ) as update_chan :
134
+ async with TqChan (self .api , last_only = True ) as update_chan : # 等待与行情服务器连接成功
135
135
self .data ["_listener" ].add (update_chan )
136
136
while self .data .get ("mdhis_more_data" , True ):
137
137
await update_chan .recv ()
@@ -140,7 +140,7 @@ async def _send_snapshot(self):
140
140
for ins , quote in self .data ["quotes" ].items ():
141
141
if not ins .startswith ("_" ):
142
142
quotes [ins ] = {
143
- "open" : None ,
143
+ "open" : None , # 填写None: 删除api中的这个字段
144
144
"close" : None ,
145
145
"settlement" : None ,
146
146
"lower_limit" : None ,
@@ -160,6 +160,7 @@ async def _send_snapshot(self):
160
160
"underlying_symbol" : quote ["underlying_symbol" ],
161
161
"strike_price" : quote ["strike_price" ],
162
162
"change" : None ,
163
+ # todo: 回测添加合约文件字段,trading_time等
163
164
"change_percent" : None ,
164
165
"expired" : None ,
165
166
}
@@ -190,6 +191,9 @@ async def _send_diff(self):
190
191
if quotes_diff and (quote_info ["min_duration" ] != 0 or min_serial [1 ] == 0 ):
191
192
quotes [min_serial [0 ]] = quotes_diff
192
193
await self ._fetch_serial (min_serial )
194
+ if not self .serials :
195
+ self .logger .warning ("回测结束" )
196
+ raise BacktestFinished () from None
193
197
for ins , diff in quotes .items ():
194
198
for d in diff :
195
199
self .diffs .append ({
@@ -209,7 +213,7 @@ async def _send_diff(self):
209
213
210
214
async def _ensure_serial (self , ins , dur ):
211
215
if (ins , dur ) not in self .serials :
212
- quote = self .quotes .setdefault (ins , {
216
+ quote = self .quotes .setdefault (ins , { # 在此处设置 min_duration: 每次生成K线的时候会自动生成quote, 记录某一合约的最小duration
213
217
"min_duration" : dur
214
218
})
215
219
quote ["min_duration" ] = min (quote ["min_duration" ], dur )
@@ -227,9 +231,7 @@ async def _fetch_serial(self, serial):
227
231
try :
228
232
s ["timestamp" ], s ["diff" ], s ["quotes" ] = await s ["generator" ].__anext__ ()
229
233
except StopAsyncIteration :
230
- del self .serials [serial ]
231
- if not self .serials :
232
- raise BacktestFinished () from None
234
+ del self .serials [serial ] # 删除一个行情时间超过结束时间的serial
233
235
234
236
async def _gen_serial (self , ins , dur ):
235
237
"""k线/tick 序列的 async generator, yield 出来的行情数据带有时间戳, 因此 _send_diff 可以据此归并"""
@@ -279,6 +281,8 @@ async def _gen_serial(self, ins, dur):
279
281
chart_info .pop ("focus_datetime" , None )
280
282
chart_info .pop ("focus_position" , None )
281
283
await self .md_send_chan .send (chart_info .copy ())
284
+ # 将订阅的8964长度的窗口中的数据都遍历完后,退出循环,然后再次进入并处理下一窗口数据
285
+ # (因为在处理过5000条数据的同时向服务器订阅从当前id开始的新一窗口的数据,在当前窗口剩下的3000条数据处理完后,下一窗口数据也已经收到)
282
286
if current_id > right_id :
283
287
break
284
288
item = {k : v for k , v in serial ["data" ].get (str (current_id ), {}).items ()}
@@ -324,7 +328,7 @@ async def _gen_serial(self, ins, dur):
324
328
item ["datetime" ])
325
329
if timestamp > self .end_dt : # 超过结束时间
326
330
return
327
- yield timestamp , diff , None
331
+ yield timestamp , diff , None # K线刚生成时的数据都为开盘价
328
332
diff = {
329
333
"klines" : {
330
334
ins : {
@@ -342,7 +346,7 @@ async def _gen_serial(self, ins, dur):
342
346
if timestamp > self .end_dt : # 超过结束时间
343
347
return
344
348
yield timestamp , diff , self ._get_quotes_from_kline (self .data ["quotes" ][ins ], timestamp ,
345
- item )
349
+ item ) # K线结束时生成quote数据
346
350
current_id += 1
347
351
finally :
348
352
# 释放chart资源
0 commit comments