Skip to content

Conversation

ctrlkk
Copy link
Contributor

@ctrlkk ctrlkk commented Sep 16, 2025

fixes #2768


Motivation / 动机

增加了一个选项用于开启请求队列功能,如果它被开启,那么同一会话中同时收到多个请求时,会将这些请求排队依次处理,确保上下文连续性,避免并发处理导致的混乱。

Modifications / 改动点

astrbot/core/pipeline/process_stage/method/llm_request.py
增加锁用于控制请求队列
astrbot/core/config/default.py
增加对应配置管理该功能

Verification Steps / 验证步骤

在 配置文件->平台配置->其它配置->启用请求队列功能 中启用该功能

Screenshots or Test Results / 运行截图或测试结果

图片 图片

Compatibility & Breaking Changes / 兼容性与破坏性变更

  • 这是一个破坏性变更 (Breaking Change)。/ This is a breaking change.
  • 这不是一个破坏性变更。/ This is NOT a breaking change.

Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Sourcery 总结

添加了一个可选的请求队列功能,用于序列化每个对话的并发 LLM 请求,以保持上下文的连续性。

新功能:

  • platform_settings 下引入一个 'request_queue' 配置标志,以启用 LLM 请求的顺序处理。
  • LLMRequestSubStage 中实现每个会话的 asyncio 锁,用于排队和序列化请求,并在完成时释放锁。

改进:

  • 扩展默认配置和模式以包含 'request_queue' 选项。
Original summary in English

Summary by Sourcery

Add an optional request queue feature that serializes concurrent LLM requests per conversation to maintain context continuity.

New Features:

  • Introduce a 'request_queue' configuration flag under platform_settings to enable sequential processing of LLM requests.
  • Implement per-session asyncio locks in LLMRequestSubStage to queue and serialize requests, releasing the lock upon completion.

Enhancements:

  • Extend the default configuration and schema to include the 'request_queue' option.

@auto-assign auto-assign bot requested review from Soulter and anka-afk September 16, 2025 02:09
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你好 - 我已审阅了你的更改 - 以下是一些反馈:

  • 使用异步上下文管理器(async with lock)或将 lock.acquire() 包装在 try/finally 块中,以确保即使发生异常,锁也能始终释放。
  • 考虑清理或限制 user_llm_locks 的大小,以避免在使用许多会话 ID 时出现无限制增长。
AI 代理的提示
请解决此代码审查中的评论:

## 总体评论
- 使用异步上下文管理器(async with lock)或将 lock.acquire() 包装在 try/finally 块中,以确保即使发生异常,锁也能始终释放。
- 考虑清理或限制 user_llm_locks 的大小,以避免在使用许多会话 ID 时出现无限制增长。

## 单独评论

### 评论 1
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:347-351` </location>
<code_context>
             conversation = await conv_mgr.get_conversation(umo, cid)
         return conversation

+    def _unlock(self, cid: str):
+        # 释放锁
+        if cid in user_llm_locks and user_llm_locks[cid].locked():
+            user_llm_locks[cid].release()
+            logger.info(f"用户(cid: {cid}) 的请求已完成,锁已释放。")
+
     async def process(
</code_context>

<issue_to_address>
**issue (bug_risk):** 检查释放锁时的竞态条件。

多个并发请求可能导致锁被未获取它的请求释放。为防止这种情况,请考虑跟踪锁所有权或实现排队机制。
</issue_to_address>

### 评论 2
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:404-414` </location>
<code_context>
             return

+        # 控制请求队列
+        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
+            cid = req.conversation.cid
+            lock = user_llm_locks[cid]
+            if lock.locked():
+                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
+            await lock.acquire()
+            # 更新到最新的上下文
+            conversation = await self._get_session_conv(event)
+            req.conversation = conversation
+            req.contexts = json.loads(conversation.history)
+
         # 执行请求 LLM 前事件钩子。
</code_context>

<issue_to_address>
**suggestion (bug_risk):** 考虑对锁获取和上下文更新进行异常处理。

如果在锁获取或上下文更新期间发生错误,锁可能不会释放,从而导致请求挂起。使用 try/finally 块来确保锁被释放。

```suggestion
        # 控制请求队列
        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            cid = req.conversation.cid
            lock = user_llm_locks[cid]
            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            try:
                # 更新到最新的上下文
                conversation = await self._get_session_conv(event)
                req.conversation = conversation
                req.contexts = json.loads(conversation.history)
            except Exception as e:
                logger.error(f"请求队列处理时发生异常: {e}")
                raise
            finally:
                if lock.locked():
                    lock.release()
```
</issue_to_address>

### 评论 3
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:418` </location>
<code_context>
+
         # 执行请求 LLM 前事件钩子。
         if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
+            self._unlock(req.conversation.cid)
             return

</code_context>

<issue_to_address>
**suggestion (bug_risk):** 在返回之前解锁可能无法覆盖所有提前返回的情况。

其他提前返回或异常可能会阻止锁被释放。使用上下文管理器或 try/finally 来确保锁始终被释放。

建议的实现:

```python
            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            try:
                # 更新到最新的上下文
                conversation = await self._get_session_conv(event)
                req.conversation = conversation
                req.contexts = json.loads(conversation.history)

                # 执行请求 LLM 前事件钩子。
                if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
                    return

                # ... 这里继续后续处理逻辑 ...

            finally:
                lock.release()

```

如果函数其余部分还有其他提前返回或异常,它们现在将由 try/finally 块覆盖。如果额外的逻辑是锁定部分的一部分,你可能需要将其移到 try 块内。
</issue_to_address>

### 评论 4
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:291` </location>
<code_context>


+# 用于将并行请求转化为队列的锁
+user_llm_locks = defaultdict(asyncio.Lock)
+
+
</code_context>

<issue_to_address>
**issue (complexity):** 考虑通过使用每个实例锁和 'async with' 块来重构锁管理,以简化请求队列处理。

```markdown
你可以通过以下方式消除分散的 `lock.acquire()`/`lock.release()` 调用(以及全局 `_unlock`):

1. 将锁映射移动到阶段实例中。
2. 将请求逻辑包装在 `async with lock:` 块中。
3. 将“实际工作”提取到辅助方法中,这样就不必重复提前返回。

例如:

```python
from collections import defaultdict

class LLMRequestSubStage(Stage):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # now per‐stage, not global
        self.user_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)

    async def process(self, event: AstrMessageEvent, _nested: bool = False):
        # ... your preamble: build req, conversation, etc. ...
        cid = req.conversation.cid

        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            lock = self.user_locks[cid]
            async with lock:
                return await self._process_with_llm(event, req)
        else:
            return await self._process_with_llm(event, req)

    async def _process_with_llm(self, event: AstrMessageEvent, req: ProviderRequest):
        # all the code that used to be between acquire()/release()
        # including hooks, max‐length checks, agent runner, history save, webchat, etc.
        if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
            return
        # ... rest of your logic ...
```

然后:

- 删除模块级别的旧 `user_llm_locks = defaultdict(...)`- 删除 `_unlock` 方法和所有手动 `.release()` 调用。
- 所有功能保持不变,但你不再有容易出错的手动锁/释放路径。
</issue_to_address>

### 评论 5
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:353` </location>
<code_context>
    async def process(
        self, event: AstrMessageEvent, _nested: bool = False
    ) -> Union[None, AsyncGenerator[None, None]]:
        req: ProviderRequest | None = None

        if not self.ctx.astrbot_config["provider_settings"]["enable"]:
            logger.debug("未启用 LLM 能力,跳过处理。")
            return

        # 检查会话级别的LLM启停状态
        if not SessionServiceManager.should_process_llm_request(event):
            logger.debug(f"会话 {event.unified_msg_origin} 禁用了 LLM,跳过处理。")
            return

        provider = self._select_provider(event)
        if provider is None:
            return

        if event.get_extra("provider_request"):
            req = event.get_extra("provider_request")
            assert isinstance(req, ProviderRequest), (
                "provider_request 必须是 ProviderRequest 类型。"
            )

            if req.conversation:
                req.contexts = json.loads(req.conversation.history)

        else:
            req = ProviderRequest(prompt="", image_urls=[])
            if sel_model := event.get_extra("selected_model"):
                req.model = sel_model
            if self.provider_wake_prefix:
                if not event.message_str.startswith(self.provider_wake_prefix):
                    return
            req.prompt = event.message_str[len(self.provider_wake_prefix) :]
            # func_tool selection 现在已经转移到 packages/astrbot 插件中进行选择。
            # req.func_tool = self.ctx.plugin_manager.context.get_llm_tool_manager()
            for comp in event.message_obj.message:
                if isinstance(comp, Image):
                    image_path = await comp.convert_to_file_path()
                    req.image_urls.append(image_path)

            conversation = await self._get_session_conv(event)
            req.conversation = conversation
            req.contexts = json.loads(conversation.history)

            event.set_extra("provider_request", req)

        if not req.prompt and not req.image_urls:
            return

        # 控制请求队列
        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            cid = req.conversation.cid
            lock = user_llm_locks[cid]
            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            # 更新到最新的上下文
            conversation = await self._get_session_conv(event)
            req.conversation = conversation
            req.contexts = json.loads(conversation.history)

        # 执行请求 LLM 前事件钩子。
        if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
            self._unlock(req.conversation.cid)
            return

        if isinstance(req.contexts, str):
            req.contexts = json.loads(req.contexts)

        # max context length
        if (
            self.max_context_length != -1  # -1 为不限制
            and len(req.contexts) // 2 > self.max_context_length
        ):
            logger.debug("上下文长度超过限制,将截断。")
            req.contexts = req.contexts[
                -(self.max_context_length - self.dequeue_context_length + 1) * 2 :
            ]
            # 找到第一个role 为 user 的索引,确保上下文格式正确
            index = next(
                (
                    i
                    for i, item in enumerate(req.contexts)
                    if item.get("role") == "user"
                ),
                None,
            )
            if index is not None and index > 0:
                req.contexts = req.contexts[index:]

        # session_id
        if not req.session_id:
            req.session_id = event.unified_msg_origin

        # fix messages
        req.contexts = self.fix_messages(req.contexts)

        # check provider modalities
        # 如果提供商不支持图像/工具使用,但请求中包含图像/工具列表,则清空。图片转述等的检测和调用发生在这之前,因此这里可以这样处理。
        if req.image_urls:
            provider_cfg = provider.provider_config.get("modalities", ["image"])
            if "image" not in provider_cfg:
                logger.debug(f"用户设置提供商 {provider} 不支持图像,清空图像列表。")
                req.image_urls = []
        if req.func_tool:
            provider_cfg = provider.provider_config.get("modalities", ["tool_use"])
            # 如果模型不支持工具使用,但请求中包含工具列表,则清空。
            if "tool_use" not in provider_cfg:
                logger.debug(
                    f"用户设置提供商 {provider} 不支持工具使用,清空工具列表。"
                )
                req.func_tool = None
        # 插件可用性设置
        if event.plugins_name is not None and req.func_tool:
            new_tool_set = ToolSet()
            for tool in req.func_tool.tools:
                plugin = star_map.get(tool.handler_module_path)
                if not plugin:
                    continue
                if plugin.name in event.plugins_name or plugin.reserved:
                    new_tool_set.add_tool(tool)
            req.func_tool = new_tool_set

        # run agent
        agent_runner = AgentRunner()
        logger.debug(
            f"handle provider[id: {provider.provider_config['id']}] request: {req}"
        )
        astr_agent_ctx = AstrAgentContext(
            provider=provider,
            first_provider_request=req,
            curr_provider_request=req,
            streaming=self.streaming_response,
        )
        await agent_runner.reset(
            provider=provider,
            request=req,
            run_context=AgentContextWrapper(context=astr_agent_ctx, event=event),
            tool_executor=FunctionToolExecutor(),
            agent_hooks=MAIN_AGENT_HOOKS,
            streaming=self.streaming_response,
        )

        if self.streaming_response:
            # 流式响应
            event.set_result(
                MessageEventResult()
                .set_result_content_type(ResultContentType.STREAMING_RESULT)
                .set_async_stream(
                    run_agent(agent_runner, self.max_step, self.show_tool_use)
                )
            )
            yield
            if agent_runner.done():
                if final_llm_resp := agent_runner.get_final_llm_resp():
                    if final_llm_resp.completion_text:
                        chain = (
                            MessageChain().message(final_llm_resp.completion_text).chain
                        )
                    else:
                        chain = final_llm_resp.result_chain.chain
                    event.set_result(
                        MessageEventResult(
                            chain=chain,
                            result_content_type=ResultContentType.STREAMING_FINISH,
                        )
                    )
        else:
            async for _ in run_agent(agent_runner, self.max_step, self.show_tool_use):
                yield

        await self._save_to_history(event, req, agent_runner.get_final_llm_resp())

        # 异步处理 WebChat 特殊情况
        if event.get_platform_name() == "webchat":
            asyncio.create_task(self._handle_webchat(event, req, provider))

        self._unlock(req.conversation.cid)

</code_context>

<issue_to_address>
**issue (code-quality):** 在 LLMRequestSubStage.process 中发现低代码质量 - 10% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))

<br/><details><summary>解释</summary>此函数的质量得分低于 25% 的质量阈值。
此得分是方法长度、认知复杂度和工作内存的组合。

如何解决这个问题?

重构此函数以使其更短、更具可读性可能是有益的。

- 通过将部分功能提取到自己的函数中来减少函数长度。这是你能做的最重要的事情——理想情况下,一个函数应该少于 10 行。
- 减少嵌套,也许可以通过引入守卫子句来提前返回。
- 确保变量的作用域紧密,以便使用相关概念的代码在函数中坐在一起,而不是分散开来。</details>
</issue_to_address>

Sourcery 对开源免费 - 如果你喜欢我们的评论,请考虑分享它们 ✨
帮助我更有用!请点击每个评论上的 👍 或 👎,我将根据反馈改进你的评论。
Original comment in English

Hey there - I've reviewed your changes - here's some feedback:

  • Use an async context manager (async with lock) or wrap lock.acquire() in a try/finally block to guarantee the lock is always released even if an exception occurs.
  • Consider cleaning up or bounding the size of user_llm_locks to avoid unbounded growth when many conversation IDs are used.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Use an async context manager (async with lock) or wrap lock.acquire() in a try/finally block to guarantee the lock is always released even if an exception occurs.
- Consider cleaning up or bounding the size of user_llm_locks to avoid unbounded growth when many conversation IDs are used.

## Individual Comments

### Comment 1
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:347-351` </location>
<code_context>
             conversation = await conv_mgr.get_conversation(umo, cid)
         return conversation

+    def _unlock(self, cid: str):
+        # 释放锁
+        if cid in user_llm_locks and user_llm_locks[cid].locked():
+            user_llm_locks[cid].release()
+            logger.info(f"用户(cid: {cid}) 的请求已完成,锁已释放。")
+
     async def process(
</code_context>

<issue_to_address>
**issue (bug_risk):** Check for race conditions when releasing locks.

Multiple concurrent requests could cause a lock to be released by a request that did not acquire it. To prevent this, consider tracking lock ownership or implementing a queueing mechanism.
</issue_to_address>

### Comment 2
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:404-414` </location>
<code_context>
             return

+        # 控制请求队列
+        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
+            cid = req.conversation.cid
+            lock = user_llm_locks[cid]
+            if lock.locked():
+                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
+            await lock.acquire()
+            # 更新到最新的上下文
+            conversation = await self._get_session_conv(event)
+            req.conversation = conversation
+            req.contexts = json.loads(conversation.history)
+
         # 执行请求 LLM 前事件钩子。
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Consider exception handling for lock acquisition and context update.

If an error occurs during lock acquisition or context update, the lock may not be released, leading to hanging requests. Use a try/finally block to guarantee the lock is released.

```suggestion
        # 控制请求队列
        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            cid = req.conversation.cid
            lock = user_llm_locks[cid]
            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            try:
                # 更新到最新的上下文
                conversation = await self._get_session_conv(event)
                req.conversation = conversation
                req.contexts = json.loads(conversation.history)
            except Exception as e:
                logger.error(f"请求队列处理时发生异常: {e}")
                raise
            finally:
                if lock.locked():
                    lock.release()
```
</issue_to_address>

### Comment 3
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:418` </location>
<code_context>
+
         # 执行请求 LLM 前事件钩子。
         if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
+            self._unlock(req.conversation.cid)
             return

</code_context>

<issue_to_address>
**suggestion (bug_risk):** Unlocking before returning may not cover all early returns.

Other early returns or exceptions could prevent the lock from being released. Use a context manager or try/finally to ensure the lock is always released.

Suggested implementation:

```python
            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            try:
                # 更新到最新的上下文
                conversation = await self._get_session_conv(event)
                req.conversation = conversation
                req.contexts = json.loads(conversation.history)

                # 执行请求 LLM 前事件钩子。
                if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
                    return

                # ... 这里继续后续处理逻辑 ...

            finally:
                lock.release()

```

If there are other early returns or exceptions in the rest of the function, they will now be covered by the try/finally block. You may need to move additional logic inside the try block if it is part of the locked section.
</issue_to_address>

### Comment 4
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:291` </location>
<code_context>


+# 用于将并行请求转化为队列的锁
+user_llm_locks = defaultdict(asyncio.Lock)
+
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider refactoring lock management by using per-instance locks and 'async with' blocks to simplify request queue handling.

```markdown
You can eliminate the scattered `lock.acquire()`/`lock.release()` calls (and the global `_unlock`) by:

1. Moving your lock‐map into the stage instance.
2. Wrapping your request logic in an `async with lock:` block.
3. Extracting the “real work” into a helper method so you don’t have to repeat early returns.

For example:

```python
from collections import defaultdict

class LLMRequestSubStage(Stage):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # now per‐stage, not global
        self.user_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)

    async def process(self, event: AstrMessageEvent, _nested: bool = False):
        # ... your preamble: build req, conversation, etc. ...
        cid = req.conversation.cid

        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            lock = self.user_locks[cid]
            async with lock:
                return await self._process_with_llm(event, req)
        else:
            return await self._process_with_llm(event, req)

    async def _process_with_llm(self, event: AstrMessageEvent, req: ProviderRequest):
        # all the code that used to be between acquire()/release()
        # including hooks, max‐length checks, agent runner, history save, webchat, etc.
        if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
            return
        # ... rest of your logic ...
```

Then:

- Remove the old `user_llm_locks = defaultdict(...)` at module level.
- Delete the `_unlock` method and all manual `.release()` calls.
- Everything remains functionally identical, but you no longer have error‐prone manual lock/release paths.
</issue_to_address>

### Comment 5
<location> `astrbot/core/pipeline/process_stage/method/llm_request.py:353` </location>
<code_context>
    async def process(
        self, event: AstrMessageEvent, _nested: bool = False
    ) -> Union[None, AsyncGenerator[None, None]]:
        req: ProviderRequest | None = None

        if not self.ctx.astrbot_config["provider_settings"]["enable"]:
            logger.debug("未启用 LLM 能力,跳过处理。")
            return

        # 检查会话级别的LLM启停状态
        if not SessionServiceManager.should_process_llm_request(event):
            logger.debug(f"会话 {event.unified_msg_origin} 禁用了 LLM,跳过处理。")
            return

        provider = self._select_provider(event)
        if provider is None:
            return

        if event.get_extra("provider_request"):
            req = event.get_extra("provider_request")
            assert isinstance(req, ProviderRequest), (
                "provider_request 必须是 ProviderRequest 类型。"
            )

            if req.conversation:
                req.contexts = json.loads(req.conversation.history)

        else:
            req = ProviderRequest(prompt="", image_urls=[])
            if sel_model := event.get_extra("selected_model"):
                req.model = sel_model
            if self.provider_wake_prefix:
                if not event.message_str.startswith(self.provider_wake_prefix):
                    return
            req.prompt = event.message_str[len(self.provider_wake_prefix) :]
            # func_tool selection 现在已经转移到 packages/astrbot 插件中进行选择。
            # req.func_tool = self.ctx.plugin_manager.context.get_llm_tool_manager()
            for comp in event.message_obj.message:
                if isinstance(comp, Image):
                    image_path = await comp.convert_to_file_path()
                    req.image_urls.append(image_path)

            conversation = await self._get_session_conv(event)
            req.conversation = conversation
            req.contexts = json.loads(conversation.history)

            event.set_extra("provider_request", req)

        if not req.prompt and not req.image_urls:
            return

        # 控制请求队列
        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            cid = req.conversation.cid
            lock = user_llm_locks[cid]
            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            # 更新到最新的上下文
            conversation = await self._get_session_conv(event)
            req.conversation = conversation
            req.contexts = json.loads(conversation.history)

        # 执行请求 LLM 前事件钩子。
        if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
            self._unlock(req.conversation.cid)
            return

        if isinstance(req.contexts, str):
            req.contexts = json.loads(req.contexts)

        # max context length
        if (
            self.max_context_length != -1  # -1 为不限制
            and len(req.contexts) // 2 > self.max_context_length
        ):
            logger.debug("上下文长度超过限制,将截断。")
            req.contexts = req.contexts[
                -(self.max_context_length - self.dequeue_context_length + 1) * 2 :
            ]
            # 找到第一个role 为 user 的索引,确保上下文格式正确
            index = next(
                (
                    i
                    for i, item in enumerate(req.contexts)
                    if item.get("role") == "user"
                ),
                None,
            )
            if index is not None and index > 0:
                req.contexts = req.contexts[index:]

        # session_id
        if not req.session_id:
            req.session_id = event.unified_msg_origin

        # fix messages
        req.contexts = self.fix_messages(req.contexts)

        # check provider modalities
        # 如果提供商不支持图像/工具使用,但请求中包含图像/工具列表,则清空。图片转述等的检测和调用发生在这之前,因此这里可以这样处理。
        if req.image_urls:
            provider_cfg = provider.provider_config.get("modalities", ["image"])
            if "image" not in provider_cfg:
                logger.debug(f"用户设置提供商 {provider} 不支持图像,清空图像列表。")
                req.image_urls = []
        if req.func_tool:
            provider_cfg = provider.provider_config.get("modalities", ["tool_use"])
            # 如果模型不支持工具使用,但请求中包含工具列表,则清空。
            if "tool_use" not in provider_cfg:
                logger.debug(
                    f"用户设置提供商 {provider} 不支持工具使用,清空工具列表。"
                )
                req.func_tool = None
        # 插件可用性设置
        if event.plugins_name is not None and req.func_tool:
            new_tool_set = ToolSet()
            for tool in req.func_tool.tools:
                plugin = star_map.get(tool.handler_module_path)
                if not plugin:
                    continue
                if plugin.name in event.plugins_name or plugin.reserved:
                    new_tool_set.add_tool(tool)
            req.func_tool = new_tool_set

        # run agent
        agent_runner = AgentRunner()
        logger.debug(
            f"handle provider[id: {provider.provider_config['id']}] request: {req}"
        )
        astr_agent_ctx = AstrAgentContext(
            provider=provider,
            first_provider_request=req,
            curr_provider_request=req,
            streaming=self.streaming_response,
        )
        await agent_runner.reset(
            provider=provider,
            request=req,
            run_context=AgentContextWrapper(context=astr_agent_ctx, event=event),
            tool_executor=FunctionToolExecutor(),
            agent_hooks=MAIN_AGENT_HOOKS,
            streaming=self.streaming_response,
        )

        if self.streaming_response:
            # 流式响应
            event.set_result(
                MessageEventResult()
                .set_result_content_type(ResultContentType.STREAMING_RESULT)
                .set_async_stream(
                    run_agent(agent_runner, self.max_step, self.show_tool_use)
                )
            )
            yield
            if agent_runner.done():
                if final_llm_resp := agent_runner.get_final_llm_resp():
                    if final_llm_resp.completion_text:
                        chain = (
                            MessageChain().message(final_llm_resp.completion_text).chain
                        )
                    else:
                        chain = final_llm_resp.result_chain.chain
                    event.set_result(
                        MessageEventResult(
                            chain=chain,
                            result_content_type=ResultContentType.STREAMING_FINISH,
                        )
                    )
        else:
            async for _ in run_agent(agent_runner, self.max_step, self.show_tool_use):
                yield

        await self._save_to_history(event, req, agent_runner.get_final_llm_resp())

        # 异步处理 WebChat 特殊情况
        if event.get_platform_name() == "webchat":
            asyncio.create_task(self._handle_webchat(event, req, provider))

        self._unlock(req.conversation.cid)

</code_context>

<issue_to_address>
**issue (code-quality):** Low code quality found in LLMRequestSubStage.process - 10% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))

<br/><details><summary>Explanation</summary>The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

- Reduce the function length by extracting pieces of functionality out into
  their own functions. This is the most important thing you can do - ideally a
  function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
  sits together within the function rather than being scattered.</details>
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +347 to +351
def _unlock(self, cid: str):
# 释放锁
if cid in user_llm_locks and user_llm_locks[cid].locked():
user_llm_locks[cid].release()
logger.info(f"用户(cid: {cid}) 的请求已完成,锁已释放。")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 检查释放锁时的竞态条件。

多个并发请求可能导致锁被未获取它的请求释放。为防止这种情况,请考虑跟踪锁所有权或实现排队机制。

Original comment in English

issue (bug_risk): Check for race conditions when releasing locks.

Multiple concurrent requests could cause a lock to be released by a request that did not acquire it. To prevent this, consider tracking lock ownership or implementing a queueing mechanism.

Comment on lines +404 to +414
# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 考虑对锁获取和上下文更新进行异常处理。

如果在锁获取或上下文更新期间发生错误,锁可能不会释放,从而导致请求挂起。使用 try/finally 块来确保锁被释放。

Suggested change
# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
try:
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
except Exception as e:
logger.error(f"请求队列处理时发生异常: {e}")
raise
finally:
if lock.locked():
lock.release()
Original comment in English

suggestion (bug_risk): Consider exception handling for lock acquisition and context update.

If an error occurs during lock acquisition or context update, the lock may not be released, leading to hanging requests. Use a try/finally block to guarantee the lock is released.

Suggested change
# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
try:
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
except Exception as e:
logger.error(f"请求队列处理时发生异常: {e}")
raise
finally:
if lock.locked():
lock.release()


# 执行请求 LLM 前事件钩子。
if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
self._unlock(req.conversation.cid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 在返回之前解锁可能无法覆盖所有提前返回的情况。

其他提前返回或异常可能会阻止锁被释放。使用上下文管理器或 try/finally 来确保锁始终被释放。

建议的实现:

            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            try:
                # 更新到最新的上下文
                conversation = await self._get_session_conv(event)
                req.conversation = conversation
                req.contexts = json.loads(conversation.history)

                # 执行请求 LLM 前事件钩子。
                if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
                    return

                # ... 这里继续后续处理逻辑 ...

            finally:
                lock.release()

如果函数其余部分还有其他提前返回或异常,它们现在将由 try/finally 块覆盖。如果额外的逻辑是锁定部分的一部分,你可能需要将其移到 try 块内。

Original comment in English

suggestion (bug_risk): Unlocking before returning may not cover all early returns.

Other early returns or exceptions could prevent the lock from being released. Use a context manager or try/finally to ensure the lock is always released.

Suggested implementation:

            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            try:
                # 更新到最新的上下文
                conversation = await self._get_session_conv(event)
                req.conversation = conversation
                req.contexts = json.loads(conversation.history)

                # 执行请求 LLM 前事件钩子。
                if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
                    return

                # ... 这里继续后续处理逻辑 ...

            finally:
                lock.release()

If there are other early returns or exceptions in the rest of the function, they will now be covered by the try/finally block. You may need to move additional logic inside the try block if it is part of the locked section.



# 用于将并行请求转化为队列的锁
user_llm_locks = defaultdict(asyncio.Lock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 考虑通过使用每个实例锁和 'async with' 块来重构锁管理,以简化请求队列处理。

你可以通过以下方式消除分散的 `lock.acquire()`/`lock.release()` 调用(以及全局 `_unlock`):

1. 将锁映射移动到阶段实例中。
2. 将请求逻辑包装在 `async with lock:` 块中。
3. 将“实际工作”提取到辅助方法中,这样就不必重复提前返回。

例如:

```python
from collections import defaultdict

class LLMRequestSubStage(Stage):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # now per‐stage, not global
        self.user_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)

    async def process(self, event: AstrMessageEvent, _nested: bool = False):
        # ... your preamble: build req, conversation, etc. ...
        cid = req.conversation.cid

        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            lock = self.user_locks[cid]
            async with lock:
                return await self._process_with_llm(event, req)
        else:
            return await self._process_with_llm(event, req)

    async def _process_with_llm(self, event: AstrMessageEvent, req: ProviderRequest):
        # all the code that used to be between acquire()/release()
        # including hooks, max‐length checks, agent runner, history save, webchat, etc.
        if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
            return
        # ... rest of your logic ...

然后:

  • 删除模块级别的旧 user_llm_locks = defaultdict(...)
  • 删除 _unlock 方法和所有手动 .release() 调用。
  • 所有功能保持不变,但你不再有容易出错的手动锁/释放路径。
Original comment in English

issue (complexity): Consider refactoring lock management by using per-instance locks and 'async with' blocks to simplify request queue handling.

You can eliminate the scattered `lock.acquire()`/`lock.release()` calls (and the global `_unlock`) by:

1. Moving your lock‐map into the stage instance.
2. Wrapping your request logic in an `async with lock:` block.
3. Extracting the “real work” into a helper method so you don’t have to repeat early returns.

For example:

```python
from collections import defaultdict

class LLMRequestSubStage(Stage):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # now per‐stage, not global
        self.user_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)

    async def process(self, event: AstrMessageEvent, _nested: bool = False):
        # ... your preamble: build req, conversation, etc. ...
        cid = req.conversation.cid

        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            lock = self.user_locks[cid]
            async with lock:
                return await self._process_with_llm(event, req)
        else:
            return await self._process_with_llm(event, req)

    async def _process_with_llm(self, event: AstrMessageEvent, req: ProviderRequest):
        # all the code that used to be between acquire()/release()
        # including hooks, max‐length checks, agent runner, history save, webchat, etc.
        if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
            return
        # ... rest of your logic ...

Then:

  • Remove the old user_llm_locks = defaultdict(...) at module level.
  • Delete the _unlock method and all manual .release() calls.
  • Everything remains functionally identical, but you no longer have error‐prone manual lock/release paths.

user_llm_locks[cid].release()
logger.info(f"用户(cid: {cid}) 的请求已完成,锁已释放。")

async def process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): 在 LLMRequestSubStage.process 中发现低代码质量 - 10% (low-code-quality)


解释此函数的质量得分低于 25% 的质量阈值。
此得分是方法长度、认知复杂度和工作内存的组合。

如何解决这个问题?

重构此函数以使其更短、更具可读性可能是有益的。

  • 通过将部分功能提取到自己的函数中来减少函数长度。这是你能做的最重要的事情——理想情况下,一个函数应该少于 10 行。
  • 减少嵌套,也许可以通过引入守卫子句来提前返回。
  • 确保变量的作用域紧密,以便使用相关概念的代码在函数中坐在一起,而不是分散开来。
Original comment in English

issue (code-quality): Low code quality found in LLMRequestSubStage.process - 10% (low-code-quality)


ExplanationThe quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

@anka-afk
Copy link
Member

#2775 实现了。

@ctrlkk ctrlkk closed this Sep 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature]增加请求队列选项

2 participants