Skip to content

Commit 7ceab9c

Browse files
enhancement: added anthropic stream chunk convertor
1 parent 06e12ff commit 7ceab9c

File tree

15 files changed

+331
-368
lines changed

15 files changed

+331
-368
lines changed

core/providers/anthropic.go

Lines changed: 33 additions & 275 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ func (provider *AnthropicProvider) ChatCompletionStream(ctx context.Context, pos
355355
reqBody,
356356
headers,
357357
provider.networkConfig.ExtraHeaders,
358+
provider.sendBackRawResponse,
358359
provider.GetProviderKey(),
359360
postHookRunner,
360361
provider.logger,
@@ -370,6 +371,7 @@ func handleAnthropicStreaming(
370371
requestBody interface{},
371372
headers map[string]string,
372373
extraHeaders map[string]string,
374+
sendBackRawResponse bool,
373375
providerType schemas.ModelProvider,
374376
postHookRunner schemas.PostHookRunner,
375377
logger schemas.Logger,
@@ -442,7 +444,7 @@ func handleAnthropicStreaming(
442444
defer resp.Body.Close()
443445

444446
scanner := bufio.NewScanner(resp.Body)
445-
chunkIndex := -1
447+
chunkIndex := 0
446448

447449
startTime := time.Now()
448450
lastChunkTime := startTime
@@ -466,8 +468,8 @@ func handleAnthropicStreaming(
466468
}
467469

468470
// Parse SSE event - track event type and data separately
469-
if strings.HasPrefix(line, "event: ") {
470-
eventType = strings.TrimPrefix(line, "event: ")
471+
if after, ok := strings.CutPrefix(line, "event: "); ok {
472+
eventType = after
471473
continue
472474
} else if strings.HasPrefix(line, "data: ") {
473475
eventData = strings.TrimPrefix(line, "data: ")
@@ -497,286 +499,42 @@ func handleAnthropicStreaming(
497499
mappedReason := anthropic.MapAnthropicFinishReasonToBifrost(*event.Delta.StopReason)
498500
finishReason = &mappedReason
499501
}
502+
if event.Message != nil {
503+
// Handle different event types
504+
messageID = event.Message.ID
505+
modelName = event.Message.Model
506+
}
500507

501-
// Handle different event types
502-
switch eventType {
503-
case "message_start":
504-
if event.Message != nil {
505-
messageID = event.Message.ID
506-
modelName = event.Message.Model
507-
508-
// Send first chunk with role
509-
if event.Message.Role != "" {
510-
chunkIndex++
511-
role := event.Message.Role
512-
513-
// Create streaming response for message start with role
514-
streamResponse := &schemas.BifrostResponse{
515-
ID: messageID,
516-
Object: "chat.completion.chunk",
517-
Model: modelName,
518-
Choices: []schemas.BifrostChatResponseChoice{
519-
{
520-
Index: 0,
521-
BifrostStreamResponseChoice: &schemas.BifrostStreamResponseChoice{
522-
Delta: &schemas.BifrostStreamDelta{
523-
Role: &role,
524-
},
525-
},
526-
},
527-
},
528-
ExtraFields: schemas.BifrostResponseExtraFields{
529-
RequestType: schemas.ChatCompletionStreamRequest,
530-
Provider: providerType,
531-
ModelRequested: modelName,
532-
ChunkIndex: chunkIndex,
533-
Latency: time.Since(lastChunkTime).Milliseconds(),
534-
},
535-
}
536-
lastChunkTime = time.Now()
537-
538-
// Use utility function to process and send response
539-
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
540-
}
508+
response, bifrostErr, isLastChunk := event.ToBifrostStream()
509+
if response != nil {
510+
response.ExtraFields = schemas.BifrostResponseExtraFields{
511+
RequestType: schemas.ChatCompletionStreamRequest,
512+
Provider: providerType,
513+
ModelRequested: modelName,
514+
ChunkIndex: chunkIndex,
515+
Latency: time.Since(lastChunkTime).Milliseconds(),
541516
}
517+
lastChunkTime = time.Now()
518+
chunkIndex++
542519

543-
case "content_block_start":
544-
if event.Index != nil && event.ContentBlock != nil {
545-
chunkIndex++
546-
547-
// Handle different content block types
548-
switch event.ContentBlock.Type {
549-
case "tool_use":
550-
// Tool use content block initialization
551-
if event.ContentBlock.Name != nil && *event.ContentBlock.Name != "" &&
552-
event.ContentBlock.ID != nil && *event.ContentBlock.ID != "" {
553-
// Create streaming response for tool start
554-
streamResponse := &schemas.BifrostResponse{
555-
ID: messageID,
556-
Object: "chat.completion.chunk",
557-
Model: modelName,
558-
Choices: []schemas.BifrostChatResponseChoice{
559-
{
560-
Index: *event.Index,
561-
BifrostStreamResponseChoice: &schemas.BifrostStreamResponseChoice{
562-
Delta: &schemas.BifrostStreamDelta{
563-
ToolCalls: []schemas.ChatAssistantMessageToolCall{
564-
{
565-
Type: func() *string { s := "function"; return &s }(),
566-
ID: event.ContentBlock.ID,
567-
Function: schemas.ChatAssistantMessageToolCallFunction{
568-
Name: event.ContentBlock.Name,
569-
},
570-
},
571-
},
572-
},
573-
},
574-
},
575-
},
576-
ExtraFields: schemas.BifrostResponseExtraFields{
577-
RequestType: schemas.ChatCompletionStreamRequest,
578-
Provider: providerType,
579-
ModelRequested: modelName,
580-
ChunkIndex: chunkIndex,
581-
Latency: time.Since(lastChunkTime).Milliseconds(),
582-
},
583-
}
584-
lastChunkTime = time.Now()
585-
586-
// Use utility function to process and send response
587-
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
588-
}
589-
default:
590-
thought := ""
591-
if event.ContentBlock.Thinking != nil && *event.ContentBlock.Thinking != "" {
592-
thought = *event.ContentBlock.Thinking
593-
}
594-
content := ""
595-
if event.ContentBlock.Text != nil && *event.ContentBlock.Text != "" {
596-
content = *event.ContentBlock.Text
597-
}
598-
599-
// Send empty message for other content block types
600-
streamResponse := &schemas.BifrostResponse{
601-
ID: messageID,
602-
Object: "chat.completion.chunk",
603-
Model: modelName,
604-
Choices: []schemas.BifrostChatResponseChoice{
605-
{
606-
Index: *event.Index,
607-
BifrostStreamResponseChoice: &schemas.BifrostStreamResponseChoice{
608-
Delta: &schemas.BifrostStreamDelta{
609-
Thought: &thought,
610-
Content: &content,
611-
},
612-
},
613-
},
614-
},
615-
ExtraFields: schemas.BifrostResponseExtraFields{
616-
RequestType: schemas.ChatCompletionStreamRequest,
617-
Provider: providerType,
618-
ModelRequested: modelName,
619-
ChunkIndex: chunkIndex,
620-
Latency: time.Since(lastChunkTime).Milliseconds(),
621-
},
622-
}
623-
lastChunkTime = time.Now()
624-
625-
// Use utility function to process and send response
626-
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
627-
}
520+
if sendBackRawResponse {
521+
response.ExtraFields.RawResponse = eventData
628522
}
629523

630-
case "content_block_delta":
631-
if event.Index != nil && event.Delta != nil {
632-
chunkIndex++
633-
634-
// Handle different delta types
635-
switch event.Delta.Type {
636-
case "text_delta":
637-
if event.Delta.Text != "" {
638-
// Create streaming response for this delta
639-
streamResponse := &schemas.BifrostResponse{
640-
ID: messageID,
641-
Object: "chat.completion.chunk",
642-
Model: modelName,
643-
Choices: []schemas.BifrostChatResponseChoice{
644-
{
645-
Index: *event.Index,
646-
BifrostStreamResponseChoice: &schemas.BifrostStreamResponseChoice{
647-
Delta: &schemas.BifrostStreamDelta{
648-
Content: &event.Delta.Text,
649-
},
650-
},
651-
},
652-
},
653-
ExtraFields: schemas.BifrostResponseExtraFields{
654-
RequestType: schemas.ChatCompletionStreamRequest,
655-
Provider: providerType,
656-
ModelRequested: modelName,
657-
ChunkIndex: chunkIndex,
658-
Latency: time.Since(lastChunkTime).Milliseconds(),
659-
},
660-
}
661-
lastChunkTime = time.Now()
662-
663-
// Use utility function to process and send response
664-
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
665-
}
666-
667-
case "input_json_delta":
668-
// Handle tool use streaming - accumulate partial JSON
669-
if event.Delta.PartialJSON != "" {
670-
// Create streaming response for tool input delta
671-
streamResponse := &schemas.BifrostResponse{
672-
ID: messageID,
673-
Object: "chat.completion.chunk",
674-
Model: modelName,
675-
Choices: []schemas.BifrostChatResponseChoice{
676-
{
677-
Index: *event.Index,
678-
BifrostStreamResponseChoice: &schemas.BifrostStreamResponseChoice{
679-
Delta: &schemas.BifrostStreamDelta{
680-
ToolCalls: []schemas.ChatAssistantMessageToolCall{
681-
{
682-
Type: func() *string { s := "function"; return &s }(),
683-
Function: schemas.ChatAssistantMessageToolCallFunction{
684-
Arguments: event.Delta.PartialJSON,
685-
},
686-
},
687-
},
688-
},
689-
},
690-
},
691-
},
692-
ExtraFields: schemas.BifrostResponseExtraFields{
693-
RequestType: schemas.ChatCompletionStreamRequest,
694-
Provider: providerType,
695-
ModelRequested: modelName,
696-
ChunkIndex: chunkIndex,
697-
Latency: time.Since(lastChunkTime).Milliseconds(),
698-
},
699-
}
700-
lastChunkTime = time.Now()
701-
702-
// Use utility function to process and send response
703-
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
704-
}
705-
706-
case "thinking_delta":
707-
// Handle thinking content streaming
708-
if event.Delta.Thinking != "" {
709-
// Create streaming response for thinking delta
710-
streamResponse := &schemas.BifrostResponse{
711-
ID: messageID,
712-
Object: "chat.completion.chunk",
713-
Model: modelName,
714-
Choices: []schemas.BifrostChatResponseChoice{
715-
{
716-
Index: *event.Index,
717-
BifrostStreamResponseChoice: &schemas.BifrostStreamResponseChoice{
718-
Delta: &schemas.BifrostStreamDelta{
719-
Thought: &event.Delta.Thinking,
720-
},
721-
},
722-
},
723-
},
724-
ExtraFields: schemas.BifrostResponseExtraFields{
725-
RequestType: schemas.ChatCompletionStreamRequest,
726-
Provider: providerType,
727-
ModelRequested: modelName,
728-
ChunkIndex: chunkIndex,
729-
Latency: time.Since(lastChunkTime).Milliseconds(),
730-
},
731-
}
732-
lastChunkTime = time.Now()
733-
734-
// Use utility function to process and send response
735-
processAndSendResponse(ctx, postHookRunner, streamResponse, responseChan, logger)
736-
}
737-
738-
case "signature_delta":
739-
// Handle signature verification for thinking content
740-
// This is used to verify the integrity of thinking content
741-
742-
}
524+
processAndSendResponse(ctx, postHookRunner, response, responseChan, logger)
525+
if isLastChunk {
526+
break
743527
}
744-
745-
case "content_block_stop":
746-
// Content block is complete, no specific action needed for streaming
747-
continue
748-
749-
case "message_delta":
750-
continue
751-
752-
case "message_stop":
753-
continue
754-
755-
case "ping":
756-
// Ping events are just keepalive, no action needed
757-
continue
758-
759-
case "error":
760-
if event.Error != nil {
761-
// Send error through channel before closing
762-
bifrostErr := &schemas.BifrostError{
763-
IsBifrostError: false,
764-
Error: &schemas.ErrorField{
765-
Type: &event.Error.Type,
766-
Message: event.Error.Message,
767-
},
768-
}
769-
770-
ctx = context.WithValue(ctx, schemas.BifrostContextKeyStreamEndIndicator, true)
771-
processAndSendBifrostError(ctx, postHookRunner, bifrostErr, responseChan, logger)
528+
}
529+
if bifrostErr != nil {
530+
bifrostErr.ExtraFields = schemas.BifrostErrorExtraFields{
531+
RequestType: schemas.ChatCompletionStreamRequest,
532+
Provider: providerType,
533+
ModelRequested: modelName,
772534
}
773-
return
774535

775-
default:
776-
// Unknown event type - handle gracefully as per Anthropic's versioning policy
777-
// New event types may be added, so we should not error but log and continue
778-
logger.Debug(fmt.Sprintf("Unknown %s stream event type: %s, data: %s", providerType, eventType, eventData))
779-
continue
536+
processAndSendBifrostError(ctx, postHookRunner, bifrostErr, responseChan, logger)
537+
break
780538
}
781539

782540
// Reset for next event

core/providers/azure.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ func (provider *AzureProvider) TextCompletionStream(ctx context.Context, postHoo
227227
request,
228228
authHeader,
229229
provider.networkConfig.ExtraHeaders,
230+
provider.sendBackRawResponse,
230231
provider.GetProviderKey(),
231232
postHookRunner,
232233
provider.logger,
@@ -375,6 +376,7 @@ func (provider *AzureProvider) ChatCompletionStream(ctx context.Context, postHoo
375376
request,
376377
authHeader,
377378
provider.networkConfig.ExtraHeaders,
379+
provider.sendBackRawResponse,
378380
schemas.Azure,
379381
postHookRunner,
380382
provider.logger,

core/providers/cerebras.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func (provider *CerebrasProvider) TextCompletionStream(ctx context.Context, post
8989
request,
9090
map[string]string{"Authorization": "Bearer " + key.Value},
9191
provider.networkConfig.ExtraHeaders,
92+
provider.sendBackRawResponse,
9293
provider.GetProviderKey(),
9394
postHookRunner,
9495
provider.logger,
@@ -104,8 +105,8 @@ func (provider *CerebrasProvider) ChatCompletion(ctx context.Context, key schema
104105
request,
105106
key,
106107
provider.networkConfig.ExtraHeaders,
107-
provider.GetProviderKey(),
108108
provider.sendBackRawResponse,
109+
provider.GetProviderKey(),
109110
provider.logger,
110111
)
111112
}
@@ -142,6 +143,7 @@ func (provider *CerebrasProvider) ChatCompletionStream(ctx context.Context, post
142143
request,
143144
map[string]string{"Authorization": "Bearer " + key.Value},
144145
provider.networkConfig.ExtraHeaders,
146+
provider.sendBackRawResponse,
145147
schemas.Cerebras,
146148
postHookRunner,
147149
provider.logger,

0 commit comments

Comments
 (0)