Skip to content

Commit 7de973f

Browse files
authored
Merge pull request #26 from wangle201210/feat/stream-op
ChatStream opt
2 parents 7994929 + 9de879d commit 7de973f

File tree

3 files changed

+94
-76
lines changed

3 files changed

+94
-76
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ server/manifest/config/config.yaml
3535
/data/
3636
/server/manifest/config/config-docker.yaml
3737
/server/manifest/config/frpc.toml
38+
/fe/src/components.d.ts
3839
**/node_modules/
3940
**/dist/
4041
**/build/

server/core/common/stream.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,92 @@
11
package common
22

3-
import "github.com/cloudwego/eino/schema"
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"time"
8+
9+
"github.com/bytedance/sonic"
10+
"github.com/cloudwego/eino/schema"
11+
"github.com/gogf/gf/v2/frame/g"
12+
"github.com/gogf/gf/v2/net/ghttp"
13+
"github.com/google/uuid"
14+
)
415

516
type StreamData struct {
617
Id string `json:"id"` // 同一个消息里面的id是相同的
718
Created int64 `json:"created"` // 消息初始生成时间
819
Content string `json:"content"` // 消息具体内容
920
Document []*schema.Document `json:"document"`
1021
}
22+
23+
func SteamResponse(ctx context.Context, streamReader *schema.StreamReader[*schema.Message], docs []*schema.Document) (err error) {
24+
// 获取HTTP响应对象
25+
httpReq := ghttp.RequestFromCtx(ctx)
26+
httpResp := httpReq.Response
27+
// 设置响应头
28+
httpResp.Header().Set("Content-Type", "text/event-stream")
29+
httpResp.Header().Set("Cache-Control", "no-cache")
30+
httpResp.Header().Set("Connection", "keep-alive")
31+
httpResp.Header().Set("X-Accel-Buffering", "no") // 禁用Nginx缓冲
32+
httpResp.Header().Set("Access-Control-Allow-Origin", "*")
33+
sd := &StreamData{
34+
Id: uuid.NewString(),
35+
Created: time.Now().Unix(),
36+
}
37+
if len(docs) > 0 {
38+
sd.Document = docs
39+
marshal, _ := sonic.Marshal(sd)
40+
writeSSEDocuments(httpResp, string(marshal))
41+
}
42+
sd.Document = nil // 置空,发一次就够了
43+
// 处理流式响应
44+
for {
45+
chunk, err := streamReader.Recv()
46+
if err == io.EOF {
47+
break
48+
}
49+
if err != nil {
50+
writeSSEError(httpResp, err)
51+
break
52+
}
53+
if len(chunk.Content) == 0 {
54+
continue
55+
}
56+
57+
sd.Content = chunk.Content
58+
marshal, _ := sonic.Marshal(sd)
59+
// 发送数据事件
60+
writeSSEData(httpResp, string(marshal))
61+
}
62+
// 发送结束事件
63+
writeSSEDone(httpResp)
64+
return nil
65+
}
66+
67+
// writeSSEData 写入SSE事件
68+
func writeSSEData(resp *ghttp.Response, data string) {
69+
if len(data) == 0 {
70+
return
71+
}
72+
// g.Log().Infof(context.Background(), "data: %s", data)
73+
resp.Writeln(fmt.Sprintf("data:%s\n", data))
74+
resp.Flush()
75+
}
76+
77+
func writeSSEDone(resp *ghttp.Response) {
78+
resp.Writeln(fmt.Sprintf("data:%s\n", "[DONE]"))
79+
resp.Flush()
80+
}
81+
82+
func writeSSEDocuments(resp *ghttp.Response, data string) {
83+
resp.Writeln(fmt.Sprintf("documents:%s\n", data))
84+
resp.Flush()
85+
}
86+
87+
// writeSSEError 写入SSE错误
88+
func writeSSEError(resp *ghttp.Response, err error) {
89+
g.Log().Error(context.Background(), err)
90+
resp.Writeln(fmt.Sprintf("event: error\ndata: %s\n\n", err.Error()))
91+
resp.Flush()
92+
}

server/internal/controller/rag/rag_v1_chat_stream.go

Lines changed: 10 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,17 @@ package rag
22

33
import (
44
"context"
5-
"fmt"
6-
"io"
7-
"time"
85

9-
"github.com/bytedance/sonic"
6+
"github.com/cloudwego/eino/schema"
107
"github.com/gogf/gf/v2/frame/g"
11-
"github.com/gogf/gf/v2/net/ghttp"
12-
"github.com/google/uuid"
138
v1 "github.com/wangle201210/go-rag/server/api/rag/v1"
149
"github.com/wangle201210/go-rag/server/core/common"
1510
"github.com/wangle201210/go-rag/server/internal/logic/chat"
1611
)
1712

1813
// ChatStream 流式输出接口
1914
func (c *ControllerV1) ChatStream(ctx context.Context, req *v1.ChatStreamReq) (res *v1.ChatStreamRes, err error) {
20-
// 获取HTTP响应对象
21-
httpReq := ghttp.RequestFromCtx(ctx)
22-
httpResp := httpReq.Response
23-
// 设置响应头
24-
httpResp.Header().Set("Content-Type", "text/event-stream")
25-
httpResp.Header().Set("Cache-Control", "no-cache")
26-
httpResp.Header().Set("Connection", "keep-alive")
27-
httpResp.Header().Set("X-Accel-Buffering", "no") // 禁用Nginx缓冲
28-
httpResp.Header().Set("Access-Control-Allow-Origin", "*")
29-
15+
var streamReader *schema.StreamReader[*schema.Message]
3016
// 获取检索结果
3117
retriever, err := c.Retriever(ctx, &v1.RetrieverReq{
3218
Question: req.Question,
@@ -35,73 +21,22 @@ func (c *ControllerV1) ChatStream(ctx context.Context, req *v1.ChatStreamReq) (r
3521
KnowledgeName: req.KnowledgeName,
3622
})
3723
if err != nil {
38-
writeSSEError(httpResp, err)
39-
return &v1.ChatStreamRes{}, nil
40-
}
41-
sd := &common.StreamData{
42-
Id: uuid.NewString(),
43-
Created: time.Now().Unix(),
44-
Document: retriever.Document,
24+
g.Log().Error(ctx, err)
25+
return
4526
}
46-
marshal, _ := sonic.Marshal(sd)
47-
writeSSEDocuments(httpResp, string(marshal))
4827
// 获取Chat实例
4928
chatI := chat.GetChat()
5029
// 获取流式响应
51-
streamReader, err := chatI.GetAnswerStream(ctx, req.ConvID, retriever.Document, req.Question)
30+
streamReader, err = chatI.GetAnswerStream(ctx, req.ConvID, retriever.Document, req.Question)
5231
if err != nil {
53-
writeSSEError(httpResp, err)
32+
g.Log().Error(ctx, err)
5433
return &v1.ChatStreamRes{}, nil
5534
}
5635
defer streamReader.Close()
57-
58-
// 处理流式响应
59-
for {
60-
chunk, err := streamReader.Recv()
61-
if err == io.EOF {
62-
break
63-
}
64-
if err != nil {
65-
writeSSEError(httpResp, err)
66-
break
67-
}
68-
if len(chunk.Content) == 0 {
69-
continue
70-
}
71-
72-
sd.Content = chunk.Content
73-
marshal, _ := sonic.Marshal(sd)
74-
// 发送数据事件
75-
writeSSEData(httpResp, string(marshal))
76-
}
77-
// 发送结束事件
78-
writeSSEDone(httpResp)
79-
return &v1.ChatStreamRes{}, nil
80-
}
81-
82-
// writeSSEData 写入SSE事件
83-
func writeSSEData(resp *ghttp.Response, data string) {
84-
if len(data) == 0 {
36+
err = common.SteamResponse(ctx, streamReader, retriever.Document)
37+
if err != nil {
38+
g.Log().Error(ctx, err)
8539
return
8640
}
87-
// g.Log().Infof(context.Background(), "data: %s", data)
88-
resp.Writeln(fmt.Sprintf("data:%s\n", data))
89-
resp.Flush()
90-
}
91-
92-
func writeSSEDone(resp *ghttp.Response) {
93-
resp.Writeln(fmt.Sprintf("data:%s\n", "[DONE]"))
94-
resp.Flush()
95-
}
96-
97-
func writeSSEDocuments(resp *ghttp.Response, data string) {
98-
resp.Writeln(fmt.Sprintf("documents:%s\n", data))
99-
resp.Flush()
100-
}
101-
102-
// writeSSEError 写入SSE错误
103-
func writeSSEError(resp *ghttp.Response, err error) {
104-
g.Log().Error(context.Background(), err)
105-
resp.Writeln(fmt.Sprintf("event: error\ndata: %s\n\n", err.Error()))
106-
resp.Flush()
41+
return &v1.ChatStreamRes{}, nil
10742
}

0 commit comments

Comments
 (0)