From 8f61af337755de94d91ce14b38897066c398178c Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 30 Sep 2025 22:17:14 +0530 Subject: [PATCH 1/4] baggage propagation --- .../OpenTelemetryTracingModule.java | 8 ++++ .../OpenTelemetryTracingModuleTest.java | 43 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index 8c42a189ac2..997d10e3e09 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -38,6 +38,7 @@ import io.grpc.ServerStreamTracer; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; @@ -58,6 +59,11 @@ final class OpenTelemetryTracingModule { @VisibleForTesting final io.grpc.Context.Key otelSpan = io.grpc.Context.key("opentelemetry-span-key"); + + @VisibleForTesting + final io.grpc.Context.Key baggageKey = + io.grpc.Context.key("opentelemetry-baggage-key"); + @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater; @Nullable @@ -330,6 +336,8 @@ public ServerCall.Listener interceptCall(ServerCall(next.startCall(call, headers), serverCallContext); } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java index bca6be94b9f..389454878dd 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java @@ -59,6 +59,7 @@ import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcServerRule; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.SpanId; @@ -750,6 +751,48 @@ public void onComplete() { } } + @Test + public void serverSpanPropagationInterceptor_propagatesBaggage() { + // 1. Arrange: Setup the module, interceptor, and mocks. + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + ServerInterceptor interceptor = tracingModule.getServerSpanPropagationInterceptor(); + + @SuppressWarnings("unchecked") + ServerCallHandler handler = mock(ServerCallHandler.class); + ServerCall call = new NoopServerCall<>(); + Metadata metadata = new Metadata(); + final AtomicReference capturedBaggage = new AtomicReference<>(); + + // Mock the handler to capture the Baggage from the current context when it's called. + when(handler.startCall(any(), any())).thenAnswer(invocation -> { + capturedBaggage.set(io.opentelemetry.api.baggage.Baggage.current()); + return mockServerCallListener; + }); + + // Create a test Span and Baggage to be propagated. + Span parentSpan = tracerRule.spanBuilder("parent-span").startSpan(); + io.opentelemetry.api.baggage.Baggage testBaggage = + io.opentelemetry.api.baggage.Baggage.builder().put("testKey", "testValue").build(); + + // Attach the Span and Baggage to the gRPC context. + io.grpc.Context grpcContext = io.grpc.Context.current() + .withValue(tracingModule.otelSpan, parentSpan) + .withValue(tracingModule.baggageKey, testBaggage); + + io.grpc.Context previous = grpcContext.attach(); + try { + // 2. Act: Call the interceptor. + interceptor.interceptCall(call, metadata, handler); + } finally { + grpcContext.detach(previous); + } + + // 3. Assert: Verify the handler was called and the correct Baggage was propagated. + verify(handler).startCall(same(call), same(metadata)); + assertEquals(testBaggage, capturedBaggage.get()); + } + @Test public void generateTraceSpanName() { assertEquals( From f5cd36f3ef34d87d292c3f48da3a0374a9bc7cff Mon Sep 17 00:00:00 2001 From: AgraVator Date: Thu, 30 Oct 2025 23:55:14 +0530 Subject: [PATCH 2/4] openTelemetry: propagate baggage --- opentelemetry/build.gradle | 4 + .../grpc/opentelemetry/GrpcOpenTelemetry.java | 4 +- .../OpenTelemetryMetricsModule.java | 45 ++++-- .../OpenTelemetryTracingModule.java | 25 ++-- .../internal/OpenTelemetryConstants.java | 6 + .../OpenTelemetryMetricsModuleTest.java | 132 ++++++++++++++++++ .../OpenTelemetryTracingModuleTest.java | 123 ++++++++++++---- 7 files changed, 289 insertions(+), 50 deletions(-) diff --git a/opentelemetry/build.gradle b/opentelemetry/build.gradle index b729f393e4b..ff517aed7de 100644 --- a/opentelemetry/build.gradle +++ b/opentelemetry/build.gradle @@ -15,11 +15,15 @@ dependencies { libraries.auto.value.annotations testImplementation project(':grpc-testing'), + project(':grpc-testing-proto'), project(':grpc-inprocess'), + project(':grpc-stub'), + project(':grpc-protobuf'), testFixtures(project(':grpc-core')), testFixtures(project(':grpc-api')), libraries.opentelemetry.sdk.testing, libraries.assertj.core // opentelemetry.sdk.testing uses compileOnly for assertj + libraries.protobuf.java annotationProcessor libraries.auto.value diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index e0af0f80ed3..4341b27daa4 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -179,12 +179,14 @@ public void configureChannelBuilder(ManagedChannelBuilder builder) { * @param serverBuilder the server builder to configure */ public void configureServerBuilder(ServerBuilder serverBuilder) { - serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory()); + /* To ensure baggage propagation to metrics, we need the tracing + tracers to be initialised before metrics */ if (ENABLE_OTEL_TRACING) { serverBuilder.addStreamTracerFactory( openTelemetryTracingModule.getServerTracerFactory()); serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor()); } + serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory()); } @VisibleForTesting diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index 1d77f9ee3e4..1c044c20980 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY; @@ -44,7 +45,9 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StreamTracer; +import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -94,8 +97,8 @@ final class OpenTelemetryMetricsModule { private final ImmutableList plugins; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, - OpenTelemetryMetricsResource resource, Collection optionalLabels, - List plugins) { + OpenTelemetryMetricsResource resource, + Collection optionalLabels, List plugins) { this.resource = checkNotNull(resource, "resource"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); @@ -128,6 +131,14 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) return isGeneratedMethod ? fullMethodName : "other"; } + private static Context otelContextWithBaggage() { + Baggage baggage = BAGGAGE_KEY.get(io.grpc.Context.current()); + if (baggage == null) { + return Context.current(); + } + return Context.current().with(baggage); + } + private static final class ClientTracer extends ClientStreamTracer { @Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater; @Nullable private static final AtomicLongFieldUpdater inboundWireSizeUpdater; @@ -243,6 +254,7 @@ public void streamClosed(Status status) { } void recordFinishedAttempt() { + Context otelContext = otelContextWithBaggage(); AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, fullMethodName) .put(TARGET_KEY, target) @@ -268,15 +280,15 @@ void recordFinishedAttempt() { if (module.resource.clientAttemptDurationCounter() != null ) { module.resource.clientAttemptDurationCounter() - .record(attemptNanos * SECONDS_PER_NANO, attribute); + .record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext); } if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) { module.resource.clientTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attribute); + .record(outboundWireSize, attribute, otelContext); } if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.clientTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attribute); + .record(inboundWireSize, attribute, otelContext); } } } @@ -408,6 +420,7 @@ void callEnded(Status status) { } void recordFinishedCall() { + Context otelContext = otelContextWithBaggage(); if (attemptsPerCall.get() == 0) { ClientTracer tracer = newClientTracer(null); tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS); @@ -429,7 +442,8 @@ void recordFinishedCall() { callLatencyNanos * SECONDS_PER_NANO, baseAttributes.toBuilder() .put(STATUS_KEY, status.getCode().toString()) - .build() + .build(), + otelContext ); } @@ -437,7 +451,8 @@ void recordFinishedCall() { if (module.resource.clientCallRetriesCounter() != null) { long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0); if (retriesPerCall > 0) { - module.resource.clientCallRetriesCounter().record(retriesPerCall, baseAttributes); + module.resource.clientCallRetriesCounter() + .record(retriesPerCall, baseAttributes, otelContext); } } @@ -446,7 +461,7 @@ void recordFinishedCall() { long hedges = hedgedAttemptsPerCall.get(); if (hedges > 0) { module.resource.clientCallHedgesCounter() - .record(hedges, baseAttributes); + .record(hedges, baseAttributes, otelContext); } } @@ -454,8 +469,8 @@ void recordFinishedCall() { if (module.resource.clientCallTransparentRetriesCounter() != null) { long transparentRetries = transparentRetriesPerCall.get(); if (transparentRetries > 0) { - module.resource.clientCallTransparentRetriesCounter().record( - transparentRetries, baseAttributes); + module.resource.clientCallTransparentRetriesCounter() + .record(transparentRetries, baseAttributes, otelContext); } } @@ -463,7 +478,8 @@ void recordFinishedCall() { if (module.resource.clientCallRetryDelayCounter() != null) { module.resource.clientCallRetryDelayCounter().record( retryDelayNanos * SECONDS_PER_NANO, - baseAttributes + baseAttributes, + otelContext ); } } @@ -562,6 +578,7 @@ public void inboundWireSize(long bytes) { */ @Override public void streamClosed(Status status) { + Context otelContext = otelContextWithBaggage(); if (streamClosedUpdater != null) { if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; @@ -584,15 +601,15 @@ public void streamClosed(Status status) { if (module.resource.serverCallDurationCounter() != null) { module.resource.serverCallDurationCounter() - .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes); + .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext); } if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) { module.resource.serverTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attributes); + .record(outboundWireSize, attributes, otelContext); } if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.serverTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attributes); + .record(inboundWireSize, attributes, otelContext); } } } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index 997d10e3e09..fa0dad6b320 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; @@ -60,10 +61,6 @@ final class OpenTelemetryTracingModule { @VisibleForTesting final io.grpc.Context.Key otelSpan = io.grpc.Context.key("opentelemetry-span-key"); - @VisibleForTesting - final io.grpc.Context.Key baggageKey = - io.grpc.Context.key("opentelemetry-baggage-key"); - @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater; @Nullable @@ -248,13 +245,15 @@ private final class ServerTracer extends ServerStreamTracer { private final Span span; volatile int streamClosed; private int seqNo; + private Baggage baggage; - ServerTracer(String fullMethodName, @Nullable Span remoteSpan) { + ServerTracer(String fullMethodName, @Nullable Span remoteSpan, Baggage baggage) { checkNotNull(fullMethodName, "fullMethodName"); this.span = otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName)) .setParent(remoteSpan == null ? null : Context.current().with(remoteSpan)) .startSpan(); + this.baggage = baggage; } /** @@ -280,7 +279,9 @@ public void streamClosed(io.grpc.Status status) { @Override public io.grpc.Context filterContext(io.grpc.Context context) { - return context.withValue(otelSpan, span); + return context + .withValue(otelSpan, span) + .withValue(BAGGAGE_KEY, baggage); } @Override @@ -320,7 +321,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata if (remoteSpan == Span.getInvalid()) { remoteSpan = null; } - return new ServerTracer(fullMethodName, remoteSpan); + Baggage baggage = Baggage.fromContext(context); + return new ServerTracer(fullMethodName, remoteSpan, baggage); } } @@ -335,9 +337,12 @@ public ServerCall.Listener interceptCall(ServerCall(next.startCall(call, headers), serverCallContext); } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java index ff2b88acbfd..2c7123198c4 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java @@ -16,7 +16,9 @@ package io.grpc.opentelemetry.internal; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributeKey; import java.util.List; @@ -42,6 +44,10 @@ public final class OpenTelemetryConstants { public static final AttributeKey SECURITY_LEVEL_KEY = AttributeKey.stringKey("grpc.security_level"); + @VisibleForTesting + public static final io.grpc.Context.Key BAGGAGE_KEY = + io.grpc.Context.key("opentelemetry-baggage-key"); + public static final List LATENCY_BUCKETS = ImmutableList.of( 0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d, diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 6d1234497d6..58759294fca 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.Mockito.verify; import com.google.common.collect.ImmutableMap; @@ -47,11 +48,21 @@ import io.grpc.ServerStreamTracer.ServerCallInfo; import io.grpc.Status; import io.grpc.Status.Code; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.FakeClock; import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcServerRule; +import io.grpc.testing.protobuf.SimpleRequest; +import io.grpc.testing.protobuf.SimpleResponse; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.MetricData; @@ -65,6 +76,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -161,6 +173,14 @@ public String parse(InputStream stream) { private ServerCall.Listener mockServerCallListener; @Captor private ArgumentCaptor statusCaptor; + @Mock + private DoubleHistogram mockServerCallDurationHistogram; + @Captor + private ArgumentCaptor contextCaptor; + private io.grpc.Server server; + private io.grpc.ManagedChannel channel; + private OpenTelemetryMetricsResource resource; + private final String serverName = "E2ETestServer-" + Math.random(); private final FakeClock fakeClock = new FakeClock(); private final MethodDescriptor method = @@ -180,6 +200,19 @@ public String parse(InputStream stream) { public void setUp() throws Exception { testMeter = openTelemetryTesting.getOpenTelemetry() .getMeter(OpenTelemetryConstants.INSTRUMENTATION_SCOPE); + resource = OpenTelemetryMetricsResource.builder() + .serverCallDurationCounter(mockServerCallDurationHistogram) + .build(); + } + + @After + public void tearDown() { + if (channel != null) { + channel.shutdownNow(); + } + if (server != null) { + server.shutdownNow(); + } } @Test @@ -1595,6 +1628,45 @@ public void serverBasicMetrics() { } + @Test + public void serverBaggagePropagationToMetrics() { + // 1. Create module and tracer factory using the mock resource + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); + ServerStreamTracer tracer = + tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); + + // 2. Define the test baggage and gRPC context + Baggage testBaggage = Baggage.builder() + .put("user-id", "67") + .build(); + + // This simulates the context that the Tracing module would have created + io.grpc.Context grpcContext = io.grpc.Context.current() + .withValue(OpenTelemetryConstants.BAGGAGE_KEY, testBaggage); + + // 3. Attach the gRPC context, trigger metric recording, and detach + io.grpc.Context previousContext = grpcContext.attach(); + try { + tracer.streamClosed(Status.OK); + } finally { + grpcContext.detach(previousContext); + } + + // 4. Verify the record call and capture the OTel Context + verify(mockServerCallDurationHistogram).record( + anyDouble(), + any(io.opentelemetry.api.common.Attributes.class), + contextCaptor.capture()); + + // 5. Assert on the captured OTel Context + io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); + Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); + + assertEquals("67", capturedBaggage.getEntryValue("user-id")); + } + private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource) { return new OpenTelemetryMetricsModule( @@ -1631,4 +1703,64 @@ public String getAuthority() { return authority; } } + + @Test + public void serverBaggagePropagation_EndToEnd() throws Exception { + // 1. Create Both Modules + OpenTelemetry otel = openTelemetryTesting.getOpenTelemetry(); + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(otel); + OpenTelemetryMetricsModule metricsModule = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + + // 2. Create Server with *both* tracer factories + server = InProcessServerBuilder.forName(serverName) + .addService(new SimpleServiceImpl()) // <-- Uses the helper class below + .addStreamTracerFactory(tracingModule.getServerTracerFactory()) + .addStreamTracerFactory(metricsModule.getServerTracerFactory()) + .build() + .start(); + + // 3. Create Client Channel + channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + // 4. Manually create baggage headers + Metadata headers = new Metadata(); + headers.put(Metadata.Key.of("baggage", Metadata.ASCII_STRING_MARSHALLER), + "choice=red_pill_or_blue_pill"); + + // 5. Make the gRPC call with these headers + ClientInterceptor headerAttachingInterceptor = + MetadataUtils.newAttachHeadersInterceptor(headers); + + // Now, create the stub and apply that interceptor + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub(channel) + .withInterceptors(headerAttachingInterceptor); + + // Use the imported SimpleRequest + stub.unaryRpc(SimpleRequest.getDefaultInstance()); + + // 6. Verify the Mock + verify(mockServerCallDurationHistogram).record( + anyDouble(), + any(io.opentelemetry.api.common.Attributes.class), + contextCaptor.capture()); + + // 7. Assert on the captured Context + io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); + Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); + + assertEquals("red_pill_or_blue_pill", capturedBaggage.getEntryValue("choice")); + } + + /** + * A simple service implementation for the E2E test. + */ + private static class SimpleServiceImpl extends SimpleServiceGrpc.SimpleServiceImplBase { + @Override + public void unaryRpc(SimpleRequest request, StreamObserver responseObserver) { + responseObserver.onNext(SimpleResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java index 389454878dd..e6759aadb1e 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java @@ -17,12 +17,15 @@ package io.grpc.opentelemetry; import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -62,9 +65,12 @@ import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanId; import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.TracerBuilder; import io.opentelemetry.api.trace.TracerProvider; @@ -751,46 +757,113 @@ public void onComplete() { } } + /** + * Tests that baggage from the initial context is propagated + * to the context active during the next handler's execution. + */ @Test - public void serverSpanPropagationInterceptor_propagatesBaggage() { - // 1. Arrange: Setup the module, interceptor, and mocks. + public void testBaggageIsPropagatedToHandlerContext() { + // 1. ARRANGE OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( openTelemetryRule.getOpenTelemetry()); ServerInterceptor interceptor = tracingModule.getServerSpanPropagationInterceptor(); + // Create mocks for the gRPC call chain @SuppressWarnings("unchecked") - ServerCallHandler handler = mock(ServerCallHandler.class); - ServerCall call = new NoopServerCall<>(); - Metadata metadata = new Metadata(); + ServerCallHandler mockHandler = mock(ServerCallHandler.class); + @SuppressWarnings("unchecked") + ServerCall.Listener mockListener = mock(ServerCall.Listener.class); + ServerCall mockCall = new NoopServerCall<>(); + Metadata mockHeaders = new Metadata(); + + // Create a non-null Span (required to pass the first 'if' check) + Span testSpan = Span.wrap( + SpanContext.create("time-period", "star-wars", + TraceFlags.getSampled(), TraceState.getDefault())); + + // Create the test Baggage + Baggage testBaggage = Baggage.builder().put("best-bot", "R2D2").build(); + + // Create the initial gRPC context that the interceptor will read from + io.grpc.Context initialGrpcContext = io.grpc.Context.current() + .withValue(tracingModule.otelSpan, testSpan) + .withValue(BAGGAGE_KEY, testBaggage); + + // This AtomicReference will capture the Baggage from *within* the handler final AtomicReference capturedBaggage = new AtomicReference<>(); - // Mock the handler to capture the Baggage from the current context when it's called. - when(handler.startCall(any(), any())).thenAnswer(invocation -> { - capturedBaggage.set(io.opentelemetry.api.baggage.Baggage.current()); - return mockServerCallListener; - }); + // Stub the handler to capture the *current* context when it's called + doAnswer(invocation -> { + // Baggage.current() gets baggage from io.opentelemetry.context.Context.current() + capturedBaggage.set(Baggage.current()); + return mockListener; + }).when(mockHandler).startCall(any(), any()); - // Create a test Span and Baggage to be propagated. - Span parentSpan = tracerRule.spanBuilder("parent-span").startSpan(); - io.opentelemetry.api.baggage.Baggage testBaggage = - io.opentelemetry.api.baggage.Baggage.builder().put("testKey", "testValue").build(); + // 2. ACT + // Run the interceptCall method within the prepared context + io.grpc.Context previous = initialGrpcContext.attach(); + try { + interceptor.interceptCall(mockCall, mockHeaders, mockHandler); + } finally { + initialGrpcContext.detach(previous); + } - // Attach the Span and Baggage to the gRPC context. - io.grpc.Context grpcContext = io.grpc.Context.current() - .withValue(tracingModule.otelSpan, parentSpan) - .withValue(tracingModule.baggageKey, testBaggage); + // 3. ASSERT + // Verify the next handler was called + verify(mockHandler).startCall(same(mockCall), same(mockHeaders)); - io.grpc.Context previous = grpcContext.attach(); + // Check the baggage that was captured + assertNotNull("Baggage should not be null in handler context", capturedBaggage.get()); + assertEquals("Baggage was not correctly propagated to the handler's context", + "R2D2", capturedBaggage.get().getEntryValue("best-bot")); + } + + /** + * Tests that the interceptor proceeds correctly if baggage is null or empty. + */ + @Test + public void testNullBaggageIsHandledGracefully() { + // 1. ARRANGE + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + ServerInterceptor interceptor = tracingModule.getServerSpanPropagationInterceptor(); + + @SuppressWarnings("unchecked") + ServerCallHandler mockHandler = mock(ServerCallHandler.class); + @SuppressWarnings("unchecked") + ServerCall.Listener mockListener = mock(ServerCall.Listener.class); + ServerCall mockCall = new NoopServerCall<>(); + Metadata mockHeaders = new Metadata(); + + Span testSpan = Span.getInvalid(); // A non-null span + + // No baggage is set in the context + io.grpc.Context initialGrpcContext = io.grpc.Context.current() + .withValue(tracingModule.otelSpan, testSpan); + + final AtomicReference capturedBaggage = new AtomicReference<>(); + + // Stub the handler to capture the *current* context when it's called + doAnswer(invocation -> { + // Baggage.current() gets baggage from io.opentelemetry.context.Context.current() + capturedBaggage.set(Baggage.current()); + return mockListener; + }).when(mockHandler).startCall(any(), any()); + + // 2. ACT + io.grpc.Context previous = initialGrpcContext.attach(); try { - // 2. Act: Call the interceptor. - interceptor.interceptCall(call, metadata, handler); + interceptor.interceptCall(mockCall, mockHeaders, mockHandler); } finally { - grpcContext.detach(previous); + initialGrpcContext.detach(previous); } - // 3. Assert: Verify the handler was called and the correct Baggage was propagated. - verify(handler).startCall(same(call), same(metadata)); - assertEquals(testBaggage, capturedBaggage.get()); + // 3. ASSERT + verify(mockHandler).startCall(same(mockCall), same(mockHeaders)); + + // Baggage should be null in the downstream context + assertEquals("Baggage should be empty when not provided", + Baggage.empty(), capturedBaggage.get()); } @Test From 154e7c1a3ad28e746c7e4ad3cd799bf31b1b9ebc Mon Sep 17 00:00:00 2001 From: AgraVator Date: Fri, 31 Oct 2025 13:44:45 +0530 Subject: [PATCH 3/4] suggested changes --- opentelemetry/build.gradle | 3 --- .../java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java | 2 +- .../java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/opentelemetry/build.gradle b/opentelemetry/build.gradle index ff517aed7de..c856ad8dcb9 100644 --- a/opentelemetry/build.gradle +++ b/opentelemetry/build.gradle @@ -17,13 +17,10 @@ dependencies { testImplementation project(':grpc-testing'), project(':grpc-testing-proto'), project(':grpc-inprocess'), - project(':grpc-stub'), - project(':grpc-protobuf'), testFixtures(project(':grpc-core')), testFixtures(project(':grpc-api')), libraries.opentelemetry.sdk.testing, libraries.assertj.core // opentelemetry.sdk.testing uses compileOnly for assertj - libraries.protobuf.java annotationProcessor libraries.auto.value diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index 1c044c20980..3e5137e0034 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -132,7 +132,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) } private static Context otelContextWithBaggage() { - Baggage baggage = BAGGAGE_KEY.get(io.grpc.Context.current()); + Baggage baggage = BAGGAGE_KEY.get(); if (baggage == null) { return Context.current(); } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index fa0dad6b320..75648644a72 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -339,7 +339,7 @@ public ServerCall.Listener interceptCall(ServerCall Date: Mon, 3 Nov 2025 21:30:24 +0300 Subject: [PATCH 4/4] add log for no baggage in server interceptCall --- .../java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index 75648644a72..ac3f5e828e9 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -342,6 +342,9 @@ public ServerCall.Listener interceptCall(ServerCall(next.startCall(call, headers), serverCallContext);