Skip to content

Commit 1fe62ac

Browse files
Pratham-Mishra04TejasGhatte
authored andcommitted
feat: responses streaming added to anthropic
1 parent ea5a3e4 commit 1fe62ac

File tree

8 files changed

+649
-86
lines changed

8 files changed

+649
-86
lines changed

core/providers/anthropic.go

Lines changed: 259 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -272,57 +272,6 @@ func (provider *AnthropicProvider) ChatCompletion(ctx context.Context, key schem
272272
return bifrostResponse, nil
273273
}
274274

275-
// Responses performs a chat completion request to Anthropic's API.
276-
// It formats the request, sends it to Anthropic, and processes the response.
277-
// Returns a BifrostResponse containing the completion results or an error if the request fails.
278-
func (provider *AnthropicProvider) Responses(ctx context.Context, key schemas.Key, request *schemas.BifrostResponsesRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
279-
if err := checkOperationAllowed(schemas.Anthropic, provider.customProviderConfig, schemas.ResponsesRequest); err != nil {
280-
return nil, err
281-
}
282-
283-
// Convert to Anthropic format using the centralized converter
284-
reqBody := anthropic.ToAnthropicResponsesRequest(request)
285-
if reqBody == nil {
286-
return nil, newBifrostOperationError("responses input is not provided", nil, provider.GetProviderKey())
287-
}
288-
289-
// Use struct directly for JSON marshaling
290-
responseBody, latency, err := provider.completeRequest(ctx, reqBody, provider.networkConfig.BaseURL+"/v1/messages", key.Value)
291-
if err != nil {
292-
return nil, err
293-
}
294-
295-
// Create response object from pool
296-
response := acquireAnthropicChatResponse()
297-
defer releaseAnthropicChatResponse(response)
298-
299-
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
300-
if bifrostErr != nil {
301-
return nil, bifrostErr
302-
}
303-
304-
// Create final response
305-
bifrostResponse := response.ToResponsesBifrostResponse()
306-
307-
// Set ExtraFields
308-
bifrostResponse.ExtraFields.Provider = provider.GetProviderKey()
309-
bifrostResponse.ExtraFields.ModelRequested = request.Model
310-
bifrostResponse.ExtraFields.RequestType = schemas.ResponsesRequest
311-
bifrostResponse.ExtraFields.Latency = latency.Milliseconds()
312-
313-
// Set raw response if enabled
314-
if provider.sendBackRawResponse {
315-
bifrostResponse.ExtraFields.RawResponse = rawResponse
316-
}
317-
318-
return bifrostResponse, nil
319-
}
320-
321-
// Embedding is not supported by the Anthropic provider.
322-
func (provider *AnthropicProvider) Embedding(ctx context.Context, key schemas.Key, input *schemas.BifrostEmbeddingRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
323-
return nil, newUnsupportedOperationError("embedding", "anthropic")
324-
}
325-
326275
// ChatCompletionStream performs a streaming chat completion request to the Anthropic API.
327276
// It supports real-time streaming of responses using Server-Sent Events (SSE).
328277
// Returns a channel containing BifrostResponse objects representing the stream or an error if the request fails.
@@ -348,7 +297,7 @@ func (provider *AnthropicProvider) ChatCompletionStream(ctx context.Context, pos
348297
}
349298

350299
// Use shared Anthropic streaming logic
351-
return handleAnthropicStreaming(
300+
return handleAnthropicChatCompletionStreaming(
352301
ctx,
353302
provider.streamClient,
354303
provider.networkConfig.BaseURL+"/v1/messages",
@@ -364,7 +313,7 @@ func (provider *AnthropicProvider) ChatCompletionStream(ctx context.Context, pos
364313

365314
// handleAnthropicStreaming handles streaming for Anthropic-compatible APIs (Anthropic, Vertex Claude models).
366315
// This shared function reduces code duplication between providers that use the same SSE event format.
367-
func handleAnthropicStreaming(
316+
func handleAnthropicChatCompletionStreaming(
368317
ctx context.Context,
369318
httpClient *http.Client,
370319
url string,
@@ -376,7 +325,6 @@ func handleAnthropicStreaming(
376325
postHookRunner schemas.PostHookRunner,
377326
logger schemas.Logger,
378327
) (chan *schemas.BifrostStream, *schemas.BifrostError) {
379-
380328
jsonBody, err := sonic.Marshal(requestBody)
381329
if err != nil {
382330
return nil, newBifrostOperationError(schemas.ErrProviderJSONMarshaling, err, providerType)
@@ -505,7 +453,7 @@ func handleAnthropicStreaming(
505453
modelName = event.Message.Model
506454
}
507455

508-
response, bifrostErr, isLastChunk := event.ToBifrostStream()
456+
response, bifrostErr, isLastChunk := event.ToBifrostChatCompletionStream()
509457
if response != nil {
510458
response.ExtraFields = schemas.BifrostResponseExtraFields{
511459
RequestType: schemas.ChatCompletionStreamRequest,
@@ -555,6 +503,262 @@ func handleAnthropicStreaming(
555503
return responseChan, nil
556504
}
557505

506+
// Responses performs a chat completion request to Anthropic's API.
507+
// It formats the request, sends it to Anthropic, and processes the response.
508+
// Returns a BifrostResponse containing the completion results or an error if the request fails.
509+
func (provider *AnthropicProvider) Responses(ctx context.Context, key schemas.Key, request *schemas.BifrostResponsesRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
510+
if err := checkOperationAllowed(schemas.Anthropic, provider.customProviderConfig, schemas.ResponsesRequest); err != nil {
511+
return nil, err
512+
}
513+
514+
// Convert to Anthropic format using the centralized converter
515+
reqBody := anthropic.ToAnthropicResponsesRequest(request)
516+
if reqBody == nil {
517+
return nil, newBifrostOperationError("responses input is not provided", nil, provider.GetProviderKey())
518+
}
519+
520+
// Use struct directly for JSON marshaling
521+
responseBody, latency, err := provider.completeRequest(ctx, reqBody, provider.networkConfig.BaseURL+"/v1/messages", key.Value)
522+
if err != nil {
523+
return nil, err
524+
}
525+
526+
// Create response object from pool
527+
response := acquireAnthropicChatResponse()
528+
defer releaseAnthropicChatResponse(response)
529+
530+
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
531+
if bifrostErr != nil {
532+
return nil, bifrostErr
533+
}
534+
535+
// Create final response
536+
bifrostResponse := response.ToResponsesBifrostResponse()
537+
538+
// Set ExtraFields
539+
bifrostResponse.ExtraFields.Provider = provider.GetProviderKey()
540+
bifrostResponse.ExtraFields.ModelRequested = request.Model
541+
bifrostResponse.ExtraFields.RequestType = schemas.ResponsesRequest
542+
bifrostResponse.ExtraFields.Latency = latency.Milliseconds()
543+
544+
// Set raw response if enabled
545+
if provider.sendBackRawResponse {
546+
bifrostResponse.ExtraFields.RawResponse = rawResponse
547+
}
548+
549+
return bifrostResponse, nil
550+
}
551+
552+
func (provider *AnthropicProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
553+
if err := checkOperationAllowed(schemas.Anthropic, provider.customProviderConfig, schemas.ResponsesStreamRequest); err != nil {
554+
return nil, err
555+
}
556+
557+
// Convert to Anthropic format using the centralized converter
558+
reqBody := anthropic.ToAnthropicResponsesRequest(request)
559+
if reqBody == nil {
560+
return nil, newBifrostOperationError("failed to convert request", fmt.Errorf("conversion returned nil"), provider.GetProviderKey())
561+
}
562+
reqBody.Stream = schemas.Ptr(true)
563+
564+
// Prepare Anthropic headers
565+
headers := map[string]string{
566+
"Content-Type": "application/json",
567+
"x-api-key": key.Value,
568+
"anthropic-version": provider.apiVersion,
569+
"Accept": "text/event-stream",
570+
"Cache-Control": "no-cache",
571+
}
572+
573+
jsonBody, err := sonic.Marshal(reqBody)
574+
if err != nil {
575+
return nil, newBifrostOperationError(schemas.ErrProviderJSONMarshaling, err, provider.GetProviderKey())
576+
}
577+
578+
// Create HTTP request for streaming
579+
req, err := http.NewRequestWithContext(ctx, "POST", provider.networkConfig.BaseURL+"/v1/messages", bytes.NewReader(jsonBody))
580+
if err != nil {
581+
if errors.Is(err, context.Canceled) {
582+
return nil, &schemas.BifrostError{
583+
IsBifrostError: false,
584+
Error: &schemas.ErrorField{
585+
Type: schemas.Ptr(schemas.RequestCancelled),
586+
Message: schemas.ErrRequestCancelled,
587+
Error: err,
588+
},
589+
}
590+
}
591+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
592+
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, provider.GetProviderKey())
593+
}
594+
return nil, newBifrostOperationError(schemas.ErrProviderRequest, err, provider.GetProviderKey())
595+
}
596+
597+
// Set headers
598+
for key, value := range headers {
599+
req.Header.Set(key, value)
600+
}
601+
602+
// Set any extra headers from network config
603+
setExtraHeadersHTTP(req, provider.networkConfig.ExtraHeaders, nil)
604+
605+
// Make the request
606+
resp, err := provider.streamClient.Do(req)
607+
if err != nil {
608+
if errors.Is(err, context.Canceled) {
609+
return nil, &schemas.BifrostError{
610+
IsBifrostError: false,
611+
Error: &schemas.ErrorField{
612+
Type: schemas.Ptr(schemas.RequestCancelled),
613+
Message: schemas.ErrRequestCancelled,
614+
Error: err,
615+
},
616+
}
617+
}
618+
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
619+
return nil, newBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, provider.GetProviderKey())
620+
}
621+
return nil, newBifrostOperationError(schemas.ErrProviderRequest, err, provider.GetProviderKey())
622+
}
623+
624+
// Check for HTTP errors
625+
if resp.StatusCode != http.StatusOK {
626+
body, _ := io.ReadAll(resp.Body)
627+
resp.Body.Close()
628+
return nil, newProviderAPIError(fmt.Sprintf("HTTP error from %s: %d", provider.GetProviderKey(), resp.StatusCode), fmt.Errorf("%s", string(body)), resp.StatusCode, provider.GetProviderKey(), nil, nil)
629+
}
630+
631+
// Create response channel
632+
responseChan := make(chan *schemas.BifrostStream, schemas.DefaultStreamBufferSize)
633+
634+
// Start streaming in a goroutine
635+
go func() {
636+
defer close(responseChan)
637+
defer resp.Body.Close()
638+
639+
scanner := bufio.NewScanner(resp.Body)
640+
chunkIndex := 0
641+
642+
startTime := time.Now()
643+
lastChunkTime := startTime
644+
645+
// Track minimal state needed for response format
646+
var usage *schemas.LLMUsage
647+
648+
// Track SSE event parsing state
649+
var eventType string
650+
var eventData string
651+
652+
for scanner.Scan() {
653+
line := scanner.Text()
654+
655+
// Skip empty lines and comments
656+
if line == "" || strings.HasPrefix(line, ":") {
657+
continue
658+
}
659+
660+
// Parse SSE event - track event type and data separately
661+
if after, ok := strings.CutPrefix(line, "event: "); ok {
662+
eventType = after
663+
continue
664+
} else if strings.HasPrefix(line, "data: ") {
665+
eventData = strings.TrimPrefix(line, "data: ")
666+
} else {
667+
continue
668+
}
669+
670+
// Skip if we don't have both event type and data
671+
if eventType == "" || eventData == "" {
672+
continue
673+
}
674+
675+
var event anthropic.AnthropicStreamEvent
676+
if err := sonic.Unmarshal([]byte(eventData), &event); err != nil {
677+
provider.logger.Warn(fmt.Sprintf("Failed to parse message_start event: %v", err))
678+
continue
679+
}
680+
681+
if chunkIndex == 0 {
682+
sendCreatedEventResponsesChunk(ctx, postHookRunner, provider.GetProviderKey(), request.Model, startTime, responseChan, provider.logger)
683+
sendInProgressEventResponsesChunk(ctx, postHookRunner, provider.GetProviderKey(), request.Model, startTime, responseChan, provider.logger)
684+
chunkIndex = 2
685+
}
686+
687+
if event.Usage != nil {
688+
usage = &schemas.LLMUsage{
689+
PromptTokens: event.Usage.InputTokens,
690+
CompletionTokens: event.Usage.OutputTokens,
691+
TotalTokens: event.Usage.InputTokens + event.Usage.OutputTokens,
692+
}
693+
}
694+
695+
response, bifrostErr, isLastChunk := event.ToBifrostResponsesStream(chunkIndex)
696+
if response != nil {
697+
response.ExtraFields = schemas.BifrostResponseExtraFields{
698+
RequestType: schemas.ResponsesStreamRequest,
699+
Provider: provider.GetProviderKey(),
700+
ModelRequested: request.Model,
701+
ChunkIndex: chunkIndex,
702+
Latency: time.Since(lastChunkTime).Milliseconds(),
703+
}
704+
lastChunkTime = time.Now()
705+
chunkIndex++
706+
707+
if provider.sendBackRawResponse {
708+
response.ExtraFields.RawResponse = eventData
709+
}
710+
711+
if isLastChunk {
712+
if response.ResponsesStreamResponse == nil {
713+
response.ResponsesStreamResponse = &schemas.ResponsesStreamResponse{
714+
Response: &schemas.ResponsesStreamResponseStruct{},
715+
}
716+
} else if response.ResponsesStreamResponse.Response == nil {
717+
response.ResponsesStreamResponse.Response = &schemas.ResponsesStreamResponseStruct{}
718+
}
719+
response.ResponsesStreamResponse.Response.Usage = &schemas.ResponsesResponseUsage{
720+
ResponsesExtendedResponseUsage: &schemas.ResponsesExtendedResponseUsage{
721+
InputTokens: usage.PromptTokens,
722+
OutputTokens: usage.CompletionTokens,
723+
},
724+
TotalTokens: usage.TotalTokens,
725+
}
726+
response.ExtraFields.Latency = time.Since(startTime).Milliseconds()
727+
handleStreamEndWithSuccess(ctx, response, postHookRunner, responseChan, provider.logger)
728+
break
729+
}
730+
processAndSendResponse(ctx, postHookRunner, response, responseChan, provider.logger)
731+
}
732+
if bifrostErr != nil {
733+
bifrostErr.ExtraFields = schemas.BifrostErrorExtraFields{
734+
RequestType: schemas.ResponsesStreamRequest,
735+
Provider: provider.GetProviderKey(),
736+
ModelRequested: request.Model,
737+
}
738+
739+
processAndSendBifrostError(ctx, postHookRunner, bifrostErr, responseChan, provider.logger)
740+
break
741+
}
742+
743+
// Reset for next event
744+
eventType = ""
745+
eventData = ""
746+
}
747+
748+
if err := scanner.Err(); err != nil {
749+
provider.logger.Warn(fmt.Sprintf("Error reading %s stream: %v", provider.GetProviderKey(), err))
750+
processAndSendError(ctx, postHookRunner, err, responseChan, schemas.ResponsesStreamRequest, provider.GetProviderKey(), request.Model, provider.logger)
751+
}
752+
}()
753+
754+
return responseChan, nil
755+
}
756+
757+
// Embedding is not supported by the Anthropic provider.
758+
func (provider *AnthropicProvider) Embedding(ctx context.Context, key schemas.Key, input *schemas.BifrostEmbeddingRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
759+
return nil, newUnsupportedOperationError("embedding", "anthropic")
760+
}
761+
558762
func (provider *AnthropicProvider) Speech(ctx context.Context, key schemas.Key, request *schemas.BifrostSpeechRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
559763
return nil, newUnsupportedOperationError("speech", "anthropic")
560764
}
@@ -570,7 +774,3 @@ func (provider *AnthropicProvider) Transcription(ctx context.Context, key schema
570774
func (provider *AnthropicProvider) TranscriptionStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostTranscriptionRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
571775
return nil, newUnsupportedOperationError("transcription stream", "anthropic")
572776
}
573-
574-
func (provider *AnthropicProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
575-
return nil, newUnsupportedOperationError("responses stream", "anthropic")
576-
}

0 commit comments

Comments
 (0)