Skip to content

Commit 2161710

Browse files
authored
Merge branch 'main' into fix-Display-all-AI-SDK-LLM-message
2 parents d700c19 + 82dbdfc commit 2161710

File tree

51 files changed

+5259
-296
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+5259
-296
lines changed

.github/workflows/lib-integration-tests-runner.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ on:
1515
- all
1616
- openai
1717
- langchain
18+
- langchain_legacy
1819
- llama_index
1920
- anthropic
2021
- aisuite
@@ -70,6 +71,12 @@ jobs:
7071
uses: ./.github/workflows/lib-langchain-tests.yml
7172
secrets: inherit
7273

74+
langchain_legacy_tests:
75+
needs: [init_environment]
76+
if: contains(fromJSON('["langchain_legacy", "all"]'), needs.init_environment.outputs.LIBS)
77+
uses: ./.github/workflows/lib-langchain-legacy-tests.yml
78+
secrets: inherit
79+
7380
llama_index_tests:
7481
needs: [init_environment]
7582
if: contains(fromJSON('["llama_index", "all"]'), needs.init_environment.outputs.LIBS)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Workflow to run Langchain tests
2+
#
3+
# Please read inputs to provide correct values.
4+
#
5+
name: SDK Lib Langchain Tests for langchain < 1.0.0
6+
run-name: "SDK Lib Langchain Tests ${{ github.ref_name }} by @${{ github.actor }}"
7+
permissions:
8+
contents: read
9+
env:
10+
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
11+
OPENAI_ORG_ID: ${{ secrets.OPENAI_ORG_ID }}
12+
GCP_CREDENTIALS_JSON: ${{ secrets.GCP_CREDENTIALS_JSON }}
13+
GOOGLE_CLOUD_LOCATION: us-east1
14+
GOOGLE_CLOUD_PROJECT: opik-sdk-tests
15+
GOOGLE_API_KEY: ${{ secrets.GOOGLE_API_KEY }}
16+
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
17+
AWS_DEFAULT_REGION: us-east-1
18+
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
19+
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
20+
OPIK_ENABLE_LITELLM_MODELS_MONITORING: False
21+
OPIK_SENTRY_ENABLE: False
22+
GROQ_API_KEY: ${{ secrets.GROQ_API_KEY }}
23+
on:
24+
workflow_call:
25+
26+
jobs:
27+
tests:
28+
name: Langchain Python ${{matrix.python_version}}
29+
runs-on: ubuntu-latest
30+
timeout-minutes: 30
31+
defaults:
32+
run:
33+
working-directory: sdks/python
34+
35+
strategy:
36+
fail-fast: true
37+
matrix:
38+
python_version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
39+
40+
steps:
41+
- name: Check out code
42+
uses: actions/checkout@v4
43+
44+
- name: Setup Python ${{matrix.python_version}}
45+
uses: actions/setup-python@v5
46+
with:
47+
python-version: ${{matrix.python_version}}
48+
49+
- name: Install opik
50+
run: pip install .
51+
52+
- name: Install test tools
53+
run: |
54+
cd ./tests
55+
pip install --no-cache-dir --disable-pip-version-check -r test_requirements.txt
56+
57+
- name: Install lib
58+
run: |
59+
cd ./tests
60+
pip install --no-cache-dir --disable-pip-version-check -r library_integration/langchain/legacy_requirements.txt
61+
62+
- name: change aws role
63+
uses: aws-actions/configure-aws-credentials@v4
64+
with:
65+
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
66+
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
67+
role-to-assume: ${{ vars.BEDROCK_ROLE }}
68+
aws-region: us-east-1
69+
role-chaining: true
70+
role-skip-session-tagging: true
71+
72+
- name: Run tests
73+
run: |
74+
cd ./tests/library_integration/langchain/
75+
python -m pytest -vv .

.github/workflows/lib-langchain-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
strategy:
3636
fail-fast: true
3737
matrix:
38-
python_version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
38+
python_version: ["3.10", "3.11", "3.12", "3.13"]
3939

4040
steps:
4141
- name: Check out code

apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/BaseRedisSubscriber.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.comet.opik.infrastructure.StreamConfiguration;
44
import io.dropwizard.lifecycle.Managed;
55
import io.opentelemetry.api.GlobalOpenTelemetry;
6+
import io.opentelemetry.api.metrics.LongCounter;
67
import io.opentelemetry.api.metrics.LongHistogram;
78
import io.opentelemetry.api.metrics.Meter;
89
import lombok.NonNull;
@@ -51,6 +52,7 @@ public abstract class BaseRedisSubscriber<M> implements Managed {
5152
protected final LongHistogram messageProcessingTime;
5253
protected final LongHistogram messageQueueDelay;
5354
protected final String payloadField;
55+
protected final LongCounter backpressureDropCounter;
5456

5557
protected BaseRedisSubscriber(@NonNull StreamConfiguration config, @NonNull RedissonReactiveClient redisson,
5658
@NonNull String metricsBaseName, @NonNull String payloadField) {
@@ -78,6 +80,11 @@ protected BaseRedisSubscriber(@NonNull StreamConfiguration config, @NonNull Redi
7880
.setUnit("ms")
7981
.ofLongs()
8082
.build();
83+
84+
this.backpressureDropCounter = meter
85+
.counterBuilder("%s_backpressure_drops_total".formatted(metricNamespace))
86+
.setDescription("Total number of events dropped due to backpressure")
87+
.build();
8188
}
8289

8390
protected abstract String getMetricNamespace();
@@ -170,7 +177,13 @@ private void enforceConsumerGroup(RStreamReactive<String, M> stream) {
170177

171178
private void setupStreamListener(RStreamReactive<String, M> stream) {
172179
this.streamSubscription = Flux.interval(config.getPoolingInterval().toJavaDuration())
173-
.onBackpressureDrop()
180+
.onBackpressureDrop(dropped -> {
181+
log.warn(
182+
"Backpressure drop detected: Unable to keep up with polling intervals. Polling interval tick dropped (sequence number: '{}').",
183+
dropped);
184+
// Record metric for dropped polling intervals
185+
backpressureDropCounter.add(1);
186+
})
174187
.flatMap(i -> stream.readGroup(config.getConsumerGroupName(), consumerId, redisReadConfig))
175188
.onErrorContinue((throwable, object) -> log.error("Error reading from Redis stream", throwable))
176189
.flatMapIterable(Map::entrySet)

apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/WebhookSubscriber.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
import com.comet.opik.api.events.webhooks.WebhookEvent;
1010
import com.comet.opik.api.resources.v1.events.webhooks.WebhookHttpClient;
1111
import com.comet.opik.infrastructure.WebhookConfig;
12+
import com.comet.opik.infrastructure.auth.RequestContext;
1213
import com.comet.opik.utils.JsonUtils;
1314
import com.fasterxml.jackson.core.type.TypeReference;
1415
import io.opentelemetry.api.common.Attributes;
16+
import io.opentelemetry.api.metrics.LongCounter;
1517
import jakarta.inject.Inject;
1618
import lombok.NonNull;
1719
import lombok.extern.slf4j.Slf4j;
@@ -24,8 +26,6 @@
2426
import java.util.List;
2527
import java.util.Map;
2628

27-
import static com.comet.opik.infrastructure.auth.RequestContext.WORKSPACE_ID;
28-
2929
/**
3030
* Service responsible for sending webhook events to external HTTP endpoints.
3131
* This service ONLY sends webhooks and does not handle aggregation or debouncing.
@@ -62,13 +62,21 @@ public class WebhookSubscriber extends BaseRedisSubscriber<WebhookEvent<?>> {
6262
private static final TypeReference<List<Guardrail>> LIST_GUARDRAIL_TYPE_REFERENCE = new TypeReference<>() {
6363
};
6464

65+
private final LongCounter webhookEventProcessedCounter;
66+
6567
@Inject
6668
public WebhookSubscriber(@NonNull WebhookConfig webhookConfig,
6769
@NonNull RedissonReactiveClient redisson,
6870
@NonNull WebhookHttpClient webhookHttpClient) {
6971
super(webhookConfig, redisson, METRICS_BASE_NAME, WebhookConfig.PAYLOAD_FIELD);
7072
this.webhookHttpClient = webhookHttpClient;
7173
this.webhookConfig = webhookConfig;
74+
75+
// Pre-build metrics during initialization
76+
this.webhookEventProcessedCounter = meter
77+
.counterBuilder("opik_webhook_events_processed_total")
78+
.setDescription("Total number of webhook events processed")
79+
.build();
7280
}
7381

7482
@Override
@@ -81,28 +89,33 @@ protected Mono<Void> processEvent(@NonNull WebhookEvent<?> event) {
8189
log.debug("Processing webhook event: id='{}', type='{}', url='{}'",
8290
event.getId(), event.getEventType(), event.getUrl());
8391

84-
// Record metrics
85-
var attributes = Attributes.builder()
86-
.put("event_type", event.getEventType().getValue())
87-
.put("workspace_id", event.getWorkspaceId())
88-
.build();
89-
90-
return Mono.defer(() -> validateEvent(event))
91-
.then(Mono.defer(() -> webhookHttpClient
92-
.sendWebhook(deserializeEventPayload((WebhookEvent<Map<String, Object>>) event))))
93-
.contextWrite(ctx -> ctx.put(WORKSPACE_ID, event.getWorkspaceId()))
92+
return validateEvent(event)
93+
.then(Mono.fromCallable(() -> Attributes.builder()
94+
.put("event_type", event.getEventType().getValue())
95+
.put("workspace_id", event.getWorkspaceId())
96+
.build())
97+
.flatMap(attributes -> {
98+
@SuppressWarnings("unchecked")
99+
WebhookEvent<Map<String, Object>> webhookEvent = (WebhookEvent<Map<String, Object>>) event;
100+
101+
return Mono.fromCallable(() -> deserializeEventPayload(webhookEvent))
102+
.subscribeOn(Schedulers.boundedElastic())
103+
.flatMap(webhookHttpClient::sendWebhook)
104+
.doOnSuccess(unused -> {
105+
log.info("Successfully sent webhook: id='{}', type='{}', url='{}'",
106+
event.getId(), event.getEventType(), event.getUrl());
107+
108+
// Record success metrics
109+
webhookEventProcessedCounter.add(1,
110+
attributes.toBuilder().put("status", "success").build());
111+
})
112+
.onErrorResume(
113+
throwable -> handlePermanentFailure(event, throwable).then(Mono.empty()));
114+
}))
115+
.onErrorResume(
116+
throwable -> handlePermanentFailure(event, throwable).then(Mono.empty()))
117+
.contextWrite(ctx -> ctx.put(RequestContext.WORKSPACE_ID, event.getWorkspaceId()))
94118
.subscribeOn(Schedulers.boundedElastic())
95-
.doOnSuccess(unused -> {
96-
log.info("Successfully sent webhook: id='{}', type='{}', url='{}'",
97-
event.getId(), event.getEventType(), event.getUrl());
98-
99-
// Record success metrics
100-
meter.counterBuilder("opik_webhook_events_processed_total")
101-
.setDescription("Total number of webhook events processed")
102-
.build()
103-
.add(1, attributes.toBuilder().put("status", "success").build());
104-
})
105-
.onErrorResume(throwable -> handlePermanentFailure(event, throwable).then(Mono.empty()))
106119
.then();
107120
}
108121

@@ -155,6 +168,7 @@ private Mono<Void> validateEvent(@NonNull WebhookEvent<?> event) {
155168
public static WebhookEvent<Map<String, Object>> deserializeEventPayload(
156169
@NonNull WebhookEvent<Map<String, Object>> event) {
157170
Map<String, Object> payload = event.getPayload();
171+
@SuppressWarnings("unchecked")
158172
List<String> metadatas = (List<String>) payload.getOrDefault("metadata", List.of());
159173

160174
var deserializeMetadata = metadatas.stream()

0 commit comments

Comments
 (0)