Skip to content

Commit 667fe68

Browse files
committed
fix(plugin): 修复退出时插件信息持久化以及加载插件信息 等问题
1 parent 2cb3397 commit 667fe68

File tree

7 files changed

+158
-42
lines changed

7 files changed

+158
-42
lines changed

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func main() {
3838
zerolog.SetGlobalLevel(zerolog.InfoLevel)
3939
gin.SetMode(gin.ReleaseMode)
4040
}
41+
defer logging.Close() // 保存日志
4142
//// 在初始化完成后输出所有缓冲日志
4243
//logging.Logger.Flush(zerolog.GlobalLevel())
4344
//logging.Logger.SetActive(false) // 取消缓存,正常日志输出

rikkabot/adapter/adapter.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (a *Adapter) HandleCovert() {
4343
for {
4444
select {
4545
case <-a.ctx.Done():
46-
logging.ErrorWithErr(a.ctx.Err(), "handle covert exit")
46+
logging.WarnWithErr(a.ctx.Err(), "handle covert exit")
4747
return
4848
case msg := <-a.cli.GetMsgChan(): // 转换收到的消息
4949
logging.Debug("rikka-bot received message", map[string]interface{}{"sdk-msg": msg})
@@ -57,12 +57,15 @@ func (a *Adapter) HandleCovert() {
5757
for {
5858
select {
5959
case <-a.ctx.Done():
60-
logging.ErrorWithErr(a.ctx.Err(), "handle send exit")
60+
logging.WarnWithErr(a.ctx.Err(), "handle send exit")
6161
return
6262
case respMsg := <-sendChan: // 接收到回复消息
6363
logging.Debug("rikka-bot send message", map[string]interface{}{"sdk-msg": respMsg})
6464
//rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
6565
//time.Sleep(time.Duration((rnd.Intn(1000) + 1000)) * time.Millisecond)
66+
if respMsg == nil {
67+
continue
68+
}
6669
err := a.sendMsg(respMsg)
6770
if err != nil {
6871
logging.ErrorWithErr(err, "sendMsg fail skip send")

rikkabot/message/message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type Message struct {
3434
SenderName string `json:"sender_name"` // 消息发送者用户昵称
3535
IsAtMe bool `json:"is_at"` // 群组中是否艾特本人
3636
IsGroup bool `json:"is_group"` // 是否为群聊消息
37-
IsFriend bool `json:"is_friend"` // 是否为好友私聊消息
37+
IsFriend bool `json:"is_friend"` // 是否为好友发送的消息 (注意 不是私聊消息)
3838
IsGH bool `json:"is_gh"` // 是否为公众号
3939
IsMySelf bool `json:"is_my_self"` // 消息是否为自己发送的
4040
IsSystem bool `json:"is_system"` // 是否为系统消息

rikkabot/processor/cache/cache.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ type Cache struct {
2020
*cacheExported // 隐藏字段
2121
config *config.CommonConfig
2222

23-
done chan struct{}
24-
wg sync.WaitGroup
23+
done chan struct{}
24+
wg sync.WaitGroup
25+
closeOnce sync.Once
2526
}
2627

2728
var (
@@ -308,9 +309,12 @@ func (c *Cache) cycleSave() {
308309
}
309310

310311
func (c *Cache) Close() {
311-
close(c.done)
312-
c.wg.Wait()
313-
logging.Info("cache closed")
312+
c.closeOnce.Do(func() {
313+
c.handleSave(false)
314+
close(c.done)
315+
c.wg.Wait()
316+
logging.Info("cache closed")
317+
})
314318
}
315319

316320
func (c *Cache) handleSave(firstLoad bool) {

rikkabot/processor/processor.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,21 @@ import (
1010
"sync/atomic"
1111
"time"
1212

13+
"github.com/Clov614/rikka-bot-wechat/rikkabot/processor/cache"
14+
1315
"github.com/Clov614/logging"
1416
wcf "github.com/Clov614/wcf-rpc-sdk"
1517

1618
"github.com/Clov614/rikka-bot-wechat/rikkabot/message"
1719
"github.com/Clov614/rikka-bot-wechat/rikkabot/plugins"
1820

1921
/* 下方为插件的导入 */
20-
_ "github.com/Clov614/rikka-bot-wechat/rikkabot/plugins/admin" // 管理员模块
21-
/* 从上到下对应优先级由高到低 */
22+
_ "github.com/Clov614/rikka-bot-wechat/rikkabot/plugins/admin" // 管理员模块
2223
_ "github.com/Clov614/rikka-bot-wechat/rikkabot/plugins/biliDecoder" // bilibili链接解析
23-
)
24+
25+
/* 从上到下对应优先级由高到低 */
26+
_ "github.com/Clov614/rikka-bot-wechat/rikkabot/plugins/ai" // AI对话
27+
/* 从上到下对应优先级由高到低 */)
2428

2529
type Processor struct {
2630
ctx context.Context
@@ -207,11 +211,15 @@ func (p *Processor) Close() {
207211
}
208212
}
209213
close(done)
214+
logging.Debug("所有插件关闭完成")
210215
}()
211216
// 缓存插件设置信息
212217
ag := plugins.GetAutoRegister()
213218
ag.CachePlugins()
219+
logging.Debug("保存插件信息完毕")
214220

221+
c := cache.GetCache()
222+
c.Close() // 保存并关闭缓存
215223
select {
216224
case <-done:
217225
case <-time.After(time.Second * 5): // 设置一个超时时间

rikkabot/processor/processor_test.go

Lines changed: 115 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,41 +6,44 @@ package processor
66

77
import (
88
"context"
9-
wcf "github.com/Clov614/wcf-rpc-sdk"
109
"testing"
1110
"time"
1211

12+
wcf "github.com/Clov614/wcf-rpc-sdk"
13+
1314
"github.com/Clov614/rikka-bot-wechat/rikkabot/message"
1415
"github.com/Clov614/rikka-bot-wechat/rikkabot/plugins"
16+
"github.com/Clov614/rikka-bot-wechat/rikkabot/processor/cache"
1517
)
1618

17-
type pTestMatcher struct {
18-
isThought bool
19-
}
20-
21-
func (m *pTestMatcher) Match(ctx context.Context, msg *message.Message) bool {
22-
return m.isThought
23-
}
24-
25-
// TestPlugin 是一个真实的插件,用于测试
19+
// 定义一个专门用于测试的 TestPlugin 结构体
2620
type TestPlugin struct {
27-
*plugins.Plugin // 嵌入标准的 Plugin 结构
28-
executeCount int
21+
*plugins.Plugin // 嵌入标准的 Plugin 结构
22+
executeCount int // 执行计数器
2923
}
3024

25+
// AddExecCount 增加执行计数
3126
func (p *TestPlugin) AddExecCount(delta int) {
3227
p.executeCount += delta
3328
}
3429

3530
// NewTestPlugin 创建一个新的 TestPlugin 实例
3631
func NewTestPlugin(name string, level plugins.PluginLevel) *TestPlugin {
3732
p := &TestPlugin{
38-
Plugin: plugins.DefaultPlugin(name).AsEnable(), // 使用默认的 Plugin 初始化 // 设置插件名称
33+
Plugin: plugins.DefaultPlugin(name).AsEnable(), // 使用默认的 Plugin 初始化并启用
3934
}
4035
p.Plugin.PluginOpt.Level = level // 设置插件级别
4136
return p
4237
}
4338

39+
type pTestMatcher struct {
40+
isThought bool
41+
}
42+
43+
func (m *pTestMatcher) Match(ctx context.Context, msg *message.Message) bool {
44+
return m.isThought
45+
}
46+
4447
// TestProcessor_RegisterAndStart 测试插件注册和启动流程
4548
func TestProcessor_RegisterAndStart(t *testing.T) {
4649
plugin := NewTestPlugin("test RegistAndStart-01", plugins.MediumLevel)
@@ -153,3 +156,102 @@ func TestProcessor_MessageFlow(t *testing.T) {
153156
t.Logf("Sending message: %s", outputMsg.Content)
154157
}
155158
}
159+
160+
// TestProcessor_CloseAndCache 测试 Processor 的 Close 方法、cache 的联动以及重启后的加载
161+
func TestProcessor_CloseAndCache(t *testing.T) {
162+
// 准备阶段:创建并注册插件
163+
pluginName := "testCloseAndCache-01"
164+
plugin := NewTestPlugin(pluginName, plugins.MediumLevel)
165+
plugin.AsAction(&plugins.ActionHandler{
166+
Name: "test action",
167+
Matcher: &pTestMatcher{true},
168+
Action: func(ctx context.Context, recvMsg *message.Message) (reply message.Message, ok bool, err error) {
169+
plugin.AddExecCount(1) // 增加执行计数
170+
return *recvMsg, true, nil
171+
},
172+
})
173+
plugins.GetAutoRegister().RegisterPlugin(plugin) // 直接获取并注册
174+
175+
// 阶段 1: 首次运行并关闭,测试缓存
176+
func() {
177+
ctx, cancel := context.WithCancel(context.Background())
178+
defer cancel()
179+
inputChan := make(chan *message.Message)
180+
sendChan := make(chan *message.Message, 10)
181+
processor := NewProcessor(ctx, wcf.NewClient(10, false, false))
182+
183+
processor.Start(inputChan, sendChan)
184+
inputChan <- &message.Message{Content: "test message"} // 触发插件执行
185+
<-time.After(time.Second * 1) // 等待执行
186+
187+
processor.Close() // 关闭并触发缓存
188+
189+
// 验证插件执行次数和缓存状态
190+
if plugin.executeCount != 1 {
191+
t.Errorf("Expected plugin executeCount to be 1, got: %d", plugin.executeCount)
192+
}
193+
cachedPlugin, ok := cache.GetCache().GetPluginInfo(pluginName)
194+
if !ok {
195+
t.Fatalf("Plugin %s not found in cache", pluginName)
196+
}
197+
// 验证缓存的是 PluginOpt
198+
cachedOpt, ok := cachedPlugin.(plugins.PluginOpt)
199+
if !ok {
200+
t.Fatalf("Expected cached plugin info to be PluginOpt, got: %T", cachedPlugin)
201+
}
202+
if !cachedOpt.Enable {
203+
t.Errorf("Expected plugin %s to be enabled in cache", pluginName)
204+
}
205+
}() // 使用匿名函数隔离作用域
206+
207+
// 阶段 2: 重启并验证加载
208+
func() {
209+
ctx, cancel := context.WithCancel(context.Background())
210+
defer cancel()
211+
inputChan := make(chan *message.Message)
212+
sendChan := make(chan *message.Message, 10)
213+
processor := NewProcessor(ctx, wcf.NewClient(10, false, false))
214+
215+
// 注意:这里不需要重新注册插件,因为 AutoRegister 会从 cache 加载
216+
processor.Start(inputChan, sendChan)
217+
218+
// 验证插件已从 cache 加载
219+
registeredPlugins := processor.LevelLayer[plugins.MediumLevel].GetPlugins()
220+
// 遍历找到我们的测试插件
221+
var plugin2 *TestPlugin
222+
for _, p := range registeredPlugins {
223+
if (*p).GetName() == pluginName {
224+
var ok bool
225+
plugin2, ok = (*p).(*TestPlugin)
226+
if !ok {
227+
t.Fatalf("Expected plugin '%s' to be *TestPlugin, got: %T", pluginName, *p)
228+
}
229+
break
230+
}
231+
}
232+
233+
if plugin2 == nil {
234+
t.Fatalf("Plugin '%s' not found in registered plugins", pluginName)
235+
}
236+
237+
// 从缓存中恢复
238+
cachedOpt, ok := cache.GetCache().GetPluginInfo(pluginName)
239+
if !ok {
240+
t.Fatalf("Plugin %s not found in cache", pluginName)
241+
}
242+
plugin2.PluginOpt = cachedOpt.(plugins.PluginOpt)
243+
244+
if !plugin2.PluginOpt.Enable {
245+
t.Errorf("Expected plugin %s to be enabled after restart", pluginName)
246+
}
247+
248+
// 再次触发插件执行,验证其功能
249+
inputChan <- &message.Message{Content: "test message"}
250+
<-time.After(time.Second * 1)
251+
if plugin2.executeCount != 2 { // 执行次数应为 2
252+
t.Errorf("Expected plugin executeCount to be 2, got: %d", plugin2.executeCount)
253+
}
254+
255+
processor.Close() // 关闭
256+
}() // 使用匿名函数隔离作用域
257+
}

rikkabot/rikkabot.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
8+
"time"
9+
710
"github.com/Clov614/logging"
811
"github.com/Clov614/rikka-bot-wechat/rikkabot/config"
912
"github.com/Clov614/rikka-bot-wechat/rikkabot/message"
@@ -12,8 +15,6 @@ import (
1215
"github.com/Clov614/rikka-bot-wechat/rikkabot/utils/timeutil"
1316
wcf "github.com/Clov614/wcf-rpc-sdk"
1417
"github.com/google/uuid"
15-
"sync"
16-
"time"
1718
)
1819

1920
type RikkaBot struct {
@@ -155,35 +156,32 @@ func (r *RikkaBot) Start() {
155156
r.EnableProcess = true // 防止生产者阻塞
156157
}
157158

159+
var onceExit sync.Once
160+
158161
// Exit 主动退出 rikkabot
159162
func (r *RikkaBot) Exit() {
160-
logging.Info("rikka bot exited")
161-
r.EventPool.Close()
162-
r.Processor.Close()
163-
r.cancel()
164-
r.cli.Close()
163+
onceExit.Do(func() {
164+
logging.Info("rikka bot exited")
165+
r.EventPool.Close()
166+
r.Processor.Close()
167+
r.cli.Close()
168+
r.cancel()
169+
})
165170
}
166171

167172
// ExitWithErr 异常退出 rikkabot
168173
func (r *RikkaBot) ExitWithErr(code int, msg string) {
169-
logging.Info("rikka bot exited")
170174
logging.Error("异常退出")
171175
logging.Error(msg, map[string]interface{}{"exit code": code})
172-
r.EventPool.Close()
173-
r.Processor.Close()
174-
r.cancel()
175-
r.cli.Close()
176+
r.Exit()
176177
}
177178

178-
var onceExit sync.Once
179-
180179
// Block 当发生错误,该方法会立即返回,否则会一直阻塞
181180
func (r *RikkaBot) Block() error {
182181
<-r.ctx.Done()
183-
onceExit.Do(func() { r.Exit() })
184-
logging.Close() // 关闭日志文件
185-
logging.Info("主程序将在5s后退出...")
186-
time.Sleep(5 * time.Second) // 增加一个固定延迟,简单的确保退出 todo perf -> wg
182+
r.Exit()
183+
logging.Info("主程序将在10s后退出...")
184+
time.Sleep(10 * time.Second) // 增加一个固定延迟,简单的确保退出 todo perf -> wg
187185
return r.err
188186
}
189187

0 commit comments

Comments
 (0)