Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.internal.BulkGetFuture;

public class BulkGetCompletionListener extends CompletionListener<BulkGetFuture<?>>
Expand All @@ -24,13 +26,29 @@ private BulkGetCompletionListener(Context parentContext, SpymemcachedRequest req
@Nullable
public static BulkGetCompletionListener create(
Context parentContext, MemcachedConnection connection, String methodName) {
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName);
MemcachedNode handlingNode = getNodeFromConnection(connection);
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName, handlingNode);
if (!instrumenter().shouldStart(parentContext, request)) {
return null;
}
return new BulkGetCompletionListener(parentContext, request);
}

@Nullable
private static MemcachedNode getNodeFromConnection(MemcachedConnection connection) {
try {
Collection<MemcachedNode> allNodes = connection.getLocator().getAll();
if (allNodes.size() == 1) {
return allNodes.iterator().next();
}
// For multiple nodes, return null - bulk operations span multiple servers
// and we cannot accurately attribute to a single server
return null;
} catch (RuntimeException e) {
return null;
}
}

@Override
public void onComplete(BulkGetFuture<?> future) {
closeAsyncSpan(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@

package io.opentelemetry.javaagent.instrumentation.spymemcached;

import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.FUTURE_OPERATION;
import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.instrumenter;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.ops.Operation;

public class GetCompletionListener extends CompletionListener<GetFuture<?>>
implements net.spy.memcached.internal.GetCompletionListener {
Expand All @@ -23,14 +26,27 @@ private GetCompletionListener(Context parentContext, SpymemcachedRequest request

@Nullable
public static GetCompletionListener create(
Context parentContext, MemcachedConnection connection, String methodName) {
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName);
Context parentContext,
MemcachedConnection connection,
String methodName,
GetFuture<?> future) {
MemcachedNode handlingNode = extractHandlingNodeFromFuture(future);
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName, handlingNode);
if (!instrumenter().shouldStart(parentContext, request)) {
return null;
}
return new GetCompletionListener(parentContext, request);
}

@Nullable
private static MemcachedNode extractHandlingNodeFromFuture(GetFuture<?> future) {
Operation operation = FUTURE_OPERATION.get(future);
if (operation != null) {
return operation.getHandlingNode();
}
return null;
}

@Override
public void onComplete(GetFuture<?> future) {
closeAsyncSpan(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static void methodExit(
if (future != null) {
OperationCompletionListener listener =
OperationCompletionListener.create(
currentContext(), client.getConnection(), methodName);
currentContext(), client.getConnection(), methodName, future);
if (listener != null) {
future.addListener(listener);
}
Expand Down Expand Up @@ -106,7 +106,8 @@ public static void methodExit(

if (future != null) {
GetCompletionListener listener =
GetCompletionListener.create(currentContext(), client.getConnection(), methodName);
GetCompletionListener.create(
currentContext(), client.getConnection(), methodName, future);
if (listener != null) {
future.addListener(listener);
}
Expand Down Expand Up @@ -156,15 +157,16 @@ private AdviceScope(CallDepth callDepth, @Nullable SyncCompletionListener listen
this.listener = listener;
}

public static AdviceScope start(MemcachedClient client, String methodName) {
public static AdviceScope start(MemcachedClient client, String methodName, String key) {
CallDepth callDepth = CallDepth.forClass(MemcachedClient.class);
if (callDepth.getAndIncrement() > 0) {
return new AdviceScope(callDepth, null);
}

return new AdviceScope(
callDepth,
SyncCompletionListener.create(Context.current(), client.getConnection(), methodName));
SyncCompletionListener.create(
Context.current(), client.getConnection(), methodName, key));
}

public void end(@Nullable Throwable throwable) {
Expand All @@ -178,8 +180,10 @@ public void end(@Nullable Throwable throwable) {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static AdviceScope methodEnter(
@Advice.This MemcachedClient client, @Advice.Origin("#m") String methodName) {
return AdviceScope.start(client, methodName);
@Advice.This MemcachedClient client,
@Advice.Origin("#m") String methodName,
@Advice.Argument(0) String key) {
return AdviceScope.start(client, methodName, key);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@

package io.opentelemetry.javaagent.instrumentation.spymemcached;

import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.FUTURE_OPERATION;
import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.instrumenter;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.Operation;

public class OperationCompletionListener extends CompletionListener<OperationFuture<?>>
implements net.spy.memcached.internal.OperationCompletionListener {
Expand All @@ -23,14 +26,27 @@ private OperationCompletionListener(Context parentContext, SpymemcachedRequest r

@Nullable
public static OperationCompletionListener create(
Context parentContext, MemcachedConnection connection, String methodName) {
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName);
Context parentContext,
MemcachedConnection connection,
String methodName,
OperationFuture<?> future) {
MemcachedNode handlingNode = extractHandlingNodeFromFuture(future);
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName, handlingNode);
if (!instrumenter().shouldStart(parentContext, request)) {
return null;
}
return new OperationCompletionListener(parentContext, request);
}

@Nullable
private static MemcachedNode extractHandlingNodeFromFuture(OperationFuture<?> future) {
Operation operation = FUTURE_OPERATION.get(future);
if (operation != null) {
return operation.getHandlingNode();
}
return null;
}

@Override
public void onComplete(OperationFuture<?> future) {
closeAsyncSpan(future);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spymemcached;

import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.FUTURE_OPERATION;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.Future;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.Operation;

public class SetOperationInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"net.spy.memcached.internal.OperationFuture", "net.spy.memcached.internal.GetFuture");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(named("setOperation")).and(takesArguments(1)),
this.getClass().getName() + "$SetOperationAdvice");
}

@SuppressWarnings("unused")
public static class SetOperationAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.This Object future, @Advice.Argument(0) Operation operation) {

if (future instanceof OperationFuture || future instanceof GetFuture) {
FUTURE_OPERATION.set((Future<?>) future, operation);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.spymemcached;

import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
Expand All @@ -23,7 +23,7 @@ public SpymemcachedInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new MemcachedClientInstrumentation());
return asList(new MemcachedClientInstrumentation(), new SetOperationInstrumentation());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spymemcached;

import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesGetter;
import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import javax.annotation.Nullable;
import net.spy.memcached.MemcachedNode;

final class SpymemcachedNetworkAttributesGetter
implements ServerAttributesGetter<SpymemcachedRequest>,
NetworkAttributesGetter<SpymemcachedRequest, Object> {

@Nullable
@Override
public String getServerAddress(SpymemcachedRequest request) {
MemcachedNode handlingNode = request.getHandlingNode();
if (handlingNode != null) {
SocketAddress socketAddress = handlingNode.getSocketAddress();
if (socketAddress instanceof InetSocketAddress) {
return ((InetSocketAddress) socketAddress).getHostString();
}
}
return null;
}

@Nullable
@Override
public Integer getServerPort(SpymemcachedRequest request) {
MemcachedNode handlingNode = request.getHandlingNode();
if (handlingNode != null) {
SocketAddress socketAddress = handlingNode.getSocketAddress();
if (socketAddress instanceof InetSocketAddress) {
return ((InetSocketAddress) socketAddress).getPort();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,25 @@
package io.opentelemetry.javaagent.instrumentation.spymemcached;

import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.MemcachedNode;

@AutoValue
public abstract class SpymemcachedRequest {

public static SpymemcachedRequest create(MemcachedConnection connection, String statement) {
return new AutoValue_SpymemcachedRequest(connection, statement);
public static SpymemcachedRequest create(
MemcachedConnection connection, String statement, @Nullable MemcachedNode handlingNode) {
return new AutoValue_SpymemcachedRequest(connection, statement, handlingNode);
}

public abstract MemcachedConnection getConnection();

public abstract String getStatement();

@Nullable
public abstract MemcachedNode getHandlingNode();

public String dbOperation() {
String statement = getStatement();
if (statement.startsWith("async")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,34 @@
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.util.concurrent.Future;
import net.spy.memcached.ops.Operation;

public final class SpymemcachedSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spymemcached-2.12";

private static final Instrumenter<SpymemcachedRequest, Object> INSTRUMENTER;

public static final VirtualField<Future<?>, Operation> FUTURE_OPERATION;

static {
SpymemcachedAttributesGetter dbAttributesGetter = new SpymemcachedAttributesGetter();
SpymemcachedNetworkAttributesGetter netAttributesGetter =
new SpymemcachedNetworkAttributesGetter();

INSTRUMENTER =
Instrumenter.builder(
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
DbClientSpanNameExtractor.create(dbAttributesGetter))
.addAttributesExtractor(DbClientAttributesExtractor.create(dbAttributesGetter))
.addAttributesExtractor(ServerAttributesExtractor.create(netAttributesGetter))
.addOperationMetrics(DbClientMetrics.get())
.buildInstrumenter(SpanKindExtractor.alwaysClient());

FUTURE_OPERATION = VirtualField.find(Future.class, Operation.class);
}

public static Instrumenter<SpymemcachedRequest, Object> instrumenter() {
Expand Down
Loading
Loading