Skip to content

Commit c66593f

Browse files
Micro66hongyu9
andauthored
feat(chat stream): 支持流式输出恢复 (#291)
Co-authored-by: hongyu9 <hongyu9@staff.weibo.com>
1 parent bc4f5f9 commit c66593f

File tree

19 files changed

+2753
-257
lines changed

19 files changed

+2753
-257
lines changed

backend/app/api/endpoints/adapter/chat.py

Lines changed: 503 additions & 4 deletions
Large diffs are not rendered by default.

backend/app/api/endpoints/adapter/tasks.py

Lines changed: 115 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from app.api.dependencies import get_db
1313
from app.core import security
1414
from app.core.config import settings
15+
from app.models.subtask import Subtask, SubtaskRole, SubtaskStatus
1516
from app.models.user import User
1617
from app.schemas.task import (
1718
TaskCreate,
@@ -44,6 +45,22 @@ async def call_executor_cancel(task_id: int):
4445
)
4546

4647

48+
async def call_chat_shell_cancel(subtask_id: int, partial_content: str = ""):
49+
"""Background task to cancel Chat Shell streaming via session manager"""
50+
try:
51+
from app.services.chat.session_manager import session_manager
52+
53+
success = await session_manager.cancel_stream(subtask_id)
54+
if success:
55+
logger.info(f"Chat Shell stream cancelled successfully for subtask {subtask_id}")
56+
else:
57+
logger.warning(f"Failed to cancel Chat Shell stream for subtask {subtask_id}")
58+
except Exception as e:
59+
logger.error(
60+
f"Error cancelling Chat Shell stream for subtask {subtask_id}: {str(e)}"
61+
)
62+
63+
4764
@router.post("", response_model=dict)
4865
def create_task_id(
4966
current_user: User = Depends(security.get_current_user),
@@ -170,7 +187,10 @@ async def cancel_task(
170187
current_user: User = Depends(security.get_current_user),
171188
db: Session = Depends(get_db),
172189
):
173-
"""Cancel a running task by calling executor_manager"""
190+
"""Cancel a running task by calling executor_manager or Chat Shell cancel"""
191+
from app.models.kind import Kind
192+
from app.schemas.kind import Task
193+
174194
# Verify user owns this task
175195
task = task_kinds_service.get_task_detail(
176196
db=db, task_id=task_id, user_id=current_user.id
@@ -196,24 +216,101 @@ async def cancel_task(
196216
logger.info(f"Task {task_id} is already being cancelled")
197217
return {"message": "Task is already being cancelled", "status": "CANCELLING"}
198218

199-
# Update task status to CANCELLING immediately
200-
try:
201-
task_kinds_service.update_task(
202-
db=db,
203-
task_id=task_id,
204-
obj_in=TaskUpdate(status="CANCELLING"),
205-
user_id=current_user.id,
206-
)
207-
logger.info(
208-
f"Task {task_id} status updated to CANCELLING by user {current_user.id}"
219+
# Check if this is a Chat Shell task by looking at the source label
220+
is_chat_shell = False
221+
task_kind = (
222+
db.query(Kind)
223+
.filter(
224+
Kind.id == task_id,
225+
Kind.user_id == current_user.id,
226+
Kind.kind == "Task",
227+
Kind.is_active == True,
209228
)
210-
except Exception as e:
211-
logger.error(f"Failed to update task {task_id} status to CANCELLING: {str(e)}")
212-
raise HTTPException(
213-
status_code=500, detail=f"Failed to update task status: {str(e)}"
229+
.first()
230+
)
231+
232+
if task_kind and task_kind.json:
233+
task_crd = Task.model_validate(task_kind.json)
234+
if task_crd.metadata.labels:
235+
source = task_crd.metadata.labels.get("source", "")
236+
is_chat_shell = source == "chat_shell"
237+
238+
logger.info(f"Task {task_id} is_chat_shell={is_chat_shell}")
239+
240+
if is_chat_shell:
241+
# For Chat Shell tasks, find the running subtask and cancel via session manager
242+
running_subtask = (
243+
db.query(Subtask)
244+
.filter(
245+
Subtask.task_id == task_id,
246+
Subtask.user_id == current_user.id,
247+
Subtask.role == SubtaskRole.ASSISTANT,
248+
Subtask.status == SubtaskStatus.RUNNING,
249+
)
250+
.first()
214251
)
252+
253+
if running_subtask:
254+
# Cancel the Chat Shell stream
255+
background_tasks.add_task(call_chat_shell_cancel, running_subtask.id)
256+
257+
# Update subtask status to COMPLETED (not CANCELLED, to show partial content)
258+
from datetime import datetime
259+
running_subtask.status = SubtaskStatus.COMPLETED
260+
running_subtask.progress = 100
261+
running_subtask.completed_at = datetime.now()
262+
running_subtask.updated_at = datetime.now()
263+
running_subtask.error_message = ""
264+
db.commit()
265+
266+
# Update task status to COMPLETED (not CANCELLING, for Chat Shell)
267+
try:
268+
task_kinds_service.update_task(
269+
db=db,
270+
task_id=task_id,
271+
obj_in=TaskUpdate(status="COMPLETED"),
272+
user_id=current_user.id,
273+
)
274+
logger.info(
275+
f"Chat Shell task {task_id} cancelled and marked as COMPLETED"
276+
)
277+
except Exception as e:
278+
logger.error(f"Failed to update Chat Shell task {task_id} status: {str(e)}")
279+
280+
return {"message": "Chat stopped successfully", "status": "COMPLETED"}
281+
else:
282+
# No running subtask found, just mark task as completed
283+
try:
284+
task_kinds_service.update_task(
285+
db=db,
286+
task_id=task_id,
287+
obj_in=TaskUpdate(status="COMPLETED"),
288+
user_id=current_user.id,
289+
)
290+
except Exception as e:
291+
logger.error(f"Failed to update task {task_id} status: {str(e)}")
292+
293+
return {"message": "No running stream to cancel", "status": "COMPLETED"}
294+
else:
295+
# For non-Chat Shell tasks, use executor_manager
296+
# Update task status to CANCELLING immediately
297+
try:
298+
task_kinds_service.update_task(
299+
db=db,
300+
task_id=task_id,
301+
obj_in=TaskUpdate(status="CANCELLING"),
302+
user_id=current_user.id,
303+
)
304+
logger.info(
305+
f"Task {task_id} status updated to CANCELLING by user {current_user.id}"
306+
)
307+
except Exception as e:
308+
logger.error(f"Failed to update task {task_id} status to CANCELLING: {str(e)}")
309+
raise HTTPException(
310+
status_code=500, detail=f"Failed to update task status: {str(e)}"
311+
)
215312

216-
# Call executor_manager in the background
217-
background_tasks.add_task(call_executor_cancel, task_id)
313+
# Call executor_manager in the background
314+
background_tasks.add_task(call_executor_cancel, task_id)
218315

219-
return {"message": "Cancel request accepted", "status": "CANCELLING"}
316+
return {"message": "Cancel request accepted", "status": "CANCELLING"}

backend/app/core/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ class Settings(BaseSettings):
5252
CHAT_HISTORY_MAX_MESSAGES: int = 50 # Maximum messages to keep in history
5353
CHAT_API_TIMEOUT_SECONDS: int = 300 # LLM API call timeout (5 minutes)
5454

55+
# Streaming incremental save configuration
56+
STREAMING_REDIS_SAVE_INTERVAL: float = 1.0 # Redis save interval (seconds)
57+
STREAMING_DB_SAVE_INTERVAL: float = 5.0 # Database save interval (seconds)
58+
STREAMING_REDIS_TTL: int = 300 # Redis streaming cache TTL (seconds)
59+
STREAMING_MIN_CHARS_TO_SAVE: int = 50 # Minimum characters to save on disconnect
60+
5561
# Task append expiration (hours)
5662
APPEND_CHAT_TASK_EXPIRE_HOURS: int = 2
5763
APPEND_CODE_TASK_EXPIRE_HOURS: int = 24

backend/app/models/subtask_attachment.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,21 @@
1010
from datetime import datetime
1111
from enum import Enum as PyEnum
1212

13-
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text
13+
from sqlalchemy import Column, DateTime, ForeignKey, Integer, LargeBinary, String, Text
1414
from sqlalchemy import Enum as SQLEnum
1515
from sqlalchemy.dialects.mysql import LONGBLOB, LONGTEXT
1616
from sqlalchemy.sql import func
1717

1818
from app.db.base import Base
1919

2020

21+
# Type adapter for binary data - uses LONGBLOB for MySQL, LargeBinary for others
22+
BinaryDataType = LargeBinary().with_variant(LONGBLOB, "mysql")
23+
24+
# Type adapter for long text - uses LONGTEXT for MySQL, Text for others
25+
LongTextType = Text().with_variant(LONGTEXT, "mysql")
26+
27+
2128
class AttachmentStatus(str, PyEnum):
2229
"""Attachment processing status."""
2330
UPLOADING = "uploading"
@@ -56,18 +63,18 @@ class SubtaskAttachment(Base):
5663
file_size = Column(Integer, nullable=False) # File size in bytes
5764
mime_type = Column(String(100), nullable=False)
5865

59-
# Binary data storage (LONGBLOB for MySQL - supports up to 4GB)
60-
binary_data = Column(LONGBLOB, nullable=False)
66+
# Binary data storage (LONGBLOB for MySQL, LargeBinary for SQLite - supports up to 4GB)
67+
binary_data = Column(BinaryDataType, nullable=False)
6168

62-
# Image base64 encoding (for vision models, LONGTEXT for large images)
69+
# Image base64 encoding (for vision models, LONGTEXT for MySQL, Text for SQLite)
6370
# Note: MySQL doesn't allow default values for TEXT/BLOB columns, so nullable=True
6471
# Empty string or None means no image data
65-
image_base64 = Column(LONGTEXT, nullable=True, default="")
72+
image_base64 = Column(LongTextType, nullable=True, default="")
6673

67-
# Extracted text content (LONGTEXT for MySQL - supports up to 4GB)
74+
# Extracted text content (LONGTEXT for MySQL, Text for SQLite - supports up to 4GB)
6875
# Note: MySQL doesn't allow default values for TEXT/BLOB columns, so nullable=True
6976
# Empty string or None means no extracted text
70-
extracted_text = Column(LONGTEXT, nullable=True, default="")
77+
extracted_text = Column(LongTextType, nullable=True, default="")
7178
text_length = Column(Integer, nullable=False, default=0) # Character count of extracted text
7279

7380
# Processing status

0 commit comments

Comments
 (0)