diff --git a/core/providers/utils.go b/core/providers/utils.go index 79fdd264c..51a5cf44b 100644 --- a/core/providers/utils.go +++ b/core/providers/utils.go @@ -592,9 +592,7 @@ func getResponsesChunkConverterCombinedPostHookRunner(postHookRunner schemas.Pos result.ExtraFields.RequestType = schemas.ResponsesStreamRequest } else if err != nil { // Ensure downstream knows this is a Responses stream even on errors - if err.ExtraFields.RequestType == "" { - err.ExtraFields.RequestType = schemas.ResponsesStreamRequest - } + err.ExtraFields.RequestType = schemas.ResponsesStreamRequest } return result, err } diff --git a/core/schemas/mux.go b/core/schemas/mux.go index 8a604d2cb..ec10fac30 100644 --- a/core/schemas/mux.go +++ b/core/schemas/mux.go @@ -339,12 +339,12 @@ func (cm *ChatMessage) ToResponsesMessages() []ResponsesMessage { rm.Content = &ResponsesMessageContent{ ContentBlocks: []ResponsesMessageContentBlock{refusalBlock}, } - } else if cm.Content.ContentStr != nil { + } else if cm.Content != nil && cm.Content.ContentStr != nil { // Convert regular string content rm.Content = &ResponsesMessageContent{ ContentStr: cm.Content.ContentStr, } - } else if cm.Content.ContentBlocks != nil { + } else if cm.Content != nil && cm.Content.ContentBlocks != nil { // Convert content blocks responseBlocks := make([]ResponsesMessageContentBlock, len(cm.Content.ContentBlocks)) for i, block := range cm.Content.ContentBlocks { @@ -910,6 +910,7 @@ func (br *BifrostResponse) ToResponsesStream() { delta := choice.BifrostStreamResponseChoice.Delta streamResp := &ResponsesStreamResponse{ SequenceNumber: br.ExtraFields.ChunkIndex, + ContentIndex: Ptr(0), OutputIndex: &choice.Index, } diff --git a/core/schemas/providers/anthropic/responses.go b/core/schemas/providers/anthropic/responses.go index 8c5b2d9b4..277c13175 100644 --- a/core/schemas/providers/anthropic/responses.go +++ b/core/schemas/providers/anthropic/responses.go @@ -341,12 +341,26 @@ func (chunk *AnthropicStreamEvent) ToBifrostResponsesStream(sequenceNumber int) Text: schemas.Ptr(""), // Empty text initially } case AnthropicContentBlockTypeToolUse: - // This is a function call starting - contentType = schemas.ResponsesInputMessageContentBlockTypeText // Will be updated to function call - part = &schemas.ResponsesMessageContentBlock{ - Type: contentType, - Text: schemas.Ptr(""), // Will contain function call info + // This is a function call starting - create function call message + + item := &schemas.ResponsesMessage{ + ID: chunk.ContentBlock.ToolUseID, + Type: schemas.Ptr(schemas.ResponsesMessageTypeFunctionCall), + ResponsesToolMessage: &schemas.ResponsesToolMessage{ + CallID: chunk.ContentBlock.ToolUseID, + Name: chunk.ContentBlock.Name, + Arguments: schemas.Ptr(""), // Arguments will be filled by deltas + }, } + + return &schemas.BifrostResponse{ + ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ + Type: schemas.ResponsesStreamResponseTypeOutputItemAdded, + SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), + Item: item, + }, + }, nil, false } if part != nil { @@ -389,7 +403,7 @@ func (chunk *AnthropicStreamEvent) ToBifrostResponsesStream(sequenceNumber int) SequenceNumber: sequenceNumber, OutputIndex: schemas.Ptr(0), ContentIndex: chunk.Index, - Arguments: chunk.Delta.PartialJSON, + Delta: chunk.Delta.PartialJSON, }, }, nil, false } diff --git a/core/schemas/providers/bedrock/responses.go b/core/schemas/providers/bedrock/responses.go index 2e2bf55fe..f48f7bf3e 100644 --- a/core/schemas/providers/bedrock/responses.go +++ b/core/schemas/providers/bedrock/responses.go @@ -211,16 +211,19 @@ func (bedrockResp *BedrockConverseResponse) ToResponsesBifrostResponse() (*schem // Convert usage information usage := &schemas.LLMUsage{ - ResponsesExtendedResponseUsage: &schemas.ResponsesExtendedResponseUsage{ - InputTokens: bedrockResp.Usage.InputTokens, - OutputTokens: bedrockResp.Usage.OutputTokens, - }, - TotalTokens: bedrockResp.Usage.TotalTokens, + ResponsesExtendedResponseUsage: &schemas.ResponsesExtendedResponseUsage{}, + } + + // Handle potential nil Usage field + if bedrockResp.Usage != nil { + usage.ResponsesExtendedResponseUsage.InputTokens = bedrockResp.Usage.InputTokens + usage.ResponsesExtendedResponseUsage.OutputTokens = bedrockResp.Usage.OutputTokens + usage.TotalTokens = bedrockResp.Usage.TotalTokens } bifrostResp.Usage = usage // Convert output message to Responses format - if bedrockResp.Output.Message != nil { + if bedrockResp.Output != nil && bedrockResp.Output.Message != nil { outputMessages := convertBedrockMessageToResponsesMessages(*bedrockResp.Output.Message) bifrostResp.ResponsesResponse.Output = outputMessages } @@ -277,142 +280,144 @@ func convertResponsesItemsToBedrockMessages(messages []schemas.ResponsesMessage) for _, msg := range messages { // Handle Responses items + msgType := schemas.ResponsesMessageTypeMessage if msg.Type != nil { - switch *msg.Type { - case "message": - // Check if Role is present, skip message if not - if msg.Role == nil { - continue - } + msgType = *msg.Type + } + switch msgType { + case schemas.ResponsesMessageTypeMessage: + // Check if Role is present, skip message if not + if msg.Role == nil { + continue + } - // Extract role from the Responses message structure - role := *msg.Role - - if role == schemas.ResponsesInputMessageRoleSystem { - // Convert to system message - // Ensure Content and ContentStr are present - if msg.Content != nil { - if msg.Content.ContentStr != nil { - systemMessages = append(systemMessages, BedrockSystemMessage{ - Text: msg.Content.ContentStr, - }) - } else if msg.Content.ContentBlocks != nil { - for _, block := range msg.Content.ContentBlocks { - if block.Text != nil { - systemMessages = append(systemMessages, BedrockSystemMessage{ - Text: block.Text, - }) - } + // Extract role from the Responses message structure + role := *msg.Role + + if role == schemas.ResponsesInputMessageRoleSystem { + // Convert to system message + // Ensure Content and ContentStr are present + if msg.Content != nil { + if msg.Content.ContentStr != nil { + systemMessages = append(systemMessages, BedrockSystemMessage{ + Text: msg.Content.ContentStr, + }) + } else if msg.Content.ContentBlocks != nil { + for _, block := range msg.Content.ContentBlocks { + if block.Text != nil { + systemMessages = append(systemMessages, BedrockSystemMessage{ + Text: block.Text, + }) } } } - // Skip system messages with no content - } else { - // Convert regular message - // Ensure Content is present - if msg.Content == nil { - // Skip messages without content or create with empty content - continue - } - - bedrockMsg := BedrockMessage{ - Role: BedrockMessageRole(role), - } + } + // Skip system messages with no content + } else { + // Convert regular message + // Ensure Content is present + if msg.Content == nil { + // Skip messages without content or create with empty content + continue + } - // Convert content - contentBlocks, err := convertBifrostResponsesMessageContentBlocksToBedrockContentBlocks(*msg.Content) - if err != nil { - return nil, nil, fmt.Errorf("failed to convert content blocks: %w", err) - } - bedrockMsg.Content = contentBlocks + bedrockMsg := BedrockMessage{ + Role: BedrockMessageRole(role), + } - bedrockMessages = append(bedrockMessages, bedrockMsg) + // Convert content + contentBlocks, err := convertBifrostResponsesMessageContentBlocksToBedrockContentBlocks(*msg.Content) + if err != nil { + return nil, nil, fmt.Errorf("failed to convert content blocks: %w", err) } + bedrockMsg.Content = contentBlocks - case "function_call": - // Handle function calls from Responses - if msg.ResponsesToolMessage != nil { - // Create tool use content block - var toolUseID string - if msg.ResponsesToolMessage.CallID != nil { - toolUseID = *msg.ResponsesToolMessage.CallID - } + bedrockMessages = append(bedrockMessages, bedrockMsg) + } - // Get function name from ToolMessage - var functionName string - if msg.ResponsesToolMessage != nil && msg.ResponsesToolMessage.Name != nil { - functionName = *msg.ResponsesToolMessage.Name - } + case schemas.ResponsesMessageTypeFunctionCall: + // Handle function calls from Responses + if msg.ResponsesToolMessage != nil { + // Create tool use content block + var toolUseID string + if msg.ResponsesToolMessage.CallID != nil { + toolUseID = *msg.ResponsesToolMessage.CallID + } - // Parse JSON arguments into interface{} - var input interface{} = map[string]interface{}{} - if msg.ResponsesToolMessage.Arguments != nil { - var parsedInput interface{} - if err := json.Unmarshal([]byte(*msg.ResponsesToolMessage.Arguments), &parsedInput); err != nil { - return nil, nil, fmt.Errorf("failed to parse tool arguments JSON: %w", err) - } - input = parsedInput - } + // Get function name from ToolMessage + var functionName string + if msg.ResponsesToolMessage != nil && msg.ResponsesToolMessage.Name != nil { + functionName = *msg.ResponsesToolMessage.Name + } - toolUseBlock := BedrockContentBlock{ - ToolUse: &BedrockToolUse{ - ToolUseID: toolUseID, - Name: functionName, - Input: input, - }, + // Parse JSON arguments into interface{} + var input interface{} = map[string]interface{}{} + if msg.ResponsesToolMessage.Arguments != nil { + var parsedInput interface{} + if err := json.Unmarshal([]byte(*msg.ResponsesToolMessage.Arguments), &parsedInput); err != nil { + return nil, nil, fmt.Errorf("failed to parse tool arguments JSON: %w", err) } + input = parsedInput + } - // Create assistant message with tool use - assistantMsg := BedrockMessage{ - Role: BedrockMessageRoleAssistant, - Content: []BedrockContentBlock{toolUseBlock}, - } - bedrockMessages = append(bedrockMessages, assistantMsg) + toolUseBlock := BedrockContentBlock{ + ToolUse: &BedrockToolUse{ + ToolUseID: toolUseID, + Name: functionName, + Input: input, + }, + } + // Create assistant message with tool use + assistantMsg := BedrockMessage{ + Role: BedrockMessageRoleAssistant, + Content: []BedrockContentBlock{toolUseBlock}, } + bedrockMessages = append(bedrockMessages, assistantMsg) - case "function_call_output": - // Handle function call outputs from Responses - if msg.ResponsesToolMessage != nil && msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput != nil { - var toolUseID string - if msg.ResponsesToolMessage.CallID != nil { - toolUseID = *msg.ResponsesToolMessage.CallID - } - toolResultBlock := BedrockContentBlock{ - ToolResult: &BedrockToolResult{ - ToolUseID: toolUseID, - }, - } - // Set content based on available data - if msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput.ResponsesFunctionToolCallOutputStr != nil { - raw := *msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput.ResponsesFunctionToolCallOutputStr - var parsed interface{} - if err := json.Unmarshal([]byte(raw), &parsed); err == nil { - toolResultBlock.ToolResult.Content = []BedrockContentBlock{ - {JSON: parsed}, - } - } else { - toolResultBlock.ToolResult.Content = []BedrockContentBlock{ - {Text: &raw}, - } + } + + case schemas.ResponsesMessageTypeFunctionCallOutput: + // Handle function call outputs from Responses + if msg.ResponsesToolMessage != nil && msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput != nil { + var toolUseID string + if msg.ResponsesToolMessage.CallID != nil { + toolUseID = *msg.ResponsesToolMessage.CallID + } + toolResultBlock := BedrockContentBlock{ + ToolResult: &BedrockToolResult{ + ToolUseID: toolUseID, + }, + } + // Set content based on available data + if msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput.ResponsesFunctionToolCallOutputStr != nil { + raw := *msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput.ResponsesFunctionToolCallOutputStr + var parsed interface{} + if err := json.Unmarshal([]byte(raw), &parsed); err == nil { + toolResultBlock.ToolResult.Content = []BedrockContentBlock{ + {JSON: parsed}, } - } else if msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput.ResponsesFunctionToolCallOutputBlocks != nil { - toolResultContent, err := convertBifrostResponsesMessageContentBlocksToBedrockContentBlocks(schemas.ResponsesMessageContent{ - ContentBlocks: msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput.ResponsesFunctionToolCallOutputBlocks, - }) - if err != nil { - return nil, nil, fmt.Errorf("failed to convert tool result content blocks: %w", err) + } else { + toolResultBlock.ToolResult.Content = []BedrockContentBlock{ + {Text: &raw}, } - toolResultBlock.ToolResult.Content = toolResultContent } - - // Create user message with tool result - userMsg := BedrockMessage{ - Role: "user", - Content: []BedrockContentBlock{toolResultBlock}, + } else if msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput.ResponsesFunctionToolCallOutputBlocks != nil { + toolResultContent, err := convertBifrostResponsesMessageContentBlocksToBedrockContentBlocks(schemas.ResponsesMessageContent{ + ContentBlocks: msg.ResponsesToolMessage.ResponsesFunctionToolCallOutput.ResponsesFunctionToolCallOutputBlocks, + }) + if err != nil { + return nil, nil, fmt.Errorf("failed to convert tool result content blocks: %w", err) } - bedrockMessages = append(bedrockMessages, userMsg) + toolResultBlock.ToolResult.Content = toolResultContent + } + + // Create user message with tool result + userMsg := BedrockMessage{ + Role: BedrockMessageRoleUser, + Content: []BedrockContentBlock{toolResultBlock}, } + bedrockMessages = append(bedrockMessages, userMsg) } } } @@ -559,27 +564,50 @@ func (chunk *BedrockStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (* }, nil, false case chunk.Start != nil: - // Handle content block start (text content or tool use) - create content part added event + // Handle content block start (text content or tool use) contentBlockIndex := 0 if chunk.ContentBlockIndex != nil { contentBlockIndex = *chunk.ContentBlockIndex } - // Create content part for any content type - part := &schemas.ResponsesMessageContentBlock{ - Type: schemas.ResponsesOutputMessageContentTypeText, - Text: schemas.Ptr(""), // Empty initially - } + // Check if this is a tool use start + if chunk.Start.ToolUse != nil { + // This is a function call starting - create function call message + item := &schemas.ResponsesMessage{ + ID: &chunk.Start.ToolUse.ToolUseID, + Type: schemas.Ptr(schemas.ResponsesMessageTypeFunctionCall), + ResponsesToolMessage: &schemas.ResponsesToolMessage{ + CallID: &chunk.Start.ToolUse.ToolUseID, + Name: &chunk.Start.ToolUse.Name, + Arguments: schemas.Ptr(""), // Arguments will be filled by deltas + }, + } - return &schemas.BifrostResponse{ - ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ - Type: schemas.ResponsesStreamResponseTypeContentPartAdded, - SequenceNumber: sequenceNumber, - OutputIndex: schemas.Ptr(0), - ContentIndex: &contentBlockIndex, - Part: part, - }, - }, nil, false + return &schemas.BifrostResponse{ + ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ + Type: schemas.ResponsesStreamResponseTypeOutputItemAdded, + SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), + Item: item, + }, + }, nil, false + } else { + // This is text content - create content part added event + part := &schemas.ResponsesMessageContentBlock{ + Type: schemas.ResponsesOutputMessageContentTypeText, + Text: schemas.Ptr(""), // Empty initially + } + + return &schemas.BifrostResponse{ + ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ + Type: schemas.ResponsesStreamResponseTypeContentPartAdded, + SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), + ContentIndex: &contentBlockIndex, + Part: part, + }, + }, nil, false + } case chunk.ContentBlockIndex != nil && chunk.Delta != nil: // Handle contentBlockDelta event @@ -608,11 +636,11 @@ func (chunk *BedrockStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (* if toolUseDelta.Input != "" { return &schemas.BifrostResponse{ ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ - Type: schemas.ResponsesStreamResponseTypeFunctionCallArgumentsAdded, + Type: schemas.ResponsesStreamResponseTypeFunctionCallArgumentsDelta, SequenceNumber: sequenceNumber, OutputIndex: schemas.Ptr(0), ContentIndex: &contentBlockIndex, - Arguments: &toolUseDelta.Input, + Delta: &toolUseDelta.Input, }, }, nil, false } diff --git a/core/schemas/providers/cohere/responses.go b/core/schemas/providers/cohere/responses.go index db73b6dda..3461cfe50 100644 --- a/core/schemas/providers/cohere/responses.go +++ b/core/schemas/providers/cohere/responses.go @@ -448,6 +448,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeOutputItemAdded, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), Item: item, }, }, nil, false @@ -478,6 +479,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeContentPartAdded, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), ContentIndex: chunk.Index, Part: part, }, @@ -492,6 +494,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeOutputTextDelta, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), ContentIndex: chunk.Index, Delta: chunk.Delta.Message.Content.Text, }, @@ -506,6 +509,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeContentPartDone, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), ContentIndex: chunk.Index, }, }, nil, false @@ -517,6 +521,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeReasoningSummaryTextDelta, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), ContentIndex: schemas.Ptr(0), // Tool plan is typically at index 0 Delta: chunk.Delta.Message.ToolPlan, }, @@ -544,6 +549,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeOutputItemAdded, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), Item: item, }, }, nil, false @@ -557,10 +563,11 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s if toolCall.Function != nil { return &schemas.BifrostResponse{ ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ - Type: schemas.ResponsesStreamResponseTypeFunctionCallArgumentsAdded, + Type: schemas.ResponsesStreamResponseTypeFunctionCallArgumentsDelta, SequenceNumber: sequenceNumber, ContentIndex: chunk.Index, - Arguments: schemas.Ptr(toolCall.Function.Arguments), + OutputIndex: schemas.Ptr(0), + Delta: schemas.Ptr(toolCall.Function.Arguments), }, }, nil, false } @@ -573,6 +580,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeFunctionCallArgumentsDone, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), ContentIndex: chunk.Index, }, }, nil, false @@ -618,6 +626,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeOutputTextAnnotationAdded, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), ContentIndex: schemas.Ptr(citation.ContentIndex), Annotation: annotation, AnnotationIndex: chunk.Index, @@ -632,6 +641,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeOutputTextAnnotationAdded, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), ContentIndex: chunk.Index, AnnotationIndex: chunk.Index, }, @@ -643,6 +653,7 @@ func (chunk *CohereStreamEvent) ToBifrostResponsesStream(sequenceNumber int) (*s ResponsesStreamResponse: &schemas.ResponsesStreamResponse{ Type: schemas.ResponsesStreamResponseTypeCompleted, SequenceNumber: sequenceNumber, + OutputIndex: schemas.Ptr(0), Response: &schemas.ResponsesStreamResponseStruct{}, // Initialize Response field }, } diff --git a/framework/logstore/tables.go b/framework/logstore/tables.go index eaab94f1e..8513c840f 100644 --- a/framework/logstore/tables.go +++ b/framework/logstore/tables.go @@ -76,10 +76,11 @@ type Log struct { Model string `gorm:"type:varchar(255);index;not null" json:"model"` InputHistory string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ChatMessage OutputMessage string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ChatMessage + ResponsesOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ResponsesMessage EmbeddingOutput string `gorm:"type:text" json:"-"` // JSON serialized [][]float32 Params string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ModelParameters Tools string `gorm:"type:text" json:"-"` // JSON serialized []schemas.Tool - ToolCalls string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ToolCall + ToolCalls string `gorm:"type:text" json:"-"` // JSON serialized []schemas.ToolCall (For backward compatibility, tool calls are now in the content) SpeechInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.SpeechInput TranscriptionInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.TranscriptionInput SpeechOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.BifrostSpeech @@ -104,10 +105,11 @@ type Log struct { // Virtual fields for JSON output - these will be populated when needed InputHistoryParsed []schemas.ChatMessage `gorm:"-" json:"input_history,omitempty"` OutputMessageParsed *schemas.ChatMessage `gorm:"-" json:"output_message,omitempty"` + ResponsesOutputParsed []schemas.ResponsesMessage `gorm:"-" json:"responses_output,omitempty"` EmbeddingOutputParsed []schemas.BifrostEmbedding `gorm:"-" json:"embedding_output,omitempty"` ParamsParsed interface{} `gorm:"-" json:"params,omitempty"` ToolsParsed []schemas.ChatTool `gorm:"-" json:"tools,omitempty"` - ToolCallsParsed []schemas.ChatAssistantMessageToolCall `gorm:"-" json:"tool_calls,omitempty"` + ToolCallsParsed []schemas.ChatAssistantMessageToolCall `gorm:"-" json:"tool_calls,omitempty"` // For backward compatibility, tool calls are now in the content TokenUsageParsed *schemas.LLMUsage `gorm:"-" json:"token_usage,omitempty"` ErrorDetailsParsed *schemas.BifrostError `gorm:"-" json:"error_details,omitempty"` SpeechInputParsed *schemas.SpeechInput `gorm:"-" json:"speech_input,omitempty"` @@ -158,6 +160,14 @@ func (l *Log) SerializeFields() error { } } + if l.ResponsesOutputParsed != nil { + if data, err := json.Marshal(l.ResponsesOutputParsed); err != nil { + return err + } else { + l.ResponsesOutput = string(data) + } + } + if l.EmbeddingOutputParsed != nil { if data, err := json.Marshal(l.EmbeddingOutputParsed); err != nil { return err @@ -273,6 +283,13 @@ func (l *Log) DeserializeFields() error { } } + if l.ResponsesOutput != "" { + if err := json.Unmarshal([]byte(l.ResponsesOutput), &l.ResponsesOutputParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.ResponsesOutputParsed = []schemas.ResponsesMessage{} + } + } + if l.EmbeddingOutput != "" { if err := json.Unmarshal([]byte(l.EmbeddingOutput), &l.EmbeddingOutputParsed); err != nil { // Log error but don't fail the operation - initialize as nil @@ -398,6 +415,30 @@ func (l *Log) BuildContentSummary() string { } } + // Add responses output content + if l.ResponsesOutputParsed != nil { + for _, msg := range l.ResponsesOutputParsed { + if msg.Content != nil { + if msg.Content.ContentStr != nil && *msg.Content.ContentStr != "" { + parts = append(parts, *msg.Content.ContentStr) + } + // If content blocks exist, extract text from them + if msg.Content.ContentBlocks != nil { + for _, block := range msg.Content.ContentBlocks { + if block.Text != nil && *block.Text != "" { + parts = append(parts, *block.Text) + } + } + } + } + if msg.ResponsesReasoning != nil { + for _, summary := range msg.ResponsesReasoning.Summary { + parts = append(parts, summary.Text) + } + } + } + } + // Add speech input content if l.SpeechInputParsed != nil && l.SpeechInputParsed.Input != "" { parts = append(parts, l.SpeechInputParsed.Input) diff --git a/framework/pricing/main.go b/framework/pricing/main.go index baa269974..365adac99 100644 --- a/framework/pricing/main.go +++ b/framework/pricing/main.go @@ -166,6 +166,11 @@ func (pm *PricingManager) CalculateCost(result *schemas.BifrostResponse) float64 if result.Transcribe.Usage.InputTokenDetails != nil { audioTokenDetails = result.Transcribe.Usage.InputTokenDetails } + } else if result.ResponsesStreamResponse != nil && result.ResponsesStreamResponse.Response != nil && result.ResponsesStreamResponse.Response.Usage != nil { + usage = &schemas.LLMUsage{ + ResponsesExtendedResponseUsage: result.ResponsesStreamResponse.Response.Usage.ResponsesExtendedResponseUsage, + TotalTokens: result.ResponsesStreamResponse.Response.Usage.TotalTokens, + } } cost := 0.0 diff --git a/framework/streaming/accumulator.go b/framework/streaming/accumulator.go index bbc003bd1..99f5cfc8f 100644 --- a/framework/streaming/accumulator.go +++ b/framework/streaming/accumulator.go @@ -18,6 +18,7 @@ type Accumulator struct { streamAccumulators sync.Map // Track accumulators by request ID (atomic) chatStreamChunkPool sync.Pool // Pool for reusing StreamChunk structs + responsesStreamChunkPool sync.Pool // Pool for reusing ResponsesStreamChunk structs audioStreamChunkPool sync.Pool // Pool for reusing AudioStreamChunk structs transcriptionStreamChunkPool sync.Pool // Pool for reusing TranscriptionStreamChunk structs @@ -80,14 +81,32 @@ func (a *Accumulator) putTranscriptionStreamChunk(chunk *TranscriptionStreamChun a.transcriptionStreamChunkPool.Put(chunk) } +// getResponsesStreamChunk gets a responses stream chunk from the pool +func (a *Accumulator) getResponsesStreamChunk() *ResponsesStreamChunk { + return a.responsesStreamChunkPool.Get().(*ResponsesStreamChunk) +} + +// putResponsesStreamChunk returns a responses stream chunk to the pool +func (a *Accumulator) putResponsesStreamChunk(chunk *ResponsesStreamChunk) { + chunk.Timestamp = time.Time{} + chunk.StreamResponse = nil + chunk.Cost = nil + chunk.SemanticCacheDebug = nil + chunk.ErrorDetails = nil + chunk.FinishReason = nil + chunk.TokenUsage = nil + a.responsesStreamChunkPool.Put(chunk) +} + // CreateStreamAccumulator creates a new stream accumulator for a request func (a *Accumulator) createStreamAccumulator(requestID string) *StreamAccumulator { sc := &StreamAccumulator{ - RequestID: requestID, - ChatStreamChunks: make([]*ChatStreamChunk, 0), - IsComplete: false, - Timestamp: time.Now(), - Object: "", + RequestID: requestID, + ChatStreamChunks: make([]*ChatStreamChunk, 0), + ResponsesStreamChunks: make([]*ResponsesStreamChunk, 0), + IsComplete: false, + Timestamp: time.Now(), + Object: "", } a.streamAccumulators.Store(requestID, sc) return sc @@ -174,6 +193,30 @@ func (a *Accumulator) addAudioStreamChunk(requestID string, chunk *AudioStreamCh return nil } +// addResponsesStreamChunk adds a responses stream chunk to the stream accumulator +func (a *Accumulator) addResponsesStreamChunk(requestID string, chunk *ResponsesStreamChunk, object string, isFinalChunk bool) error { + accumulator := a.getOrCreateStreamAccumulator(requestID) + // Lock the accumulator + accumulator.mu.Lock() + defer accumulator.mu.Unlock() + if accumulator.StartTimestamp.IsZero() { + accumulator.StartTimestamp = chunk.Timestamp + } + // Store object type once (from first chunk) + if accumulator.Object == "" && object != "" { + accumulator.Object = object + } + // Add chunk to the list (chunks arrive in order) + accumulator.ResponsesStreamChunks = append(accumulator.ResponsesStreamChunks, chunk) + // Check if this is the final chunk + // Set FinalTimestamp when either FinishReason is present or token usage exists + // This handles both normal completion chunks and usage-only last chunks + if isFinalChunk { + accumulator.FinalTimestamp = chunk.Timestamp + } + return nil +} + // cleanupStreamAccumulator removes the stream accumulator for a request func (a *Accumulator) cleanupStreamAccumulator(requestID string) { if accumulator, exists := a.streamAccumulators.Load(requestID); exists { @@ -182,6 +225,9 @@ func (a *Accumulator) cleanupStreamAccumulator(requestID string) { for _, chunk := range acc.ChatStreamChunks { a.putChatStreamChunk(chunk) } + for _, chunk := range acc.ResponsesStreamChunks { + a.putResponsesStreamChunk(chunk) + } for _, chunk := range acc.AudioStreamChunks { a.putAudioStreamChunk(chunk) } @@ -263,7 +309,7 @@ func (a *Accumulator) appendContentToMessage(message *schemas.ChatMessage, newCo } // ProcessStreamingResponse processes a streaming response -// It handles both audio and chat streaming responses +// It handles chat, audio, and responses streaming responses func (a *Accumulator) ProcessStreamingResponse(ctx *context.Context, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*ProcessedStreamResponse, error) { // Check if this is a streaming response if result == nil { @@ -272,6 +318,8 @@ func (a *Accumulator) ProcessStreamingResponse(ctx *context.Context, result *sch requestType := result.ExtraFields.RequestType isAudioStreaming := requestType == schemas.SpeechStreamRequest || requestType == schemas.TranscriptionStreamRequest isChatStreaming := requestType == schemas.ChatCompletionStreamRequest || requestType == schemas.TextCompletionStreamRequest + isResponsesStreaming := requestType == schemas.ResponsesStreamRequest + if isChatStreaming { // Handle text-based streaming with ordered accumulation return a.processChatStreamingResponse(ctx, result, bifrostErr) @@ -283,6 +331,9 @@ func (a *Accumulator) ProcessStreamingResponse(ctx *context.Context, result *sch if requestType == schemas.SpeechStreamRequest { return a.processAudioStreamingResponse(ctx, result, bifrostErr) } + } else if isResponsesStreaming { + // Handle responses streaming with responses accumulation + return a.processResponsesStreamingResponse(ctx, result, bifrostErr) } return nil, fmt.Errorf("request type missing/invalid for accumulator") } @@ -295,6 +346,9 @@ func (a *Accumulator) Cleanup() { for _, chunk := range accumulator.ChatStreamChunks { a.chatStreamChunkPool.Put(chunk) } + for _, chunk := range accumulator.ResponsesStreamChunks { + a.responsesStreamChunkPool.Put(chunk) + } for _, chunk := range accumulator.TranscriptionStreamChunks { a.transcriptionStreamChunkPool.Put(chunk) } @@ -360,6 +414,11 @@ func NewAccumulator(pricingManager *pricing.PricingManager, logger schemas.Logge return &ChatStreamChunk{} }, }, + responsesStreamChunkPool: sync.Pool{ + New: func() any { + return &ResponsesStreamChunk{} + }, + }, audioStreamChunkPool: sync.Pool{ New: func() any { return &AudioStreamChunk{} @@ -381,6 +440,7 @@ func NewAccumulator(pricingManager *pricing.PricingManager, logger schemas.Logge // Prewarm the pools for better performance at startup for range 1000 { a.chatStreamChunkPool.Put(&ChatStreamChunk{}) + a.responsesStreamChunkPool.Put(&ResponsesStreamChunk{}) a.audioStreamChunkPool.Put(&AudioStreamChunk{}) a.transcriptionStreamChunkPool.Put(&TranscriptionStreamChunk{}) } diff --git a/framework/streaming/responses.go b/framework/streaming/responses.go new file mode 100644 index 000000000..13cf67842 --- /dev/null +++ b/framework/streaming/responses.go @@ -0,0 +1,433 @@ +package streaming + +import ( + "context" + "fmt" + "sort" + "time" + + bifrost "github.com/maximhq/bifrost/core" + "github.com/maximhq/bifrost/core/schemas" +) + +// buildCompleteMessageFromResponsesStreamChunks builds complete messages from accumulated responses stream chunks +func (a *Accumulator) buildCompleteMessageFromResponsesStreamChunks(chunks []*ResponsesStreamChunk) []schemas.ResponsesMessage { + var messages []schemas.ResponsesMessage + + // Sort chunks by sequence number to ensure correct processing order + sort.Slice(chunks, func(i, j int) bool { + if chunks[i].StreamResponse == nil || chunks[j].StreamResponse == nil { + return false + } + return chunks[i].StreamResponse.SequenceNumber < chunks[j].StreamResponse.SequenceNumber + }) + + for _, chunk := range chunks { + if chunk.StreamResponse == nil { + continue + } + + resp := chunk.StreamResponse + switch resp.Type { + case schemas.ResponsesStreamResponseTypeOutputItemAdded: + // Always append new items - this fixes multiple function calls issue + if resp.Item != nil { + messages = append(messages, *resp.Item) + } + + case schemas.ResponsesStreamResponseTypeContentPartAdded: + // Add content part to the most recent message, create message if none exists + if resp.Part != nil { + if len(messages) == 0 { + messages = append(messages, createNewMessage()) + } + + lastMsg := &messages[len(messages)-1] + if lastMsg.Content == nil { + lastMsg.Content = &schemas.ResponsesMessageContent{} + } + if lastMsg.Content.ContentBlocks == nil { + lastMsg.Content.ContentBlocks = make([]schemas.ResponsesMessageContentBlock, 0) + } + lastMsg.Content.ContentBlocks = append(lastMsg.Content.ContentBlocks, *resp.Part) + } + + case schemas.ResponsesStreamResponseTypeOutputTextDelta: + if len(messages) == 0 { + messages = append(messages, createNewMessage()) + } + // Append text delta to the most recent message + if resp.Delta != nil && resp.ContentIndex != nil && len(messages) > 0 { + a.appendTextDeltaToResponsesMessage(&messages[len(messages)-1], *resp.Delta, *resp.ContentIndex) + } + + case schemas.ResponsesStreamResponseTypeRefusalDelta: + if len(messages) == 0 { + messages = append(messages, createNewMessage()) + } + // Append refusal delta to the most recent message + if resp.Refusal != nil && resp.ContentIndex != nil && len(messages) > 0 { + a.appendRefusalDeltaToResponsesMessage(&messages[len(messages)-1], *resp.Refusal, *resp.ContentIndex) + } + + case schemas.ResponsesStreamResponseTypeFunctionCallArgumentsDelta: + if len(messages) == 0 { + messages = append(messages, createNewMessage()) + } + if resp.Item != nil { + messages = append(messages, *resp.Item) + } + // Append arguments to the most recent message + if resp.Delta != nil && len(messages) > 0 { + a.appendFunctionArgumentsDeltaToResponsesMessage(&messages[len(messages)-1], *resp.Delta) + } + } + } + + return messages +} + +func createNewMessage() schemas.ResponsesMessage { + return schemas.ResponsesMessage{ + Type: schemas.Ptr(schemas.ResponsesMessageTypeMessage), + Role: schemas.Ptr(schemas.ResponsesInputMessageRoleAssistant), + Content: &schemas.ResponsesMessageContent{ + ContentBlocks: make([]schemas.ResponsesMessageContentBlock, 0), + }, + } +} + +// appendTextDeltaToResponsesMessage appends text delta to a responses message +func (a *Accumulator) appendTextDeltaToResponsesMessage(message *schemas.ResponsesMessage, delta string, contentIndex int) { + if message.Content == nil { + message.Content = &schemas.ResponsesMessageContent{} + } + + // If we don't have content blocks yet, create them + if message.Content.ContentBlocks == nil { + message.Content.ContentBlocks = make([]schemas.ResponsesMessageContentBlock, contentIndex+1) + } + + // Ensure we have enough content blocks + for len(message.Content.ContentBlocks) <= contentIndex { + message.Content.ContentBlocks = append(message.Content.ContentBlocks, schemas.ResponsesMessageContentBlock{}) + } + + // Initialize the content block if needed + if message.Content.ContentBlocks[contentIndex].Type == "" { + message.Content.ContentBlocks[contentIndex].Type = schemas.ResponsesOutputMessageContentTypeText + message.Content.ContentBlocks[contentIndex].ResponsesOutputMessageContentText = &schemas.ResponsesOutputMessageContentText{} + } + + // Append to existing text or create new text + if message.Content.ContentBlocks[contentIndex].Text == nil { + message.Content.ContentBlocks[contentIndex].Text = &delta + } else { + *message.Content.ContentBlocks[contentIndex].Text += delta + } +} + +// appendRefusalDeltaToResponsesMessage appends refusal delta to a responses message +func (a *Accumulator) appendRefusalDeltaToResponsesMessage(message *schemas.ResponsesMessage, refusal string, contentIndex int) { + if message.Content == nil { + message.Content = &schemas.ResponsesMessageContent{} + } + + // If we don't have content blocks yet, create them + if message.Content.ContentBlocks == nil { + message.Content.ContentBlocks = make([]schemas.ResponsesMessageContentBlock, contentIndex+1) + } + + // Ensure we have enough content blocks + for len(message.Content.ContentBlocks) <= contentIndex { + message.Content.ContentBlocks = append(message.Content.ContentBlocks, schemas.ResponsesMessageContentBlock{}) + } + + // Initialize the content block if needed + if message.Content.ContentBlocks[contentIndex].Type == "" { + message.Content.ContentBlocks[contentIndex].Type = schemas.ResponsesOutputMessageContentTypeRefusal + message.Content.ContentBlocks[contentIndex].ResponsesOutputMessageContentRefusal = &schemas.ResponsesOutputMessageContentRefusal{} + } + + // Append to existing refusal text + if message.Content.ContentBlocks[contentIndex].ResponsesOutputMessageContentRefusal == nil { + message.Content.ContentBlocks[contentIndex].ResponsesOutputMessageContentRefusal = &schemas.ResponsesOutputMessageContentRefusal{ + Refusal: refusal, + } + } else { + message.Content.ContentBlocks[contentIndex].ResponsesOutputMessageContentRefusal.Refusal += refusal + } +} + +// appendFunctionArgumentsDeltaToResponsesMessage appends function arguments delta to a responses message +func (a *Accumulator) appendFunctionArgumentsDeltaToResponsesMessage(message *schemas.ResponsesMessage, arguments string) { + if message.ResponsesToolMessage == nil { + message.ResponsesToolMessage = &schemas.ResponsesToolMessage{} + } + + if message.ResponsesToolMessage.Arguments == nil { + message.ResponsesToolMessage.Arguments = &arguments + } else { + *message.ResponsesToolMessage.Arguments += arguments + } +} + +// processAccumulatedResponsesStreamingChunks processes all accumulated responses streaming chunks in order +func (a *Accumulator) processAccumulatedResponsesStreamingChunks(requestID string, respErr *schemas.BifrostError, isFinalChunk bool) (*AccumulatedData, error) { + accumulator := a.getOrCreateStreamAccumulator(requestID) + // Lock the accumulator + accumulator.mu.Lock() + defer func() { + accumulator.mu.Unlock() + if isFinalChunk { + // Before unlocking, we cleanup + defer a.cleanupStreamAccumulator(requestID) + } + }() + + // Initialize accumulated data + data := &AccumulatedData{ + RequestID: requestID, + Status: "success", + Stream: true, + StartTimestamp: accumulator.StartTimestamp, + EndTimestamp: accumulator.FinalTimestamp, + Latency: 0, + OutputMessages: nil, + ToolCalls: nil, + ErrorDetails: respErr, + TokenUsage: nil, + CacheDebug: nil, + Cost: nil, + Object: "", + } + + // Build complete messages from accumulated chunks + completeMessages := a.buildCompleteMessageFromResponsesStreamChunks(accumulator.ResponsesStreamChunks) + + if !isFinalChunk { + data.OutputMessages = completeMessages + return data, nil + } + + // Update database with complete messages + data.Status = "success" + if respErr != nil { + data.Status = "error" + } + + if accumulator.StartTimestamp.IsZero() || accumulator.FinalTimestamp.IsZero() { + data.Latency = 0 + } else { + data.Latency = accumulator.FinalTimestamp.Sub(accumulator.StartTimestamp).Nanoseconds() / 1e6 + } + + data.EndTimestamp = accumulator.FinalTimestamp + data.OutputMessages = completeMessages + + // Extract tool calls from messages + for _, msg := range completeMessages { + if msg.ResponsesToolMessage != nil { + // Add tool call info to accumulated data + // This is simplified - you might want to extract specific tool call info + } + } + + data.ErrorDetails = respErr + + // Update token usage from final chunk if available + if len(accumulator.ResponsesStreamChunks) > 0 { + lastChunk := accumulator.ResponsesStreamChunks[len(accumulator.ResponsesStreamChunks)-1] + if lastChunk.TokenUsage != nil { + data.TokenUsage = lastChunk.TokenUsage + } + // Handle cache debug + if lastChunk.SemanticCacheDebug != nil { + data.CacheDebug = lastChunk.SemanticCacheDebug + } + } + + // Update cost from final chunk if available + if len(accumulator.ResponsesStreamChunks) > 0 { + lastChunk := accumulator.ResponsesStreamChunks[len(accumulator.ResponsesStreamChunks)-1] + if lastChunk.Cost != nil { + data.Cost = lastChunk.Cost + } + data.FinishReason = lastChunk.FinishReason + } + + // Update object field from accumulator (stored once for the entire stream) + if accumulator.Object != "" { + data.Object = accumulator.Object + } + + return data, nil +} + +// processResponsesStreamingResponse processes a responses streaming response +func (a *Accumulator) processResponsesStreamingResponse(ctx *context.Context, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*ProcessedStreamResponse, error) { + a.logger.Debug("[streaming] processing responses streaming response") + + // Extract request ID from context + requestID, ok := (*ctx).Value(schemas.BifrostContextKeyRequestID).(string) + if !ok || requestID == "" { + return nil, fmt.Errorf("request-id not found in context or is empty") + } + + _, provider, model := bifrost.GetRequestFields(result, bifrostErr) + + accumulator := a.getOrCreateStreamAccumulator(requestID) + accumulator.mu.Lock() + startTimestamp := accumulator.StartTimestamp + endTimestamp := accumulator.FinalTimestamp + accumulator.mu.Unlock() + + // For OpenAI provider, the last chunk already contains the whole accumulated response + // so just return it as is + if provider == "openai" { + isFinalChunk := bifrost.IsFinalChunk(ctx) + if isFinalChunk { + // For OpenAI, the final chunk contains the complete response + // Extract the complete response and return it + if result != nil && result.ResponsesStreamResponse != nil { + // Build the complete response from the final chunk + data := &AccumulatedData{ + RequestID: requestID, + Status: "success", + Stream: true, + StartTimestamp: startTimestamp, + EndTimestamp: endTimestamp, + Latency: result.ExtraFields.Latency, + ErrorDetails: bifrostErr, + Object: result.Object, + } + + if bifrostErr != nil { + data.Status = "error" + } + + // Extract the complete response from the stream response + if result.ResponsesStreamResponse.Response != nil && result.ResponsesStreamResponse.Response.ResponsesResponse != nil { + data.OutputMessages = result.ResponsesStreamResponse.Response.ResponsesResponse.Output + if result.ResponsesStreamResponse.Response.Usage != nil { + // Convert ResponsesResponseUsage to schemas.LLMUsage + data.TokenUsage = &schemas.LLMUsage{ + ResponsesExtendedResponseUsage: &schemas.ResponsesExtendedResponseUsage{ + InputTokens: result.ResponsesStreamResponse.Response.Usage.InputTokens, + OutputTokens: result.ResponsesStreamResponse.Response.Usage.OutputTokens, + }, + TotalTokens: result.ResponsesStreamResponse.Response.Usage.TotalTokens, + } + } + } + + if a.pricingManager != nil { + cost := a.pricingManager.CalculateCostWithCacheDebug(result) + data.Cost = bifrost.Ptr(cost) + } + + return &ProcessedStreamResponse{ + Type: StreamResponseTypeFinal, + RequestID: requestID, + StreamType: StreamTypeResponses, + Provider: provider, + Model: model, + Data: data, + }, nil + } + } + + // For non-final chunks from OpenAI, just pass through + return &ProcessedStreamResponse{ + Type: StreamResponseTypeDelta, + RequestID: requestID, + StreamType: StreamTypeResponses, + Provider: provider, + Model: model, + Data: nil, // No accumulated data for delta responses + }, nil + } + + // For non-OpenAI providers, use the accumulation logic + isFinalChunk := bifrost.IsFinalChunk(ctx) + chunk := a.getResponsesStreamChunk() + chunk.Timestamp = time.Now() + chunk.ErrorDetails = bifrostErr + + if bifrostErr != nil { + chunk.FinishReason = bifrost.Ptr("error") + } else if result != nil && result.ResponsesStreamResponse != nil { + // Store the stream response + chunk.StreamResponse = result.ResponsesStreamResponse + + // Extract token usage from stream response if available + if result.ResponsesStreamResponse.Response != nil && + result.ResponsesStreamResponse.Response.Usage != nil { + chunk.TokenUsage = &schemas.LLMUsage{ + ResponsesExtendedResponseUsage: &schemas.ResponsesExtendedResponseUsage{ + InputTokens: result.ResponsesStreamResponse.Response.Usage.InputTokens, + OutputTokens: result.ResponsesStreamResponse.Response.Usage.OutputTokens, + }, + TotalTokens: result.ResponsesStreamResponse.Response.Usage.TotalTokens, + } + } + } + + // Add chunk to accumulator synchronously to maintain order + object := "" + if result != nil { + if isFinalChunk { + if a.pricingManager != nil { + cost := a.pricingManager.CalculateCostWithCacheDebug(result) + chunk.Cost = bifrost.Ptr(cost) + } + chunk.SemanticCacheDebug = result.ExtraFields.CacheDebug + } + object = result.Object + } + + if addErr := a.addResponsesStreamChunk(requestID, chunk, object, isFinalChunk); addErr != nil { + return nil, fmt.Errorf("failed to add responses stream chunk for request %s: %w", requestID, addErr) + } + + // If this is the final chunk, process accumulated chunks + if isFinalChunk { + shouldProcess := false + // Get the accumulator to check if processing has already been triggered + accumulator := a.getOrCreateStreamAccumulator(requestID) + accumulator.mu.Lock() + shouldProcess = !accumulator.IsComplete + // Mark as complete when we're about to process + if shouldProcess { + accumulator.IsComplete = true + } + accumulator.mu.Unlock() + + if shouldProcess { + data, processErr := a.processAccumulatedResponsesStreamingChunks(requestID, bifrostErr, isFinalChunk) + if processErr != nil { + a.logger.Error("failed to process accumulated responses chunks for request %s: %v", requestID, processErr) + return nil, processErr + } + + return &ProcessedStreamResponse{ + Type: StreamResponseTypeFinal, + RequestID: requestID, + StreamType: StreamTypeResponses, + Provider: provider, + Model: model, + Data: data, + }, nil + } + return nil, nil + } + + return &ProcessedStreamResponse{ + Type: StreamResponseTypeDelta, + RequestID: requestID, + StreamType: StreamTypeResponses, + Provider: provider, + Model: model, + Data: nil, + }, nil +} diff --git a/framework/streaming/types.go b/framework/streaming/types.go index 93e9b8635..11bfa92e2 100644 --- a/framework/streaming/types.go +++ b/framework/streaming/types.go @@ -14,6 +14,7 @@ const ( StreamTypeChat StreamType = "chat.completion" StreamTypeAudio StreamType = "audio.speech" StreamTypeTranscription StreamType = "audio.transcription" + StreamTypeResponses StreamType = "responses" ) type StreamResponseType string @@ -33,6 +34,7 @@ type AccumulatedData struct { StartTimestamp time.Time EndTimestamp time.Time OutputMessage *schemas.ChatMessage + OutputMessages []schemas.ResponsesMessage // For responses API ToolCalls []schemas.ChatAssistantMessageToolCall ErrorDetails *schemas.BifrostError TokenUsage *schemas.LLMUsage @@ -78,11 +80,23 @@ type ChatStreamChunk struct { ErrorDetails *schemas.BifrostError // Error if any } +// ResponsesStreamChunk represents a single responses streaming chunk +type ResponsesStreamChunk struct { + Timestamp time.Time // When chunk was received + StreamResponse *schemas.ResponsesStreamResponse // The actual stream response + FinishReason *string // If this is the final chunk + TokenUsage *schemas.LLMUsage // Token usage if available + SemanticCacheDebug *schemas.BifrostCacheDebug // Semantic cache debug if available + Cost *float64 // Cost in dollars from pricing plugin + ErrorDetails *schemas.BifrostError // Error if any +} + // StreamAccumulator manages accumulation of streaming chunks type StreamAccumulator struct { RequestID string StartTimestamp time.Time ChatStreamChunks []*ChatStreamChunk + ResponsesStreamChunks []*ResponsesStreamChunk TranscriptionStreamChunks []*TranscriptionStreamChunk AudioStreamChunks []*AudioStreamChunk IsComplete bool @@ -139,6 +153,13 @@ func (p *ProcessedStreamResponse) ToBifrostResponse() *schemas.BifrostResponse { choice, } } + // Handle responses output + if p.Data.OutputMessages != nil { + resp.ResponsesResponse = &schemas.ResponsesResponse{ + CreatedAt: int(p.Data.StartTimestamp.Unix()), + Output: p.Data.OutputMessages, + } + } resp.Model = p.Model resp.Created = int(p.Data.StartTimestamp.Unix()) if p.Data.TokenUsage != nil { diff --git a/plugins/logging/main.go b/plugins/logging/main.go index 6debdd029..606986a39 100644 --- a/plugins/logging/main.go +++ b/plugins/logging/main.go @@ -5,9 +5,7 @@ package logging import ( "context" - "errors" "fmt" - "strings" "sync" "sync/atomic" "time" @@ -38,7 +36,6 @@ const ( // Context keys for logging optimization const ( DroppedCreateContextKey ContextKey = "logging-dropped" - CreatedTimestampKey ContextKey = "logging-created-timestamp" ) // UpdateLogData contains data for log entry updates @@ -46,12 +43,12 @@ type UpdateLogData struct { Status string TokenUsage *schemas.LLMUsage Cost *float64 // Cost in dollars from pricing plugin - OutputMessage *schemas.ChatMessage + ChatOutput *schemas.ChatMessage + ResponsesOutput []schemas.ResponsesMessage EmbeddingOutput []schemas.BifrostEmbedding - ToolCalls []schemas.ChatAssistantMessageToolCall ErrorDetails *schemas.BifrostError - Model string // May be different from request - Object string // May be different from request + Model string + Object string SpeechOutput *schemas.BifrostSpeech // For non-streaming speech responses TranscriptionOutput *schemas.BifrostTranscribe // For non-streaming transcription responses RawResponse interface{} @@ -63,6 +60,7 @@ type LogMessage struct { RequestID string // Unique ID for the request ParentRequestID string // Unique ID for the parent request Timestamp time.Time // Of the preHook/postHook call + Latency int64 // For latency updates InitialData *InitialLogData // For create operations SemanticCacheDebug *schemas.BifrostCacheDebug // For semantic cache operations UpdateData *UpdateLogData // For update operations @@ -101,39 +99,6 @@ type LoggerPlugin struct { accumulator *streaming.Accumulator // Accumulator for streaming chunks } -// retryOnNotFound retries a function up to 3 times with 1-second delays if it returns logstore.ErrNotFound -func retryOnNotFound(ctx context.Context, operation func() error) error { - const maxRetries = 3 - const retryDelay = time.Second - - var lastErr error - for attempt := 0; attempt < maxRetries; attempt++ { - err := operation() - if err == nil { - return nil - } - - // Check if the error is logstore.ErrNotFound - if !errors.Is(err, logstore.ErrNotFound) { - return err - } - - lastErr = err - - // Don't wait after the last attempt - if attempt < maxRetries-1 { - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(retryDelay): - // Continue to next retry - } - } - } - - return lastErr -} - // Init creates new logger plugin with given log store func Init(ctx context.Context, logger schemas.Logger, logsStore logstore.LogStore, pricingManager *pricing.PricingManager) (*LoggerPlugin, error) { if logsStore == nil { @@ -237,14 +202,12 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest if bifrost.IsStreamRequestType(req.RequestType) { p.accumulator.CreateStreamAccumulator(requestID, createdTimestamp) } - // Prepare initial log data - objectType := p.determineObjectType(req.RequestType) inputHistory := p.extractInputHistory(req) initialData := &InitialLogData{ Provider: string(req.Provider), Model: req.Model, - Object: objectType, + Object: string(req.RequestType), InputHistory: inputHistory, } @@ -275,7 +238,7 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest initialData.Params = req.TranscriptionRequest.Params initialData.TranscriptionInput = req.TranscriptionRequest.Input } - *ctx = context.WithValue(*ctx, CreatedTimestampKey, createdTimestamp) + // Queue the log creation message (non-blocking) - Using sync.Pool logMsg := p.getLogMessage() logMsg.Operation = LogOperationCreate @@ -325,7 +288,6 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest // PostHook is called after a response is received - FULLY ASYNC, NO DATABASE I/O func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError, error) { - p.logger.Debug("running post-hook for plugin logging") if ctx == nil { // Log error but don't fail the request p.logger.Error("context is nil in PostHook") @@ -350,7 +312,13 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes // Queue the log update message (non-blocking) - use same pattern for both streaming and regular logMsg := p.getLogMessage() logMsg.RequestID = requestID - logMsg.Timestamp = time.Now() + + if result != nil { + logMsg.Latency = result.ExtraFields.Latency + } else { + logMsg.Latency = 0 + } + // If response is nil, and there is an error, we update log with error if result == nil && bifrostErr != nil { // If request type is streaming, then we trigger cleanup as well @@ -363,7 +331,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes ErrorDetails: bifrostErr, } processingErr := retryOnNotFound(p.ctx, func() error { - return p.updateLogEntry(p.ctx, logMsg.RequestID, logMsg.Timestamp, logMsg.SemanticCacheDebug, logMsg.UpdateData) + return p.updateLogEntry(p.ctx, logMsg.RequestID, logMsg.Latency, logMsg.SemanticCacheDebug, logMsg.UpdateData) }) if processingErr != nil { p.logger.Error("failed to process log update for request %s: %v", logMsg.RequestID, processingErr) @@ -395,7 +363,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes go func() { defer p.putLogMessage(logMsg) // Return to pool when done processingErr := retryOnNotFound(p.ctx, func() error { - return p.updateStreamingLogEntry(p.ctx, logMsg.RequestID, logMsg.Timestamp, logMsg.SemanticCacheDebug, logMsg.StreamResponse, streamResponse.Type == streaming.StreamResponseTypeFinal) + return p.updateStreamingLogEntry(p.ctx, logMsg.RequestID, logMsg.SemanticCacheDebug, logMsg.StreamResponse, streamResponse.Type == streaming.StreamResponseTypeFinal) }) if processingErr != nil { p.logger.Error("failed to process stream update for request %s: %v", logMsg.RequestID, processingErr) @@ -412,7 +380,6 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes } }() } - } else { // Handle regular response logMsg.Operation = LogOperationUpdate @@ -426,11 +393,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes // Success case updateData.Status = "success" if result.Model != "" { - updateData.Model = result.Model - } - // Update object type if available - if result.Object != "" { - updateData.Object = result.Object + updateData.Model = result.ExtraFields.ModelRequested } // Token usage if result.Usage != nil && result.Usage.TotalTokens > 0 { @@ -443,7 +406,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes if len(result.Choices) > 0 { choice := result.Choices[0] if choice.BifrostTextCompletionResponseChoice != nil { - updateData.OutputMessage = &schemas.ChatMessage{ + updateData.ChatOutput = &schemas.ChatMessage{ Role: schemas.ChatMessageRoleAssistant, Content: &schemas.ChatMessageContent{ ContentStr: choice.BifrostTextCompletionResponseChoice.Text, @@ -452,29 +415,11 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes } // Check if this is a non-stream response choice if choice.BifrostNonStreamResponseChoice != nil { - updateData.OutputMessage = choice.BifrostNonStreamResponseChoice.Message - // Extract tool calls if present - if choice.BifrostNonStreamResponseChoice.Message.ChatAssistantMessage != nil && - choice.BifrostNonStreamResponseChoice.Message.ChatAssistantMessage.ToolCalls != nil { - updateData.ToolCalls = choice.BifrostNonStreamResponseChoice.Message.ChatAssistantMessage.ToolCalls - } + updateData.ChatOutput = choice.BifrostNonStreamResponseChoice.Message } } if result.ResponsesResponse != nil { - outputMessages := result.ResponsesResponse.Output - if len(outputMessages) > 0 { - chatMessages := schemas.ToChatMessages(outputMessages) - if len(chatMessages) > 0 { - lastMessage := chatMessages[len(chatMessages)-1] - updateData.OutputMessage = &lastMessage - - // Extract tool calls if present - if lastMessage.ChatAssistantMessage != nil && - lastMessage.ChatAssistantMessage.ToolCalls != nil { - updateData.ToolCalls = lastMessage.ChatAssistantMessage.ToolCalls - } - } - } + updateData.ResponsesOutput = result.ResponsesResponse.Output } if result.Data != nil { updateData.EmbeddingOutput = result.Data @@ -528,7 +473,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes } // Here we pass plugin level context for background processing to avoid context cancellation processingErr := retryOnNotFound(p.ctx, func() error { - return p.updateLogEntry(p.ctx, logMsg.RequestID, logMsg.Timestamp, logMsg.SemanticCacheDebug, logMsg.UpdateData) + return p.updateLogEntry(p.ctx, logMsg.RequestID, logMsg.Latency, logMsg.SemanticCacheDebug, logMsg.UpdateData) }) if processingErr != nil { p.logger.Error("failed to process log update for request %s: %v", logMsg.RequestID, processingErr) @@ -562,92 +507,3 @@ func (p *LoggerPlugin) Cleanup() error { // GORM handles connection cleanup automatically return nil } - -// Helper methods - -// determineObjectType determines the object type from request input -func (p *LoggerPlugin) determineObjectType(requestType schemas.RequestType) string { - switch requestType { - case schemas.TextCompletionRequest, schemas.TextCompletionStreamRequest: - return "text.completion" - case schemas.ChatCompletionRequest: - return "chat.completion" - case schemas.ChatCompletionStreamRequest: - return "chat.completion.chunk" - case schemas.ResponsesRequest: - return "response" - case schemas.ResponsesStreamRequest: - return "response.completion.chunk" - case schemas.EmbeddingRequest: - return "list" - case schemas.SpeechRequest: - return "audio.speech" - case schemas.SpeechStreamRequest: - return "audio.speech.chunk" - case schemas.TranscriptionRequest: - return "audio.transcription" - case schemas.TranscriptionStreamRequest: - return "audio.transcription.chunk" - } - return "unknown" -} - -// extractInputHistory extracts input history from request input -// extractInputHistory extracts input history from request input -func (p *LoggerPlugin) extractInputHistory(request *schemas.BifrostRequest) []schemas.ChatMessage { - if request.ChatRequest != nil { - return request.ChatRequest.Input - } - if request.ResponsesRequest != nil { - messages := schemas.ToChatMessages(request.ResponsesRequest.Input) - if len(messages) > 0 { - return messages - } - } - if request.TextCompletionRequest != nil { - var text string - if request.TextCompletionRequest.Input.PromptStr != nil { - text = *request.TextCompletionRequest.Input.PromptStr - } else { - var stringBuilder strings.Builder - for _, prompt := range request.TextCompletionRequest.Input.PromptArray { - stringBuilder.WriteString(prompt) - } - text = stringBuilder.String() - } - return []schemas.ChatMessage{ - { - Role: schemas.ChatMessageRoleUser, - Content: &schemas.ChatMessageContent{ - ContentStr: &text, - }, - }, - } - } - if request.EmbeddingRequest != nil { - texts := request.EmbeddingRequest.Input.Texts - - if len(texts) == 0 && request.EmbeddingRequest.Input.Text != nil { - texts = []string{*request.EmbeddingRequest.Input.Text} - } - - contentBlocks := make([]schemas.ChatContentBlock, len(texts)) - for i, text := range texts { - // Create a per-iteration copy to avoid reusing the same memory address - t := text - contentBlocks[i] = schemas.ChatContentBlock{ - Type: schemas.ChatContentBlockTypeText, - Text: &t, - } - } - return []schemas.ChatMessage{ - { - Role: schemas.ChatMessageRoleUser, - Content: &schemas.ChatMessageContent{ - ContentBlocks: contentBlocks, - }, - }, - } - } - return []schemas.ChatMessage{} -} diff --git a/plugins/logging/operations.go b/plugins/logging/operations.go index 8bd44aa1f..083c8e145 100644 --- a/plugins/logging/operations.go +++ b/plugins/logging/operations.go @@ -39,15 +39,10 @@ func (p *LoggerPlugin) insertInitialLogEntry(ctx context.Context, requestID stri } // updateLogEntry updates an existing log entry using GORM -func (p *LoggerPlugin) updateLogEntry(ctx context.Context, requestID string, timestamp time.Time, cacheDebug *schemas.BifrostCacheDebug, data *UpdateLogData) error { +func (p *LoggerPlugin) updateLogEntry(ctx context.Context, requestID string, latency int64, cacheDebug *schemas.BifrostCacheDebug, data *UpdateLogData) error { updates := make(map[string]interface{}) - if !timestamp.IsZero() { - // Try to get original timestamp from context first for latency calculation - latency, err := p.calculateLatency(ctx, requestID, timestamp) - if err != nil { - return err - } - updates["latency"] = latency + if latency != 0 { + updates["latency"] = float64(latency) } updates["status"] = data.Status if data.Model != "" { @@ -58,8 +53,8 @@ func (p *LoggerPlugin) updateLogEntry(ctx context.Context, requestID string, tim } // Handle JSON fields by setting them on a temporary entry and serializing tempEntry := &logstore.Log{} - if data.OutputMessage != nil { - tempEntry.OutputMessageParsed = data.OutputMessage + if data.ChatOutput != nil { + tempEntry.OutputMessageParsed = data.ChatOutput if err := tempEntry.SerializeFields(); err != nil { p.logger.Error("failed to serialize output message: %v", err) } else { @@ -68,21 +63,21 @@ func (p *LoggerPlugin) updateLogEntry(ctx context.Context, requestID string, tim } } - if data.EmbeddingOutput != nil { - tempEntry.EmbeddingOutputParsed = data.EmbeddingOutput + if data.ResponsesOutput != nil { + tempEntry.ResponsesOutputParsed = data.ResponsesOutput if err := tempEntry.SerializeFields(); err != nil { - p.logger.Error("failed to serialize embedding output: %v", err) + p.logger.Error("failed to serialize responses output: %v", err) } else { - updates["embedding_output"] = tempEntry.EmbeddingOutput + updates["responses_output"] = tempEntry.ResponsesOutput } } - if data.ToolCalls != nil { - tempEntry.ToolCallsParsed = data.ToolCalls + if data.EmbeddingOutput != nil { + tempEntry.EmbeddingOutputParsed = data.EmbeddingOutput if err := tempEntry.SerializeFields(); err != nil { - p.logger.Error("failed to serialize tool calls: %v", err) + p.logger.Error("failed to serialize embedding output: %v", err) } else { - updates["tool_calls"] = tempEntry.ToolCalls + updates["embedding_output"] = tempEntry.EmbeddingOutput } } @@ -153,26 +148,11 @@ func (p *LoggerPlugin) updateLogEntry(ctx context.Context, requestID string, tim } // updateStreamingLogEntry handles streaming updates using GORM -func (p *LoggerPlugin) updateStreamingLogEntry(ctx context.Context, requestID string, timestamp time.Time, cacheDebug *schemas.BifrostCacheDebug, streamResponse *streaming.ProcessedStreamResponse, isFinalChunk bool) error { +func (p *LoggerPlugin) updateStreamingLogEntry(ctx context.Context, requestID string, cacheDebug *schemas.BifrostCacheDebug, streamResponse *streaming.ProcessedStreamResponse, isFinalChunk bool) error { p.logger.Debug("[logging] updating streaming log entry %s", requestID) updates := make(map[string]interface{}) // Handle error case first if streamResponse.Data.ErrorDetails != nil { - latency, err := p.calculateLatency(ctx, requestID, timestamp) - if err != nil { - // If we can't get created_at, just update status and error - tempEntry := &logstore.Log{} - tempEntry.ErrorDetailsParsed = streamResponse.Data.ErrorDetails - if err := tempEntry.SerializeFields(); err == nil { - return p.store.Update(ctx, requestID, map[string]interface{}{ - "status": "error", - "error_details": tempEntry.ErrorDetails, - "timestamp": timestamp, - }) - } - return err - } - tempEntry := &logstore.Log{} tempEntry.ErrorDetailsParsed = streamResponse.Data.ErrorDetails if err := tempEntry.SerializeFields(); err != nil { @@ -180,20 +160,18 @@ func (p *LoggerPlugin) updateStreamingLogEntry(ctx context.Context, requestID st } return p.store.Update(ctx, requestID, map[string]interface{}{ "status": "error", - "latency": latency, - "timestamp": timestamp, + "latency": float64(streamResponse.Data.Latency), "error_details": tempEntry.ErrorDetails, }) } // Always mark as streaming and update timestamp updates["stream"] = true - updates["timestamp"] = timestamp // Calculate latency when stream finishes tempEntry := &logstore.Log{} - updates["latency"] = streamResponse.Data.Latency + updates["latency"] = float64(streamResponse.Data.Latency) // Update model if provided if streamResponse.Data.Model != "" { @@ -252,14 +230,6 @@ func (p *LoggerPlugin) updateStreamingLogEntry(ctx context.Context, requestID st updates["cache_debug"] = tempEntry.CacheDebug } } - if streamResponse.Data.ToolCalls != nil { - tempEntry.ToolCallsParsed = streamResponse.Data.ToolCalls - if err := tempEntry.SerializeFields(); err != nil { - p.logger.Error("failed to serialize tool calls: %v", err) - } else { - updates["tool_calls"] = tempEntry.ToolCalls - } - } // Create content summary if streamResponse.Data.OutputMessage != nil { tempEntry.OutputMessageParsed = streamResponse.Data.OutputMessage @@ -270,6 +240,15 @@ func (p *LoggerPlugin) updateStreamingLogEntry(ctx context.Context, requestID st updates["content_summary"] = tempEntry.ContentSummary } } + // Handle responses output from stream updates + if streamResponse.Data.OutputMessages != nil { + tempEntry.ResponsesOutputParsed = streamResponse.Data.OutputMessages + if err := tempEntry.SerializeFields(); err != nil { + p.logger.Error("failed to serialize responses output: %v", err) + } else { + updates["responses_output"] = tempEntry.ResponsesOutput + } + } // Only perform update if there's something to update if len(updates) > 0 { return p.store.Update(ctx, requestID, updates) @@ -277,24 +256,6 @@ func (p *LoggerPlugin) updateStreamingLogEntry(ctx context.Context, requestID st return nil } -// calculateLatency computes latency in milliseconds from creation time -func (p *LoggerPlugin) calculateLatency(ctx context.Context, requestID string, currentTime time.Time) (float64, error) { - // Try to get original timestamp from context first - if ctxTimestamp, ok := ctx.Value(CreatedTimestampKey).(time.Time); ok { - return float64(currentTime.Sub(ctxTimestamp).Nanoseconds()) / 1e6, nil - } - var originalEntry *logstore.Log - err := retryOnNotFound(ctx, func() error { - var opErr error - originalEntry, opErr = p.store.FindFirst(ctx, map[string]interface{}{"id": requestID}, "created_at") - return opErr - }) - if err != nil { - return 0, err - } - return float64(currentTime.Sub(originalEntry.CreatedAt).Nanoseconds()) / 1e6, nil -} - // getLogEntry retrieves a log entry by ID using GORM func (p *LoggerPlugin) getLogEntry(ctx context.Context, requestID string) (*logstore.Log, error) { entry, err := p.store.FindFirst(ctx, map[string]interface{}{"id": requestID}) diff --git a/plugins/logging/pool.go b/plugins/logging/pool.go index afdd7ee0e..24a532bab 100644 --- a/plugins/logging/pool.go +++ b/plugins/logging/pool.go @@ -17,9 +17,10 @@ func (p *LoggerPlugin) putLogMessage(msg *LogMessage) { msg.Timestamp = time.Time{} msg.InitialData = nil - // Don't reset UpdateData and StreamUpdateData here since they're returned + // Don't reset UpdateData and StreamResponse here since they're returned // to their own pools in the defer function - just clear the pointers - msg.UpdateData = nil + msg.UpdateData = nil + msg.StreamResponse = nil p.logMsgPool.Put(msg) } @@ -34,14 +35,14 @@ func (p *LoggerPlugin) putUpdateLogData(data *UpdateLogData) { // Reset all fields to avoid memory leaks data.Status = "" data.TokenUsage = nil - data.OutputMessage = nil - data.ToolCalls = nil + data.ChatOutput = nil + data.ResponsesOutput = nil data.ErrorDetails = nil data.Model = "" data.Object = "" data.SpeechOutput = nil data.TranscriptionOutput = nil - data.EmbeddingOutput = nil - data.Cost = nil + data.EmbeddingOutput = nil + data.Cost = nil p.updateDataPool.Put(data) } diff --git a/plugins/logging/utils.go b/plugins/logging/utils.go index 22b985a02..db5150c96 100644 --- a/plugins/logging/utils.go +++ b/plugins/logging/utils.go @@ -3,8 +3,12 @@ package logging import ( "context" + "errors" "fmt" + "strings" + "time" + "github.com/maximhq/bifrost/core/schemas" "github.com/maximhq/bifrost/framework/logstore" ) @@ -47,3 +51,95 @@ func (p *LoggerPlugin) GetPluginLogManager() *PluginLogManager { plugin: p, } } + +// retryOnNotFound retries a function up to 3 times with 1-second delays if it returns logstore.ErrNotFound +func retryOnNotFound(ctx context.Context, operation func() error) error { + const maxRetries = 3 + const retryDelay = time.Second + + var lastErr error + for attempt := range maxRetries { + err := operation() + if err == nil { + return nil + } + + // Check if the error is logstore.ErrNotFound + if !errors.Is(err, logstore.ErrNotFound) { + return err + } + + lastErr = err + + // Don't wait after the last attempt + if attempt < maxRetries-1 { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryDelay): + // Continue to next retry + } + } + } + + return lastErr +} + +// extractInputHistory extracts input history from request input +func (p *LoggerPlugin) extractInputHistory(request *schemas.BifrostRequest) []schemas.ChatMessage { + if request.ChatRequest != nil { + return request.ChatRequest.Input + } + if request.ResponsesRequest != nil { + messages := schemas.ToChatMessages(request.ResponsesRequest.Input) + if len(messages) > 0 { + return messages + } + } + if request.TextCompletionRequest != nil { + var text string + if request.TextCompletionRequest.Input.PromptStr != nil { + text = *request.TextCompletionRequest.Input.PromptStr + } else { + var stringBuilder strings.Builder + for _, prompt := range request.TextCompletionRequest.Input.PromptArray { + stringBuilder.WriteString(prompt) + } + text = stringBuilder.String() + } + return []schemas.ChatMessage{ + { + Role: schemas.ChatMessageRoleUser, + Content: &schemas.ChatMessageContent{ + ContentStr: &text, + }, + }, + } + } + if request.EmbeddingRequest != nil { + texts := request.EmbeddingRequest.Input.Texts + + if len(texts) == 0 && request.EmbeddingRequest.Input.Text != nil { + texts = []string{*request.EmbeddingRequest.Input.Text} + } + + contentBlocks := make([]schemas.ChatContentBlock, len(texts)) + for i, text := range texts { + // Create a per-iteration copy to avoid reusing the same memory address + t := text + contentBlocks[i] = schemas.ChatContentBlock{ + Type: schemas.ChatContentBlockTypeText, + Text: &t, + } + } + return []schemas.ChatMessage{ + { + Role: schemas.ChatMessageRoleUser, + Content: &schemas.ChatMessageContent{ + ContentBlocks: contentBlocks, + }, + }, + } + } + return []schemas.ChatMessage{} +} diff --git a/transports/bifrost-http/handlers/inference.go b/transports/bifrost-http/handlers/inference.go index 94ed91f85..cd5dca839 100644 --- a/transports/bifrost-http/handlers/inference.go +++ b/transports/bifrost-http/handlers/inference.go @@ -668,6 +668,10 @@ func (h *CompletionHandler) transcription(ctx *fasthttp.RequestCtx) { } provider, modelName := schemas.ParseModelString(modelValues[0], "") + if provider == "" || modelName == "" { + SendError(ctx, fasthttp.StatusBadRequest, "model should be in provider/model format", h.logger) + return + } // Extract file (required) fileHeaders := form.File["file"] diff --git a/ui/app/logs/views/logChatMessageView.tsx b/ui/app/logs/views/logChatMessageView.tsx new file mode 100644 index 000000000..879b01c4c --- /dev/null +++ b/ui/app/logs/views/logChatMessageView.tsx @@ -0,0 +1,181 @@ +import { ChatMessage, ContentBlock } from "@/lib/types/logs"; +import { CodeEditor } from "./codeEditor"; +import { isJson, cleanJson } from "@/lib/utils/validation"; + +interface LogChatMessageViewProps { + message: ChatMessage; +} + +const renderContentBlock = (block: ContentBlock, index: number) => { + return ( +