@@ -644,14 +644,16 @@ func (provider *BedrockProvider) ChatCompletionStream(ctx context.Context, postH
644644 reader := bufio .NewReader (resp .Body )
645645 buffer := make ([]byte , 1024 * 1024 ) // 1MB buffer
646646 var accumulator []byte // Accumulate data across reads
647+ startTime := time .Now ()
648+ lastChunkTime := startTime
647649
648650 for {
649651 n , err := reader .Read (buffer )
650652 if err != nil {
651653 if err == io .EOF {
652654 // Process any remaining data in the accumulator
653655 if len (accumulator ) > 0 {
654- _ = provider .processAWSEventStreamData (ctx , postHookRunner , accumulator , & messageID , & chunkIndex , & usage , & finishReason , request .Model , providerName , responseChan )
656+ _ = provider .processAWSEventStreamData (ctx , postHookRunner , accumulator , & messageID , & chunkIndex , & usage , & finishReason , request .Model , providerName , responseChan , & lastChunkTime )
655657 }
656658 break
657659 }
@@ -668,14 +670,15 @@ func (provider *BedrockProvider) ChatCompletionStream(ctx context.Context, postH
668670 accumulator = append (accumulator , buffer [:n ]... )
669671
670672 // Process the accumulated data and get the remaining unprocessed part
671- remaining := provider .processAWSEventStreamData (ctx , postHookRunner , accumulator , & messageID , & chunkIndex , & usage , & finishReason , request .Model , providerName , responseChan )
673+ remaining := provider .processAWSEventStreamData (ctx , postHookRunner , accumulator , & messageID , & chunkIndex , & usage , & finishReason , request .Model , providerName , responseChan , & lastChunkTime )
672674
673675 // Reset accumulator with remaining data
674676 accumulator = remaining
675677 }
676678
677679 // Send final response
678680 response := createBifrostChatCompletionChunkResponse (messageID , usage , finishReason , chunkIndex , schemas .ChatCompletionStreamRequest , providerName , request .Model )
681+ response .ExtraFields .Latency = time .Since (startTime ).Milliseconds ()
679682 handleStreamEndWithSuccess (ctx , response , postHookRunner , responseChan , provider .logger )
680683 }()
681684
@@ -695,6 +698,7 @@ func (provider *BedrockProvider) processAWSEventStreamData(
695698 model string ,
696699 providerName schemas.ModelProvider ,
697700 responseChan chan * schemas.BifrostStream ,
701+ lastChunkTime * time.Time ,
698702) []byte {
699703 lastProcessed := 0
700704 depth := 0
@@ -741,7 +745,7 @@ func (provider *BedrockProvider) processAWSEventStreamData(
741745 bytes .Contains (jsonBytes , []byte (`metadata` ))
742746
743747 if hasQuotes && hasRelevantContent {
744- provider .processEventBuffer (ctx , postHookRunner , jsonBytes , messageID , chunkIndex , usage , finishReason , model , providerName , responseChan )
748+ provider .processEventBuffer (ctx , postHookRunner , jsonBytes , messageID , chunkIndex , usage , finishReason , model , providerName , responseChan , lastChunkTime )
745749 lastProcessed = i + 1
746750 }
747751 objStart = - 1
@@ -759,7 +763,7 @@ func (provider *BedrockProvider) processAWSEventStreamData(
759763}
760764
761765// processEventBuffer processes AWS Event Stream JSON payloads using typed Bedrock stream events
762- func (provider * BedrockProvider ) processEventBuffer (ctx context.Context , postHookRunner schemas.PostHookRunner , eventBuffer []byte , messageID * string , chunkIndex * int , usage * * schemas.LLMUsage , finishReason * * string , model string , providerName schemas.ModelProvider , responseChan chan * schemas.BifrostStream ) {
766+ func (provider * BedrockProvider ) processEventBuffer (ctx context.Context , postHookRunner schemas.PostHookRunner , eventBuffer []byte , messageID * string , chunkIndex * int , usage * * schemas.LLMUsage , finishReason * * string , model string , providerName schemas.ModelProvider , responseChan chan * schemas.BifrostStream , lastChunkTime * time. Time ) {
763767 // Parse the JSON event into our typed structure
764768 var streamEvent bedrock.BedrockStreamEvent
765769 if err := sonic .Unmarshal (eventBuffer , & streamEvent ); err != nil {
@@ -798,9 +802,11 @@ func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHoo
798802 Provider : providerName ,
799803 ModelRequested : model ,
800804 ChunkIndex : * chunkIndex ,
805+ Latency : time .Since (* lastChunkTime ).Milliseconds (),
801806 },
802807 }
803808
809+ * lastChunkTime = time .Now ()
804810 processAndSendResponse (ctx , postHookRunner , streamResponse , responseChan , provider .logger )
805811
806812 case streamEvent .Start != nil && streamEvent .Start .ToolUse != nil :
@@ -838,9 +844,11 @@ func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHoo
838844 Provider : providerName ,
839845 ModelRequested : model ,
840846 ChunkIndex : * chunkIndex ,
847+ Latency : time .Since (* lastChunkTime ).Milliseconds (),
841848 },
842849 }
843850
851+ * lastChunkTime = time .Now ()
844852 processAndSendResponse (ctx , postHookRunner , streamResponse , responseChan , provider .logger )
845853
846854 case streamEvent .ContentBlockIndex != nil && streamEvent .Delta != nil :
@@ -872,9 +880,11 @@ func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHoo
872880 Provider : providerName ,
873881 ModelRequested : model ,
874882 ChunkIndex : * chunkIndex ,
883+ Latency : time .Since (* lastChunkTime ).Milliseconds (),
875884 },
876885 }
877886
887+ * lastChunkTime = time .Now ()
878888 processAndSendResponse (ctx , postHookRunner , streamResponse , responseChan , provider .logger )
879889 }
880890
@@ -909,9 +919,11 @@ func (provider *BedrockProvider) processEventBuffer(ctx context.Context, postHoo
909919 Provider : providerName ,
910920 ModelRequested : model ,
911921 ChunkIndex : * chunkIndex ,
922+ Latency : time .Since (* lastChunkTime ).Milliseconds (),
912923 },
913924 }
914925
926+ * lastChunkTime = time .Now ()
915927 processAndSendResponse (ctx , postHookRunner , streamResponse , responseChan , provider .logger )
916928 }
917929
0 commit comments