diff --git a/PKG-INFO b/PKG-INFO index a2178b28..b772d575 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: tqsdk -Version: 3.8.3 +Version: 3.8.5 Summary: TianQin SDK Home-page: https://www.shinnytech.com/tqsdk Author: TianQin diff --git a/doc/advanced/gui.rst b/doc/advanced/gui.rst index b7814189..f7fc8634 100644 --- a/doc/advanced/gui.rst +++ b/doc/advanced/gui.rst @@ -15,14 +15,6 @@ :language: python -在两个线程中分别运行Gui和TqSdk -------------------------------------------------- -参见示例程序 multi_thread.py. - -.. literalinclude:: ../../tqsdk/demo/gui/multi_thread.py - :language: python - - 在TqSdk任务中驱动Gui消息循环 ------------------------------------------------- 参见示例程序 loop_integrate.py. diff --git a/doc/conf.py b/doc/conf.py index a29672d8..be082552 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -48,9 +48,9 @@ # built documents. # # The short X.Y version. -version = u'3.8.3' +version = u'3.8.5' # The full version, including alpha/beta/rc tags. -release = u'3.8.3' +release = u'3.8.5' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/doc/dev_framework.rst b/doc/dev_framework.rst new file mode 100644 index 00000000..00d48e81 --- /dev/null +++ b/doc/dev_framework.rst @@ -0,0 +1,103 @@ +.. _dev_framework: + +TqSdk整体结构 +==================================================== + +文件结构 +---------------------------------------------------- + +========================== ========================================= +File Description +========================== ========================================= +api.py TqApi 接口主文件 +tqhelper.py TqApi 辅助代码 +exception.py 异常类型定义 +objs.py 主要业务数据结构定义 +sim.py 本地模拟交易 +backtest.py 回测支持 +lib.py 交易辅助工具 +ta.py 技术指标 +tafunc.py 技术分析函数 +ctpse/* 穿透式监管信息采集模块 +test/* 单元测试用例 +demo/* 示例程序 +========================== ========================================= + + +数据流 +---------------------------------------------------- +TqSdk中以数据流的方式连接各组件。TqChan(本质是一个asyncio.Queue)被用作两个组件间的单向数据流管道,一个组件向 TqChan 中放入数据包,另一个组件从 TqChan 中依次取出数据包。 + +实盘运行时,整个数据流结构如下图: + +.. raw:: html + + Websocket ClientTo OpenTradeGatewayWebsocket Client<br>To OpenTradeGatewayWebsocket ClientTo OpenMdGatewayWebsocket Client<br>To OpenMdGatewayTqAccountTqAccountapi_recv_chanapi_recv_chanapi_send_chanapi_send_chanTqApiTqApitd_recv_chantd_recv_chantd_send_chantd_send_chanmd_recv_chanmd_recv_chanmd_send_chanmd_send_chan + +数据包上行流程(以报单为例): + +#. 用户程序调用 TqApi 中的某些需要发出数据包的功能函数, 以 TqApi.insert_order 为例 +#. TqApi.insert_order 函数生成一个需要发出的数据包, 将此数据包放入 api_send_chan +#. TqAccount 从 api_send_chan 中取出此数据包,根据 aid 字段,决定将此数据包放入 td_send_chan +#. 连接到交易网关的 websocket client 从 td_send_chan 中取出此数据包,通过网络发出 + +数据包下行流程(以接收行情为例): + +#. 连接到行情网关的 websocket client 从网络收到一个数据包,将其放入 md_recv_chan +#. TqAccount 从md_recv_chan中取出此数据包,将它放入 api_recv_chan +#. TqApi 从api_recv_chan中取出此数据包,将数据包中携带的行情数据合并到内存存储区中 + + +基于这样的数据流结构,可以通过简单更换部分组件的方式实现不同工作模式。例如模拟交易时,我们用 TqSim 替换 TqAccount: + +.. raw:: html + + Websocket ClientTo OpenMdGatewayWebsocket Client<br>To OpenMdGatewayTqSimTqSimapi_recv_chanapi_recv_chanapi_send_chanapi_send_chanTqApiTqApimd_recv_chanmd_recv_chanmd_send_chanmd_send_chan + + +策略回测则是这样: + +.. raw:: html + + Websocket ClientTo OpenMdGatewayWebsocket Client<br>To OpenMdGatewayTqSimTqSimapi_recv_chanapi_recv_chanapi_send_chanapi_send_chanTqApiTqApimd_recv_chanmd_recv_chanmd_send_chanmd_send_chanTqBacktestTqBacktestbacktest_recv_chanbacktest_recv_chanbacktest_send_chanbacktest_send_chan + + +内存数据存储与更新 +---------------------------------------------------- +按照 DIFF 协议推荐的客户端最佳实践,TqApi 使用单一变量(TqApi._data)存储所有业务数据, 它的结构如下: + +.. raw:: html + + //quotesquotesSHFE.cu1901SHFE.cu1901last_pricelast_pricevolumevolumeSHFE.cu1902SHFE.cu1902klinesklinestickstickstradetradeuser1user1positionspositionsSHFE.cu1901SHFE.cu1901pos_longpos_longpos_shortpos_short + + +在每次收到数据包时,TqApi都会将数据包内容合并到 TqApi._data 中. 具体的代码流程如下: + +#. websocket client 收到数据包, 放入 TqApi._pending_diffs +#. wait_update 函数发现 TqApi._pending_diffs 有待处理数据包, 中止异步循环以处理此数据包:: + + while not self._wait_timeout and not self._pending_diffs: # 这里发现 self._pending_diffs 非空, 中止 while 循环 + self._run_once() + +#. wait_update 调用 self._merge_diff 函数:: + + for d in self._diffs: + self._merge_diff(self._data, d, self._prototype, False) + +#. TqApi._merge_diff 函数将收到的数据包并入本地存储. + +#. 对于k线之类的序列数据, 后续继续将更新的数据复制到 pandas dataframe 中 + + +异步任务调度 +---------------------------------------------------- +TqApi 在 wait_update 函数中完成所有异步任务的调度执行. 每当用户程序执行 api.wait_update 函数时, 会调度所有 task 运行, 直到收到新数据包或超时 wait_update函数返回, 继续执行后续用户代码 + +.. raw:: html + + 用户代码用户代码api.wait_updateapi.wait_update用户代码用户代码api.wait_updateapi.wait_updatewebsocket client发送 taskwebsocket client<br>发送 taskwebsocket client接收 taskwebsocket client<br>接收 task用户创建的其它task用户创建的其它taskTqSdk创建的tqsdkTqSdk创建的tqsdk用户代码用户代码api.wait_updateapi.wait_update用户代码用户代码api.wait_updateapi.wait_update + + + + + diff --git a/doc/dev_general.rst b/doc/dev_general.rst new file mode 100644 index 00000000..b1d2ff75 --- /dev/null +++ b/doc/dev_general.rst @@ -0,0 +1,66 @@ +.. _dev_general: + +原则与规范 +==================================================== + +TqSdk设计原则 +---------------------------------------------------- + +不预设用户策略模型 +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +我们深刻认识到, 量化交易是一个充满竞争与创新的领域,成功的用户总是在不断构思和尝试全新的理念与模型. 在这方面, 用户比我们知道得更多, 走得也更快. 因此, 我们在设计TqSdk时, 总是尽力避免对用户的模型结构做限定, 而是专注于为用户提供通用性的资源和能力. + +我们的以下设计决策遵循了此原则: + +* 不提供策略类模板, 只以示例程序方式展示各类策略应用 +* 一个策略程序中可以任意获取数据和发出指令 +* 允许用户在一个程序中使用任意多个TqApi实例 + + +保持用户代码简单 +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +我们对TqSdk的一个设计目标是尽量让用户的代码与需求方案保持一致(顺序相同, 篇幅相当). + +我们的以下设计决策遵循了此原则: + +* 不使用多线程, 避免用户处理线程同步问题 +* 不使用回调模型, 避免用户维护状态机状态变量 +* 提供专门的调仓工具 +* 实盘/模拟/回测/复盘 几种不同运行模式切换, 只需要在代码中做单点修改 + + +行为可验证 +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +作为TqSdk库质量管控的关键措施, 我们要求 TqSdk 在运行时可以记录完整的输入信息, 以确保发生问题时可以稳定重现问题和定位原因. 我们做了这些决定: + +* 以数据流衔接库中的各组件 +* TqSdk 的日志文件完整记录收到的全部数据包 +* 专门构建了一个单元测试框架, 可以直接用日志作为测试用例输入 + + +相关知识与技能 +---------------------------------------------------- +TqSdk 的开发涉及以下知识点,您可能需要先学习它们,才能更好的理解和改进TqSdk的代码 + +* 金融相关业务知识 + + TqSdk是用于金融交易领域的专用软件包。我们假定用户和开发者都已经具备相应的基础知识,在TqSdk的文档中不再详加解释。 + +* python asyncio + + TqSdk 的代码大量依赖 python asyncio 机制. asyncio 的编程模型与传统 python 程序差异很大. 我们对于TqSdk的使用者尽量隐藏了 asyncio 相关概念, 允许用户在不了解 asyncio 的情况下实现绝大多数业务需求. 但是对于开发者, 若不了解 asyncio, 在理解 TqSdk 内部代码实现时会非常困难。 + +* Diff协议 + + TqSdk 并不是一个 all-in-one 的包, 它的能力有赖于一系列后台服务的支持. DIFF协议是 TqSdk 与后台服务间通讯的主要协议, 开发者需对 DIFF 有所理解, 才能掌握 TqSdk 的内部实现 + +* Pandas/Numpy + + Pandas/Numpy 是非常优秀的 python 数值计算库. TqSdk 利用这些库完成K线序列数据的存储和操作. + + +代码风格 +---------------------------------------------------- +TqSdk的代码风格遵循 PEP8 规范. + + diff --git a/doc/index.rst b/doc/index.rst index 6ff5f3bf..0ce76957 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -22,7 +22,8 @@ TianQin Python Sdk User Guide tq_trading_unit.rst advanced/index.rst tqsdk_cursor.rst - dev/index.rst + dev_general.rst + dev_framework.rst profession.rst enterprise.rst qa.rst diff --git a/doc/intro.rst b/doc/intro.rst index 4483fab0..2ca5858b 100644 --- a/doc/intro.rst +++ b/doc/intro.rst @@ -36,10 +36,10 @@ TqSdk 是一个由 `信易科技 `_ 发起并贡献 ---------------------------------------------------- .. raw:: html - Open Md Gateway行情网关[Not supported by viewer]Open Trade Gateway交易中继网关[Not supported by viewer]期货公司交易系统CTP / FEMAS / UFX期货公司交易系统<br>CTP / FEMAS / UFX<br>交易所行情系统交易所行情系统<br>DIFF 协议[Not supported by viewer]TqSdk[Not supported by viewer] + Open Md Gateway行情网关[Not supported by viewer]Open Trade Gateway交易中继网关[Not supported by viewer]期货公司交易系统CTP / FEMAS / UFX期货公司交易系统<br>CTP / FEMAS / UFX<br>交易所行情系统交易所行情系统<br>DIFF 协议[Not supported by viewer]TqSdk[Not supported by viewer] -* `行情网关 (Open Md Gateway) `_ 负责提供实时行情和历史数据 -* `交易中继网关 (Open Trade Gateway) `_ 负责连接到期货公司交易系统 +* 行情网关 (Open Md Gateway) 负责提供实时行情和历史数据 +* 交易中继网关 (Open Trade Gateway) 负责连接到期货公司交易系统 * 这两个网关统一以 `Diff协议 `_ 对下方提供服务 * TqSdk按照Diff协议连接到行情网关和交易中继网关, 实现行情和交易功能 @@ -48,10 +48,10 @@ TqSdk 是一个由 `信易科技 `_ 发起并贡献 ---------------------------------------------------- TqSdk 提供的功能可以支持从简单到复杂的各类策略程序. -* 提供当前所有可交易合约从上市开始的 **全部Tick数据和K线数据** -* 支持数十家期货公司的 **实盘交易** -* 支持 **模拟交易** -* 支持 **Tick级和K线级回测**, 支持 **复杂策略回测** +* 提供当前所有可交易合约从上市开始的 :ref:`全部Tick数据和K线数据 ` +* 支持数十家期货公司的 `实盘交易 `_ +* 支持 **模拟交易** :py:class:`~tqsdk.tradeable.otg.tqkq.TqKq` +* 支持 :ref:`Tick级和K线级回测 `, 支持 **复杂策略回测** * 提供近百个 **技术指标函数及源码** * 用户无须建立和维护数据库, 行情和交易数据全在 **内存数据库** , 无访问延迟 * 优化支持 **pandas** 和 **numpy** 库 diff --git a/doc/quickstart.rst b/doc/quickstart.rst index a72046e6..69633b1b 100644 --- a/doc/quickstart.rst +++ b/doc/quickstart.rst @@ -277,21 +277,6 @@ klines是一个pandas.DataFrame对象. 跟 api.get_quote() 一样, api.get_kline -TqSdk AI 助手 -------------------------------------------------- -TqSdk 基于先进的大语言模型和常见天勤问题资料库,提供了新一代的 AI 助手 - -解释函数,编写demo策略,分析代码报错原因,它都有不错的表现 `点击使用 `_ - -.. figure:: images/llm_pic1.png -.. figure:: images/llm_pic2.png -.. figure:: images/llm_pic3.png -.. figure:: images/llm_pic4.png -.. figure:: images/llm_pic5.png -.. figure:: images/llm_pic6.png -.. figure:: images/llm_pic7.png - - TqSdk 学习视频 ------------------------------------------------- TqSdk 提供简单易懂的十分钟上手视频 `供用户学习 `_ diff --git a/doc/usage/shinny_account.rst b/doc/usage/shinny_account.rst index 8834b429..2cd4d8a9 100644 --- a/doc/usage/shinny_account.rst +++ b/doc/usage/shinny_account.rst @@ -19,7 +19,7 @@ 用快期账户来实盘交易 ------------------------------------------------- -对于 TqSdk 免费版,每个快期账户支持最多绑定一个实盘账户,而天勤量化专业版支持一个快期账户绑定任意多个实盘账户 +每个快期账户支持最多绑定三个实盘账户 快期账户会在用户使用实盘账户时自动进行绑定,直到该快期账户没有能绑定实盘账户的名额:: @@ -39,7 +39,7 @@ .. figure:: ../images/user_web_management.png -如需一个快期账户支持更多的实盘账户,请联系工作人员进行批量购买 `天勤量化专业版 `_ +如需一个快期账户支持更多的实盘账户,请联系工作人员进行批量购买 diff --git a/doc/usage/trade.rst b/doc/usage/trade.rst index bd10d472..15505c65 100644 --- a/doc/usage/trade.rst +++ b/doc/usage/trade.rst @@ -11,7 +11,7 @@ from tqsdk import TqApi, TqAuth api = TqApi(auth=TqAuth("快期账户", "账户密码")) -对于 TqSdk 免费版,每个快期账户支持最多绑定一个实盘账户,并且快期账户会在用户第一次使用实盘账户时自动进行绑定(自动绑定功能需要 TqSdk 版本> 1.8.3):: +每个快期账户支持最多绑定3个实盘账户,并且快期账户会在用户第一次使用实盘账户时自动进行绑定(自动绑定功能需要 TqSdk 版本> 1.8.3):: from tqsdk import TqApi, TqAuth api = TqApi(auth=TqAuth("快期账户", "账户密码")) @@ -23,7 +23,7 @@ .. figure:: ../images/user_web_management.png -如果需要让您的快期账户支持更多的实盘账户,可以购买或申请试用我们的 `天勤量化专业版 `_ +如果需要让您的快期账户支持更多的实盘账户,可以在购买我们的 `天勤量化专业版 `_ 后联系工作人员进行额外账户数的购买 设定实盘交易账户 ---------------------------------------------------- diff --git a/doc/version.rst b/doc/version.rst index a73c2c82..edd2f99c 100644 --- a/doc/version.rst +++ b/doc/version.rst @@ -2,6 +2,20 @@ 版本变更 ============================= +3.8.5 (2025/08/07) + +* 修复: websockets 10.0 版本引发的连接失败问题 +* docs: 优化文档 + + +3.8.4 (2025/07/18) + +* 新增: :py:class:`~tqsdk.TqApi.get_trading_status` 支持订阅期权交易状态 +* 修复: :py:class:`~tqsdk.tools.DataDownloader` 计算复权因子错误 +* 修复: 回测模式下,:py:meth:`~tqsdk.TqApi.get_kline_serial` 订阅多合约 K 线时可能出现的越界报错 +* docs: 优化文档 + + 3.8.3 (2025/06/16) * 新增: 多策略历史结算查询,详情参考 :ref:`tq_trading_unit` diff --git a/setup.py b/setup.py index bde1b1ce..920ac6e8 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setuptools.setup( name='tqsdk', - version="3.8.3", + version="3.8.5", description='TianQin SDK', author='TianQin', author_email='tianqincn@gmail.com', @@ -17,7 +17,7 @@ url='https://www.shinnytech.com/tqsdk', packages=setuptools.find_packages(exclude=["tqsdk.test", "tqsdk.test.*"]), python_requires='>=3.7', - install_requires=["websockets>=8.1", "requests", "numpy", "pandas>=1.1.0", "scipy", "simplejson", "aiohttp", + install_requires=["websockets>=10.1", "requests", "numpy", "pandas>=1.1.0", "scipy", "simplejson", "aiohttp", "certifi", "pyjwt", "psutil>=5.9.6", "shinny_structlog", "sgqlc", "filelock", "tqsdk_ctpse", "tqsdk_sm", "packaging"], classifiers=[ diff --git a/tqsdk/__version__.py b/tqsdk/__version__.py index 20d4c102..973d6b31 100644 --- a/tqsdk/__version__.py +++ b/tqsdk/__version__.py @@ -1 +1 @@ -__version__ = '3.8.3' +__version__ = '3.8.5' diff --git a/tqsdk/api.py b/tqsdk/api.py index 0f7c4076..0577cf99 100644 --- a/tqsdk/api.py +++ b/tqsdk/api.py @@ -2022,12 +2022,11 @@ def is_changing(self, obj: Any, key: Union[str, List[str], None] = None) -> bool diffs = self._diffs if self._loop.is_running() else self._sync_diffs if not isinstance(key, list): key = [key] if key else [] - if isinstance(obj, list): - for o in obj: - if self._is_obj_changing(o, diffs=diffs, key=key): - return True - else: - return self._is_obj_changing(obj, diffs=diffs, key=key) + objs = obj if isinstance(obj, list) else [obj] + for o in objs: + if self._is_obj_changing(o, diffs=diffs, key=key): + return True + return False def _is_obj_changing(self, obj: Any, diffs: List[Dict[str, Any]], key: List[str]) -> bool: try: diff --git a/tqsdk/backtest/backtest.py b/tqsdk/backtest/backtest.py index 4e2a956b..8007be5d 100644 --- a/tqsdk/backtest/backtest.py +++ b/tqsdk/backtest/backtest.py @@ -599,6 +599,7 @@ async def _gen_serial(self, ins, dur): for chart_id in self._serials[(ins, dur)]["chart_id_set"]: diff["charts"] = { chart_id: { + "left_id": current_id - 10000 + 1, # left_id 是当前窗口的左端点 "right_id": current_id # api 中处理多合约 kline 需要 right_id 信息 } } diff --git a/tqsdk/connect.py b/tqsdk/connect.py index b557dc62..7b833dec 100644 --- a/tqsdk/connect.py +++ b/tqsdk/connect.py @@ -133,7 +133,7 @@ async def _run(self, api, url, send_chan, recv_chan): "level": "WARNING", "code": 2019112910, "conn_id": self._conn_id, - "content": f"开始与 {url} 的重新建立网络连接", + "content": f"开始与 {url} 重新建立网络连接", "url": url } self._logger.debug("websocket connection connecting") diff --git a/tqsdk/tools/downloader.py b/tqsdk/tools/downloader.py index bc0e7554..cfc1de83 100644 --- a/tqsdk/tools/downloader.py +++ b/tqsdk/tools/downloader.py @@ -189,7 +189,14 @@ async def _ensure_dividend_factor(self, quote, timestamp): if quote.instrument_id not in self._dividend_cache: df = await _get_dividend_factor(self._api, quote, timestamp, self._end_dt_nano, chart_id_prefix="PYSDK_downloader") # 插入结束时间这条记录, 因为可能存在行情时间等于 _end_dt_nano 的行情,因此这里 +1 - df = df.append({"datetime": self._end_dt_nano+1, "factor": 1.0}, ignore_index=True) + new_row = pandas.DataFrame([{ + "datetime": self._end_dt_nano+1, + "stock_dividend": 0.0, + "cash_dividend": 0.0, + "pre_close": float('nan'), + "factor": 1.0 + }]) + df = pandas.concat([df, new_row], ignore_index=True) if self._adj_type == "F": df["factor"] = df["factor"].iloc[::-1].cumprod().iloc[::-1] elif self._adj_type == "B": @@ -198,8 +205,9 @@ async def _ensure_dividend_factor(self, quote, timestamp): df["factor"] = 1.0 / df["factor"].cumprod() # 至此 df 每行的含义为从 datetime 开始应使用 factor 复权 # 该格式并不好用,需要改为截止 datetime 之前(不包含) 应使用 factor 复权 - df["factor"] = df["factor"].shift(1) - df["factor"].iloc[0] = 1.0 + shifted_factor = df["factor"].shift(1) + shifted_factor.iloc[0] = 1.0 + df["factor"] = shifted_factor self._dividend_cache[quote.instrument_id] = { "df": df, "last_dt": 0, diff --git a/tqsdk/tradeable/sim/basesim.py b/tqsdk/tradeable/sim/basesim.py index 75a977c7..72286455 100644 --- a/tqsdk/tradeable/sim/basesim.py +++ b/tqsdk/tradeable/sim/basesim.py @@ -18,7 +18,7 @@ from tqsdk.tradeable.tradeable import Tradeable from tqsdk.tradeable.sim.trade_future import SimTrade from tqsdk.tradeable.sim.trade_stock import SimTradeStock -from tqsdk.utils import _query_for_quote +from tqsdk.utils import _query_for_quote, _forward_chan_handler class BaseSim(Tradeable): @@ -220,7 +220,7 @@ async def _quote_handler(self, symbol, quote_chan, order_chan): quote.update(self._data["quotes"][symbol]) if underlying_quote: underlying_quote.update(self._data["quotes"][underlying_symbol]) - task = self._api.create_task(self._forward_chan_handler(order_chan, quote_chan)) + task = self._api.create_task(_forward_chan_handler(order_chan, quote_chan)) quotes = {symbol: quote} if underlying_quote: quotes[underlying_symbol] = underlying_quote @@ -241,10 +241,6 @@ async def _quote_handler(self, symbol, quote_chan, order_chan): await order_chan.close() await self._api._cancel_task(task) - async def _forward_chan_handler(self, chan_from, chan_to): - async for pack in chan_from: - await chan_to.send(pack) - def _md_recv(self, pack): for d in pack["data"]: self._diffs.append(d) diff --git a/tqsdk/trading_status.py b/tqsdk/trading_status.py index 9a29462b..c5b02cef 100644 --- a/tqsdk/trading_status.py +++ b/tqsdk/trading_status.py @@ -2,11 +2,16 @@ # -*- coding: utf-8 -*- __author__ = 'mayanqiong' +import math from shinny_structlog import ShinnyLoggerAdapter from tqsdk.baseModule import TqModule from tqsdk.channel import TqChan from tqsdk.connect import TqConnect, TsReconnectHandler +from tqsdk.entity import Entity +from tqsdk.objs import Quote +from tqsdk.utils import _query_for_quote +from tqsdk.diff import _get_obj, _merge_diff class TqTradingStatus(TqModule): """ @@ -23,7 +28,82 @@ async def _run(self, api, api_send_chan, api_recv_chan, md_send_chan, md_recv_ch self._md_recv_chan = md_recv_chan self._ts_send_chan = TqChan(self._api, chan_name="send to ts_reconn") self._ts_recv_chan = TqChan(self._api, chan_name="recv from ts_reconn") - await super(TqTradingStatus, self)._run(api, api_send_chan, api_recv_chan, md_send_chan, md_recv_chan, self._ts_send_chan, self._ts_recv_chan) + self._data = Entity() + self._data._instance_entity([]) + self._quote_chan = TqChan(self._api, last_only=True) + self._prototype = { + "quotes": { + "#": Quote(self._api), # 行情的数据原型 + } + } + self._quotes_ready = {} + self._quotes_unready = {} + self._tasks = [self._api.create_task(self._symbol_info_watcher())] + try: + await super(TqTradingStatus, self)._run(api, api_send_chan, api_recv_chan, md_send_chan, md_recv_chan, self._ts_send_chan, self._ts_recv_chan) + finally: + await self._api._cancel_tasks(*self._tasks) + + async def _subscribe_trading_status(self): + """订阅交易状态服务""" + ins_list = set() + for quote in self._quotes_ready.values(): + if quote.ins_class == "OPTION" and quote.underlying_symbol: + ins_list.add(quote.underlying_symbol) + else: + ins_list.add(quote.instrument_id) + + ins_list_str = ",".join(ins_list) + if ins_list_str: + await self._ts_send_chan.send({"aid": "subscribe_trading_status", "ins_list": ins_list_str}) + + def _extend_option_trading_status(self, diffs): + """为期权扩展交易状态信息""" + received_statuses = { + k: v for d in diffs for k, v in d.get('trading_status', {}).items() + } + if not received_statuses: + return + + extended_diffs = [ + {'trading_status': { + quote.instrument_id: { + 'symbol': quote.instrument_id, + 'trade_status': received_statuses[quote.underlying_symbol]['trade_status'] + } + }} + for quote in self._quotes_ready.values() + if quote.ins_class == 'OPTION' and quote.underlying_symbol in received_statuses + ] + diffs.extend(extended_diffs) + + def _normalize_trade_status(self, diffs): + """标准化交易状态,将非交易状态统一为NOTRADING""" + for d in diffs: + for _, ts in d.get('trading_status', {}).items(): + if ts['trade_status'] not in ["AUCTIONORDERING", "CONTINOUS"]: + ts['trade_status'] = "NOTRADING" + + async def _query_symbol_info(self, symbols): + """查询缺少合约信息的quotes""" + for symbol in symbols: + self._quotes_unready[symbol]["_listener"].add(self._quote_chan) + for query_pack in _query_for_quote(list(symbols), self._api._pre20_ins_info.keys()): + await self._md_send_chan.send(query_pack) + + async def _symbol_info_watcher(self): + async for _ in self._quote_chan: + for symbol in await self._unready_to_ready(): + self._quotes_ready[symbol]["_listener"].discard(self._quote_chan) + + async def _unready_to_ready(self): + ready_delta = {symbol for symbol, quote in self._quotes_unready.items() if not math.isnan(quote.price_tick)} + for symbol in ready_delta: + quote = self._quotes_unready.pop(symbol) + self._quotes_ready[symbol] = quote + if ready_delta: + await self._subscribe_trading_status() + return ready_delta async def _handle_recv_data(self, pack, chan): """ @@ -31,13 +111,15 @@ async def _handle_recv_data(self, pack, chan): """ if pack['aid'] == 'rtn_data': if chan == self._md_recv_chan: # 从行情收到的数据包 - self._diffs.extend(pack.get('data', [])) + datas = pack.get("data", []) + self._diffs.extend(datas) + for d in datas: + quotes_diff = d.get("quotes", {}) + _merge_diff(self._data, {"quotes": {k: quotes_diff[k] for k in quotes_diff.keys() & self._quotes_unready.keys()}}, self._prototype, persist=False, reduce_diff=False) elif chan == self._ts_recv_chan: # 从交易状态服务收到的数据包 diffs = pack.get('data', []) - for d in diffs: - for symbol, ts in d.get('trading_status', {}).items(): - if ts['trade_status'] not in ["AUCTIONORDERING", "CONTINOUS"]: - ts['trade_status'] = "NOTRADING" + self._extend_option_trading_status(diffs) + self._normalize_trade_status(diffs) self._diffs.extend(diffs) else: await self._api_recv_chan.send(pack) @@ -48,7 +130,10 @@ async def _handle_req_data(self, pack): if self._init_ts_ws is False: self._init_ts_ws = True self._create_ts_run() - await self._ts_send_chan.send(pack) + unseen = set(pack["ins_list"].split(",")) - self._quotes_ready.keys() - self._quotes_unready.keys() + for symbol in unseen: + self._quotes_unready[symbol] = _get_obj(self._data, ["quotes", symbol], self._prototype["quotes"]["#"]) + await self._query_symbol_info(unseen - await self._unready_to_ready()) else: await self._md_send_chan.send(pack) @@ -60,8 +145,8 @@ def _create_ts_run(self): ws_ts_send_chan._logger_bind(chan_from="ts_reconn", url=ts_url) ws_ts_recv_chan._logger_bind(chan_to="ts_reconn", url=ts_url) conn = TqConnect(logger=ShinnyLoggerAdapter(conn_logger, url=ts_url), conn_id="ts") - self._api.create_task(conn._run(self._api, ts_url, ws_ts_send_chan, ws_ts_recv_chan), _caller_api=True) + self._tasks.append(self._api.create_task(conn._run(self._api, ts_url, ws_ts_send_chan, ws_ts_recv_chan), _caller_api=True)) ts_reconnect = TsReconnectHandler(logger=ShinnyLoggerAdapter(self._logger.getChild("TsReconnect"), url=ts_url)) self._ts_send_chan._logger_bind(chan_from="ts", url=ts_url) self._ts_recv_chan._logger_bind(chan_to="ts", url=ts_url) - self._api.create_task(ts_reconnect._run(self._api, self._ts_send_chan, self._ts_recv_chan, ws_ts_send_chan, ws_ts_recv_chan), _caller_api=True) + self._tasks.append(self._api.create_task(ts_reconnect._run(self._api, self._ts_send_chan, self._ts_recv_chan, ws_ts_send_chan, ws_ts_recv_chan), _caller_api=True)) \ No newline at end of file diff --git a/tqsdk/utils.py b/tqsdk/utils.py index e6959059..e3438c09 100644 --- a/tqsdk/utils.py +++ b/tqsdk/utils.py @@ -167,6 +167,7 @@ async def _get_dividend_factor(api, quote, start_dt_nano, end_dt_nano, chart_id_ # 对每个除权除息矩阵增加 factor 序列,为当日的复权因子 df = get_dividend_df(quote.stock_dividend_ratio, quote.cash_dividend_ratio) df = df[df["datetime"].between(start_dt_nano, end_dt_nano, inclusive="right")] # 只需要一段时间之间的复权因子, 左开右闭 + df = df.reset_index(drop=True) # 重置索引,确保索引为 0, 1, 2... df["pre_close"] = float('nan') # 初始化 pre_close 为 nan for i in range(len(df)): chart_info = { @@ -203,7 +204,7 @@ async def _get_dividend_factor(api, quote, start_dt_nano, end_dt_nano, chart_id_ "view_width": 2 }) df["factor"] = (df["pre_close"] - df["cash_dividend"]) / df["pre_close"] / (1 + df["stock_dividend"]) - df["factor"].fillna(1.0, inplace=True) + df["factor"] = df["factor"].fillna(1.0) return df @@ -228,3 +229,8 @@ def wrapper(self, *args, **kwargs): return wrapper return decorator + + +async def _forward_chan_handler(chan_from, chan_to): + async for pack in chan_from: + await chan_to.send(pack) \ No newline at end of file