Skip to content
This repository was archived by the owner on Jul 29, 2025. It is now read-only.

feat(autocompcat): for non interactive mode #336

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 228 additions & 15 deletions internal/llm/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,7 @@ func (a *agent) processGeneration(ctx context.Context, sessionID, content string
return a.err(fmt.Errorf("failed to get session: %w", err))
}
if session.SummaryMessageID != "" {
summaryMsgInex := -1
for i, msg := range msgs {
if msg.ID == session.SummaryMessageID {
summaryMsgInex = i
break
}
}
if summaryMsgInex != -1 {
msgs = msgs[summaryMsgInex:]
msgs[0].Role = message.User
}
msgs = a.filterMessagesFromSummary(msgs, session.SummaryMessageID)
}

userMsg, err := a.createUserMessage(ctx, sessionID, content, attachmentParts)
Expand All @@ -272,6 +262,8 @@ func (a *agent) processGeneration(ctx context.Context, sessionID, content string
}
// Append the new user message to the conversation history.
msgHistory := append(msgs, userMsg)
compactionAttempts := 0 // Track compaction attempts to prevent infinite loops
maxCompactionAttempts := 2 // Allow at most 2 compaction attempts per request

for {
// Check for cancellation before each iteration
Expand All @@ -281,6 +273,47 @@ func (a *agent) processGeneration(ctx context.Context, sessionID, content string
default:
// Continue processing
}

// Check if auto-compaction should be triggered before each model call
// This is crucial for long tool use loops that can exceed context limits
if cfg.AutoCompact && compactionAttempts < maxCompactionAttempts && a.shouldTriggerAutoCompactionFromHistory(msgHistory) {
compactionAttempts++
logging.Info("Auto-compaction triggered during tool use loop", "session_id", sessionID, "history_length", len(msgHistory), "attempt", compactionAttempts)

// Perform synchronous compaction to shrink context
if err := a.performSynchronousCompaction(ctx, sessionID); err != nil {
logging.Warn("Failed to perform auto-compaction during tool use", "error", err, "attempt", compactionAttempts)
// Continue anyway - better to risk context overflow than stop completely
} else {
// After successful compaction, reload messages and rebuild msgHistory
msgs, err := a.messages.List(ctx, sessionID)
if err != nil {
return a.err(fmt.Errorf("failed to reload messages after compaction: %w", err))
}

session, err := a.sessions.Get(ctx, sessionID)
if err != nil {
return a.err(fmt.Errorf("failed to get session after compaction: %w", err))
}
msgs = a.filterMessagesFromSummary(msgs, session.SummaryMessageID)

msgHistory = append(msgs, userMsg)
logging.Info("Context compacted, continuing with reduced history", "session_id", sessionID, "new_history_length", len(msgHistory), "attempt", compactionAttempts)

// NOTE: Check if compaction actually reduced the context size
// If it's still above threshold, we need to break the loop to prevent infinite compaction
if a.shouldTriggerAutoCompactionFromHistory(msgHistory) {
logging.Warn("Auto-compaction did not sufficiently reduce context size, proceeding anyway to prevent infinite loop",
"session_id", sessionID, "history_length", len(msgHistory), "attempt", compactionAttempts)
// Don't continue - proceed with the current msgHistory to avoid infinite loop
} else {
// After compaction, continue to the next iteration to re-check context size
// This prevents sending an oversized request immediately after compaction
continue
}
}
}

agentMessage, toolResults, err := a.streamAndHandleEvents(ctx, sessionID, msgHistory)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -297,10 +330,31 @@ func (a *agent) processGeneration(ctx context.Context, sessionID, content string
} else {
logging.Info("Result", "message", agentMessage.FinishReason(), "toolResults", toolResults)
}
if (agentMessage.FinishReason() == message.FinishReasonToolUse) && toolResults != nil {
// We are not done, we need to respond with the tool response
msgHistory = append(msgHistory, agentMessage, *toolResults)
continue
if agentMessage.FinishReason() == message.FinishReasonToolUse {
if toolResults != nil {
// We have tool results, continue with the tool response
msgHistory = append(msgHistory, agentMessage, *toolResults)
continue
} else {
// Tool results are nil (tool execution failed or returned empty)
// Create an empty tool results message to allow the LLM to provide a final response
logging.Warn("Tool results are nil, creating empty tool results message to allow final response", "session_id", sessionID)
emptyToolMsg, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
Role: message.Tool,
Parts: []message.ContentPart{message.TextContent{Text: "Tool execution completed with no results."}},
})
if err != nil {
logging.Warn("Failed to create empty tool results message", "error", err)
// If we can't create the message, just return what we have
return AgentEvent{
Type: AgentEventTypeResponse,
Message: agentMessage,
Done: true,
}
}
msgHistory = append(msgHistory, agentMessage, emptyToolMsg)
continue
}
}
return AgentEvent{
Type: AgentEventTypeResponse,
Expand Down Expand Up @@ -532,6 +586,165 @@ func (a *agent) Update(agentName config.AgentName, modelID models.ModelID) (mode
return a.provider.Model(), nil
}

// shouldTriggerAutoCompaction checks if the session should trigger auto-compaction
// based on token usage approaching the context window limit
// filterMessagesFromSummary filters messages to start from the summary message if one exists
// This reduces context size by excluding messages before the summary
func (a *agent) filterMessagesFromSummary(msgs []message.Message, summaryMessageID string) []message.Message {
if summaryMessageID == "" {
return msgs
}

summaryMsgIndex := -1
for i, msg := range msgs {
if msg.ID == summaryMessageID {
summaryMsgIndex = i
break
}
}

if summaryMsgIndex != -1 {
filteredMsgs := msgs[summaryMsgIndex:]
// Convert the summary message role to User so it can be used in conversation
filteredMsgs[0].Role = message.User
return filteredMsgs
}

return msgs
}

func (a *agent) shouldTriggerAutoCompaction(session session.Session) bool {
model := a.provider.Model()
contextWindow := model.ContextWindow

// If context window is not defined, we can't determine if compaction is needed
if contextWindow <= 0 {
return false
}

totalTokens := session.CompletionTokens + session.PromptTokens
threshold := int64(float64(contextWindow) * 0.95) // 95% threshold

return totalTokens >= threshold
}

// shouldTriggerAutoCompactionFromHistory estimates token usage from message history
// and determines if auto-compaction should be triggered. This is used during tool use loops
// where we don't have real-time token counts from the session.
func (a *agent) shouldTriggerAutoCompactionFromHistory(msgHistory []message.Message) bool {
model := a.provider.Model()
contextWindow := model.ContextWindow

// If context window is not defined, we can't determine if compaction is needed
if contextWindow <= 0 {
return false
}

// Estimate tokens from message history
// This is a rough estimation: ~4 characters per token for most models
totalChars := 0
for _, msg := range msgHistory {
for _, part := range msg.Parts {
if textPart, ok := part.(message.TextContent); ok {
totalChars += len(textPart.Text)
}
// For tool calls and other content types, add some overhead
totalChars += 100 // rough estimate for metadata
}
}

// Convert characters to estimated tokens (rough approximation)
estimatedTokens := int64(totalChars / 4)
threshold := int64(float64(contextWindow) * 0.90) // Use 90% for history-based estimation to be more conservative

logging.Debug("Token estimation for auto-compaction",
"estimated_tokens", estimatedTokens,
"threshold", threshold,
"context_window", contextWindow,
"message_count", len(msgHistory))

return estimatedTokens >= threshold
}

// performSynchronousCompaction performs summarization synchronously and waits for completion
// This is used for auto-compaction in non-interactive mode to shrink context before continuing
func (a *agent) performSynchronousCompaction(ctx context.Context, sessionID string) error {
if a.summarizeProvider == nil {
return fmt.Errorf("summarize provider not available")
}

// Note: We don't check IsSessionBusy here because this is called from within
// an active request processing loop, so the session is already marked as busy

logging.Info("Starting synchronous compaction", "session_id", sessionID)

// Get all messages from the session
msgs, err := a.messages.List(ctx, sessionID)
if err != nil {
return fmt.Errorf("failed to list messages: %w", err)
}

if len(msgs) == 0 {
return fmt.Errorf("no messages to summarize")
}

// Add session context
summarizeCtx := context.WithValue(ctx, tools.SessionIDContextKey, sessionID)

// Add a system message to guide the summarization
summarizePrompt := "Provide a detailed but concise summary of our conversation above. Focus on information that would be helpful for continuing the conversation, including what we did, what we're doing, which files we're working on, and what we're going to do next."

// Create a new message with the summarize prompt
promptMsg := message.Message{
Role: message.User,
Parts: []message.ContentPart{message.TextContent{Text: summarizePrompt}},
}

// Append the prompt to the messages
msgsWithPrompt := append(msgs, promptMsg)

// Send the messages to the summarize provider
response, err := a.summarizeProvider.SendMessages(
summarizeCtx,
msgsWithPrompt,
make([]tools.BaseTool, 0),
)
if err != nil {
return fmt.Errorf("failed to summarize: %w", err)
}

summary := strings.TrimSpace(response.Content)
if summary == "" {
return fmt.Errorf("empty summary returned")
}

// Get the session to update
oldSession, err := a.sessions.Get(summarizeCtx, sessionID)
if err != nil {
return fmt.Errorf("failed to get session: %w", err)
}

// Create a new message with the summary
msg, err := a.messages.Create(summarizeCtx, oldSession.ID, message.CreateMessageParams{
Role: message.User,
Parts: []message.ContentPart{message.TextContent{Text: summary}},
Model: a.summarizeProvider.Model().ID,
})
if err != nil {
return fmt.Errorf("failed to create summary message: %w", err)
}

// Update the session with the summary message ID
oldSession.SummaryMessageID = msg.ID
_, err = a.sessions.Save(summarizeCtx, oldSession)
if err != nil {
return fmt.Errorf("failed to save session: %w", err)
}

logging.Info("Synchronous compaction completed successfully", "session_id", sessionID)
return nil
}

func (a *agent) Summarize(ctx context.Context, sessionID string) error {
if a.summarizeProvider == nil {
return fmt.Errorf("summarize provider not available")
Expand Down
Loading