Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions astrbot/dashboard/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .conversation import ConversationRoute
from .file import FileRoute
from .session_management import SessionManagementRoute
from .command_permission import CommandPermissionRoute


__all__ = [
Expand All @@ -25,4 +26,5 @@
"ConversationRoute",
"FileRoute",
"SessionManagementRoute",
"CommandPermissionRoute",
]
202 changes: 202 additions & 0 deletions astrbot/dashboard/routes/command_permission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import traceback
from typing import List, Dict, Any

from .route import Route, Response, RouteContext
from astrbot.core import logger
from quart import request
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
from astrbot.core.star.star_handler import star_handlers_registry, StarHandlerMetadata
from astrbot.core.star.filter.command import CommandFilter
from astrbot.core.star.filter.command_group import CommandGroupFilter
from astrbot.core.star.filter.permission import PermissionTypeFilter
from astrbot.core.star.star import star_map
from astrbot.core import DEMO_MODE
from astrbot.core.utils.shared_preferences import SharedPreferences


class CommandPermissionRoute(Route):
def __init__(
self,
context: RouteContext,
core_lifecycle: AstrBotCoreLifecycle,
) -> None:
super().__init__(context)
self.routes = {
"/command_permission/get": ("GET", self.get_command_permissions),
"/command_permission/set": ("POST", self.set_command_permission),
"/command_permission/get_commands": ("GET", self.get_all_commands),
}
self.core_lifecycle = core_lifecycle
self.register_routes()

async def get_command_permissions(self) -> Response:
"""获取所有指令的权限配置"""
try:
sp = SharedPreferences()
alter_cmd_cfg = sp.get("alter_cmd", {})

# 构建权限配置列表
permissions = []

# 遍历所有插件的权限配置
for plugin_name, plugin_config in alter_cmd_cfg.items():
for command_name, config in plugin_config.items():
permission_type = config.get("permission", "member")
permissions.append({
"plugin_name": plugin_name,
"command_name": command_name,
"permission": permission_type,
"id": f"{plugin_name}.{command_name}"
})

return Response().ok({"permissions": permissions}).__dict__
except Exception:
logger.error(f"/api/command_permission/get: {traceback.format_exc()}")
return Response().error("获取指令权限配置失败").__dict__

async def get_all_commands(self) -> Response:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): 在 CommandPermissionRoute.get_all_commands 中发现低代码质量 - 14% (low-code-quality)


解释此函数的质量得分低于 25% 的质量阈值。
此得分是方法长度、认知复杂度和工作内存的组合。

如何解决这个问题?

重构此函数以使其更短、更具可读性可能是有益的。

  • 通过将部分功能提取到自己的函数中来减少函数长度。这是您可以做的最重要的事情 - 理想情况下,函数应少于 10 行。
  • 减少嵌套,也许可以通过引入防御性子句来提前返回。
  • 确保变量的作用域紧密,以便使用相关概念的代码在函数中紧密地放在一起,而不是分散开来。
Original comment in English

issue (code-quality): Low code quality found in CommandPermissionRoute.get_all_commands - 14% (low-code-quality)


ExplanationThe quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

"""获取所有可用的指令列表"""
try:
commands = []

# 遍历所有注册的处理器
for handler in star_handlers_registry:
assert isinstance(handler, StarHandlerMetadata)
plugin = star_map.get(handler.handler_module_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 在生产环境中使用 assert 进行类型检查可能会导致问题。

当 Python 以优化模式运行时,assert 会被移除,因此类型检查可能会被跳过。请使用显式的 isinstance 检查并通过 if 语句处理失败情况。

Suggested change
# 遍历所有注册的处理器
for handler in star_handlers_registry:
assert isinstance(handler, StarHandlerMetadata)
plugin = star_map.get(handler.handler_module_path)
# 遍历所有注册的处理器
+ for handler in star_handlers_registry:
+ if not isinstance(handler, StarHandlerMetadata):
+ logger.warning(f"Handler {handler} is not an instance of StarHandlerMetadata, skipping.")
+ continue
+ plugin = star_map.get(handler.handler_module_path)
Original comment in English

suggestion (bug_risk): Use of assert for type checking may cause issues in production.

Asserts are removed when Python runs with optimizations, so type checks may be skipped. Use an explicit isinstance check and handle failures with an if-statement instead.

Suggested change
# 遍历所有注册的处理器
for handler in star_handlers_registry:
assert isinstance(handler, StarHandlerMetadata)
plugin = star_map.get(handler.handler_module_path)
# 遍历所有注册的处理器
+ for handler in star_handlers_registry:
+ if not isinstance(handler, StarHandlerMetadata):
+ logger.warning(f"Handler {handler} is not an instance of StarHandlerMetadata, skipping.")
+ continue
+ plugin = star_map.get(handler.handler_module_path)


if not plugin:
continue

# 查找指令过滤器
for filter_ in handler.event_filters:
if isinstance(filter_, CommandFilter):
# 检查当前权限配置
sp = SharedPreferences()
alter_cmd_cfg = sp.get("alter_cmd", {})
current_permission = "member" # 默认权限

if (plugin.name in alter_cmd_cfg and
handler.handler_name in alter_cmd_cfg[plugin.name]):
current_permission = alter_cmd_cfg[plugin.name][handler.handler_name].get("permission", "member")

# 检查是否有默认的权限过滤器
has_default_admin = False
for f in handler.event_filters:
if isinstance(f, PermissionTypeFilter):
if f.permission_type.name == "ADMIN":
has_default_admin = True
if current_permission == "member":
current_permission = "admin"
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: 只更新了第一个 PermissionTypeFilter。

目前,event_filters 中只有第一个 PermissionTypeFilter 被更新。请确认这是否是预期行为,或者在必要时更新所有匹配的过滤器。

Original comment in English

question: Only the first PermissionTypeFilter is updated.

Currently, only the first PermissionTypeFilter in event_filters is updated. Please confirm if this is intended, or update all matching filters if necessary.


commands.append({
"command_name": filter_.command_name,
"plugin_name": plugin.name,
"handler_name": handler.handler_name,
"description": getattr(handler, 'description', ''),
"current_permission": current_permission,
"has_default_admin": has_default_admin,
"id": f"{plugin.name}.{handler.handler_name}"
})
elif isinstance(filter_, CommandGroupFilter):
# 处理指令组
sp = SharedPreferences()
alter_cmd_cfg = sp.get("alter_cmd", {})
current_permission = "member"

if (plugin.name in alter_cmd_cfg and
handler.handler_name in alter_cmd_cfg[plugin.name]):
current_permission = alter_cmd_cfg[plugin.name][handler.handler_name].get("permission", "member")

has_default_admin = False
for f in handler.event_filters:
if isinstance(f, PermissionTypeFilter):
if f.permission_type.name == "ADMIN":
has_default_admin = True
if current_permission == "member":
current_permission = "admin"
break

commands.append({
"command_name": filter_.group_name,
"plugin_name": plugin.name,
"handler_name": handler.handler_name,
"description": getattr(handler, 'description', ''),
"current_permission": current_permission,
"has_default_admin": has_default_admin,
"is_group": True,
"id": f"{plugin.name}.{handler.handler_name}"
})

return Response().ok({"commands": commands}).__dict__
except Exception:
logger.error(f"/api/command_permission/get_commands: {traceback.format_exc()}")
return Response().error("获取指令列表失败").__dict__

async def set_command_permission(self) -> Response:
"""设置指令权限"""
if DEMO_MODE:
return Response().error("演示模式下不允许修改配置").__dict__

try:
data = await request.get_json()
plugin_name = data.get("plugin_name")
handler_name = data.get("handler_name")
permission = data.get("permission")

if not all([plugin_name, handler_name, permission]):
return Response().error("参数不完整").__dict__

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: 缺少参数的错误消息可以更具体。

请指定缺少哪些参数以提高错误清晰度和客户端处理能力。

Suggested change
plugin_name = data.get("plugin_name")
handler_name = data.get("handler_name")
permission = data.get("permission")
if not all([plugin_name, handler_name, permission]):
return Response().error("参数不完整").__dict__
plugin_name = data.get("plugin_name")
handler_name = data.get("handler_name")
permission = data.get("permission")
missing_params = []
if not plugin_name:
missing_params.append("plugin_name")
if not handler_name:
missing_params.append("handler_name")
if not permission:
missing_params.append("permission")
if missing_params:
return Response().error(f"参数不完整,缺少: {', '.join(missing_params)}").__dict__
Original comment in English

suggestion: Error message for missing parameters could be more specific.

Specify which parameter(s) are missing to improve error clarity and client-side handling.

Suggested change
plugin_name = data.get("plugin_name")
handler_name = data.get("handler_name")
permission = data.get("permission")
if not all([plugin_name, handler_name, permission]):
return Response().error("参数不完整").__dict__
plugin_name = data.get("plugin_name")
handler_name = data.get("handler_name")
permission = data.get("permission")
missing_params = []
if not plugin_name:
missing_params.append("plugin_name")
if not handler_name:
missing_params.append("handler_name")
if not permission:
missing_params.append("permission")
if missing_params:
return Response().error(f"参数不完整,缺少: {', '.join(missing_params)}").__dict__

if permission not in ["admin", "member"]:
return Response().error("权限类型错误,只能是 admin 或 member").__dict__

Comment on lines 163 to 178
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 权限类型检查区分大小写。

这将拒绝大小写不完全匹配的有效权限。考虑规范化输入或使用不区分大小写的检查。

Suggested change
permission = data.get("permission")
if not all([plugin_name, handler_name, permission]):
return Response().error("参数不完整").__dict__
if permission not in ["admin", "member"]:
return Response().error("权限类型错误,只能是 admin 或 member").__dict__
permission = data.get("permission")
if not all([plugin_name, handler_name, permission]):
return Response().error("参数不完整").__dict__
normalized_permission = permission.lower()
if normalized_permission not in ["admin", "member"]:
return Response().error("权限类型错误,只能是 admin 或 member").__dict__
Original comment in English

suggestion (bug_risk): Permission type check is case-sensitive.

This will reject valid permissions if the case doesn't match exactly. Consider normalizing input or using a case-insensitive check.

Suggested change
permission = data.get("permission")
if not all([plugin_name, handler_name, permission]):
return Response().error("参数不完整").__dict__
if permission not in ["admin", "member"]:
return Response().error("权限类型错误,只能是 admin 或 member").__dict__
permission = data.get("permission")
if not all([plugin_name, handler_name, permission]):
return Response().error("参数不完整").__dict__
normalized_permission = permission.lower()
if normalized_permission not in ["admin", "member"]:
return Response().error("权限类型错误,只能是 admin 或 member").__dict__

# 查找对应的处理器
found_handler = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): 使用内置函数 next 而不是 for 循环 (use-next)

Original comment in English

issue (code-quality): Use the built-in function next instead of a for-loop (use-next)

for handler in star_handlers_registry:
if (handler.handler_module_path in star_map and
star_map[handler.handler_module_path].name == plugin_name and
handler.handler_name == handler_name):
found_handler = handler
break

if not found_handler:
return Response().error("未找到指定的指令处理器").__dict__

# 更新配置
sp = SharedPreferences()
alter_cmd_cfg = sp.get("alter_cmd", {})

if plugin_name not in alter_cmd_cfg:
alter_cmd_cfg[plugin_name] = {}

if handler_name not in alter_cmd_cfg[plugin_name]:
alter_cmd_cfg[plugin_name][handler_name] = {}

alter_cmd_cfg[plugin_name][handler_name]["permission"] = permission
sp.put("alter_cmd", alter_cmd_cfg)

# 动态更新权限过滤器
found_permission_filter = False
for filter_ in found_handler.event_filters:
if isinstance(filter_, PermissionTypeFilter):
from astrbot.core.star.filter.permission import PermissionType
if permission == "admin":
filter_.permission_type = PermissionType.ADMIN
else:
filter_.permission_type = PermissionType.MEMBER
found_permission_filter = True
break

if not found_permission_filter:
# 如果没有权限过滤器,则添加一个
from astrbot.core.star.filter.permission import PermissionType
new_filter = PermissionTypeFilter(
PermissionType.ADMIN if permission == "admin" else PermissionType.MEMBER
)
found_handler.event_filters.insert(0, new_filter)

return Response().ok({"message": f"已将 {handler_name} 权限设置为 {permission}"}).__dict__

except Exception:
logger.error(f"/api/command_permission/set: {traceback.format_exc()}")
return Response().error("设置指令权限失败").__dict__
3 changes: 3 additions & 0 deletions astrbot/dashboard/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def __init__(
self.session_management_route = SessionManagementRoute(
self.context, db, core_lifecycle
)
self.command_permission_route = CommandPermissionRoute(
self.context, core_lifecycle
)

self.app.add_url_rule(
"/api/plug/<path:subpath>",
Expand Down
1 change: 1 addition & 0 deletions dashboard/src/i18n/locales/en-US/core/navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"providers": "Providers",
"toolUse": "MCP Tools",
"config": "Config",
"commandPermission": "Command Permission",
"extension": "Extensions",
"extensionMarketplace": "Extension Market",
"chat": "Chat",
Expand Down
1 change: 1 addition & 0 deletions dashboard/src/i18n/locales/zh-CN/core/navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"providers": "服务提供商",
"toolUse": "MCP",
"config": "配置文件",
"commandPermission": "指令权限管理",
"extension": "插件管理",
"extensionMarketplace": "插件市场",
"chat": "聊天",
Expand Down
5 changes: 5 additions & 0 deletions dashboard/src/layouts/full/vertical-sidebar/sidebarItem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ const sidebarItem: menu[] = [
icon: 'mdi-cog',
to: '/config',
},
{
title: 'core.navigation.commandPermission',
icon: 'mdi-shield-key',
to: '/command-permission',
},
{
title: 'core.navigation.extension',
icon: 'mdi-puzzle',
Expand Down
5 changes: 5 additions & 0 deletions dashboard/src/router/MainRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ const MainRoutes = {
name: 'About',
path: '/about',
component: () => import('@/views/AboutPage.vue')
},
{
name: 'CommandPermission',
path: '/command-permission',
component: () => import('@/views/CommandPermissionPage.vue')
}
]
};
Expand Down
46 changes: 35 additions & 11 deletions dashboard/src/stores/common.js → dashboard/src/stores/common.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
import { defineStore } from 'pinia';
import axios from 'axios';

interface LogCacheItem {
type: string;
data: string;
}

interface PluginData {
name: string;
desc: string;
author: string;
repo: string;
installed: boolean;
version: string;
social_link?: string;
tags: string[];
logo: string;
pinned: boolean;
stars: number;
updated_at: string;
}

export const useCommonStore = defineStore({
id: 'common',
state: () => ({
// @ts-ignore
eventSource: null,
log_cache: [],
eventSource: null as AbortController | null,
log_cache: [] as LogCacheItem[],
sse_connected: false,

log_cache_max_len: 1000,
startTime: -1,

pluginMarketData: [],
pluginMarketData: [] as PluginData[],
}),
actions: {
async createEventSource() {
Expand Down Expand Up @@ -52,12 +71,16 @@ export const useCommonStore = defineStore({
console.log('SSE stream opened');
this.sse_connected = true;

if (!response.body) {
throw new Error('Response body is null');
}

const reader = response.body.getReader();
const decoder = new TextDecoder();

let incompleteLine = ""; // 用于存储不完整的行

const handleIncompleteLine = (line) => {
const handleIncompleteLine = (line: string): LogCacheItem | null => {
incompleteLine += line;
// if can parse as JSON, return it
try {
Expand All @@ -69,7 +92,8 @@ export const useCommonStore = defineStore({
}
}

const processStream = ({ done, value }) => {
const processStream = (result: ReadableStreamReadResult<Uint8Array>): Promise<any> | undefined => {
const { done, value } = result;
// get bytes length
const bytesLength = value ? value.byteLength : 0;
console.log(`Received ${bytesLength} bytes from live log`);
Expand All @@ -91,7 +115,7 @@ export const useCommonStore = defineStore({
if (line.startsWith('data:')) {
const data = line.substring(5).trim();
// {"type":"log","data":"[2021-08-01 00:00:00] INFO: Hello, world!"}
let data_json = {}
let data_json: any = {}
try {
data_json = JSON.parse(data);
} catch (e) {
Expand All @@ -104,16 +128,16 @@ export const useCommonStore = defineStore({
return; // 如果无法解析,跳过当前行
}
}
if (data_json.type === 'log') {
this.log_cache.push(data_json);
if (data_json && data_json.type === 'log') {
this.log_cache.push(data_json as LogCacheItem);
if (this.log_cache.length > this.log_cache_max_len) {
this.log_cache.shift();
}
}
} else {
const parsedData = handleIncompleteLine(line);
if (parsedData && parsedData.type === 'log') {
this.log_cache.push(parsedData);
this.log_cache.push(parsedData as LogCacheItem);
if (this.log_cache.length > this.log_cache_max_len) {
this.log_cache.shift();
}
Expand All @@ -127,7 +151,7 @@ export const useCommonStore = defineStore({
}).catch(error => {
console.error('SSE error:', error);
// Attempt to reconnect after a delay
this.log_cache.push('SSE Connection failed, retrying in 5 seconds...');
this.log_cache.push({ type: 'log', data: 'SSE Connection failed, retrying in 5 seconds...' });
setTimeout(() => {
this.eventSource = null;
this.createEventSource();
Expand Down
Loading