diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java index dac8a010cd..2a19e7e8a2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java @@ -65,6 +65,7 @@ public class HttpStorageOptions extends StorageOptions { private transient RetryDependenciesAdapter retryDepsAdapter; private final BlobWriteSessionConfig blobWriteSessionConfig; + private final boolean enableHttpClientsMetrics; private transient OpenTelemetry openTelemetry; private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) { @@ -75,6 +76,7 @@ private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) { builder.storageRetryStrategy, defaults().getStorageRetryStrategy())); retryDepsAdapter = new RetryDependenciesAdapter(); blobWriteSessionConfig = builder.blobWriteSessionConfig; + enableHttpClientsMetrics = builder.enableHttpClientsMetrics; openTelemetry = builder.openTelemetry; } @@ -99,6 +101,15 @@ StorageRpc getStorageRpcV1() { @BetaApi @Override public OpenTelemetry getOpenTelemetry() { + if (openTelemetry == null || openTelemetry == OpenTelemetry.noop()) { + if (enableHttpClientsMetrics) { + openTelemetry = + OpenTelemetryBootstrappingUtils.getHttpOpenTelemetrySdk( + getProjectId(), getUniverseDomain(), getHost(), true); + } else { + return OpenTelemetry.noop(); + } + } return openTelemetry; } @@ -110,7 +121,11 @@ public HttpStorageOptions.Builder toBuilder() { @Override public int hashCode() { return Objects.hash( - retryAlgorithmManager, blobWriteSessionConfig, openTelemetry, baseHashCode()); + retryAlgorithmManager, + blobWriteSessionConfig, + enableHttpClientsMetrics, + openTelemetry, + baseHashCode()); } @Override @@ -124,6 +139,7 @@ public boolean equals(Object o) { HttpStorageOptions that = (HttpStorageOptions) o; return Objects.equals(retryAlgorithmManager, that.retryAlgorithmManager) && Objects.equals(blobWriteSessionConfig, that.blobWriteSessionConfig) + && Objects.equals(enableHttpClientsMetrics, that.enableHttpClientsMetrics) && Objects.equals(openTelemetry, that.openTelemetry) && this.baseEquals(that); } @@ -156,6 +172,7 @@ public static class Builder extends StorageOptions.Builder { private StorageRetryStrategy storageRetryStrategy; private BlobWriteSessionConfig blobWriteSessionConfig = HttpStorageDefaults.INSTANCE.getDefaultStorageWriterConfig(); + private boolean enableHttpClientsMetrics = false; private OpenTelemetry openTelemetry = HttpStorageDefaults.INSTANCE.getDefaultOpenTelemetry(); Builder() {} @@ -165,6 +182,7 @@ public static class Builder extends StorageOptions.Builder { HttpStorageOptions hso = (HttpStorageOptions) options; this.storageRetryStrategy = hso.retryAlgorithmManager.retryStrategy; this.blobWriteSessionConfig = hso.blobWriteSessionConfig; + this.enableHttpClientsMetrics = hso.enableHttpClientsMetrics; this.openTelemetry = hso.getOpenTelemetry(); } @@ -317,6 +335,18 @@ public HttpStorageOptions.Builder setOpenTelemetry(OpenTelemetry openTelemetry) this.openTelemetry = openTelemetry; return this; } + + /** + * Option for whether this client should emit internal Otel HTTP client metrics to + * Cloud Monitoring. To enable metric reporting, set this to true. False by default. + * + * @since 2.62.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public HttpStorageOptions.Builder setEnableHttpClientsMetrics(boolean enableHttpClientsMetrics) { + this.enableHttpClientsMetrics = enableHttpClientsMetrics; + return this; + } } public static final class HttpStorageDefaults extends StorageDefaults { @@ -360,6 +390,11 @@ public BlobWriteSessionConfig getDefaultStorageWriterConfig() { public OpenTelemetry getDefaultOpenTelemetry() { return OpenTelemetry.noop(); } + + @BetaApi + public boolean getDefaultEnableHttpClientsMetrics() { + return false; + } } /** diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OpenTelemetryBootstrappingUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OpenTelemetryBootstrappingUtils.java index 53fea0b8b6..5b6e34054d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/OpenTelemetryBootstrappingUtils.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OpenTelemetryBootstrappingUtils.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import io.grpc.ManagedChannelBuilder; import io.grpc.opentelemetry.GrpcOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; @@ -124,7 +125,7 @@ static ChannelConfigurator enableGrpcMetrics( String metricServiceEndpoint = getCloudMonitoringEndpoint(endpoint, universeDomain); SdkMeterProvider provider = createMeterProvider( - metricServiceEndpoint, projectIdToUse, detectedAttributes, shouldSuppressExceptions); + metricServiceEndpoint, projectIdToUse, detectedAttributes, shouldSuppressExceptions, true); OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder().setMeterProvider(provider).build(); @@ -142,6 +143,49 @@ static ChannelConfigurator enableGrpcMetrics( return otelConfigurator.andThen(channelConfigurator); } + @NonNull + static OpenTelemetry getHttpOpenTelemetrySdk( + String projectId, + String universeDomain, + String host, + boolean shouldSuppressExceptions) { + GCPResourceProvider resourceProvider = new GCPResourceProvider(); + Attributes detectedAttributes = resourceProvider.getAttributes(); + + @Nullable + String detectedProjectId = detectedAttributes.get(AttributeKey.stringKey("cloud.account.id")); + if (projectId == null && detectedProjectId == null) { + log.warning( + "Unable to determine the Project ID in order to report metrics. No HTTP client metrics" + + " will be reported."); + return OpenTelemetry.noop(); + } + + String projectIdToUse = detectedProjectId == null ? projectId : detectedProjectId; + if (!projectIdToUse.equals(projectId)) { + log.warning( + "The Project ID configured for HTTP client metrics is " + + projectIdToUse + + ", but the Project ID of the storage client is " + + projectId + + ". Make sure that the service account in use has the required metric writing role " + + "(roles/monitoring.metricWriter) in the project " + + projectIdToUse + + ", or metrics will not be written."); + } + + String metricServiceEndpoint = getCloudMonitoringEndpoint(host, universeDomain); + SdkMeterProvider provider = + createMeterProvider( + metricServiceEndpoint, + projectIdToUse, + detectedAttributes, + shouldSuppressExceptions, + false); + + return OpenTelemetrySdk.builder().setMeterProvider(provider).build(); + } + @SuppressWarnings("rawtypes") // ManagedChannelBuilder @FunctionalInterface interface ChannelConfigurator extends ApiFunction { @@ -210,7 +254,8 @@ static SdkMeterProvider createMeterProvider( String metricServiceEndpoint, String projectIdToUse, Attributes detectedAttributes, - boolean shouldSuppressExceptions) { + boolean shouldSuppressExceptions, + boolean enableGrpcViews) { MonitoredResourceDescription monitoredResourceDescription = new MonitoredResourceDescription( @@ -233,11 +278,13 @@ static SdkMeterProvider createMeterProvider( // This replaces the dots with slashes in each metric, which is the format needed for this // monitored resource - for (String metric : - ImmutableList.copyOf(Iterables.concat(METRICS_TO_ENABLE, METRICS_ENABLED_BY_DEFAULT))) { - providerBuilder.registerView( - InstrumentSelector.builder().setName(metric).build(), - View.builder().setName(metric.replace(".", "/")).build()); + if (enableGrpcViews) { + for (String metric : + ImmutableList.copyOf(Iterables.concat(METRICS_TO_ENABLE, METRICS_ENABLED_BY_DEFAULT))) { + providerBuilder.registerView( + InstrumentSelector.builder().setName(metric).build(), + View.builder().setName(metric.replace(".", "/")).build()); + } } MetricExporter exporter = shouldSuppressExceptions @@ -274,18 +321,20 @@ static SdkMeterProvider createMeterProvider( .build()) .setResource(Resource.create(attributesBuilder.build())); - addHistogramView( - providerBuilder, latencyHistogramBoundaries(), "grpc/client/attempt/duration", "s"); - addHistogramView( - providerBuilder, - sizeHistogramBoundaries(), - "grpc/client/attempt/rcvd_total_compressed_message_size", - "By"); - addHistogramView( - providerBuilder, - sizeHistogramBoundaries(), - "grpc/client/attempt/sent_total_compressed_message_size", - "By"); + if (enableGrpcViews) { + addHistogramView( + providerBuilder, latencyHistogramBoundaries(), "grpc/client/attempt/duration", "s"); + addHistogramView( + providerBuilder, + sizeHistogramBoundaries(), + "grpc/client/attempt/rcvd_total_compressed_message_size", + "By"); + addHistogramView( + providerBuilder, + sizeHistogramBoundaries(), + "grpc/client/attempt/sent_total_compressed_message_size", + "By"); + } return providerBuilder.build(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java index f5e7080fed..0c85e3e46f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java @@ -36,9 +36,13 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.Meter; /** - * A decorator for {@link MultipartUploadClient} that adds OpenTelemetry tracing. + * A decorator for {@link MultipartUploadClient} that adds OpenTelemetry tracing and metrics. * * @since 2.62.0 This new api is in preview and is subject to breaking changes. */ @@ -47,6 +51,17 @@ final class OtelMultipartUploadClientDecorator extends MultipartUploadClient { private final MultipartUploadClient delegate; private final Tracer tracer; + + private final Meter meter; + private final DoubleHistogram createMultipartUploadLatency; + private final DoubleHistogram listPartsLatency; + private final DoubleHistogram abortMultipartUploadLatency; + private final DoubleHistogram completeMultipartUploadLatency; + private final DoubleHistogram uploadPartLatency; + private final DoubleHistogram listMultipartUploadsLatency; + + private final LongCounter uploadedBytes; + private final LongHistogram partSize; private OtelMultipartUploadClientDecorator( MultipartUploadClient delegate, OpenTelemetry otel, Attributes baseAttributes) { @@ -54,21 +69,82 @@ private OtelMultipartUploadClientDecorator( this.tracer = OtelStorageDecorator.TracerDecorator.decorate( null, otel, baseAttributes, MultipartUploadClient.class.getName() + "/"); + + this.meter = otel.meterBuilder(MultipartUploadClient.class.getName()) + .build(); + + this.createMultipartUploadLatency = meter + .histogramBuilder("storage.multipart_upload.create_multipart_upload.latency") + .setDescription("Latency of Create Multipart Upload API calls") + .setUnit("ms") + .build(); + this.listPartsLatency = meter + .histogramBuilder("storage.multipart_upload.list_parts.latency") + .setDescription("Latency of List Parts API calls") + .setUnit("ms") + .build(); + this.abortMultipartUploadLatency = meter + .histogramBuilder("storage.multipart_upload.abort_multipart_upload.latency") + .setDescription("Latency of Abort Multipart Upload API calls") + .setUnit("ms") + .build(); + this.completeMultipartUploadLatency = meter + .histogramBuilder("storage.multipart_upload.complete_multipart_upload.latency") + .setDescription("Latency of Complete Multipart Upload API calls") + .setUnit("ms") + .build(); + this.uploadPartLatency = meter + .histogramBuilder("storage.multipart_upload.upload_part.latency") + .setDescription("Latency of Upload Part API calls") + .setUnit("ms") + .build(); + this.listMultipartUploadsLatency = meter + .histogramBuilder("storage.multipart_upload.list_multipart_uploads.latency") + .setDescription("Latency of List Multipart Uploads API calls") + .setUnit("ms") + .build(); + this.uploadedBytes = meter + .counterBuilder("storage.multipart_upload.uploaded_bytes") + .setDescription("Total bytes uploaded via Multipart Upload") + .setUnit("By") + .build(); + this.partSize = meter + .histogramBuilder("storage.multipart_upload.part_size") + .ofLongs() + .setDescription("Size of parts uploaded via Multipart Upload") + .setUnit("By") + .build(); } @Override public CreateMultipartUploadResponse createMultipartUpload(CreateMultipartUploadRequest request) { + long startTime = System.currentTimeMillis(); Span span = tracer .spanBuilder("createMultipartUpload") .setAttribute( "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("method", "createMultipartUpload") + .build(); try (Scope ignore = span.makeCurrent()) { - return delegate.createMultipartUpload(request); + CreateMultipartUploadResponse response = delegate.createMultipartUpload(request); + long duration = System.currentTimeMillis() - startTime; + createMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; } catch (Throwable t) { span.recordException(t); span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + createMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); throw t; } finally { span.end(); @@ -77,17 +153,33 @@ public CreateMultipartUploadResponse createMultipartUpload(CreateMultipartUpload @Override public ListPartsResponse listParts(ListPartsRequest request) { + long startTime = System.currentTimeMillis(); Span span = tracer .spanBuilder("listParts") .setAttribute( "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("method", "listParts") + .build(); try (Scope ignore = span.makeCurrent()) { - return delegate.listParts(request); + ListPartsResponse response = delegate.listParts(request); + long duration = System.currentTimeMillis() - startTime; + listPartsLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; } catch (Throwable t) { span.recordException(t); span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + listPartsLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); throw t; } finally { span.end(); @@ -96,17 +188,33 @@ public ListPartsResponse listParts(ListPartsRequest request) { @Override public AbortMultipartUploadResponse abortMultipartUpload(AbortMultipartUploadRequest request) { + long startTime = System.currentTimeMillis(); Span span = tracer .spanBuilder("abortMultipartUpload") .setAttribute( "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("method", "abortMultipartUpload") + .build(); try (Scope ignore = span.makeCurrent()) { - return delegate.abortMultipartUpload(request); + AbortMultipartUploadResponse response = delegate.abortMultipartUpload(request); + long duration = System.currentTimeMillis() - startTime; + abortMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; } catch (Throwable t) { span.recordException(t); span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + abortMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); throw t; } finally { span.end(); @@ -116,17 +224,33 @@ public AbortMultipartUploadResponse abortMultipartUpload(AbortMultipartUploadReq @Override public CompleteMultipartUploadResponse completeMultipartUpload( CompleteMultipartUploadRequest request) { + long startTime = System.currentTimeMillis(); Span span = tracer .spanBuilder("completeMultipartUpload") .setAttribute( "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("method", "completeMultipartUpload") + .build(); try (Scope ignore = span.makeCurrent()) { - return delegate.completeMultipartUpload(request); + CompleteMultipartUploadResponse response = delegate.completeMultipartUpload(request); + long duration = System.currentTimeMillis() - startTime; + completeMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; } catch (Throwable t) { span.recordException(t); span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + completeMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); throw t; } finally { span.end(); @@ -135,6 +259,8 @@ public CompleteMultipartUploadResponse completeMultipartUpload( @Override public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requestBody) { + long startTime = System.currentTimeMillis(); + Span span = tracer .spanBuilder("uploadPart") @@ -142,11 +268,39 @@ public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requ "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) .setAttribute("partNumber", request.partNumber()) .startSpan(); + + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("partNumber", request.partNumber()) + .put("method", "uploadPart") + .build(); + try (Scope ignore = span.makeCurrent()) { - return delegate.uploadPart(request, requestBody); + UploadPartResponse response = delegate.uploadPart(request, requestBody); + + long duration = System.currentTimeMillis() - startTime; + uploadPartLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + uploadedBytes.add(requestBody.getContent().getLength(), metricAttributes.toBuilder() + .put("status", "success") + .build()); + partSize.record(requestBody.getContent().getLength(), metricAttributes.toBuilder() + .put("status", "success") + .build()); + + return response; } catch (Throwable t) { span.recordException(t); span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + + long duration = System.currentTimeMillis() - startTime; + uploadPartLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); + throw t; } finally { span.end(); @@ -155,16 +309,31 @@ public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requ @Override public ListMultipartUploadsResponse listMultipartUploads(ListMultipartUploadsRequest request) { + long startTime = System.currentTimeMillis(); Span span = tracer .spanBuilder("listMultipartUploads") .setAttribute("gsutil.uri", String.format("gs://%s/", request.bucket())) .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("method", "listMultipartUploads") + .build(); try (Scope ignore = span.makeCurrent()) { - return delegate.listMultipartUploads(request); + ListMultipartUploadsResponse response = delegate.listMultipartUploads(request); + long duration = System.currentTimeMillis() - startTime; + listMultipartUploadsLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; } catch (Throwable t) { span.recordException(t); span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + listMultipartUploadsLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); throw t; } finally { span.end(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/HttpStorageOptionsOtelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/HttpStorageOptionsOtelTest.java new file mode 100644 index 0000000000..2853144cce --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/HttpStorageOptionsOtelTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.common.truth.Truth.assertThat; + +import io.opentelemetry.api.OpenTelemetry; +import org.junit.Test; + +public class HttpStorageOptionsOtelTest { + + @Test + public void testEnableHttpClientsMetrics() { + HttpStorageOptions options = + HttpStorageOptions.newBuilder() + .setProjectId("test-project") + .setEnableHttpClientsMetrics(true) + .build(); + + OpenTelemetry otel = options.getOpenTelemetry(); + assertThat(otel).isNotNull(); + assertThat(otel).isNotEqualTo(OpenTelemetry.noop()); + } + + @Test + public void testDefaultHttpClientsMetrics() { + HttpStorageOptions options = + HttpStorageOptions.newBuilder() + .setProjectId("test-project") + .build(); + + OpenTelemetry otel = options.getOpenTelemetry(); + assertThat(otel).isEqualTo(OpenTelemetry.noop()); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcMetricsTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcMetricsTest.java index e71b9077dc..e18a20667e 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcMetricsTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcMetricsTest.java @@ -45,7 +45,7 @@ public void testGrpcMetrics() { "monitoring.googleapis.com:443", grpcStorageOptions.getProjectId(), detectedAttributes, - false); + true); /* * SDKMeterProvider doesn't expose the relevant fields we want to test, but they are present in diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java index e1a83ba6eb..c6df27bdf5 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java @@ -24,23 +24,29 @@ import com.google.cloud.storage.it.runner.annotations.CrossRun; import com.google.cloud.storage.it.runner.annotations.Inject; import com.google.cloud.storage.it.runner.registry.Generator; +import com.google.cloud.storage.multipartupload.model.AbortMultipartUploadRequest; import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadRequest; import com.google.cloud.storage.multipartupload.model.CompletedMultipartUpload; import com.google.cloud.storage.multipartupload.model.CompletedPart; import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadRequest; import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse; import com.google.cloud.storage.multipartupload.model.ListMultipartUploadsRequest; +import com.google.cloud.storage.multipartupload.model.ListPartsRequest; import com.google.cloud.storage.multipartupload.model.UploadPartRequest; import com.google.cloud.storage.multipartupload.model.UploadPartResponse; import com.google.cloud.storage.otel.TestExporter; import com.google.common.collect.ImmutableList; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; @@ -52,14 +58,11 @@ public final class ITOpenTelemetryMPUTest { @Inject public Storage storage; - @Inject public BucketInfo bucket; - @Inject public Generator generator; - @Inject public Transport transport; @Test - public void checkMPUInstrumentation() throws Exception { + public void checkMPUTracing() throws Exception { TestExporter exporter = new TestExporter(); OpenTelemetrySdk openTelemetrySdk = @@ -70,12 +73,158 @@ public void checkMPUInstrumentation() throws Exception { .build()) .build(); + String objectName = generator.randomObjectName(); + runMpuOperations(openTelemetrySdk, objectName); + + List spans = exporter.getExportedSpans(); + assertThat(spans).hasSize(7); + + SpanData createSpan = spans.get(0); + assertThat(createSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/createMultipartUpload"); + assertThat(createSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); + + SpanData uploadSpan = spans.get(1); + assertThat(uploadSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/uploadPart"); + assertThat(uploadSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); + assertThat(uploadSpan.getAttributes().get(AttributeKey.longKey("partNumber"))).isEqualTo(1); + + SpanData listSpan = spans.get(2); + assertThat(listSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/listParts"); + assertThat(listSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); + + SpanData completeSpan = spans.get(3); + assertThat(completeSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/completeMultipartUpload"); + assertThat(completeSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); + + SpanData listUploadsSpan = spans.get(4); + assertThat(listUploadsSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/listMultipartUploads"); + assertThat(listUploadsSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/", bucket.getName())); + + SpanData create2Span = spans.get(5); + assertThat(create2Span.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/createMultipartUpload"); + + SpanData abortSpan = spans.get(6); + assertThat(abortSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/abortMultipartUpload"); + assertThat(abortSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s-abort", bucket.getName(), objectName)); + } + + @Test + public void checkMPUMetrics() throws Exception { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + + OpenTelemetrySdk openTelemetrySdk = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); + + String objectName = generator.randomObjectName(); + runMpuOperations(openTelemetrySdk, objectName); + + Collection metrics = metricReader.collectAllMetrics(); + System.err.println("Exported metrics count: " + metrics.size()); + metrics.forEach(m -> System.err.println("Metric: " + m.getName())); + + assertThat(metrics).hasSize(8); + + MetricData createMetric = + metrics.stream() + .filter(m -> m.getName().contains("create_multipart_upload")) + .findFirst() + .orElseThrow(() -> new RuntimeException("create_multipart_upload metric not found")); + assertThat(createMetric.getName()) + .isEqualTo("storage.multipart_upload.create_multipart_upload.latency"); + assertThat(createMetric.getData().getPoints()).hasSize(2); // 2 create calls + + MetricData uploadPartMetric = + metrics.stream() + .filter(m -> m.getName().contains("upload_part")) + .findFirst() + .orElseThrow(() -> new RuntimeException("upload_part metric not found")); + assertThat(uploadPartMetric.getName()) + .isEqualTo("storage.multipart_upload.upload_part.latency"); + assertThat(uploadPartMetric.getData().getPoints()).hasSize(1); + + MetricData completeMetric = + metrics.stream() + .filter(m -> m.getName().contains("complete_multipart_upload")) + .findFirst() + .orElseThrow(() -> new RuntimeException("complete_multipart_upload metric not found")); + assertThat(completeMetric.getName()) + .isEqualTo("storage.multipart_upload.complete_multipart_upload.latency"); + assertThat(completeMetric.getData().getPoints()).hasSize(1); + + MetricData listPartsMetric = + metrics.stream() + .filter(m -> m.getName().contains("list_parts")) + .findFirst() + .orElseThrow(() -> new RuntimeException("list_parts metric not found")); + assertThat(listPartsMetric.getName()) + .isEqualTo("storage.multipart_upload.list_parts.latency"); + assertThat(listPartsMetric.getData().getPoints()).hasSize(1); + + MetricData listUploadsMetric = + metrics.stream() + .filter(m -> m.getName().contains("list_multipart_uploads")) + .findFirst() + .orElseThrow(() -> new RuntimeException("list_multipart_uploads metric not found")); + assertThat(listUploadsMetric.getName()) + .isEqualTo("storage.multipart_upload.list_multipart_uploads.latency"); + assertThat(listUploadsMetric.getData().getPoints()).hasSize(1); + + MetricData abortMetric = + metrics.stream() + .filter(m -> m.getName().contains("abort_multipart_upload")) + .findFirst() + .orElseThrow(() -> new RuntimeException("abort_multipart_upload metric not found")); + assertThat(abortMetric.getName()) + .isEqualTo("storage.multipart_upload.abort_multipart_upload.latency"); + assertThat(abortMetric.getData().getPoints()).hasSize(1); + + MetricData uploadedBytesMetric = + metrics.stream() + .filter(m -> m.getName().contains("uploaded_bytes")) + .findFirst() + .orElseThrow(() -> new RuntimeException("uploaded_bytes metric not found")); + assertThat(uploadedBytesMetric.getName()) + .isEqualTo("storage.multipart_upload.uploaded_bytes"); + assertThat(uploadedBytesMetric.getData().getPoints()).hasSize(1); + + // "Hello, World!" is 13 bytes + assertThat(uploadedBytesMetric.getLongSumData().getPoints().iterator().next().getValue()) + .isEqualTo(13); + + MetricData partSizeMetric = + metrics.stream() + .filter(m -> m.getName().contains("part_size")) + .findFirst() + .orElseThrow(() -> new RuntimeException("part_size metric not found")); + assertThat(partSizeMetric.getName()) + .isEqualTo("storage.multipart_upload.part_size"); + assertThat(partSizeMetric.getData().getPoints()).hasSize(1); + assertThat(partSizeMetric.getHistogramData().getPoints().iterator().next().getSum()) + .isEqualTo(13); + } + + private void runMpuOperations(OpenTelemetrySdk openTelemetrySdk, String objectName) { HttpStorageOptions httpStorageOptions = (HttpStorageOptions) storage.getOptions(); StorageOptions storageOptions = httpStorageOptions.toBuilder().setOpenTelemetry(openTelemetrySdk).build(); - String objectName = generator.randomObjectName(); - try (Storage storage = storageOptions.getService()) { MultipartUploadClient mpuClient = MultipartUploadClient.create( @@ -100,6 +249,13 @@ public void checkMPUInstrumentation() throws Exception { .build(), body); + mpuClient.listParts( + ListPartsRequest.builder() + .bucket(bucket.getName()) + .key(objectName) + .uploadId(create.uploadId()) + .build()); + mpuClient.completeMultipartUpload( CompleteMultipartUploadRequest.builder() .bucket(bucket.getName()) @@ -115,34 +271,21 @@ public void checkMPUInstrumentation() throws Exception { mpuClient.listMultipartUploads( ListMultipartUploadsRequest.builder().bucket(bucket.getName()).build()); - } - - List spans = exporter.getExportedSpans(); - assertThat(spans).hasSize(4); - - SpanData createSpan = spans.get(0); - assertThat(createSpan.getName()) - .isEqualTo("com.google.cloud.storage.MultipartUploadClient/createMultipartUpload"); - assertThat(createSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) - .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); - SpanData uploadSpan = spans.get(1); - assertThat(uploadSpan.getName()) - .isEqualTo("com.google.cloud.storage.MultipartUploadClient/uploadPart"); - assertThat(uploadSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) - .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); - assertThat(uploadSpan.getAttributes().get(AttributeKey.longKey("partNumber"))).isEqualTo(1); - - SpanData completeSpan = spans.get(2); - assertThat(completeSpan.getName()) - .isEqualTo("com.google.cloud.storage.MultipartUploadClient/completeMultipartUpload"); - assertThat(completeSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) - .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); - - SpanData listSpan = spans.get(3); - assertThat(listSpan.getName()) - .isEqualTo("com.google.cloud.storage.MultipartUploadClient/listMultipartUploads"); - assertThat(listSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) - .isEqualTo(String.format("gs://%s/", bucket.getName())); + CreateMultipartUploadResponse create2 = + mpuClient.createMultipartUpload( + CreateMultipartUploadRequest.builder() + .bucket(bucket.getName()) + .key(objectName + "-abort") + .build()); + mpuClient.abortMultipartUpload( + AbortMultipartUploadRequest.builder() + .bucket(bucket.getName()) + .key(objectName + "-abort") + .uploadId(create2.uploadId()) + .build()); + } catch (Exception e) { + throw new RuntimeException(e); + } } }