Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions opentelemetry/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ dependencies {
libraries.auto.value.annotations

testImplementation project(':grpc-testing'),
project(':grpc-testing-proto'),
project(':grpc-inprocess'),
project(':grpc-stub'),
project(':grpc-protobuf'),
testFixtures(project(':grpc-core')),
testFixtures(project(':grpc-api')),
libraries.opentelemetry.sdk.testing,
libraries.assertj.core // opentelemetry.sdk.testing uses compileOnly for assertj
libraries.protobuf.java

annotationProcessor libraries.auto.value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,14 @@ public void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
* @param serverBuilder the server builder to configure
*/
public void configureServerBuilder(ServerBuilder<?> serverBuilder) {
serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory());
/* To ensure baggage propagation to metrics, we need the tracing
tracers to be initialised before metrics */
if (ENABLE_OTEL_TRACING) {
serverBuilder.addStreamTracerFactory(
openTelemetryTracingModule.getServerTracerFactory());
serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor());
}
serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
Expand All @@ -44,7 +45,9 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StreamTracer;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -94,8 +97,8 @@ final class OpenTelemetryMetricsModule {
private final ImmutableList<OpenTelemetryPlugin> plugins;

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource, Collection<String> optionalLabels,
List<OpenTelemetryPlugin> plugins) {
OpenTelemetryMetricsResource resource,
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
Expand Down Expand Up @@ -128,6 +131,14 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
return isGeneratedMethod ? fullMethodName : "other";
}

private static Context otelContextWithBaggage() {
Baggage baggage = BAGGAGE_KEY.get(io.grpc.Context.current());
if (baggage == null) {
return Context.current();
}
return Context.current().with(baggage);
}

private static final class ClientTracer extends ClientStreamTracer {
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
Expand Down Expand Up @@ -243,6 +254,7 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
Context otelContext = otelContextWithBaggage();
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
Expand All @@ -268,15 +280,15 @@ void recordFinishedAttempt() {

if (module.resource.clientAttemptDurationCounter() != null ) {
module.resource.clientAttemptDurationCounter()
.record(attemptNanos * SECONDS_PER_NANO, attribute);
.record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
}
if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
module.resource.clientTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attribute);
.record(outboundWireSize, attribute, otelContext);
}
if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attribute);
.record(inboundWireSize, attribute, otelContext);
}
}
}
Expand Down Expand Up @@ -408,6 +420,7 @@ void callEnded(Status status) {
}

void recordFinishedCall() {
Context otelContext = otelContextWithBaggage();
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
Expand All @@ -429,15 +442,17 @@ void recordFinishedCall() {
callLatencyNanos * SECONDS_PER_NANO,
baseAttributes.toBuilder()
.put(STATUS_KEY, status.getCode().toString())
.build()
.build(),
otelContext
);
}

// Retry counts
if (module.resource.clientCallRetriesCounter() != null) {
long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
if (retriesPerCall > 0) {
module.resource.clientCallRetriesCounter().record(retriesPerCall, baseAttributes);
module.resource.clientCallRetriesCounter()
.record(retriesPerCall, baseAttributes, otelContext);
}
}

Expand All @@ -446,24 +461,25 @@ void recordFinishedCall() {
long hedges = hedgedAttemptsPerCall.get();
if (hedges > 0) {
module.resource.clientCallHedgesCounter()
.record(hedges, baseAttributes);
.record(hedges, baseAttributes, otelContext);
}
}

// Transparent Retry counts
if (module.resource.clientCallTransparentRetriesCounter() != null) {
long transparentRetries = transparentRetriesPerCall.get();
if (transparentRetries > 0) {
module.resource.clientCallTransparentRetriesCounter().record(
transparentRetries, baseAttributes);
module.resource.clientCallTransparentRetriesCounter()
.record(transparentRetries, baseAttributes, otelContext);
}
}

// Retry delay
if (module.resource.clientCallRetryDelayCounter() != null) {
module.resource.clientCallRetryDelayCounter().record(
retryDelayNanos * SECONDS_PER_NANO,
baseAttributes
baseAttributes,
otelContext
);
}
}
Expand Down Expand Up @@ -562,6 +578,7 @@ public void inboundWireSize(long bytes) {
*/
@Override
public void streamClosed(Status status) {
Context otelContext = otelContextWithBaggage();
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
Expand All @@ -584,15 +601,15 @@ public void streamClosed(Status status) {

if (module.resource.serverCallDurationCounter() != null) {
module.resource.serverCallDurationCounter()
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
}
if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
module.resource.serverTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attributes);
.record(outboundWireSize, attributes, otelContext);
}
if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attributes);
.record(inboundWireSize, attributes, otelContext);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
Expand All @@ -38,6 +39,7 @@
import io.grpc.ServerStreamTracer;
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
Expand All @@ -58,6 +60,7 @@ final class OpenTelemetryTracingModule {

@VisibleForTesting
final io.grpc.Context.Key<Span> otelSpan = io.grpc.Context.key("opentelemetry-span-key");

@Nullable
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
@Nullable
Expand Down Expand Up @@ -242,13 +245,15 @@ private final class ServerTracer extends ServerStreamTracer {
private final Span span;
volatile int streamClosed;
private int seqNo;
private Baggage baggage;

ServerTracer(String fullMethodName, @Nullable Span remoteSpan) {
ServerTracer(String fullMethodName, @Nullable Span remoteSpan, Baggage baggage) {
checkNotNull(fullMethodName, "fullMethodName");
this.span =
otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName))
.setParent(remoteSpan == null ? null : Context.current().with(remoteSpan))
.startSpan();
this.baggage = baggage;
}

/**
Expand All @@ -274,7 +279,9 @@ public void streamClosed(io.grpc.Status status) {

@Override
public io.grpc.Context filterContext(io.grpc.Context context) {
return context.withValue(otelSpan, span);
return context
.withValue(otelSpan, span)
.withValue(BAGGAGE_KEY, baggage);
}

@Override
Expand Down Expand Up @@ -314,7 +321,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
if (remoteSpan == Span.getInvalid()) {
remoteSpan = null;
}
return new ServerTracer(fullMethodName, remoteSpan);
Baggage baggage = Baggage.fromContext(context);
return new ServerTracer(fullMethodName, remoteSpan, baggage);
}
}

Expand All @@ -329,7 +337,12 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
+ "tracing must be set.");
return next.startCall(call, headers);
}
Context serverCallContext = Context.current().with(span);
Context serverCallContext = Context.current();
serverCallContext = serverCallContext.with(span);
Baggage baggage = BAGGAGE_KEY.get(io.grpc.Context.current());
if (baggage != null) {
serverCallContext = serverCallContext.with(baggage);
}
try (Scope scope = serverCallContext.makeCurrent()) {
return new ContextServerCallListener<>(next.startCall(call, headers), serverCallContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package io.grpc.opentelemetry.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributeKey;
import java.util.List;

Expand All @@ -42,6 +44,10 @@ public final class OpenTelemetryConstants {
public static final AttributeKey<String> SECURITY_LEVEL_KEY =
AttributeKey.stringKey("grpc.security_level");

@VisibleForTesting
public static final io.grpc.Context.Key<Baggage> BAGGAGE_KEY =
io.grpc.Context.key("opentelemetry-baggage-key");

public static final List<Double> LATENCY_BUCKETS =
ImmutableList.of(
0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d,
Expand Down
Loading