Skip to content

Commit bf5a247

Browse files
feat: responses streaming added
1 parent 48c06ab commit bf5a247

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2377
-769
lines changed

core/providers/azure.go

Lines changed: 62 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -273,58 +273,6 @@ func (provider *AzureProvider) ChatCompletion(ctx context.Context, key schemas.K
273273
return response, nil
274274
}
275275

276-
// Responses performs a responses request to Azure's API.
277-
// It formats the request, sends it to Azure, and processes the response.
278-
// Returns a BifrostResponse containing the completion results or an error if the request fails.
279-
func (provider *AzureProvider) Responses(ctx context.Context, key schemas.Key, request *schemas.BifrostResponsesRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
280-
response, err := provider.ChatCompletion(ctx, key, request.ToChatRequest())
281-
if err != nil {
282-
return nil, err
283-
}
284-
285-
response.ToResponsesOnly()
286-
response.ExtraFields.RequestType = schemas.ResponsesRequest
287-
response.ExtraFields.Provider = provider.GetProviderKey()
288-
response.ExtraFields.ModelRequested = request.Model
289-
290-
return response, nil
291-
}
292-
293-
// Embedding generates embeddings for the given input text(s) using Azure OpenAI.
294-
// The input can be either a single string or a slice of strings for batch embedding.
295-
// Returns a BifrostResponse containing the embedding(s) and any error that occurred.
296-
func (provider *AzureProvider) Embedding(ctx context.Context, key schemas.Key, request *schemas.BifrostEmbeddingRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
297-
// Use centralized converter
298-
reqBody := openai.ToOpenAIEmbeddingRequest(request)
299-
if reqBody == nil {
300-
return nil, newBifrostOperationError("embedding input is not provided", nil, schemas.Azure)
301-
}
302-
303-
responseBody, latency, err := provider.completeRequest(ctx, reqBody, "embeddings", key, request.Model)
304-
if err != nil {
305-
return nil, err
306-
}
307-
308-
response := &schemas.BifrostResponse{}
309-
310-
// Use enhanced response handler with pre-allocated response
311-
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
312-
if bifrostErr != nil {
313-
return nil, bifrostErr
314-
}
315-
316-
response.ExtraFields.Provider = schemas.Azure
317-
response.ExtraFields.Latency = latency.Milliseconds()
318-
response.ExtraFields.ModelRequested = request.Model
319-
response.ExtraFields.RequestType = schemas.EmbeddingRequest
320-
321-
if provider.sendBackRawResponse {
322-
response.ExtraFields.RawResponse = rawResponse
323-
}
324-
325-
return response, nil
326-
}
327-
328276
// ChatCompletionStream performs a streaming chat completion request to Azure's OpenAI API.
329277
// It supports real-time streaming of responses using Server-Sent Events (SSE).
330278
// Uses Azure-specific URL construction with deployments and supports both api-key and Bearer token authentication.
@@ -369,7 +317,7 @@ func (provider *AzureProvider) ChatCompletionStream(ctx context.Context, postHoo
369317
}
370318

371319
// Use shared streaming logic from OpenAI
372-
return handleOpenAIStreaming(
320+
return handleOpenAIChatCompletionStreaming(
373321
ctx,
374322
provider.streamClient,
375323
fullURL,
@@ -383,6 +331,67 @@ func (provider *AzureProvider) ChatCompletionStream(ctx context.Context, postHoo
383331
)
384332
}
385333

334+
// Responses performs a responses request to Azure's API.
335+
// It formats the request, sends it to Azure, and processes the response.
336+
// Returns a BifrostResponse containing the completion results or an error if the request fails.
337+
func (provider *AzureProvider) Responses(ctx context.Context, key schemas.Key, request *schemas.BifrostResponsesRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
338+
response, err := provider.ChatCompletion(ctx, key, request.ToChatRequest())
339+
if err != nil {
340+
return nil, err
341+
}
342+
343+
response.ToResponsesOnly()
344+
response.ExtraFields.RequestType = schemas.ResponsesRequest
345+
response.ExtraFields.Provider = provider.GetProviderKey()
346+
response.ExtraFields.ModelRequested = request.Model
347+
348+
return response, nil
349+
}
350+
351+
func (provider *AzureProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
352+
return provider.ChatCompletionStream(
353+
ctx,
354+
getResponsesChunkConverterCombinedPostHookRunner(postHookRunner),
355+
key,
356+
request.ToChatRequest(),
357+
)
358+
}
359+
360+
// Embedding generates embeddings for the given input text(s) using Azure OpenAI.
361+
// The input can be either a single string or a slice of strings for batch embedding.
362+
// Returns a BifrostResponse containing the embedding(s) and any error that occurred.
363+
func (provider *AzureProvider) Embedding(ctx context.Context, key schemas.Key, request *schemas.BifrostEmbeddingRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
364+
// Use centralized converter
365+
reqBody := openai.ToOpenAIEmbeddingRequest(request)
366+
if reqBody == nil {
367+
return nil, newBifrostOperationError("embedding input is not provided", nil, schemas.Azure)
368+
}
369+
370+
responseBody, latency, err := provider.completeRequest(ctx, reqBody, "embeddings", key, request.Model)
371+
if err != nil {
372+
return nil, err
373+
}
374+
375+
response := &schemas.BifrostResponse{}
376+
377+
// Use enhanced response handler with pre-allocated response
378+
rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse)
379+
if bifrostErr != nil {
380+
return nil, bifrostErr
381+
}
382+
383+
response.ExtraFields.Provider = schemas.Azure
384+
response.ExtraFields.Latency = latency.Milliseconds()
385+
response.ExtraFields.ModelRequested = request.Model
386+
response.ExtraFields.RequestType = schemas.EmbeddingRequest
387+
388+
if provider.sendBackRawResponse {
389+
response.ExtraFields.RawResponse = rawResponse
390+
}
391+
392+
return response, nil
393+
}
394+
386395
func (provider *AzureProvider) Speech(ctx context.Context, key schemas.Key, request *schemas.BifrostSpeechRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
387396
return nil, newUnsupportedOperationError("speech", "azure")
388397
}
@@ -398,7 +407,3 @@ func (provider *AzureProvider) Transcription(ctx context.Context, key schemas.Ke
398407
func (provider *AzureProvider) TranscriptionStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostTranscriptionRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
399408
return nil, newUnsupportedOperationError("transcription stream", "azure")
400409
}
401-
402-
func (provider *AzureProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
403-
return nil, newUnsupportedOperationError("responses stream", "azure")
404-
}

core/providers/cerebras.go

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -111,32 +111,13 @@ func (provider *CerebrasProvider) ChatCompletion(ctx context.Context, key schema
111111
)
112112
}
113113

114-
func (provider *CerebrasProvider) Responses(ctx context.Context, key schemas.Key, request *schemas.BifrostResponsesRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
115-
response, err := provider.ChatCompletion(ctx, key, request.ToChatRequest())
116-
if err != nil {
117-
return nil, err
118-
}
119-
120-
response.ToResponsesOnly()
121-
response.ExtraFields.RequestType = schemas.ResponsesRequest
122-
response.ExtraFields.Provider = provider.GetProviderKey()
123-
response.ExtraFields.ModelRequested = request.Model
124-
125-
return response, nil
126-
}
127-
128-
// Embedding is not supported by the Cerebras provider.
129-
func (provider *CerebrasProvider) Embedding(ctx context.Context, key schemas.Key, request *schemas.BifrostEmbeddingRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
130-
return nil, newUnsupportedOperationError("embedding", "cerebras")
131-
}
132-
133114
// ChatCompletionStream performs a streaming chat completion request to the Cerebras API.
134115
// It supports real-time streaming of responses using Server-Sent Events (SSE).
135116
// Uses Cerebras's OpenAI-compatible streaming format.
136117
// Returns a channel containing BifrostResponse objects representing the stream or an error if the request fails.
137118
func (provider *CerebrasProvider) ChatCompletionStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostChatRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
138119
// Use shared OpenAI-compatible streaming logic
139-
return handleOpenAIStreaming(
120+
return handleOpenAIChatCompletionStreaming(
140121
ctx,
141122
provider.streamClient,
142123
provider.networkConfig.BaseURL+"/v1/chat/completions",
@@ -150,6 +131,34 @@ func (provider *CerebrasProvider) ChatCompletionStream(ctx context.Context, post
150131
)
151132
}
152133

134+
func (provider *CerebrasProvider) Responses(ctx context.Context, key schemas.Key, request *schemas.BifrostResponsesRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
135+
response, err := provider.ChatCompletion(ctx, key, request.ToChatRequest())
136+
if err != nil {
137+
return nil, err
138+
}
139+
140+
response.ToResponsesOnly()
141+
response.ExtraFields.RequestType = schemas.ResponsesRequest
142+
response.ExtraFields.Provider = provider.GetProviderKey()
143+
response.ExtraFields.ModelRequested = request.Model
144+
145+
return response, nil
146+
}
147+
148+
func (provider *CerebrasProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
149+
return provider.ChatCompletionStream(
150+
ctx,
151+
getResponsesChunkConverterCombinedPostHookRunner(postHookRunner),
152+
key,
153+
request.ToChatRequest(),
154+
)
155+
}
156+
157+
// Embedding is not supported by the Cerebras provider.
158+
func (provider *CerebrasProvider) Embedding(ctx context.Context, key schemas.Key, request *schemas.BifrostEmbeddingRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
159+
return nil, newUnsupportedOperationError("embedding", "cerebras")
160+
}
161+
153162
func (provider *CerebrasProvider) Speech(ctx context.Context, key schemas.Key, request *schemas.BifrostSpeechRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
154163
return nil, newUnsupportedOperationError("speech", "cerebras")
155164
}
@@ -165,7 +174,3 @@ func (provider *CerebrasProvider) Transcription(ctx context.Context, key schemas
165174
func (provider *CerebrasProvider) TranscriptionStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostTranscriptionRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
166175
return nil, newUnsupportedOperationError("transcription stream", "cerebras")
167176
}
168-
169-
func (provider *CerebrasProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
170-
return nil, newUnsupportedOperationError("responses stream", "cerebras")
171-
}

core/providers/gemini.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (provider *GeminiProvider) ChatCompletionStream(ctx context.Context, postHo
177177
}
178178

179179
// Use shared OpenAI-compatible streaming logic
180-
return handleOpenAIStreaming(
180+
return handleOpenAIChatCompletionStreaming(
181181
ctx,
182182
provider.streamClient,
183183
provider.networkConfig.BaseURL+"/openai/chat/completions",

core/providers/groq.go

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -151,32 +151,13 @@ func (provider *GroqProvider) ChatCompletion(ctx context.Context, key schemas.Ke
151151
)
152152
}
153153

154-
func (provider *GroqProvider) Responses(ctx context.Context, key schemas.Key, request *schemas.BifrostResponsesRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
155-
response, err := provider.ChatCompletion(ctx, key, request.ToChatRequest())
156-
if err != nil {
157-
return nil, err
158-
}
159-
160-
response.ToResponsesOnly()
161-
response.ExtraFields.RequestType = schemas.ResponsesRequest
162-
response.ExtraFields.Provider = provider.GetProviderKey()
163-
response.ExtraFields.ModelRequested = request.Model
164-
165-
return response, nil
166-
}
167-
168-
// Embedding is not supported by the Groq provider.
169-
func (provider *GroqProvider) Embedding(ctx context.Context, key schemas.Key, request *schemas.BifrostEmbeddingRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
170-
return nil, newUnsupportedOperationError("embedding", "groq")
171-
}
172-
173154
// ChatCompletionStream performs a streaming chat completion request to the Groq API.
174155
// It supports real-time streaming of responses using Server-Sent Events (SSE).
175156
// Uses Groq's OpenAI-compatible streaming format.
176157
// Returns a channel containing BifrostResponse objects representing the stream or an error if the request fails.
177158
func (provider *GroqProvider) ChatCompletionStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostChatRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
178159
// Use shared OpenAI-compatible streaming logic
179-
return handleOpenAIStreaming(
160+
return handleOpenAIChatCompletionStreaming(
180161
ctx,
181162
provider.streamClient,
182163
provider.networkConfig.BaseURL+"/v1/chat/completions",
@@ -190,6 +171,34 @@ func (provider *GroqProvider) ChatCompletionStream(ctx context.Context, postHook
190171
)
191172
}
192173

174+
func (provider *GroqProvider) Responses(ctx context.Context, key schemas.Key, request *schemas.BifrostResponsesRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
175+
response, err := provider.ChatCompletion(ctx, key, request.ToChatRequest())
176+
if err != nil {
177+
return nil, err
178+
}
179+
180+
response.ToResponsesOnly()
181+
response.ExtraFields.RequestType = schemas.ResponsesRequest
182+
response.ExtraFields.Provider = provider.GetProviderKey()
183+
response.ExtraFields.ModelRequested = request.Model
184+
185+
return response, nil
186+
}
187+
188+
func (provider *GroqProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
189+
return provider.ChatCompletionStream(
190+
ctx,
191+
getResponsesChunkConverterCombinedPostHookRunner(postHookRunner),
192+
key,
193+
request.ToChatRequest(),
194+
)
195+
}
196+
197+
// Embedding is not supported by the Groq provider.
198+
func (provider *GroqProvider) Embedding(ctx context.Context, key schemas.Key, request *schemas.BifrostEmbeddingRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
199+
return nil, newUnsupportedOperationError("embedding", "groq")
200+
}
201+
193202
func (provider *GroqProvider) Speech(ctx context.Context, key schemas.Key, request *schemas.BifrostSpeechRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
194203
return nil, newUnsupportedOperationError("speech", "groq")
195204
}
@@ -205,7 +214,3 @@ func (provider *GroqProvider) Transcription(ctx context.Context, key schemas.Key
205214
func (provider *GroqProvider) TranscriptionStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostTranscriptionRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
206215
return nil, newUnsupportedOperationError("transcription stream", "groq")
207216
}
208-
209-
func (provider *GroqProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
210-
return nil, newUnsupportedOperationError("responses stream", "groq")
211-
}

core/providers/mistral.go

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,26 @@ func (provider *MistralProvider) ChatCompletion(ctx context.Context, key schemas
9393
)
9494
}
9595

96+
// ChatCompletionStream performs a streaming chat completion request to the Mistral API.
97+
// It supports real-time streaming of responses using Server-Sent Events (SSE).
98+
// Uses Mistral's OpenAI-compatible streaming format.
99+
// Returns a channel containing BifrostResponse objects representing the stream or an error if the request fails.
100+
func (provider *MistralProvider) ChatCompletionStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostChatRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
101+
// Use shared OpenAI-compatible streaming logic
102+
return handleOpenAIChatCompletionStreaming(
103+
ctx,
104+
provider.streamClient,
105+
provider.networkConfig.BaseURL+"/v1/chat/completions",
106+
request,
107+
map[string]string{"Authorization": "Bearer " + key.Value},
108+
provider.networkConfig.ExtraHeaders,
109+
provider.sendBackRawResponse,
110+
schemas.Mistral,
111+
postHookRunner,
112+
provider.logger,
113+
)
114+
}
115+
96116
func (provider *MistralProvider) Responses(ctx context.Context, key schemas.Key, request *schemas.BifrostResponsesRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
97117
response, err := provider.ChatCompletion(ctx, key, request.ToChatRequest())
98118
if err != nil {
@@ -107,6 +127,15 @@ func (provider *MistralProvider) Responses(ctx context.Context, key schemas.Key,
107127
return response, nil
108128
}
109129

130+
func (provider *MistralProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
131+
return provider.ChatCompletionStream(
132+
ctx,
133+
getResponsesChunkConverterCombinedPostHookRunner(postHookRunner),
134+
key,
135+
request.ToChatRequest(),
136+
)
137+
}
138+
110139
// Embedding generates embeddings for the given input text(s) using the Mistral API.
111140
// Supports Mistral's embedding models and returns a BifrostResponse containing the embedding(s).
112141
func (provider *MistralProvider) Embedding(ctx context.Context, key schemas.Key, request *schemas.BifrostEmbeddingRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
@@ -124,26 +153,6 @@ func (provider *MistralProvider) Embedding(ctx context.Context, key schemas.Key,
124153
)
125154
}
126155

127-
// ChatCompletionStream performs a streaming chat completion request to the Mistral API.
128-
// It supports real-time streaming of responses using Server-Sent Events (SSE).
129-
// Uses Mistral's OpenAI-compatible streaming format.
130-
// Returns a channel containing BifrostResponse objects representing the stream or an error if the request fails.
131-
func (provider *MistralProvider) ChatCompletionStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostChatRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
132-
// Use shared OpenAI-compatible streaming logic
133-
return handleOpenAIStreaming(
134-
ctx,
135-
provider.streamClient,
136-
provider.networkConfig.BaseURL+"/v1/chat/completions",
137-
request,
138-
map[string]string{"Authorization": "Bearer " + key.Value},
139-
provider.networkConfig.ExtraHeaders,
140-
provider.sendBackRawResponse,
141-
schemas.Mistral,
142-
postHookRunner,
143-
provider.logger,
144-
)
145-
}
146-
147156
func (provider *MistralProvider) Speech(ctx context.Context, key schemas.Key, request *schemas.BifrostSpeechRequest) (*schemas.BifrostResponse, *schemas.BifrostError) {
148157
return nil, newUnsupportedOperationError("speech", "mistral")
149158
}
@@ -159,7 +168,3 @@ func (provider *MistralProvider) Transcription(ctx context.Context, key schemas.
159168
func (provider *MistralProvider) TranscriptionStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostTranscriptionRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
160169
return nil, newUnsupportedOperationError("transcription stream", "mistral")
161170
}
162-
163-
func (provider *MistralProvider) ResponsesStream(ctx context.Context, postHookRunner schemas.PostHookRunner, key schemas.Key, request *schemas.BifrostResponsesRequest) (chan *schemas.BifrostStream, *schemas.BifrostError) {
164-
return nil, newUnsupportedOperationError("responses stream", "mistral")
165-
}

0 commit comments

Comments
 (0)