Skip to content

Commit 06e12ff

Browse files
authored
feat: add per-token latency tracking for streaming responses (#596)
## Summary Add per-token latency tracking for streaming responses to improve observability and performance monitoring. ## Changes - Added latency tracking for each token in streaming responses across all providers (Anthropic, Bedrock, Cohere, Gemini, OpenAI) - Implemented two types of latency measurements: - Per-chunk latency: Time since the last chunk was received - Total latency: Time from the start of the request to the final chunk - Added a new Prometheus metric `bifrost_stream_token_latency_seconds` to track token latency - Enhanced the telemetry plugin to record these metrics for each streaming chunk - Improved JSON marshaling for BifrostStream to prevent field conflicts - Simplified the streaming response handler in HTTP transport ## Type of change - [x] Feature - [x] Refactor ## Affected areas - [x] Core (Go) - [x] Transports (HTTP) - [x] Providers/Integrations - [x] Plugins ## How to test Test streaming responses with different providers and verify latency metrics are being recorded: ```sh # Start Bifrost with telemetry plugin enabled go run cmd/bifrost/main.go # Make streaming requests to different providers curl -X POST http://localhost:8000/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{"model":"gpt-3.5-turbo","messages":[{"role":"user","content":"Write a short story"}],"stream":true}' # Check Prometheus metrics curl http://localhost:8000/metrics | grep bifrost_stream_token_latency_seconds ``` ## Breaking changes - [x] No ## Related issues Improves observability for streaming responses, which helps diagnose performance issues. ## Security considerations No security implications as this only adds internal performance tracking. ## Checklist - [x] I added/updated tests where appropriate - [x] I verified builds succeed (Go and UI)
2 parents 38338a2 + 45d8ac2 commit 06e12ff

File tree

31 files changed

+219
-131
lines changed

31 files changed

+219
-131
lines changed

core/changelog.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
<!-- The pattern we follow here is to keep the changelog for the latest version -->
22
<!-- Old changelogs are automatically attached to the GitHub releases -->
33

4-
- Fix: Anthropic tool results aggregation logic.
4+
- Feat: Stream token latency sent back in extra fields.
5+
- Feat: Plugin interface extended with TransportInterceptor method.
6+
- Feat: Add Anthropic thinking parameter
7+
- Feat: Add Custom key selector logic and send back request latency in extra fields.
8+
- Bug: Fallbacks not working occasionally.

core/providers/anthropic.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,9 @@ func handleAnthropicStreaming(
444444
scanner := bufio.NewScanner(resp.Body)
445445
chunkIndex := -1
446446

447+
startTime := time.Now()
448+
lastChunkTime := startTime
449+
447450
// Track minimal state needed for response format
448451
var messageID string
449452
var modelName string
@@ -527,8 +530,10 @@ func handleAnthropicStreaming(
527530
Provider: providerType,
528531
ModelRequested: modelName,
529532
ChunkIndex: chunkIndex,
533+
Latency: time.Since(lastChunkTime).Milliseconds(),
530534
},
531535
}
536+
lastChunkTime = time.Now()
532537

533538
// Use utility function to process and send response
534539
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
@@ -573,8 +578,10 @@ func handleAnthropicStreaming(
573578
Provider: providerType,
574579
ModelRequested: modelName,
575580
ChunkIndex: chunkIndex,
581+
Latency: time.Since(lastChunkTime).Milliseconds(),
576582
},
577583
}
584+
lastChunkTime = time.Now()
578585

579586
// Use utility function to process and send response
580587
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
@@ -610,8 +617,10 @@ func handleAnthropicStreaming(
610617
Provider: providerType,
611618
ModelRequested: modelName,
612619
ChunkIndex: chunkIndex,
620+
Latency: time.Since(lastChunkTime).Milliseconds(),
613621
},
614622
}
623+
lastChunkTime = time.Now()
615624

616625
// Use utility function to process and send response
617626
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
@@ -646,8 +655,10 @@ func handleAnthropicStreaming(
646655
Provider: providerType,
647656
ModelRequested: modelName,
648657
ChunkIndex: chunkIndex,
658+
Latency: time.Since(lastChunkTime).Milliseconds(),
649659
},
650660
}
661+
lastChunkTime = time.Now()
651662

652663
// Use utility function to process and send response
653664
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
@@ -683,8 +694,10 @@ func handleAnthropicStreaming(
683694
Provider: providerType,
684695
ModelRequested: modelName,
685696
ChunkIndex: chunkIndex,
697+
Latency: time.Since(lastChunkTime).Milliseconds(),
686698
},
687699
}
700+
lastChunkTime = time.Now()
688701

689702
// Use utility function to process and send response
690703
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
@@ -713,8 +726,10 @@ func handleAnthropicStreaming(
713726
Provider: providerType,
714727
ModelRequested: modelName,
715728
ChunkIndex: chunkIndex,
729+
Latency: time.Since(lastChunkTime).Milliseconds(),
716730
},
717731
}
732+
lastChunkTime = time.Now()
718733

719734
// Use utility function to process and send response
720735
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
@@ -774,6 +789,7 @@ func handleAnthropicStreaming(
774789
processAndSendError(ctx, postHookRunner, err, responseChan, schemas.ChatCompletionStreamRequest, providerType, modelName, logger)
775790
} else {
776791
response := createBifrostChatCompletionChunkResponse(messageID, usage, finishReason, chunkIndex, schemas.ChatCompletionStreamRequest, providerType, modelName)
792+
response.ExtraFields.Latency = time.Since(startTime).Milliseconds()
777793
handleStreamEndWithSuccess(ctx, response, postHookRunner, responseChan, logger)
778794
}
779795
}()

core/providers/bedrock.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -644,14 +644,16 @@ func (provider *BedrockProvider) ChatCompletionStream(ctx context.Context, postH
644644
reader := bufio.NewReader(resp.Body)
645645
buffer := make([]byte, 1024*1024) // 1MB buffer
646646
var accumulator []byte // Accumulate data across reads
647+
startTime := time.Now()
648+
lastChunkTime := startTime
647649

648650
for {
649651
n, err := reader.Read(buffer)
650652
if err != nil {
651653
if err == io.EOF {
652654
// Process any remaining data in the accumulator
653655
if len(accumulator) > 0 {
654-
_ = provider.processAWSEventStreamData(ctx, postHookRunner, accumulator, &messageID, &chunkIndex, &usage, &finishReason, request.Model, providerName, responseChan)
656+
_ = provider.processAWSEventStreamData(ctx, postHookRunner, accumulator, &messageID, &chunkIndex, &usage, &finishReason, request.Model, providerName, responseChan, &lastChunkTime)
655657
}
656658
break
657659
}
@@ -668,14 +670,15 @@ func (provider *BedrockProvider) ChatCompletionStream(ctx context.Context, postH
668670
accumulator = append(accumulator, buffer[:n]...)
669671

670672
// Process the accumulated data and get the remaining unprocessed part
671-
remaining := provider.processAWSEventStreamData(ctx, postHookRunner, accumulator, &messageID, &chunkIndex, &usage, &finishReason, request.Model, providerName, responseChan)
673+
remaining := provider.processAWSEventStreamData(ctx, postHookRunner, accumulator, &messageID, &chunkIndex, &usage, &finishReason, request.Model, providerName, responseChan, &lastChunkTime)
672674

673675
// Reset accumulator with remaining data
674676
accumulator = remaining
675677
}
676678

677679
// Send final response
678680
response := createBifrostChatCompletionChunkResponse(messageID, usage, finishReason, chunkIndex, schemas.ChatCompletionStreamRequest, providerName, request.Model)
681+
response.ExtraFields.Latency = time.Since(startTime).Milliseconds()
679682
handleStreamEndWithSuccess(ctx, response, postHookRunner, responseChan, provider.logger)
680683
}()
681684

@@ -695,6 +698,7 @@ func (provider *BedrockProvider) processAWSEventStreamData(
695698
model string,
696699
providerName schemas.ModelProvider,
697700
responseChan chan *schemas.BifrostStream,
701+
lastChunkTime *time.Time,
698702
) []byte {
699703
lastProcessed := 0
700704
depth := 0
@@ -741,7 +745,7 @@ func (provider *BedrockProvider) processAWSEventStreamData(
741745
bytes.Contains(jsonBytes, []byte(`metadata`))
742746

743747
if hasQuotes && hasRelevantContent {
744-
provider.processEventBuffer(ctx, postHookRunner, jsonBytes, messageID, chunkIndex, usage, finishReason, model, providerName, responseChan)
748+
provider.processEventBuffer(ctx, postHookRunner, jsonBytes, messageID, chunkIndex, usage, finishReason, model, providerName, responseChan, lastChunkTime)
745749
lastProcessed = i + 1
746750
}
747751
objStart = -1
@@ -759,7 +763,7 @@ func (provider *BedrockProvider) processAWSEventStreamData(
759763
}
760764

761765
// processEventBuffer processes AWS Event Stream JSON payloads using typed Bedrock stream events
762-
func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHookRunner schemas.PostHookRunner, eventBuffer []byte, messageID *string, chunkIndex *int, usage **schemas.LLMUsage, finishReason **string, model string, providerName schemas.ModelProvider, responseChan chan *schemas.BifrostStream) {
766+
func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHookRunner schemas.PostHookRunner, eventBuffer []byte, messageID *string, chunkIndex *int, usage **schemas.LLMUsage, finishReason **string, model string, providerName schemas.ModelProvider, responseChan chan *schemas.BifrostStream, lastChunkTime *time.Time) {
763767
// Parse the JSON event into our typed structure
764768
var streamEvent bedrock.BedrockStreamEvent
765769
if err := sonic.Unmarshal(eventBuffer, &streamEvent); err != nil {
@@ -798,9 +802,11 @@ func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHoo
798802
Provider: providerName,
799803
ModelRequested: model,
800804
ChunkIndex: *chunkIndex,
805+
Latency: time.Since(*lastChunkTime).Milliseconds(),
801806
},
802807
}
803808

809+
*lastChunkTime = time.Now()
804810
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, provider.logger)
805811

806812
case streamEvent.Start != nil && streamEvent.Start.ToolUse != nil:
@@ -838,9 +844,11 @@ func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHoo
838844
Provider: providerName,
839845
ModelRequested: model,
840846
ChunkIndex: *chunkIndex,
847+
Latency: time.Since(*lastChunkTime).Milliseconds(),
841848
},
842849
}
843850

851+
*lastChunkTime = time.Now()
844852
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, provider.logger)
845853

846854
case streamEvent.ContentBlockIndex != nil && streamEvent.Delta != nil:
@@ -872,9 +880,11 @@ func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHoo
872880
Provider: providerName,
873881
ModelRequested: model,
874882
ChunkIndex: *chunkIndex,
883+
Latency: time.Since(*lastChunkTime).Milliseconds(),
875884
},
876885
}
877886

887+
*lastChunkTime = time.Now()
878888
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, provider.logger)
879889
}
880890

@@ -909,9 +919,11 @@ func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHoo
909919
Provider: providerName,
910920
ModelRequested: model,
911921
ChunkIndex: *chunkIndex,
922+
Latency: time.Since(*lastChunkTime).Milliseconds(),
912923
},
913924
}
914925

926+
*lastChunkTime = time.Now()
915927
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, provider.logger)
916928
}
917929

core/providers/cohere.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,8 @@ func (provider *CohereProvider) ChatCompletionStream(ctx context.Context, postHo
436436

437437
scanner := bufio.NewScanner(resp.Body)
438438
var responseID string
439+
startTime := time.Now()
440+
lastChunkTime := startTime
439441

440442
for scanner.Scan() {
441443
line := scanner.Text()
@@ -487,8 +489,10 @@ func (provider *CohereProvider) ChatCompletionStream(ctx context.Context, postHo
487489
Provider: providerName,
488490
ModelRequested: request.Model,
489491
ChunkIndex: chunkIndex,
492+
Latency: time.Since(lastChunkTime).Milliseconds(),
490493
},
491494
}
495+
lastChunkTime = time.Now()
492496

493497
switch event.Type {
494498
case cohere.StreamEventMessageStart:
@@ -569,6 +573,7 @@ func (provider *CohereProvider) ChatCompletionStream(ctx context.Context, postHo
569573
}
570574

571575
ctx = context.WithValue(ctx, schemas.BifrostContextKeyStreamEndIndicator, true)
576+
response.ExtraFields.Latency = time.Since(startTime).Milliseconds()
572577
}
573578

574579
case cohere.StreamEventToolCallEnd, cohere.StreamEventContentEnd:

core/providers/gemini.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,8 @@ func (provider *GeminiProvider) SpeechStream(ctx context.Context, postHookRunner
363363
scanner.Buffer(buf, 1024*1024) // Allow up to 1MB tokens
364364
chunkIndex := -1
365365
usage := &schemas.AudioLLMUsage{}
366+
startTime := time.Now()
367+
lastChunkTime := startTime
366368

367369
for scanner.Scan() {
368370
line := scanner.Text()
@@ -452,8 +454,10 @@ func (provider *GeminiProvider) SpeechStream(ctx context.Context, postHookRunner
452454
Provider: providerName,
453455
ModelRequested: request.Model,
454456
ChunkIndex: chunkIndex,
457+
Latency: time.Since(lastChunkTime).Milliseconds(),
455458
},
456459
}
460+
lastChunkTime = time.Now()
457461

458462
// Process response through post-hooks and send to channel
459463
processAndSendResponse(ctx, postHookRunner, response, responseChan, provider.logger)
@@ -475,9 +479,11 @@ func (provider *GeminiProvider) SpeechStream(ctx context.Context, postHookRunner
475479
Provider: providerName,
476480
ModelRequested: request.Model,
477481
ChunkIndex: chunkIndex + 1,
482+
Latency: time.Since(startTime).Milliseconds(),
478483
},
479484
}
480485

486+
ctx = context.WithValue(ctx, schemas.BifrostContextKeyStreamEndIndicator, true)
481487
handleStreamEndWithSuccess(ctx, response, postHookRunner, responseChan, provider.logger)
482488
}
483489
}()
@@ -629,6 +635,8 @@ func (provider *GeminiProvider) TranscriptionStream(ctx context.Context, postHoo
629635
scanner := bufio.NewScanner(resp.Body)
630636
chunkIndex := -1
631637
usage := &schemas.TranscriptionUsage{}
638+
startTime := time.Now()
639+
lastChunkTime := startTime
632640

633641
var fullTranscriptionText string
634642

@@ -727,8 +735,10 @@ func (provider *GeminiProvider) TranscriptionStream(ctx context.Context, postHoo
727735
Provider: providerName,
728736
ModelRequested: request.Model,
729737
ChunkIndex: chunkIndex,
738+
Latency: time.Since(lastChunkTime).Milliseconds(),
730739
},
731740
}
741+
lastChunkTime = time.Now()
732742

733743
// Process response through post-hooks and send to channel
734744
processAndSendResponse(ctx, postHookRunner, response, responseChan, provider.logger)
@@ -756,9 +766,11 @@ func (provider *GeminiProvider) TranscriptionStream(ctx context.Context, postHoo
756766
Provider: providerName,
757767
ModelRequested: request.Model,
758768
ChunkIndex: chunkIndex + 1,
769+
Latency: time.Since(startTime).Milliseconds(),
759770
},
760771
}
761772

773+
ctx = context.WithValue(ctx, schemas.BifrostContextKeyStreamEndIndicator, true)
762774
handleStreamEndWithSuccess(ctx, response, postHookRunner, responseChan, provider.logger)
763775
}
764776
}()

0 commit comments

Comments
 (0)