Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ AstrBot 是一个松耦合、异步、支持多消息平台部署、具有易用
## ✨ 近期更新

1. AstrBot 现已支持接入 [MCP](https://modelcontextprotocol.io/) 服务器!
2. 现已集成 [Google Agent SDK](https://google.github.io/adk-docs/)


## ✨ 主要功能

Expand Down Expand Up @@ -131,6 +133,7 @@ uvx astrbot init
| OpenAI API | ✔ | 文本生成 | 也支持 DeepSeek、Google Gemini、GLM、Kimi、xAI 等兼容 OpenAI API 的服务 |
| Claude API | ✔ | 文本生成 | |
| Google Gemini API | ✔ | 文本生成 | |
| Google Agent SDK | ✔ | Agent SDK | [https://google.github.io/adk-docs/](https://google.github.io/adk-docs/) |
| Dify | ✔ | LLMOps | |
| 阿里云百炼应用 | ✔ | LLMOps | |
| Ollama | ✔ | 模型加载器 | 本地部署 DeepSeek、Llama 等开源语言模型 |
Expand Down
1 change: 1 addition & 0 deletions README_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ See docs: [Source Code Deployment](https://astrbot.app/deploy/astrbot/cli.html)
| OpenAI API | ✔ | Text Generation | Supports all OpenAI API-compatible services including DeepSeek, Google Gemini, GLM, Moonshot, Alibaba Cloud Bailian, Silicon Flow, xAI, etc. |
| Claude API | ✔ | Text Generation | |
| Google Gemini API | ✔ | Text Generation | |
| Google Agent SDK | ✔ | Agent SDK | [Documentation](https://google.github.io/adk-docs/) |
| Dify | ✔ | LLMOps | |
| DashScope (Alibaba Cloud) | ✔ | LLMOps | |
| Ollama | ✔ | Model Loader | Local deployment for open-source LLMs (DeepSeek, Llama, etc.) |
Expand Down
11 changes: 11 additions & 0 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,17 @@
"budget": 0,
},
},
"Google Agent SDK": {
"id": "google_agent_default",
"type": "google_agent_sdk",
"provider_type": "chat_completion",
"enable": False,
"key": [],
"timeout": 120,
"model_config": {
"model": "gemini-1.5-pro",
},
},
"DeepSeek": {
"id": "deepseek_default",
"type": "openai_chat_completion",
Expand Down
4 changes: 4 additions & 0 deletions astrbot/core/provider/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ async def load_provider(self, provider_config: dict):
from .sources.gemini_source import (
ProviderGoogleGenAI as ProviderGoogleGenAI,
)
case "google_agent_sdk":
from .sources.google_agent_sdk_source import (
ProviderGoogleAgentSDK as ProviderGoogleAgentSDK,
)
case "sensevoice_stt_selfhost":
from .sources.sensevoice_selfhosted_source import (
ProviderSenseVoiceSTTSelfHost as ProviderSenseVoiceSTTSelfHost,
Expand Down
123 changes: 123 additions & 0 deletions astrbot/core/provider/sources/google_agent_sdk_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from __future__ import annotations

import asyncio
from typing import Any, AsyncGenerator, List

from astrbot.api.provider import Personality, Provider
from astrbot.core.db import BaseDatabase
from astrbot.core.message.message_event_result import MessageChain
from astrbot.core.provider.entities import LLMResponse, ToolCallsResult
from astrbot.core.provider.func_tool_manager import FuncCall
from astrbot.core import logger
from ..register import register_provider_adapter

try:
from google.agents import Agent
except Exception: # pragma: no cover - optional dependency
Agent = None # type: ignore


@register_provider_adapter(
"google_agent_sdk", "Google Agent SDK 提供商适配器"
)
class ProviderGoogleAgentSDK(Provider):
"""Provider adapter using Google Agent SDK.

This is a lightweight integration that forwards prompts to a Google Agent.
If the optional dependency is missing, initialization fails.
"""

def __init__(
self,
provider_config: dict,
provider_settings: dict,
db_helper: BaseDatabase,
persistant_history: bool = True,
default_persona: Personality | None = None,
) -> None:
super().__init__(
provider_config,
provider_settings,
persistant_history,
db_helper,
default_persona,
)

if Agent is None:
raise ImportError(
"google-agents SDK is required for google_agent_sdk provider"
)

self.api_key: str | None = None
keys = provider_config.get("key", [])
if keys:
Comment on lines +52 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议(代码质量): 使用命名表达式来简化赋值和条件 (use-named-expression)

Suggested change
keys = provider_config.get("key", [])
if keys:
if keys := provider_config.get("key", []):
Original comment in English

suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)

Suggested change
keys = provider_config.get("key", [])
if keys:
if keys := provider_config.get("key", []):

self.api_key = keys[0]
self.set_model(provider_config.get("model_config", {}).get("model", ""))
# Actual Agent initialization; parameters may vary based on SDK version.
# TODO: pass additional configuration such as tools when needed.
self.agent = Agent(api_key=self.api_key, model=self.get_model())

def get_current_key(self) -> str:
return self.api_key or ""

def set_key(self, key: str) -> None:
self.api_key = key
# The Agent instance might need reconfiguration with new key.
try:
self.agent.api_key = key # type: ignore[attr-defined]
except Exception: # pragma: no cover - best effort
pass

def get_models(self) -> List[str]: # pragma: no cover - simple return
return [self.get_model()]

async def text_chat(
self,
prompt: str,
session_id: str | None = None,
image_urls: List[str] | None = None,
func_tool: FuncCall | None = None,
contexts: List[dict] | None = None,
system_prompt: str | None = None,
tool_calls_result: ToolCallsResult | None = None,
**kwargs: Any,
) -> LLMResponse:
"""Return chat completion via Google Agent SDK."""
if image_urls:
logger.warning("google_agent_sdk provider does not support images yet")
history = contexts or []
if system_prompt:
history = [{"role": "system", "content": system_prompt}, *history]
# TODO: handle func_tool and tool_calls_result via ADK Tool API
try:
response = await self.agent.chat(prompt, history=history) # type: ignore[attr-defined]
Comment on lines +88 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议: 警告何时提供 func_tool 或 tool_calls_result

如果提供了 func_tool 或 tool_calls_result,请考虑添加警告或异常,因为它们目前被忽略。

Suggested change
history = contexts or []
if system_prompt:
history = [{"role": "system", "content": system_prompt}, *history]
# TODO: handle func_tool and tool_calls_result via ADK Tool API
try:
response = await self.agent.chat(prompt, history=history) # type: ignore[attr-defined]
history = contexts or []
if system_prompt:
history = [{"role": "system", "content": system_prompt}, *history]
# TODO: handle func_tool and tool_calls_result via ADK Tool API
if kwargs.get("func_tool") is not None:
logger.warning("func_tool is provided but currently ignored by google_agent_sdk provider")
if tool_calls_result is not None:
logger.warning("tool_calls_result is provided but currently ignored by google_agent_sdk provider")
try:
response = await self.agent.chat(prompt, history=history) # type: ignore[attr-defined]
Original comment in English

suggestion: Warn when func_tool or tool_calls_result is provided

Consider adding a warning or exception if func_tool or tool_calls_result is provided, since they are currently ignored.

Suggested change
history = contexts or []
if system_prompt:
history = [{"role": "system", "content": system_prompt}, *history]
# TODO: handle func_tool and tool_calls_result via ADK Tool API
try:
response = await self.agent.chat(prompt, history=history) # type: ignore[attr-defined]
history = contexts or []
if system_prompt:
history = [{"role": "system", "content": system_prompt}, *history]
# TODO: handle func_tool and tool_calls_result via ADK Tool API
if kwargs.get("func_tool") is not None:
logger.warning("func_tool is provided but currently ignored by google_agent_sdk provider")
if tool_calls_result is not None:
logger.warning("tool_calls_result is provided but currently ignored by google_agent_sdk provider")
try:
response = await self.agent.chat(prompt, history=history) # type: ignore[attr-defined]

except Exception as e: # pragma: no cover - runtime errors
raise Exception(f"Google Agent SDK error: {e}") from e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

问题(代码质量): 引发特定错误,而不是一般的 ExceptionBaseException (raise-specific-error)

解释如果一段代码引发特定异常类型,而不是通用的 [`BaseException`](https://docs.python.org/3/library/exceptions.html#BaseException) 或 [`Exception`](https://docs.python.org/3/library/exceptions.html#Exception),则调用代码可以:
  • 获取有关错误类型的更多信息
  • 为其定义特定的异常处理

这样,代码的调用者可以适当地处理错误。

您如何解决这个问题?

因此,与其让代码引发 ExceptionBaseException,例如

if incorrect_input(value):
    raise Exception("输入不正确")

您可以让代码引发特定错误,例如

if incorrect_input(value):
    raise ValueError("输入不正确")

或者

class IncorrectInputError(Exception):
    pass


if incorrect_input(value):
    raise IncorrectInputError("输入不正确")
Original comment in English

issue (code-quality): Raise a specific error instead of the general Exception or BaseException (raise-specific-error)

ExplanationIf a piece of code raises a specific exception type rather than the generic [`BaseException`](https://docs.python.org/3/library/exceptions.html#BaseException) or [`Exception`](https://docs.python.org/3/library/exceptions.html#Exception), the calling code can:
  • get more information about what type of error it is
  • define specific exception handling for it

This way, callers of the code can handle the error appropriately.

How can you solve this?

So instead of having code raising Exception or BaseException like

if incorrect_input(value):
    raise Exception("The input is incorrect")

you can have code raising a specific error like

if incorrect_input(value):
    raise ValueError("The input is incorrect")

or

class IncorrectInputError(Exception):
    pass


if incorrect_input(value):
    raise IncorrectInputError("The input is incorrect")

llm_response = LLMResponse("assistant")
llm_response.result_chain = MessageChain().message(str(response))
llm_response.raw_completion = response
return llm_response

async def text_chat_stream(
self,
prompt: str,
session_id: str | None = None,
image_urls: List[str] | None = None,
func_tool: FuncCall | None = None,
contexts: List[dict] | None = None,
system_prompt: str | None = None,
tool_calls_result: ToolCallsResult | None = None,
**kwargs: Any,
) -> AsyncGenerator[LLMResponse, None]:
"""Stream chat completions via Google Agent SDK."""
llm_response = await self.text_chat(
prompt,
session_id=session_id,
image_urls=image_urls,
func_tool=func_tool,
contexts=contexts,
system_prompt=system_prompt,
tool_calls_result=tool_calls_result,
**kwargs,
)
Comment on lines +113 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

问题(代码质量): 内联立即产生的变量 (inline-immediately-yielded-variable)

Original comment in English

issue (code-quality): Inline variable that is immediately yielded (inline-immediately-yielded-variable)

yield llm_response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议: 实施真正的流式传输或调整方法命名

由于 text_chat_stream 一次性产生完整响应,请实施分块流式传输或更新方法名称/文档,以明确不支持实时流式传输。

建议的实施方式:

    async def text_chat_full_response(self, prompt, session_id=None, image_urls=None, func_tool=None, contexts=None, system_prompt=None, tool_calls_result=None, **kwargs):
        """
        一次性返回完整的 LLM 响应。此方法不提供实时或分块流式传输。
        """
        llm_response = await self.text_chat(
            prompt,
            session_id=session_id,
            image_urls=image_urls,
            func_tool=func_tool,
            contexts=contexts,
            system_prompt=system_prompt,
            tool_calls_result=tool_calls_result,
            **kwargs,
        )
        yield llm_response
  • 在整个代码库中,将所有 text_chat_stream 的用法更新为 text_chat_full_response
  • 如果您有引用 text_chat_stream 的文档,请更新它以明确新的方法名称及其非流式传输行为。
Original comment in English

suggestion: Implement true streaming or adjust method naming

Since text_chat_stream yields the full response at once, please either implement chunked streaming or update the method name/documentation to clarify that real-time streaming is not supported.

Suggested implementation:

    async def text_chat_full_response(self, prompt, session_id=None, image_urls=None, func_tool=None, contexts=None, system_prompt=None, tool_calls_result=None, **kwargs):
        """
        Returns the full LLM response at once. This method does not provide real-time or chunked streaming.
        """
        llm_response = await self.text_chat(
            prompt,
            session_id=session_id,
            image_urls=image_urls,
            func_tool=func_tool,
            contexts=contexts,
            system_prompt=system_prompt,
            tool_calls_result=tool_calls_result,
            **kwargs,
        )
        yield llm_response
  • Update all usages of text_chat_stream to text_chat_full_response throughout the codebase.
  • If you have documentation referencing text_chat_stream, update it to clarify the new method name and its non-streaming behavior.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies = [
"faiss-cpu>=1.11.0",
"filelock>=3.18.0",
"google-genai>=1.14.0",
"google-agents>=0.1.0",
"googlesearch-python>=1.3.0",
"lark-oapi>=1.4.15",
"lxml-html-clean>=0.4.2",
Expand Down