Skip to content

Commit 12a3d96

Browse files
feat: 增加fetch请求,优化axios对流式数据处理的不友好 (#640)
fix: 修复消息刷新后的更新问题 Co-authored-by: kim <weizhong.jin@dragonplus.com>
1 parent 6067325 commit 12a3d96

File tree

3 files changed

+221
-76
lines changed

3 files changed

+221
-76
lines changed

src/api/index.ts

Lines changed: 50 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import type { AnnounceConfig, AuditConfig, ConfigState, GiftCard, KeyConfig, MailConfig, SearchConfig, SiteConfig, Status, UserInfo, UserPassword, UserPrompt } from '@/components/common/Setting/model'
22
import type { SettingsState } from '@/store/modules/user/helper'
3-
import { useAuthStore, useUserStore } from '@/store'
3+
import { useUserStore } from '@/store'
44
import { get, post } from '@/utils/request'
5+
import fetchService from '@/utils/request/fetchService'
56

67
export function fetchAnnouncement<T = any>() {
78
return post<T>({
@@ -26,7 +27,7 @@ interface SSEEventHandlers {
2627
onEnd?: () => void
2728
}
2829

29-
// SSE chat processing function
30+
// SSE chat processing function using custom fetch service
3031
export function fetchChatAPIProcessSSE(
3132
params: {
3233
roomId: number
@@ -40,7 +41,6 @@ export function fetchChatAPIProcessSSE(
4041
handlers: SSEEventHandlers,
4142
): Promise<void> {
4243
const userStore = useUserStore()
43-
const authStore = useAuthStore()
4444

4545
const data: Record<string, any> = {
4646
roomId: params.roomId,
@@ -55,92 +55,66 @@ export function fetchChatAPIProcessSSE(
5555
}
5656

5757
return new Promise((resolve, reject) => {
58-
const baseURL = import.meta.env.VITE_GLOB_API_URL || ''
59-
const url = `${baseURL}/api/chat-process`
60-
61-
fetch(url, {
62-
method: 'POST',
63-
headers: {
64-
'Content-Type': 'application/json',
65-
'Authorization': authStore.token ? `Bearer ${authStore.token}` : '',
58+
fetchService.postStream(
59+
{
60+
url: '/chat-process',
61+
body: data,
62+
signal: params.signal,
6663
},
67-
body: JSON.stringify(data),
68-
signal: params.signal,
69-
}).then((response) => {
70-
if (!response.ok) {
71-
throw new Error(`HTTP error! status: ${response.status}`)
72-
}
73-
74-
const reader = response.body?.getReader()
75-
if (!reader) {
76-
throw new Error('No reader available')
77-
}
78-
79-
const decoder = new TextDecoder()
80-
let buffer = ''
81-
82-
function readStream(): void {
83-
reader!.read().then(({ done, value }) => {
84-
if (done) {
85-
handlers.onEnd?.()
86-
resolve()
64+
{
65+
onChunk: (line: string) => {
66+
if (line.trim() === '')
8767
return
88-
}
8968

90-
buffer += decoder.decode(value, { stream: true })
91-
const lines = buffer.split('\n')
92-
buffer = lines.pop() || '' // Keep the incomplete line in buffer
69+
if (line.startsWith('event: ')) {
70+
// const _eventType = line.substring(7).trim()
71+
return
72+
}
9373

94-
for (const line of lines) {
95-
if (line.trim() === '')
96-
continue
74+
if (line.startsWith('data: ')) {
75+
const data = line.substring(6).trim()
9776

98-
if (line.startsWith('event: ')) {
99-
// const _eventType = line.substring(7).trim()
100-
continue
77+
if (data === '[DONE]') {
78+
handlers.onEnd?.()
79+
resolve()
80+
return
10181
}
10282

103-
if (line.startsWith('data: ')) {
104-
const data = line.substring(6).trim()
83+
try {
84+
const jsonData = JSON.parse(data)
10585

106-
if (data === '[DONE]') {
107-
handlers.onEnd?.()
108-
resolve()
109-
return
86+
// Dispatch to different handlers based on data type
87+
if (jsonData.message) {
88+
handlers.onError?.(jsonData.message)
11089
}
111-
112-
try {
113-
const jsonData = JSON.parse(data)
114-
115-
// 根据前面的 event 类型分发到不同的处理器
116-
if (jsonData.message) {
117-
handlers.onError?.(jsonData.message)
118-
}
119-
else if (jsonData.searchQuery) {
120-
handlers.onSearchQuery?.(jsonData)
121-
}
122-
else if (jsonData.searchResults) {
123-
handlers.onSearchResults?.(jsonData)
124-
}
125-
else if (jsonData.m) {
126-
handlers.onDelta?.(jsonData.m)
127-
}
128-
else {
129-
handlers.onMessage?.(jsonData)
130-
}
90+
else if (jsonData.searchQuery) {
91+
handlers.onSearchQuery?.(jsonData)
92+
}
93+
else if (jsonData.searchResults) {
94+
handlers.onSearchResults?.(jsonData)
13195
}
132-
catch (e) {
133-
console.error('Failed to parse SSE data:', data, e)
96+
else if (jsonData.m) {
97+
handlers.onDelta?.(jsonData.m)
13498
}
99+
else {
100+
handlers.onMessage?.(jsonData)
101+
}
102+
}
103+
catch (e) {
104+
console.error('Failed to parse SSE data:', data, e)
135105
}
136106
}
137-
138-
readStream()
139-
}).catch(reject)
140-
}
141-
142-
readStream()
143-
}).catch(reject)
107+
},
108+
onError: (error: Error) => {
109+
handlers.onError?.(error.message)
110+
reject(error)
111+
},
112+
onComplete: () => {
113+
handlers.onEnd?.()
114+
resolve()
115+
},
116+
},
117+
)
144118
})
145119
}
146120

src/utils/request/fetchService.ts

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import { useAuthStore } from '@/store'
2+
3+
export interface FetchRequestConfig {
4+
url: string
5+
method?: string
6+
headers?: Record<string, string>
7+
body?: any
8+
signal?: AbortSignal
9+
}
10+
11+
export interface FetchResponse<T = any> {
12+
data: T
13+
status: number
14+
statusText: string
15+
headers: Headers
16+
}
17+
18+
export interface SSEStreamOptions {
19+
onChunk?: (chunk: string) => void
20+
onError?: (error: Error) => void
21+
onComplete?: () => void
22+
}
23+
24+
class FetchService {
25+
private baseURL: string
26+
private defaultHeaders: Record<string, string>
27+
28+
constructor() {
29+
this.baseURL = import.meta.env.VITE_GLOB_API_URL || ''
30+
this.defaultHeaders = {
31+
'Content-Type': 'application/json',
32+
}
33+
}
34+
35+
// Request interceptor - automatically add authentication headers and other configurations
36+
private requestInterceptor(config: FetchRequestConfig): FetchRequestConfig {
37+
const token = useAuthStore().token
38+
const headers = { ...this.defaultHeaders, ...config.headers }
39+
40+
if (token) {
41+
headers.Authorization = `Bearer ${token}`
42+
}
43+
44+
return {
45+
...config,
46+
headers,
47+
}
48+
}
49+
50+
// Response interceptor - handle error status
51+
private async responseInterceptor(response: Response): Promise<Response> {
52+
if (!response.ok) {
53+
throw new Error(`HTTP error! status: ${response.status}`)
54+
}
55+
return response
56+
}
57+
58+
// POST request
59+
async post<T = any>(config: FetchRequestConfig): Promise<FetchResponse<T>> {
60+
const processedConfig = this.requestInterceptor(config)
61+
const url = `${this.baseURL}${processedConfig.url}`
62+
63+
const response = await fetch(url, {
64+
method: 'POST',
65+
headers: processedConfig.headers,
66+
body: typeof processedConfig.body === 'object'
67+
? JSON.stringify(processedConfig.body)
68+
: processedConfig.body,
69+
signal: processedConfig.signal,
70+
})
71+
72+
const processedResponse = await this.responseInterceptor(response)
73+
const data = await processedResponse.json()
74+
75+
return {
76+
data,
77+
status: processedResponse.status,
78+
statusText: processedResponse.statusText,
79+
headers: processedResponse.headers,
80+
}
81+
}
82+
83+
// SSE streaming request
84+
async postStream(config: FetchRequestConfig, options: SSEStreamOptions): Promise<void> {
85+
const processedConfig = this.requestInterceptor(config)
86+
const url = `${this.baseURL}${processedConfig.url}`
87+
88+
try {
89+
const response = await fetch(url, {
90+
method: 'POST',
91+
headers: processedConfig.headers,
92+
body: typeof processedConfig.body === 'object'
93+
? JSON.stringify(processedConfig.body)
94+
: processedConfig.body,
95+
signal: processedConfig.signal,
96+
})
97+
98+
await this.responseInterceptor(response)
99+
100+
if (!response.body) {
101+
throw new Error('ReadableStream not supported')
102+
}
103+
104+
const reader = response.body.getReader()
105+
const decoder = new TextDecoder()
106+
let buffer = ''
107+
108+
try {
109+
while (true) {
110+
const { done, value } = await reader.read()
111+
112+
if (done) {
113+
options.onComplete?.()
114+
break
115+
}
116+
117+
// Decode the chunk and add to buffer
118+
buffer += decoder.decode(value, { stream: true })
119+
120+
// Process complete lines
121+
const lines = buffer.split('\n')
122+
// Keep the last potentially incomplete line
123+
buffer = lines.pop() || ''
124+
125+
for (const line of lines) {
126+
if (line.trim()) {
127+
options.onChunk?.(line)
128+
}
129+
}
130+
}
131+
}
132+
catch (error) {
133+
options.onError?.(error as Error)
134+
throw error
135+
}
136+
}
137+
catch (error) {
138+
options.onError?.(error as Error)
139+
throw error
140+
}
141+
}
142+
}
143+
144+
// Create singleton instance
145+
const fetchService = new FetchService()
146+
147+
export default fetchService

src/views/chat/index.vue

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,10 @@ async function onRegenerate(index: number) {
369369
let lastText = ''
370370
let accumulatedReasoning = ''
371371
const fetchChatAPIOnce = async () => {
372+
let searchQuery: string
373+
let searchResults: Chat.SearchResult[]
374+
let searchUsageTime: number
375+
372376
await fetchChatAPIProcessSSE({
373377
roomId: currentChatRoom.value!.roomId,
374378
uuid: chatUuid || Date.now(),
@@ -377,6 +381,13 @@ async function onRegenerate(index: number) {
377381
options,
378382
signal: controller.signal,
379383
}, {
384+
onSearchQuery: (data) => {
385+
searchQuery = data.searchQuery
386+
},
387+
onSearchResults: (data) => {
388+
searchResults = data.searchResults
389+
searchUsageTime = data.searchUsageTime
390+
},
380391
onDelta: async (delta) => {
381392
// 处理增量数据
382393
if (delta.text) {
@@ -391,6 +402,9 @@ async function onRegenerate(index: number) {
391402
index,
392403
{
393404
dateTime: new Date().toLocaleString(),
405+
searchQuery,
406+
searchResults,
407+
searchUsageTime,
394408
reasoning: accumulatedReasoning,
395409
text: lastText,
396410
inversion: false,
@@ -406,6 +420,13 @@ async function onRegenerate(index: number) {
406420
},
407421
onMessage: async (data) => {
408422
// Handle complete message data (compatibility mode)
423+
if (data.searchQuery)
424+
searchQuery = data.searchQuery
425+
if (data.searchResults)
426+
searchResults = data.searchResults
427+
if (data.searchUsageTime)
428+
searchUsageTime = data.searchUsageTime
429+
// Handle complete message data (compatibility mode)
409430
const usage = (data.detail && data.detail.usage)
410431
? {
411432
completion_tokens: data.detail.usage.completion_tokens || null,
@@ -419,6 +440,9 @@ async function onRegenerate(index: number) {
419440
index,
420441
{
421442
dateTime: new Date().toLocaleString(),
443+
searchQuery,
444+
searchResults,
445+
searchUsageTime,
422446
reasoning: data?.reasoning,
423447
finish_reason: data?.finish_reason,
424448
text: data.text ?? '',

0 commit comments

Comments
 (0)