Skip to content

Conversation

kawayiYokami
Copy link
Contributor

@kawayiYokami kawayiYokami commented Sep 19, 2025

Motivation / 动机

当前,当 LLM API 调用失败时,机器人会将包含堆栈跟踪(stack trace)的详细错误信息直接作为消息发送给用户。这种行为存在两个问题:

  1. 用户体验不佳:终端用户看到内部错误信息会感到困惑和不安。
  2. 暴露内部实现:不必要地暴露了机器人的内部工作细节。

此外,对于网络波动或临时的 API 不可用,当前缺乏自动重试机制,导致单次失败就会中断服务。

本次更改旨在解决以上问题,通过隐藏内部错误为不稳定的 API 调用增加重试功能,来提升机器人的健壮性和用户体验。

Modifications / 改动点

  1. 核心文件修改:

    • astrbot/core/pipeline/process_stage/method/llm_request.py
  2. 功能实现:

    • run_agent 函数增加了三个新参数
      • retry_on_failure: 允许配置请求失败时的自动重试次数。
      • report_error_message: 控制在重试耗尽后是否向用户报告详细错误(默认为 True 以保持旧行为,但可以配置为 False 来隐藏错误)。
      • fallback_response: 当隐藏错误时,可以提供一个对用户友好的备用响应(例如“我暂时遇到了一点问题,请稍后再试”)。
    • LLMRequestSubStage 中加载了上述配置,使其可以通过配置文件进行管理。
    • 实现了 run_agent 内部的重试循环,当 agent_runner.step() 抛出异常时会捕获并执行重试。
    • 修复了一个逻辑缺陷:通过引入 success 标志位,确保了 Metric.upload 只在整个流程最终成功时被调用一次,避免了在重试场景下指标被重复记录的问题。

Verification Steps / 验证步骤

由于此改动主要影响后端逻辑,可以通过单元测试进行验证。我们编写了一套临时的单元测试 (tests/test_llm_request_temp.py),覆盖了以下所有场景,并确保它们都按预期工作:

  1. 场景一:首次成功 - 验证无异常时,函数正常返回结果。
  2. 场景二:重试后成功 - 验证在 step() 第一次失败后,函数能成功重试并返回结果。
  3. 场景三:重试耗尽并报告错误 - 验证在达到最大重试次数后,函数返回了标准的错误信息。
  4. 场景四:重试耗尽并返回备用响应 - 验证在配置了 fallback_response 后,函数返回了指定的备用文本。
  5. 场景五:重试耗尽并静默失败 - 验证在不报告错误且无备用响应时,函数没有返回任何结果。

Screenshots or Test Results / 运行截图或测试结果

以下是最终运行单元测试并通过的日志截图,证明了上述所有验证步骤均已成功执行:

============================ test session starts =============================
platform win32 -- Python 3.10.17, pytest-8.4.1, pluggy-1.6.0 -- ...
...
collecting 5 items

tests/test_llm_request_temp.py::test_run_agent_success_on_first_try PASSED [ 20%]
tests/test_llm_request_temp.py::test_run_agent_retry_and_succeed PASSED [ 40%]
tests/test_llm_request_temp.py::test_run_agent_retry_exhausted_with_error_report PASSED [ 60%]
tests/test_llm_request_temp.py::test_run_agent_retry_exhausted_with_fallback_response PASSED [ 80%]
tests/test_llm_request_temp.py::test_run_agent_retry_exhausted_with_silent_fail PASSED [100%]

============================= 5 passed in 2.43s ==============================

Compatibility & Breaking Changes / 兼容性与破坏性变更

  • 这是一个破坏性变更 (Breaking Change)。/ This is a breaking change.
  • 这不是一个破坏性变更。/ This is NOT a breaking change.

说明: 所有新增参数 (retry_on_failure, report_error_message, fallback_response) 都带有默认值,这些默认值与修改前的行为保持一致(即默认不重试,失败时报告错误)。因此,本次修改对现有代码完全向后兼容,不会造成任何破坏性变更。


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Sourcery 总结

为 LLM 请求实现可配置的重试和错误处理,以提高健壮性和用户体验。

新功能:

  • 添加 retry_on_failure 参数,以控制 LLM API 调用的自动重试
  • 引入 report_error_messagefallback_response 选项,以自定义面向用户的错误处理

改进:

  • run_agent 包装在重试循环中,并重构其流程以支持重试
  • 使用成功标志,以确保指标只在最终成功时上传一次

文档:

  • 扩展默认配置和 JSON 模式,包含用于重试、错误报告和回退响应的 error_handling 设置

测试:

  • 添加单元测试,涵盖首次成功、重试成功、错误报告、回退响应和静默失败等场景
Original summary in English

Summary by Sourcery

Implement configurable retry and error handling for LLM requests to improve robustness and user experience.

New Features:

  • Add retry_on_failure parameter to control automatic retries for LLM API calls
  • Introduce report_error_message and fallback_response options to customize user-facing error handling

Enhancements:

  • Wrap run_agent in a retry loop and restructure its flow to support retries
  • Use a success flag to ensure metrics are uploaded only once upon final success

Documentation:

  • Extend default configuration and JSON schema with error_handling settings for retry, error reporting, and fallback responses

Tests:

  • Add unit tests covering scenarios for first success, retry success, error reporting, fallback responses, and silent failures

本次提交为 \
un_agent\ 函数引入了更健壮的错误处理和自动重试机制,以提高 LLM 请求的稳定性。

主要改动包括:

- 在 \
un_agent\ 函数和 \LLMRequestSubStage\ 中增加了 \
etry_on_failure\, \
eport_error_message\, \allback_response\ 三个可配置参数。

- 实现了基于 \
etry_on_failure\ 的循环重试逻辑,当 \�gent_runner.step()\ 抛出异常时会自动重试。

- 增加了对重试次数耗尽后的精细化控制:

  - \
eport_error_message=True\: 返回标准的错误信息。

  - \
eport_error_message=False\ 且 \allback_response\ 有值: 返回指定的备用响应。

  - \
eport_error_message=False\ 且 \allback_response\ 为空: 实现静默失败。

- 修复了一个逻辑缺陷,通过引入 \success\ 标志位确保了 \Metric.upload\ 只在整个流程最终成功时被调用一次,避免了在重试场景下指标被重复记录的问题。
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你好 - 我已审阅了你的更改 - 以下是一些反馈:

  • run_agent 中嵌套的重试和步骤循环非常复杂——考虑将重试逻辑提取到专门的辅助函数中,或使用重试工具来提高可读性并减少嵌套。
  • 由于 run_agent 现在接受三个新参数,将它们设为仅关键字参数有助于防止在调用函数时意外的参数错序。
  • 在默认配置 JSON 模式中,使用 "properties" 而不是 "items" 来定义对象字段,以符合标准 JSON 模式约定并确保正确的验证。
给 AI 代理的提示
请处理此代码审查中的评论:

## 总体评论
- `run_agent` 中嵌套的重试和步骤循环非常复杂——考虑将重试逻辑提取到专门的辅助函数中,或使用重试工具来提高可读性并减少嵌套。
- 由于 `run_agent` 现在接受三个新参数,将它们设为仅关键字参数有助于防止在调用函数时意外的参数错序。
- 在默认配置 JSON 模式中,使用 "properties" 而不是 "items" 来定义对象字段,以符合标准 JSON 模式约定并确保正确的验证。

## 单独评论

### 评论 1
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:235` </location>
<code_context>
-        step_idx += 1
+    success = False
+    # Retry loop
+    for attempt in range(retry_on_failure + 1):
+        step_idx = 0
         try:
</code_context>

<issue_to_address>
**suggestion (bug_risk):** 考虑限制过多的重试次数以避免资源耗尽。

强制限制 `retry_on_failure` 的上限或在设置高值时发出警告有助于防止资源耗尽和过多的阻塞。
</issue_to_address>

### 评论 2
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:300-301` </location>
<code_context>
-            return
+            if attempt >= retry_on_failure:
+                # All retries exhausted, handle final error
+                if report_error_message:
+                    astr_event.set_result(
+                        MessageEventResult().message(
+                            f"AstrBot 请求失败。\n错误类型: {type(e).__name__}\n错误信息: {str(e)}\n\n请在控制台查看和分享错误详情。\n"
+                        )
+                    )
+                elif fallback_response:
+                    astr_event.set_result(
+                        MessageEventResult().message(fallback_response)
</code_context>

<issue_to_address>
**suggestion (bug_risk):** 如果同时禁用错误报告和回退,静默失败可能会让用户感到困惑。

如果既没有提供错误报告也没有提供回退响应,用户将不会被告知失败。建议记录警告或发送最小通知,以获得更好的用户体验和可追溯性。

```suggestion
                # If report_error_message is False and fallback_response is empty, send minimal notification and log warning.
                logger.warning(
                    "All retries exhausted and no error message or fallback response provided. Notifying user of failure."
                )
                astr_event.set_result(
                    MessageEventResult().message(
                        "请求失败,未能获取有效回复。请稍后重试或联系管理员。"
                    )
                )
                return
```
</issue_to_address>

### 评论 3
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:285` </location>
<code_context>
-                MessageEventResult().message(
-                    f"AstrBot 请求失败。\n错误类型: {type(e).__name__}\n错误信息: {str(e)}\n\n请在控制台查看和分享错误详情。\n"
-                )
+            logger.error(
+                f"Attempt {attempt + 1}/{retry_on_failure + 1} failed: {traceback.format_exc()}"
             )
</code_context>

<issue_to_address>
**🚨 suggestion (security):** 生产环境中,堆栈跟踪日志可能会暴露敏感信息。

考虑对错误日志进行清理或使堆栈跟踪日志可配置,以避免在生产环境中暴露敏感细节。

建议的实现:

```python
            if getattr(settings, "LOG_TRACEBACK", False):
                logger.error(
                    f"Attempt {attempt + 1}/{retry_on_failure + 1} failed: {traceback.format_exc()}"
                )
            else:
                logger.error(
                    f"Attempt {attempt + 1}/{retry_on_failure + 1} failed: {type(e).__name__}: {str(e)}"
                )

```

1. 确保你有一个 `settings` 对象或模块提供 `LOG_TRACEBACK` 标志。这可以是一个环境变量或一个配置文件条目。
2. 如果你没有 `settings` 对象,你可以使用 `os.environ.get("LOG_TRACEBACK", "false").lower() == "true"` 或类似的逻辑从环境变量中读取。
3. 如果文件中尚未导入 `settings``os`,请确保在文件顶部导入它们。
</issue_to_address>

### 评论 4
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:332-338` </location>
<code_context>
         self.show_tool_use: bool = settings.get("show_tool_use_status", True)

+        # Load error handling settings
+        error_handling_settings = settings.get("error_handling", {})
+        self.retry_on_failure = error_handling_settings.get("retry_on_failure", 0)
+        self.report_error_message = error_handling_settings.get(
+            "report_error_message", True
+        )
+        self.fallback_response = error_handling_settings.get("fallback_response", "")
+
         for bwp in self.bot_wake_prefixs:
</code_context>

<issue_to_address>
**suggestion (bug_risk):** 考虑验证 `error_handling` 设置的类型和值。

直接从配置中加载 `error_handling` 设置而不进行验证,可能会导致 `retry_on_failure` 等值无效时出现运行时错误。请为这些设置添加类型和值检查。

```suggestion
        # Load error handling settings with validation
        error_handling_settings = settings.get("error_handling", {})

        # Validate retry_on_failure: must be a non-negative integer
        retry_on_failure = error_handling_settings.get("retry_on_failure", 0)
        if not isinstance(retry_on_failure, int) or retry_on_failure < 0:
            logger.warning(
                f"Invalid value for retry_on_failure: {retry_on_failure}. Using default 0."
            )
            retry_on_failure = 0
        self.retry_on_failure = retry_on_failure

        # Validate report_error_message: must be a boolean
        report_error_message = error_handling_settings.get("report_error_message", True)
        if not isinstance(report_error_message, bool):
            logger.warning(
                f"Invalid value for report_error_message: {report_error_message}. Using default True."
            )
            report_error_message = True
        self.report_error_message = report_error_message

        # Validate fallback_response: must be a string
        fallback_response = error_handling_settings.get("fallback_response", "")
        if not isinstance(fallback_response, str):
            logger.warning(
                f"Invalid value for fallback_response: {fallback_response}. Using default ''."
            )
            fallback_response = ""
        self.fallback_response = fallback_response
```
</issue_to_address>

### 评论 5
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:223` </location>
<code_context>


 async def run_agent(
-    agent_runner: AgentRunner, max_step: int = 30, show_tool_use: bool = True
+    agent_runner: AgentRunner,
</code_context>

<issue_to_address>
**issue (complexity):** 考虑将 `run_agent` 中嵌套的循环和分支重构为辅助函数,以扁平化控制流并提高可读性。

考虑将内部循环和分支提取到两个辅助函数中:一个用于单次“try”运行,另一个用于分派每个响应。这将把你的嵌套从:

```python
async def run_agent(...):
    for attempt in ...:
        try:
            while ...:
                async for resp in agent_runner.step():
                    if cond1: …
                    elif cond2: …
                    elif cond3: …
                    else: …
                if done: …
```

变成更像这样:

```python
async def run_agent(...):
    for attempt in range(retry_on_failure + 1):
        try:
            success = await _run_once(agent_runner, astr_event, max_step, show_tool_use)
            if success:
                break
        except Exception as e:
            await _handle_retry_error(e, attempt, retry_on_failure)
            if attempt >= retry_on_failure:
                return
```

然后…

```python
async def _run_once(agent_runner, astr_event, max_step, show_tool_use) -> bool:
    step_idx = 0
    while step_idx < max_step:
        step_idx += 1
        async for resp in agent_runner.step():
            if astr_event.is_stopped():
                return False
            out = await _dispatch_resp(resp, agent_runner, astr_event, show_tool_use)
            if out == "break":
                return True
    return agent_runner.done()
```

```python
async def _dispatch_resp(
    resp, agent_runner, astr_event, show_tool_use
) -> Optional[str]:
    # tool_call_result
    if resp.type == "tool_call_result":
        chain = resp.data["chain"]
        if chain.type == "tool_direct_result":
            chain.type = "tool_call_result"
            await astr_event.send(chain)
        return

    # tool_call
    if resp.type == "tool_call":
        if agent_runner.streaming:
            yield MessageChain(chain=[], type="break")
        if show_tool_use or astr_event.get_platform_name() == "webchat":
            resp.data["chain"].type = "tool_call"
            await astr_event.send(resp.data["chain"])
        return

    # non‐streaming vs streaming
    if not agent_runner.streaming:
        content_type = ResultContentType.LLM_RESULT if resp.type == "llm_result" else ResultContentType.GENERAL_RESULT
        astr_event.set_result(
            MessageEventResult(chain=resp.data["chain"].chain, result_content_type=content_type)
        )
        yield
        astr_event.clear_result()
    elif resp.type == "streaming_delta":
        yield resp.data["chain"]
```

最后,将错误报告分解到 `_handle_retry_error(...)` 中,这样 `run_agent` 本身只有 15-20 行,结构扁平且自描述。
</issue_to_address>

### 评论 6
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:223` </location>
<code_context>
async def run_agent(
    agent_runner: AgentRunner,
    max_step: int = 30,
    show_tool_use: bool = True,
    retry_on_failure: int = 0,
    report_error_message: bool = True,
    fallback_response: str = "",
) -> AsyncGenerator[MessageChain, None]:
    step_idx = 0
    astr_event = agent_runner.run_context.event
    success = False
    # Retry loop
    for attempt in range(retry_on_failure + 1):
        step_idx = 0
        try:
            while step_idx < max_step:
                step_idx += 1
                async for resp in agent_runner.step():
                    if astr_event.is_stopped():
                        return
                    if resp.type == "tool_call_result":
                        msg_chain = resp.data["chain"]
                        if msg_chain.type == "tool_direct_result":
                            # tool_direct_result 用于标记 llm tool 需要直接发送给用户的内容
                            resp.data["chain"].type = "tool_call_result"
                            await astr_event.send(resp.data["chain"])\n                            continue
                        # 对于其他情况,暂时先不处理
                        continue
                    elif resp.type == "tool_call":
                        if agent_runner.streaming:
                            # 用来标记流式响应需要分节
                            yield MessageChain(chain=[], type="break")
                        if show_tool_use or astr_event.get_platform_name() == "webchat":
                            resp.data["chain"].type = "tool_call"
                            await astr_event.send(resp.data["chain"])
                        continue

                    if not agent_runner.streaming:
                        content_typ = (\n                            ResultContentType.LLM_RESULT
                            if resp.type == "llm_result"
                            else ResultContentType.GENERAL_RESULT
                        )
                        astr_event.set_result(
                            MessageEventResult(
                                chain=resp.data["chain"].chain,
                                result_content_type=content_typ,
                            )
                        )
                        yield
                        astr_event.clear_result()
                    else:
                        if resp.type == "streaming_delta":
                            yield resp.data["chain"]  # MessageChain
                if agent_runner.done():
                    success = True
                    break
            if success:
                break

        except Exception as e:
            logger.error(
                f"Attempt {attempt + 1}/{retry_on_failure + 1} failed: {traceback.format_exc()}"
            )
            if attempt >= retry_on_failure:
                # All retries exhausted, handle final error
                if report_error_message:
                    astr_event.set_result(
                        MessageEventResult().message(
                            f"AstrBot 请求失败。\n错误类型: {type(e).__name__}\n错误信息: {str(e)}\n\n请在控制台查看和分享错误详情。\n"
                        )
                    )
                elif fallback_response:
                    astr_event.set_result(
                        MessageEventResult().message(fallback_response)
                    )
                # If report_error_message is False and fallback_response is empty, do nothing (silent fail).
                return
            # If retries are left, continue to the next attempt
            continue
    if success:
        asyncio.create_task(
            Metric.upload(
                llm_tick=1,
                model_name=agent_runner.provider.get_model(),
                provider_type=agent_runner.provider.meta().type,
            )
        )

</code_context>

<issue_to_address>
**issue (code-quality):** 我们发现了这些问题:

- 将条件简化为类似 switch 的形式 ([`switch`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/switch/))
- `run_agent` 中发现代码质量低 - 9% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))

<br/><details><summary>解释</summary>
此函数的质量得分低于 25% 的质量阈值。
此得分是方法长度、认知复杂度和工作内存的组合。

你如何解决这个问题?

重构此函数以使其更短、更具可读性可能值得。

- 通过将部分功能提取到自己的函数中来减少函数长度。这是你能做的最重要的事情——理想情况下,一个函数应该少于 10 行。
- 减少嵌套,也许通过引入守卫子句来提前返回。
- 确保变量的作用域紧密,以便使用相关概念的代码在函数中坐在一起而不是分散。</details>
</issue_to_address>

Sourcery 对开源免费 - 如果你喜欢我们的评论,请考虑分享它们 ✨
帮助我更有用!请点击每个评论上的 👍 或 👎,我将使用反馈来改进你的评论。
Original comment in English

Hey there - I've reviewed your changes - here's some feedback:

  • The nested retry and step loops in run_agent are quite complex—consider extracting the retry logic into a dedicated helper or using a retry utility to improve readability and reduce nesting.
  • Since run_agent now takes three new parameters, making them keyword-only arguments could help prevent accidental misordering when invoking the function.
  • In the default config JSON schema, use "properties" instead of "items" for defining object fields to adhere to standard JSON Schema conventions and ensure correct validation.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The nested retry and step loops in run_agent are quite complex—consider extracting the retry logic into a dedicated helper or using a retry utility to improve readability and reduce nesting.
- Since run_agent now takes three new parameters, making them keyword-only arguments could help prevent accidental misordering when invoking the function.
- In the default config JSON schema, use "properties" instead of "items" for defining object fields to adhere to standard JSON Schema conventions and ensure correct validation.

## Individual Comments

### Comment 1
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:235` </location>
<code_context>
-        step_idx += 1
+    success = False
+    # Retry loop
+    for attempt in range(retry_on_failure + 1):
+        step_idx = 0
         try:
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Consider limiting excessive retry counts to avoid resource exhaustion.

Enforcing an upper limit on retry_on_failure or warning when a high value is set can help prevent resource exhaustion and excessive blocking.
</issue_to_address>

### Comment 2
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:300-301` </location>
<code_context>
-            return
+            if attempt >= retry_on_failure:
+                # All retries exhausted, handle final error
+                if report_error_message:
+                    astr_event.set_result(
+                        MessageEventResult().message(
+                            f"AstrBot 请求失败。\n错误类型: {type(e).__name__}\n错误信息: {str(e)}\n\n请在控制台查看和分享错误详情。\n"
+                        )
+                    )
+                elif fallback_response:
+                    astr_event.set_result(
+                        MessageEventResult().message(fallback_response)
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Silent failure may confuse users if both error reporting and fallback are disabled.

If neither error reporting nor a fallback response is provided, users will not be informed of failures. Logging a warning or sending a minimal notification is recommended for better user experience and traceability.

```suggestion
                # If report_error_message is False and fallback_response is empty, send minimal notification and log warning.
                logger.warning(
                    "All retries exhausted and no error message or fallback response provided. Notifying user of failure."
                )
                astr_event.set_result(
                    MessageEventResult().message(
                        "请求失败,未能获取有效回复。请稍后重试或联系管理员。"
                    )
                )
                return
```
</issue_to_address>

### Comment 3
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:285` </location>
<code_context>
-                MessageEventResult().message(
-                    f"AstrBot 请求失败。\n错误类型: {type(e).__name__}\n错误信息: {str(e)}\n\n请在控制台查看和分享错误详情。\n"
-                )
+            logger.error(
+                f"Attempt {attempt + 1}/{retry_on_failure + 1} failed: {traceback.format_exc()}"
             )
</code_context>

<issue_to_address>
**🚨 suggestion (security):** Traceback logging may expose sensitive information in production.

Consider sanitizing error logs or making traceback logging configurable to avoid exposing sensitive details in production.

Suggested implementation:

```python
            if getattr(settings, "LOG_TRACEBACK", False):
                logger.error(
                    f"Attempt {attempt + 1}/{retry_on_failure + 1} failed: {traceback.format_exc()}"
                )
            else:
                logger.error(
                    f"Attempt {attempt + 1}/{retry_on_failure + 1} failed: {type(e).__name__}: {str(e)}"
                )

```

1. Ensure you have a `settings` object or module that provides the `LOG_TRACEBACK` flag. This could be an environment variable or a config file entry.
2. If you do not have a `settings` object, you can use `os.environ.get("LOG_TRACEBACK", "false").lower() == "true"` or similar logic to read from environment variables.
3. Make sure to import `settings` or `os` at the top of the file if not already present.
</issue_to_address>

### Comment 4
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:332-338` </location>
<code_context>
         self.show_tool_use: bool = settings.get("show_tool_use_status", True)

+        # Load error handling settings
+        error_handling_settings = settings.get("error_handling", {})
+        self.retry_on_failure = error_handling_settings.get("retry_on_failure", 0)
+        self.report_error_message = error_handling_settings.get(
+            "report_error_message", True
+        )
+        self.fallback_response = error_handling_settings.get("fallback_response", "")
+
         for bwp in self.bot_wake_prefixs:
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Consider validating error_handling settings for type and value.

Directly loading error_handling settings from the config without validation may lead to runtime errors if values like retry_on_failure are invalid. Please add type and value checks for these settings.

```suggestion
        # Load error handling settings with validation
        error_handling_settings = settings.get("error_handling", {})

        # Validate retry_on_failure: must be a non-negative integer
        retry_on_failure = error_handling_settings.get("retry_on_failure", 0)
        if not isinstance(retry_on_failure, int) or retry_on_failure < 0:
            logger.warning(
                f"Invalid value for retry_on_failure: {retry_on_failure}. Using default 0."
            )
            retry_on_failure = 0
        self.retry_on_failure = retry_on_failure

        # Validate report_error_message: must be a boolean
        report_error_message = error_handling_settings.get("report_error_message", True)
        if not isinstance(report_error_message, bool):
            logger.warning(
                f"Invalid value for report_error_message: {report_error_message}. Using default True."
            )
            report_error_message = True
        self.report_error_message = report_error_message

        # Validate fallback_response: must be a string
        fallback_response = error_handling_settings.get("fallback_response", "")
        if not isinstance(fallback_response, str):
            logger.warning(
                f"Invalid value for fallback_response: {fallback_response}. Using default ''."
            )
            fallback_response = ""
        self.fallback_response = fallback_response
```
</issue_to_address>

### Comment 5
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:223` </location>
<code_context>


 async def run_agent(
-    agent_runner: AgentRunner, max_step: int = 30, show_tool_use: bool = True
+    agent_runner: AgentRunner,
</code_context>

<issue_to_address>
**issue (complexity):** Consider refactoring the nested loops and branches in run_agent into helper functions to flatten the control flow and improve readability.

Consider pulling the inner loops and branches into two helpers: one for a single “try” run, and one to dispatch each response. This will collapse your nesting from:

```python
async def run_agent(...):
    for attempt in ...:
        try:
            while ...:
                async for resp in agent_runner.step():
                    if cond1: …
                    elif cond2: …
                    elif cond3: …
                    else: …
                if done: …
```

into something more like:

```python
async def run_agent(...):
    for attempt in range(retry_on_failure + 1):
        try:
            success = await _run_once(agent_runner, astr_event, max_step, show_tool_use)
            if success:
                break
        except Exception as e:
            await _handle_retry_error(e, attempt, retry_on_failure)
            if attempt >= retry_on_failure:
                return
```

and then…

```python
async def _run_once(agent_runner, astr_event, max_step, show_tool_use) -> bool:
    step_idx = 0
    while step_idx < max_step:
        step_idx += 1
        async for resp in agent_runner.step():
            if astr_event.is_stopped():
                return False
            out = await _dispatch_resp(resp, agent_runner, astr_event, show_tool_use)
            if out == "break":
                return True
    return agent_runner.done()
```

```python
async def _dispatch_resp(
    resp, agent_runner, astr_event, show_tool_use
) -> Optional[str]:
    # tool_call_result
    if resp.type == "tool_call_result":
        chain = resp.data["chain"]
        if chain.type == "tool_direct_result":
            chain.type = "tool_call_result"
            await astr_event.send(chain)
        return

    # tool_call
    if resp.type == "tool_call":
        if agent_runner.streaming:
            yield MessageChain(chain=[], type="break")
        if show_tool_use or astr_event.get_platform_name() == "webchat":
            resp.data["chain"].type = "tool_call"
            await astr_event.send(resp.data["chain"])
        return

    # non‐streaming vs streaming
    if not agent_runner.streaming:
        content_type = ResultContentType.LLM_RESULT if resp.type == "llm_result" else ResultContentType.GENERAL_RESULT
        astr_event.set_result(
            MessageEventResult(chain=resp.data["chain"].chain, result_content_type=content_type)
        )
        yield
        astr_event.clear_result()
    elif resp.type == "streaming_delta":
        yield resp.data["chain"]
```

Finally factor out the error‐reporting into `_handle_retry_error(...)` so that `run_agent` itself is only 15–20 lines, flat, and self-descriptive.
</issue_to_address>

### Comment 6
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:223` </location>
<code_context>
async def run_agent(
    agent_runner: AgentRunner,
    max_step: int = 30,
    show_tool_use: bool = True,
    retry_on_failure: int = 0,
    report_error_message: bool = True,
    fallback_response: str = "",
) -> AsyncGenerator[MessageChain, None]:
    step_idx = 0
    astr_event = agent_runner.run_context.event
    success = False
    # Retry loop
    for attempt in range(retry_on_failure + 1):
        step_idx = 0
        try:
            while step_idx < max_step:
                step_idx += 1
                async for resp in agent_runner.step():
                    if astr_event.is_stopped():
                        return
                    if resp.type == "tool_call_result":
                        msg_chain = resp.data["chain"]
                        if msg_chain.type == "tool_direct_result":
                            # tool_direct_result 用于标记 llm tool 需要直接发送给用户的内容
                            resp.data["chain"].type = "tool_call_result"
                            await astr_event.send(resp.data["chain"])
                            continue
                        # 对于其他情况,暂时先不处理
                        continue
                    elif resp.type == "tool_call":
                        if agent_runner.streaming:
                            # 用来标记流式响应需要分节
                            yield MessageChain(chain=[], type="break")
                        if show_tool_use or astr_event.get_platform_name() == "webchat":
                            resp.data["chain"].type = "tool_call"
                            await astr_event.send(resp.data["chain"])
                        continue

                    if not agent_runner.streaming:
                        content_typ = (
                            ResultContentType.LLM_RESULT
                            if resp.type == "llm_result"
                            else ResultContentType.GENERAL_RESULT
                        )
                        astr_event.set_result(
                            MessageEventResult(
                                chain=resp.data["chain"].chain,
                                result_content_type=content_typ,
                            )
                        )
                        yield
                        astr_event.clear_result()
                    else:
                        if resp.type == "streaming_delta":
                            yield resp.data["chain"]  # MessageChain
                if agent_runner.done():
                    success = True
                    break
            if success:
                break

        except Exception as e:
            logger.error(
                f"Attempt {attempt + 1}/{retry_on_failure + 1} failed: {traceback.format_exc()}"
            )
            if attempt >= retry_on_failure:
                # All retries exhausted, handle final error
                if report_error_message:
                    astr_event.set_result(
                        MessageEventResult().message(
                            f"AstrBot 请求失败。\n错误类型: {type(e).__name__}\n错误信息: {str(e)}\n\n请在控制台查看和分享错误详情。\n"
                        )
                    )
                elif fallback_response:
                    astr_event.set_result(
                        MessageEventResult().message(fallback_response)
                    )
                # If report_error_message is False and fallback_response is empty, do nothing (silent fail).
                return
            # If retries are left, continue to the next attempt
            continue
    if success:
        asyncio.create_task(
            Metric.upload(
                llm_tick=1,
                model_name=agent_runner.provider.get_model(),
                provider_type=agent_runner.provider.meta().type,
            )
        )

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Simplify conditional into switch-like form ([`switch`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/switch/))
- Low code quality found in run\_agent - 9% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))

<br/><details><summary>Explanation</summary>
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

- Reduce the function length by extracting pieces of functionality out into
  their own functions. This is the most important thing you can do - ideally a
  function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
  sits together within the function rather than being scattered.</details>
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 332 to 338
# Load error handling settings
error_handling_settings = settings.get("error_handling", {})
self.retry_on_failure = error_handling_settings.get("retry_on_failure", 0)
self.report_error_message = error_handling_settings.get(
"report_error_message", True
)
self.fallback_response = error_handling_settings.get("fallback_response", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 考虑验证 error_handling 设置的类型和值。

直接从配置中加载 error_handling 设置而不进行验证,可能会导致 retry_on_failure 等值无效时出现运行时错误。请为这些设置添加类型和值检查。

Suggested change
# Load error handling settings
error_handling_settings = settings.get("error_handling", {})
self.retry_on_failure = error_handling_settings.get("retry_on_failure", 0)
self.report_error_message = error_handling_settings.get(
"report_error_message", True
)
self.fallback_response = error_handling_settings.get("fallback_response", "")
# Load error handling settings with validation
error_handling_settings = settings.get("error_handling", {})
# Validate retry_on_failure: must be a non-negative integer
retry_on_failure = error_handling_settings.get("retry_on_failure", 0)
if not isinstance(retry_on_failure, int) or retry_on_failure < 0:
logger.warning(
f"Invalid value for retry_on_failure: {retry_on_failure}. Using default 0."
)
retry_on_failure = 0
self.retry_on_failure = retry_on_failure
# Validate report_error_message: must be a boolean
report_error_message = error_handling_settings.get("report_error_message", True)
if not isinstance(report_error_message, bool):
logger.warning(
f"Invalid value for report_error_message: {report_error_message}. Using default True."
)
report_error_message = True
self.report_error_message = report_error_message
# Validate fallback_response: must be a string
fallback_response = error_handling_settings.get("fallback_response", "")
if not isinstance(fallback_response, str):
logger.warning(
f"Invalid value for fallback_response: {fallback_response}. Using default ''."
)
fallback_response = ""
self.fallback_response = fallback_response
Original comment in English

suggestion (bug_risk): Consider validating error_handling settings for type and value.

Directly loading error_handling settings from the config without validation may lead to runtime errors if values like retry_on_failure are invalid. Please add type and value checks for these settings.

Suggested change
# Load error handling settings
error_handling_settings = settings.get("error_handling", {})
self.retry_on_failure = error_handling_settings.get("retry_on_failure", 0)
self.report_error_message = error_handling_settings.get(
"report_error_message", True
)
self.fallback_response = error_handling_settings.get("fallback_response", "")
# Load error handling settings with validation
error_handling_settings = settings.get("error_handling", {})
# Validate retry_on_failure: must be a non-negative integer
retry_on_failure = error_handling_settings.get("retry_on_failure", 0)
if not isinstance(retry_on_failure, int) or retry_on_failure < 0:
logger.warning(
f"Invalid value for retry_on_failure: {retry_on_failure}. Using default 0."
)
retry_on_failure = 0
self.retry_on_failure = retry_on_failure
# Validate report_error_message: must be a boolean
report_error_message = error_handling_settings.get("report_error_message", True)
if not isinstance(report_error_message, bool):
logger.warning(
f"Invalid value for report_error_message: {report_error_message}. Using default True."
)
report_error_message = True
self.report_error_message = report_error_message
# Validate fallback_response: must be a string
fallback_response = error_handling_settings.get("fallback_response", "")
if not isinstance(fallback_response, str):
logger.warning(
f"Invalid value for fallback_response: {fallback_response}. Using default ''."
)
fallback_response = ""
self.fallback_response = fallback_response

MAIN_AGENT_HOOKS = MainAgentHooks()


async def run_agent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): 我们发现了这些问题:

  • 将条件简化为类似 switch 的形式 (switch)
  • run_agent 中发现代码质量低 - 9% (low-code-quality)


解释
此函数的质量得分低于 25% 的质量阈值。
此得分是方法长度、认知复杂度和工作内存的组合。

你如何解决这个问题?

重构此函数以使其更短、更具可读性可能值得。

  • 通过将部分功能提取到自己的函数中来减少函数长度。这是你能做的最重要的事情——理想情况下,一个函数应该少于 10 行。
  • 减少嵌套,也许通过引入守卫子句来提前返回。
  • 确保变量的作用域紧密,以便使用相关概念的代码在函数中坐在一起而不是分散。
Original comment in English

issue (code-quality): We've found these issues:

  • Simplify conditional into switch-like form (switch)
  • Low code quality found in run_agent - 9% (low-code-quality)


Explanation
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

@anka-afk anka-afk removed their request for review September 19, 2025 12:25
本次提交在原有的重试与错误处理机制基础上,增加了以下改进:

- 新增 \
etry_delay\ 配置项,允许用户设置重试之间的等待时间(秒)。

- 对 \
etry_on_failure\ 和 \
etry_delay\ 配置项增加了值范围验证(0-10),增强了健壮性。

- 在 \
un_agent\ 函数中实现了重试前的异步等待逻辑。
本次提交为 LLM 错误处理机制增加了 \
etry_delay\ 配置项的默认值和管理面板元数据,使其在 AstrBot 的配置系统中完全可用。
@kawayiYokami
Copy link
Contributor Author

新增重试次数和重试间隔

本次提交为 \
un_agent\ 函数新增了全面的单元测试,覆盖了成功执行、重试成功、重试耗尽并报告错误、重试耗尽并返回备用响应以及重试耗尽并静默失败等所有关键场景。这些测试将确保 \
un_agent\ 函数的稳定性和可靠性。
@kawayiYokami kawayiYokami deleted the feature/hide-api-errors branch October 16, 2025 05:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant