From a62f2eecb143ce7f6639fde5e2e763ec9edaa18c Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Fri, 9 May 2025 11:03:53 -0700 Subject: [PATCH 01/13] Add OTLP HTTP MetricExporter max export batch size --- .../proto/http/metric_exporter/__init__.py | 454 ++++++++++++++++- .../metrics/test_otlp_metrics_exporter.py | 474 ++++++++++++++++++ 2 files changed, 910 insertions(+), 18 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 4feea8d4302..75ebe53d648 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -116,7 +116,29 @@ def __init__( preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, + max_export_batch_size: int | None = None, ): + """OTLP HTTP metrics exporter + + Args: + endpoint: Target URL to which the exporter is going to send metrics + certificate_file: Path to the certificate file to use for any TLS + client_key_file: Path to the client key file to use for any TLS + client_certificate_file: Path to the client certificate file to use for any TLS + headers: Headers to be sent with HTTP requests at export + timeout: Timeout in seconds for export + compression: Compression to use; one of none, gzip, deflate + session: Requests session to use at export + preferred_temporality: Map of preferred temporality for each metric type. + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what + preferred temporality is. + preferred_aggregation: Map of preferred aggregation for each metric type. + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what + preferred aggregation is. + max_export_batch_size: Maximum number of data points to export in a single request. + If not set there is no limit to the number of data points in a request. + If it is set and the number of data points exceeds the max, the request will be split. + """ self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, _append_metrics_path( @@ -165,6 +187,7 @@ def __init__( self._common_configuration( preferred_temporality, preferred_aggregation ) + self._max_export_batch_size: int | None = max_export_batch_size def _export(self, serialized_data: bytes): data = serialized_data @@ -219,27 +242,422 @@ def export( if delay == self._MAX_RETRY_TIMEOUT: return MetricExportResult.FAILURE - resp = self._export(serialized_data.SerializeToString()) - # pylint: disable=no-else-return - if resp.ok: - return MetricExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue + if self._max_export_batch_size is None: + resp = self._export(serialized_data.SerializeToString()) + # pylint: disable=no-else-return + if resp.ok: + return MetricExportResult.SUCCESS + elif self._retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch, retrying in %ss.", + resp.reason, + delay, + ) + sleep(delay) + continue + else: + _logger.error( + "Failed to export batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return MetricExportResult.FAILURE + + # Else, attempt export in batches for this retry else: - _logger.error( - "Failed to export batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return MetricExportResult.FAILURE + export_result = MetricExportResult.SUCCESS + for split_metrics_data in self._split_metrics_data(serialized_data): + split_resp = self._export( + split_metrics_data.SerializeToString() + ) + + if split_resp.ok: + export_result = MetricExportResult.SUCCESS + elif self._retryable(split_resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch, retrying in %ss.", + split_resp.reason, + delay, + ) + sleep(delay) + continue + else: + _logger.error( + "Failed to export batch code: %s, reason: %s", + split_resp.status_code, + split_resp.text, + ) + export_result = MetricExportResult.FAILURE + + # Return result after all batches are attempted + return export_result + return MetricExportResult.FAILURE + def _split_metrics_data( + self, + metrics_data: pb2.MetricsData, + ) -> Iterable[pb2.MetricsData]: + """Splits metrics data into several MetricsData (copies protobuf originals), + based on configured data point max export batch size. + + Args: + metrics_data: metrics object based on HTTP protocol buffer definition + + Returns: + Iterable[pb2.MetricsData]: An iterable of pb2.MetricsData objects containing + pb2.ResourceMetrics, pb2.ScopeMetrics, pb2.Metrics, and data points + """ + batch_size: int = 0 + # Stores split metrics data as editable references + # used to write batched pb2 objects for export when finalized + split_resource_metrics = [] + + for resource_metrics in metrics_data.resource_metrics: + split_scope_metrics = [] + split_resource_metrics.append( + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ) + + for scope_metrics in resource_metrics.scope_metrics: + split_metrics = [] + split_scope_metrics.append( + { + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, + } + ) + + for metric in scope_metrics.metrics: + split_data_points = [] + + # protobuf specifies metrics types (e.g. Sum, Histogram) + # with different accessors for data points, etc + # We maintain these structures throughout batch calculation + current_data_points = [] + if metric.HasField("sum"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + } + } + ) + current_data_points = metric.sum.data_points + elif metric.HasField("histogram"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + } + } + ) + current_data_points = metric.histogram.data_points + elif metric.HasField("exponential_histogram"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + } + } + ) + current_data_points = metric.exponential_histogram.data_points + elif metric.HasField("gauge"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + } + } + ) + current_data_points = metric.gauge.data_points + elif metric.HasField("summary"): + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + } + } + ) + else: + _logger.warning("Tried to split and export an unsupported metric type. Skipping.") + continue + + for data_point in current_data_points: + split_data_points.append(data_point) + batch_size += 1 + + if batch_size >= self._max_export_batch_size: + yield pb2.MetricsData( + resource_metrics=self._get_split_resource_metrics_pb2(split_resource_metrics) + ) + + # Reset all the reference variables with current metrics_data position + # minus yielded data_points. Need to clear data_points and keep metric + # to avoid duplicate data_point export + batch_size = 0 + split_data_points = [] + + if metric.HasField("sum"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + } + } + ] + elif metric.HasField("histogram"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + } + } + ] + elif metric.HasField("exponential_histogram"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + } + } + ] + elif metric.HasField("gauge"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + } + } + ] + elif metric.HasField("summary"): + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + } + } + ] + + split_scope_metrics = [ + { + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, + } + ] + split_resource_metrics = [ + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ] + + if not split_data_points: + # If data_points is empty remove the whole metric + split_metrics.pop() + + if not split_metrics: + # If metrics is empty remove the whole scope_metrics + split_scope_metrics.pop() + + if not split_scope_metrics: + # If scope_metrics is empty remove the whole resource_metrics + split_resource_metrics.pop() + + if batch_size > 0: + yield pb2.MetricsData( + resource_metrics=self._get_split_resource_metrics_pb2(split_resource_metrics) + ) + + def _get_split_resource_metrics_pb2( + self, + split_resource_metrics: List[Dict], + ) -> List[pb2.ResourceMetrics]: + """Helper that returns a list of pb2.ResourceMetrics objects based on split_resource_metrics. + Example input: + + ```python + [ + { + "resource": , + "schema_url": "http://foo-bar", + "scope_metrics": [ + "scope": , + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "apples", + "description": "number of apples purchased", + "sum": { + "aggregation_temporality": 1, + "is_monotonic": "false", + "data_points": [ + { + start_time_unix_nano: 1000 + time_unix_nano: 1001 + exemplars { + time_unix_nano: 1002 + span_id: "foo-span" + trace_id: "foo-trace" + as_int: 5 + } + as_int: 5 + } + ] + } + }, + ], + ], + }, + ] + ``` + + Args: + split_resource_metrics: A list of dict representations of ResourceMetrics, + ScopeMetrics, Metrics, and data points. + + Returns: + List[pb2.ResourceMetrics]: A list of pb2.ResourceMetrics objects containing + pb2.ScopeMetrics, pb2.Metrics, and data points + """ + split_resource_metrics_pb = [] + for resource_metrics in split_resource_metrics: + new_resource_metrics = pb2.ResourceMetrics( + resource=resource_metrics.get("resource"), + scope_metrics=[], + schema_url=resource_metrics.get("schema_url"), + ) + for scope_metrics in resource_metrics.get("scope_metrics", []): + new_scope_metrics = pb2.ScopeMetrics( + scope=scope_metrics.get("scope"), + metrics=[], + schema_url=scope_metrics.get("schema_url"), + ) + + for metric in scope_metrics.get("metrics", []): + new_metric = None + data_points = [] + + if "sum" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + sum=pb2.Sum( + data_points=[], + aggregation_temporality=metric.get("sum").get("aggregation_temporality"), + is_monotonic=metric.get("sum").get("is_monotonic"), + ) + ) + data_points = metric.get("sum").get("data_points") + elif "histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + histogram=pb2.Histogram( + data_points=[], + aggregation_temporality=metric.get("histogram").get("aggregation_temporality"), + ), + ) + data_points = metric.get("histogram").get("data_points") + elif "exponential_histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + exponential_histogram=pb2.ExponentialHistogram( + data_points=[], + aggregation_temporality=metric.get("exponential_histogram").get("aggregation_temporality"), + ), + ) + data_points = metric.get("exponential_histogram").get("data_points") + elif "gauge" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + gauge=pb2.Gauge( + data_points=[], + ) + ) + data_points = metric.get("gauge").get("data_points") + elif "summary" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + summary=pb2.Summary( + data_points=[], + ) + ) + data_points = metric.get("summary").get("data_points") + else: + _logger.warning("Tried to split and export an unsupported metric type. Skipping.") + continue + + for data_point in data_points: + if "sum" in metric: + new_metric.sum.data_points.append(data_point) + elif "histogram" in metric: + new_metric.histogram.data_points.append(data_point) + elif "exponential_histogram" in metric: + new_metric.exponential_histogram.data_points.append(data_point) + elif "gauge" in metric: + new_metric.gauge.data_points.append(data_point) + elif "summary" in metric: + new_metric.summary.data_points.append(data_point) + + new_scope_metrics.metrics.append(new_metric) + new_resource_metrics.scope_metrics.append(new_scope_metrics) + split_resource_metrics_pb.append(new_resource_metrics) + return split_resource_metrics_pb + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 16bb3e54286..d5a4c734d09 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -14,6 +14,7 @@ from logging import WARNING from os import environ +from typing import List from unittest import TestCase from unittest.mock import MagicMock, Mock, call, patch @@ -33,6 +34,12 @@ OTLPMetricExporter, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.proto.common.v1.common_pb2 import ( + InstrumentationScope, + KeyValue, +) +from opentelemetry.proto.resource.v1.resource_pb2 import Resource as Pb2Resource from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, @@ -86,6 +93,7 @@ # pylint: disable=protected-access class TestOTLPMetricExporter(TestCase): + # pylint: disable=too-many-public-methods def setUp(self): self.metrics = { "sum_int": MetricsData( @@ -331,6 +339,426 @@ def test_serialization(self, mock_post): cert=exporter._client_cert, ) + def test_split_metrics_data_many_data_points(self): + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ), + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_split_metrics_data_nb_data_points_equal_batch_size(self): + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=3)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_split_metrics_data_many_resources_scopes_metrics(self): + # GIVEN + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ) + + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ), + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_get_split_resource_metrics_pb2_one_of_each(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo", value={"string_value": "bar"}) + ], + ), + "schema_url": "http://foo-bar", + "scope_metrics": [ + { + "scope": InstrumentationScope(name="foo-scope", version="1.0.0"), + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "foo-metric", + "description": "foo-description", + "unit": "foo-unit", + "sum": { + "aggregation_temporality": 1, + "is_monotonic": True, + "data_points": [ + pb2.NumberDataPoint( + attributes=[ + KeyValue(key="dp_key", value={"string_value": "dp_value"}) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + as_double=42.42, + ) + ], + }, + } + ], + } + ], + } + ] + + result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], pb2.ResourceMetrics) + self.assertEqual(result[0].schema_url, "http://foo-bar") + self.assertEqual(len(result[0].scope_metrics), 1) + self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope") + self.assertEqual(len(result[0].scope_metrics[0].metrics), 1) + self.assertEqual(result[0].scope_metrics[0].metrics[0].name, "foo-metric") + self.assertEqual(result[0].scope_metrics[0].metrics[0].sum.is_monotonic, True) + + def test_get_split_resource_metrics_pb2_multiples(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[KeyValue(key="foo1", value={"string_value": "bar2"})], + ), + "schema_url": "http://foo-bar-1", + "scope_metrics": [ + { + "scope": InstrumentationScope(name="foo-scope-1", version="1.0.0"), + "schema_url": "http://foo-baz-1", + "metrics": [ + { + "name": "foo-metric-1", + "description": "foo-description-1", + "unit": "foo-unit-1", + "gauge": { + "data_points": [ + pb2.NumberDataPoint( + attributes=[ + KeyValue(key="dp_key", value={"string_value": "dp_value"}) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + as_double=42.42, + ) + ], + }, + } + ], + } + ], + }, + { + "resource": Pb2Resource( + attributes=[KeyValue(key="foo2", value={"string_value": "bar2"})], + ), + "schema_url": "http://foo-bar-2", + "scope_metrics": [ + { + "scope": InstrumentationScope(name="foo-scope-2", version="2.0.0"), + "schema_url": "http://foo-baz-2", + "metrics": [ + { + "name": "foo-metric-2", + "description": "foo-description-2", + "unit": "foo-unit-2", + "histogram": { + "aggregation_temporality": 2, + "data_points": [ + pb2.HistogramDataPoint( + attributes=[KeyValue(key="dp_key", value={"string_value": "dp_value"})], + start_time_unix_nano=12345, + time_unix_nano=12350, + ) + ], + }, + } + ], + } + ], + }, + ] + + result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + self.assertEqual(len(result), 2) + self.assertEqual(result[0].schema_url, "http://foo-bar-1") + self.assertEqual(result[1].schema_url, "http://foo-bar-2") + self.assertEqual(len(result[0].scope_metrics), 1) + self.assertEqual(len(result[1].scope_metrics), 1) + self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope-1") + self.assertEqual(result[1].scope_metrics[0].scope.name, "foo-scope-2") + self.assertEqual(result[0].scope_metrics[0].metrics[0].name, "foo-metric-1") + self.assertEqual(result[1].scope_metrics[0].metrics[0].name, "foo-metric-2") + + def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[KeyValue(key="foo", value={"string_value": "bar"})], + ), + "schema_url": "http://foo-bar", + "scope_metrics": [ + { + "scope": InstrumentationScope(name="foo", version="1.0.0"), + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "unsupported-metric", + "description": "foo-bar", + "unit": "foo-bar", + "unsupported_metric_type": {}, + } + ], + } + ], + } + ] + + with self.assertLogs(level="WARNING") as log: + result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + self.assertEqual(len(result), 1) + self.assertIn("Tried to split and export an unsupported metric type", log.output[0]) + @activate @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") def test_exponential_backoff(self, mock_sleep): @@ -523,3 +951,49 @@ def test_preferred_aggregation_override(self): self.assertEqual( exporter._preferred_aggregation[Histogram], histogram_aggregation ) + + + +def _resource_metrics( + index: int, scope_metrics: List[pb2.ScopeMetrics] +) -> pb2.ResourceMetrics: + return pb2.ResourceMetrics( + resource={ + "attributes": [ + KeyValue(key="a", value={"int_value": index}) + ], + }, + schema_url=f"resource_url_{index}", + scope_metrics=scope_metrics, + ) + + +def _scope_metrics(index: int, metrics: List[pb2.Metric]) -> pb2.ScopeMetrics: + return pb2.ScopeMetrics( + scope=InstrumentationScope(name=f"scope_{index}"), + schema_url=f"scope_url_{index}", + metrics=metrics, + ) + + +def _gauge(index: int, data_points: List[pb2.NumberDataPoint]) -> pb2.Metric: + return pb2.Metric( + name=f"gauge_{index}", + description="description", + unit="unit", + gauge=pb2.Gauge( + data_points=data_points + ), + ) + + +def _number_data_point(value: int) -> pb2.NumberDataPoint: + return pb2.NumberDataPoint( + attributes=[ + KeyValue(key="a", value={"int_value": 1}), + KeyValue(key="b", value={"bool_value": True}), + ], + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + as_int=value, + ) From d09febaf1571c19d725c795666a762847fd12b9c Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Fri, 9 May 2025 11:20:34 -0700 Subject: [PATCH 02/13] Changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc6b3db5227..c8f98afd0cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add configurable `max_export_batch_size` to OTLP HTTP metrics exporter + ([#4576](https://github.com/open-telemetry/opentelemetry-python/pull/4576)) + ## Version 1.33.0/0.54b0 (2025-05-09) - Fix intermittent `Connection aborted` error when using otlp/http exporters From dc86036a11e84ba593bb7632ec5ca7258747c546 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Fri, 9 May 2025 11:50:45 -0700 Subject: [PATCH 03/13] Lint --- .../proto/http/metric_exporter/__init__.py | 107 +++++++++++------- .../metrics/test_otlp_metrics_exporter.py | 100 +++++++++++----- 2 files changed, 140 insertions(+), 67 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 75ebe53d648..033b1404bbb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -22,6 +22,7 @@ Any, Callable, Dict, + Iterable, List, Mapping, Sequence, @@ -262,11 +263,13 @@ def export( resp.text, ) return MetricExportResult.FAILURE - + # Else, attempt export in batches for this retry else: export_result = MetricExportResult.SUCCESS - for split_metrics_data in self._split_metrics_data(serialized_data): + for split_metrics_data in self._split_metrics_data( + serialized_data + ): split_resp = self._export( split_metrics_data.SerializeToString() ) @@ -288,17 +291,17 @@ def export( split_resp.text, ) export_result = MetricExportResult.FAILURE - + # Return result after all batches are attempted return export_result - + return MetricExportResult.FAILURE def _split_metrics_data( self, metrics_data: pb2.MetricsData, ) -> Iterable[pb2.MetricsData]: - """Splits metrics data into several MetricsData (copies protobuf originals), + """Splits metrics data into several MetricsData (copies protobuf originals), based on configured data point max export batch size. Args: @@ -350,7 +353,7 @@ def _split_metrics_data( "aggregation_temporality": metric.sum.aggregation_temporality, "is_monotonic": metric.sum.is_monotonic, "data_points": split_data_points, - } + }, } ) current_data_points = metric.sum.data_points @@ -363,7 +366,7 @@ def _split_metrics_data( "histogram": { "aggregation_temporality": metric.histogram.aggregation_temporality, "data_points": split_data_points, - } + }, } ) current_data_points = metric.histogram.data_points @@ -376,10 +379,12 @@ def _split_metrics_data( "exponential_histogram": { "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, "data_points": split_data_points, - } + }, } ) - current_data_points = metric.exponential_histogram.data_points + current_data_points = ( + metric.exponential_histogram.data_points + ) elif metric.HasField("gauge"): split_metrics.append( { @@ -388,7 +393,7 @@ def _split_metrics_data( "unit": metric.unit, "gauge": { "data_points": split_data_points, - } + }, } ) current_data_points = metric.gauge.data_points @@ -400,11 +405,13 @@ def _split_metrics_data( "unit": metric.unit, "summary": { "data_points": split_data_points, - } + }, } ) else: - _logger.warning("Tried to split and export an unsupported metric type. Skipping.") + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) continue for data_point in current_data_points: @@ -413,7 +420,9 @@ def _split_metrics_data( if batch_size >= self._max_export_batch_size: yield pb2.MetricsData( - resource_metrics=self._get_split_resource_metrics_pb2(split_resource_metrics) + resource_metrics=self._get_split_resource_metrics_pb2( + split_resource_metrics + ) ) # Reset all the reference variables with current metrics_data position @@ -432,7 +441,7 @@ def _split_metrics_data( "aggregation_temporality": metric.sum.aggregation_temporality, "is_monotonic": metric.sum.is_monotonic, "data_points": split_data_points, - } + }, } ] elif metric.HasField("histogram"): @@ -444,7 +453,7 @@ def _split_metrics_data( "histogram": { "aggregation_temporality": metric.histogram.aggregation_temporality, "data_points": split_data_points, - } + }, } ] elif metric.HasField("exponential_histogram"): @@ -456,7 +465,7 @@ def _split_metrics_data( "exponential_histogram": { "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, "data_points": split_data_points, - } + }, } ] elif metric.HasField("gauge"): @@ -467,7 +476,7 @@ def _split_metrics_data( "unit": metric.unit, "gauge": { "data_points": split_data_points, - } + }, } ] elif metric.HasField("summary"): @@ -478,7 +487,7 @@ def _split_metrics_data( "unit": metric.unit, "summary": { "data_points": split_data_points, - } + }, } ] @@ -511,7 +520,9 @@ def _split_metrics_data( if batch_size > 0: yield pb2.MetricsData( - resource_metrics=self._get_split_resource_metrics_pb2(split_resource_metrics) + resource_metrics=self._get_split_resource_metrics_pb2( + split_resource_metrics + ) ) def _get_split_resource_metrics_pb2( @@ -568,16 +579,16 @@ def _get_split_resource_metrics_pb2( split_resource_metrics_pb = [] for resource_metrics in split_resource_metrics: new_resource_metrics = pb2.ResourceMetrics( - resource=resource_metrics.get("resource"), - scope_metrics=[], - schema_url=resource_metrics.get("schema_url"), - ) + resource=resource_metrics.get("resource"), + scope_metrics=[], + schema_url=resource_metrics.get("schema_url"), + ) for scope_metrics in resource_metrics.get("scope_metrics", []): new_scope_metrics = pb2.ScopeMetrics( - scope=scope_metrics.get("scope"), - metrics=[], - schema_url=scope_metrics.get("schema_url"), - ) + scope=scope_metrics.get("scope"), + metrics=[], + schema_url=scope_metrics.get("schema_url"), + ) for metric in scope_metrics.get("metrics", []): new_metric = None @@ -590,9 +601,13 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), sum=pb2.Sum( data_points=[], - aggregation_temporality=metric.get("sum").get("aggregation_temporality"), - is_monotonic=metric.get("sum").get("is_monotonic"), - ) + aggregation_temporality=metric.get("sum").get( + "aggregation_temporality" + ), + is_monotonic=metric.get("sum").get( + "is_monotonic" + ), + ), ) data_points = metric.get("sum").get("data_points") elif "histogram" in metric: @@ -602,10 +617,14 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), histogram=pb2.Histogram( data_points=[], - aggregation_temporality=metric.get("histogram").get("aggregation_temporality"), + aggregation_temporality=metric.get( + "histogram" + ).get("aggregation_temporality"), ), ) - data_points = metric.get("histogram").get("data_points") + data_points = metric.get("histogram").get( + "data_points" + ) elif "exponential_histogram" in metric: new_metric = pb2.Metric( name=metric.get("name"), @@ -613,10 +632,14 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), exponential_histogram=pb2.ExponentialHistogram( data_points=[], - aggregation_temporality=metric.get("exponential_histogram").get("aggregation_temporality"), + aggregation_temporality=metric.get( + "exponential_histogram" + ).get("aggregation_temporality"), ), ) - data_points = metric.get("exponential_histogram").get("data_points") + data_points = metric.get("exponential_histogram").get( + "data_points" + ) elif "gauge" in metric: new_metric = pb2.Metric( name=metric.get("name"), @@ -624,7 +647,7 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), gauge=pb2.Gauge( data_points=[], - ) + ), ) data_points = metric.get("gauge").get("data_points") elif "summary" in metric: @@ -634,11 +657,13 @@ def _get_split_resource_metrics_pb2( unit=metric.get("unit"), summary=pb2.Summary( data_points=[], - ) + ), ) data_points = metric.get("summary").get("data_points") else: - _logger.warning("Tried to split and export an unsupported metric type. Skipping.") + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) continue for data_point in data_points: @@ -647,15 +672,17 @@ def _get_split_resource_metrics_pb2( elif "histogram" in metric: new_metric.histogram.data_points.append(data_point) elif "exponential_histogram" in metric: - new_metric.exponential_histogram.data_points.append(data_point) + new_metric.exponential_histogram.data_points.append( + data_point + ) elif "gauge" in metric: new_metric.gauge.data_points.append(data_point) elif "summary" in metric: new_metric.summary.data_points.append(data_point) - + new_scope_metrics.metrics.append(new_metric) new_resource_metrics.scope_metrics.append(new_scope_metrics) - split_resource_metrics_pb.append(new_resource_metrics) + split_resource_metrics_pb.append(new_resource_metrics) return split_resource_metrics_pb def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index d5a4c734d09..6751c9bd651 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=too-many-lines from logging import WARNING from os import environ from typing import List @@ -34,12 +35,14 @@ OTLPMetricExporter, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ -from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 from opentelemetry.proto.common.v1.common_pb2 import ( InstrumentationScope, KeyValue, ) -from opentelemetry.proto.resource.v1.resource_pb2 import Resource as Pb2Resource +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as Pb2Resource, +) from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, @@ -617,7 +620,9 @@ def test_get_split_resource_metrics_pb2_one_of_each(self): "schema_url": "http://foo-bar", "scope_metrics": [ { - "scope": InstrumentationScope(name="foo-scope", version="1.0.0"), + "scope": InstrumentationScope( + name="foo-scope", version="1.0.0" + ), "schema_url": "http://foo-baz", "metrics": [ { @@ -630,7 +635,12 @@ def test_get_split_resource_metrics_pb2_one_of_each(self): "data_points": [ pb2.NumberDataPoint( attributes=[ - KeyValue(key="dp_key", value={"string_value": "dp_value"}) + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) ], start_time_unix_nano=12345, time_unix_nano=12350, @@ -645,26 +655,36 @@ def test_get_split_resource_metrics_pb2_one_of_each(self): } ] - result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + result = OTLPMetricExporter()._get_split_resource_metrics_pb2( + split_resource_metrics + ) self.assertEqual(len(result), 1) self.assertIsInstance(result[0], pb2.ResourceMetrics) self.assertEqual(result[0].schema_url, "http://foo-bar") self.assertEqual(len(result[0].scope_metrics), 1) self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope") self.assertEqual(len(result[0].scope_metrics[0].metrics), 1) - self.assertEqual(result[0].scope_metrics[0].metrics[0].name, "foo-metric") - self.assertEqual(result[0].scope_metrics[0].metrics[0].sum.is_monotonic, True) + self.assertEqual( + result[0].scope_metrics[0].metrics[0].name, "foo-metric" + ) + self.assertEqual( + result[0].scope_metrics[0].metrics[0].sum.is_monotonic, True + ) def test_get_split_resource_metrics_pb2_multiples(self): split_resource_metrics = [ { "resource": Pb2Resource( - attributes=[KeyValue(key="foo1", value={"string_value": "bar2"})], + attributes=[ + KeyValue(key="foo1", value={"string_value": "bar2"}) + ], ), "schema_url": "http://foo-bar-1", "scope_metrics": [ { - "scope": InstrumentationScope(name="foo-scope-1", version="1.0.0"), + "scope": InstrumentationScope( + name="foo-scope-1", version="1.0.0" + ), "schema_url": "http://foo-baz-1", "metrics": [ { @@ -675,7 +695,12 @@ def test_get_split_resource_metrics_pb2_multiples(self): "data_points": [ pb2.NumberDataPoint( attributes=[ - KeyValue(key="dp_key", value={"string_value": "dp_value"}) + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) ], start_time_unix_nano=12345, time_unix_nano=12350, @@ -690,12 +715,16 @@ def test_get_split_resource_metrics_pb2_multiples(self): }, { "resource": Pb2Resource( - attributes=[KeyValue(key="foo2", value={"string_value": "bar2"})], + attributes=[ + KeyValue(key="foo2", value={"string_value": "bar2"}) + ], ), "schema_url": "http://foo-bar-2", "scope_metrics": [ { - "scope": InstrumentationScope(name="foo-scope-2", version="2.0.0"), + "scope": InstrumentationScope( + name="foo-scope-2", version="2.0.0" + ), "schema_url": "http://foo-baz-2", "metrics": [ { @@ -706,7 +735,14 @@ def test_get_split_resource_metrics_pb2_multiples(self): "aggregation_temporality": 2, "data_points": [ pb2.HistogramDataPoint( - attributes=[KeyValue(key="dp_key", value={"string_value": "dp_value"})], + attributes=[ + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) + ], start_time_unix_nano=12345, time_unix_nano=12350, ) @@ -719,7 +755,9 @@ def test_get_split_resource_metrics_pb2_multiples(self): }, ] - result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + result = OTLPMetricExporter()._get_split_resource_metrics_pb2( + split_resource_metrics + ) self.assertEqual(len(result), 2) self.assertEqual(result[0].schema_url, "http://foo-bar-1") self.assertEqual(result[1].schema_url, "http://foo-bar-2") @@ -727,19 +765,27 @@ def test_get_split_resource_metrics_pb2_multiples(self): self.assertEqual(len(result[1].scope_metrics), 1) self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope-1") self.assertEqual(result[1].scope_metrics[0].scope.name, "foo-scope-2") - self.assertEqual(result[0].scope_metrics[0].metrics[0].name, "foo-metric-1") - self.assertEqual(result[1].scope_metrics[0].metrics[0].name, "foo-metric-2") + self.assertEqual( + result[0].scope_metrics[0].metrics[0].name, "foo-metric-1" + ) + self.assertEqual( + result[1].scope_metrics[0].metrics[0].name, "foo-metric-2" + ) def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): split_resource_metrics = [ { "resource": Pb2Resource( - attributes=[KeyValue(key="foo", value={"string_value": "bar"})], + attributes=[ + KeyValue(key="foo", value={"string_value": "bar"}) + ], ), "schema_url": "http://foo-bar", "scope_metrics": [ { - "scope": InstrumentationScope(name="foo", version="1.0.0"), + "scope": InstrumentationScope( + name="foo", version="1.0.0" + ), "schema_url": "http://foo-baz", "metrics": [ { @@ -755,9 +801,14 @@ def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): ] with self.assertLogs(level="WARNING") as log: - result = OTLPMetricExporter()._get_split_resource_metrics_pb2(split_resource_metrics) + result = OTLPMetricExporter()._get_split_resource_metrics_pb2( + split_resource_metrics + ) self.assertEqual(len(result), 1) - self.assertIn("Tried to split and export an unsupported metric type", log.output[0]) + self.assertIn( + "Tried to split and export an unsupported metric type", + log.output[0], + ) @activate @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") @@ -953,15 +1004,12 @@ def test_preferred_aggregation_override(self): ) - def _resource_metrics( index: int, scope_metrics: List[pb2.ScopeMetrics] ) -> pb2.ResourceMetrics: return pb2.ResourceMetrics( resource={ - "attributes": [ - KeyValue(key="a", value={"int_value": index}) - ], + "attributes": [KeyValue(key="a", value={"int_value": index})], }, schema_url=f"resource_url_{index}", scope_metrics=scope_metrics, @@ -981,9 +1029,7 @@ def _gauge(index: int, data_points: List[pb2.NumberDataPoint]) -> pb2.Metric: name=f"gauge_{index}", description="description", unit="unit", - gauge=pb2.Gauge( - data_points=data_points - ), + gauge=pb2.Gauge(data_points=data_points), ) From f1ef6c404faab53a8eb9685206e8e8010a0b5f77 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Mon, 12 May 2025 15:01:37 -0700 Subject: [PATCH 04/13] HTTP metrics export batching does not retry if big failure --- .../proto/http/metric_exporter/__init__.py | 80 ++++++++------ .../metrics/test_otlp_metrics_exporter.py | 102 ++++++++++++++++++ 2 files changed, 147 insertions(+), 35 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 033b1404bbb..78c7043b994 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -237,13 +237,14 @@ def export( **kwargs, ) -> MetricExportResult: serialized_data = encode_metrics(metrics_data) - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: - return MetricExportResult.FAILURE - if self._max_export_batch_size is None: + if self._max_export_batch_size is None: + for delay in _create_exp_backoff_generator( + max_value=self._MAX_RETRY_TIMEOUT + ): + if delay == self._MAX_RETRY_TIMEOUT: + return MetricExportResult.FAILURE + resp = self._export(serialized_data.SerializeToString()) # pylint: disable=no-else-return if resp.ok: @@ -264,38 +265,47 @@ def export( ) return MetricExportResult.FAILURE - # Else, attempt export in batches for this retry - else: - export_result = MetricExportResult.SUCCESS - for split_metrics_data in self._split_metrics_data( - serialized_data - ): - split_resp = self._export( - split_metrics_data.SerializeToString() - ) + return MetricExportResult.FAILURE - if split_resp.ok: - export_result = MetricExportResult.SUCCESS - elif self._retryable(split_resp): - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %ss.", - split_resp.reason, - delay, - ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export batch code: %s, reason: %s", - split_resp.status_code, - split_resp.text, - ) - export_result = MetricExportResult.FAILURE + # Else, attempt export in batches + split_metrics_batches = list(self._split_metrics_data(serialized_data)) + export_result = MetricExportResult.SUCCESS + + for split_metrics_data in split_metrics_batches: + # Export current batch until success, non-transient error, or timeout reached + for delay in _create_exp_backoff_generator( + max_value=self._MAX_RETRY_TIMEOUT + ): + if delay == self._MAX_RETRY_TIMEOUT: + export_result = MetricExportResult.FAILURE + break - # Return result after all batches are attempted - return export_result + split_resp = self._export( + split_metrics_data.SerializeToString() + ) + # pylint: disable=no-else-return + if split_resp.ok: + export_result = MetricExportResult.SUCCESS + break + elif self._retryable(split_resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch, retrying in %ss.", + split_resp.reason, + delay, + ) + sleep(delay) + continue + else: + _logger.error( + "Failed to export batch code: %s, reason: %s", + split_resp.status_code, + split_resp.text, + ) + export_result = MetricExportResult.FAILURE + break - return MetricExportResult.FAILURE + # Return last result after all batches are attempted + return export_result def _split_metrics_data( self, diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 6751c9bd651..68fb526e801 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -831,6 +831,108 @@ def test_exponential_backoff(self, mock_sleep): [call(1), call(2), call(4), call(8), call(16), call(32)] ) + @patch.object(OTLPMetricExporter, "_export") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter._create_exp_backoff_generator" + ) + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching( + self, + mock_encode_metrics, + mock_sleep, + mock_backoff_generator, + mock_export, + ): + mock_backoff_generator.return_value = iter([1, 2, 4]) + mock_export.side_effect = [ + # Non-retryable + MagicMock(ok=False, status_code=400, reason="bad request"), + # Retryable + MagicMock( + ok=False, status_code=500, reason="internal server error" + ), + # Success + MagicMock(ok=True), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + batch_2 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.SUCCESS) + self.assertEqual(mock_export.call_count, 3) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString()), + call(batch_2.SerializeToString()), + call(batch_2.SerializeToString()), + ] + ) + def test_aggregation_temporality(self): otlp_metric_exporter = OTLPMetricExporter() From 5b4efdde0a4d09641a32c83fef2c5f2821afed5d Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Thu, 3 Jul 2025 14:28:15 -0700 Subject: [PATCH 05/13] Fix changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b325d5c87ab..262b79a05e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4649](https://github.com/open-telemetry/opentelemetry-python/pull/4649)) - proto: relax protobuf version requirement to support v6 ([#4620](https://github.com/open-telemetry/opentelemetry-python/pull/4620)) +- Add configurable `max_export_batch_size` to OTLP HTTP metrics exporter + ([#4576](https://github.com/open-telemetry/opentelemetry-python/pull/4576)) ## Version 1.34.0/0.55b0 (2025-06-04) @@ -57,8 +59,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4599](https://github.com/open-telemetry/opentelemetry-python/pull/4599)) - Drop support for Python 3.8 ([#4520](https://github.com/open-telemetry/opentelemetry-python/pull/4520)) -- Add configurable `max_export_batch_size` to OTLP HTTP metrics exporter - ([#4576](https://github.com/open-telemetry/opentelemetry-python/pull/4576)) ## Version 1.33.0/0.54b0 (2025-05-09) From d748c128cc8ea39f5875cfb0824286e4d5072f93 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Mon, 14 Jul 2025 16:44:03 -0700 Subject: [PATCH 06/13] Extract helper _export_with_retries --- .../proto/http/metric_exporter/__init__.py | 112 +++++++++--------- 1 file changed, 53 insertions(+), 59 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index e47eb93b7e2..21e4e6bfa92 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -226,6 +226,48 @@ def _export(self, serialized_data: bytes, timeout_sec: float): ) return resp + def _export_with_retries( + self, + serialized_data: bytes, + deadline_sec: float, + ) -> MetricExportResult: + """Export serialized data with retry logic until success, non-transient error, or exponential backoff maxed out. + + Args: + serialized_data: serialized metrics data to export + deadline_sec: timestamp deadline for the export + + Returns: + MetricExportResult: SUCCESS if export succeeded, FAILURE otherwise + """ + for retry_num in range(_MAX_RETRYS): + resp = self._export(serialized_data, deadline_sec - time()) + if resp.ok: + return MetricExportResult.SUCCESS + + # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. + backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + if ( + not _is_retryable(resp) + or retry_num + 1 == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + ): + _logger.error( + "Failed to export metrics batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return MetricExportResult.FAILURE + + _logger.warning( + "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", + resp.reason, + backoff_seconds, + ) + sleep(backoff_seconds) + + return MetricExportResult.FAILURE + def export( self, metrics_data: MetricsData, @@ -241,71 +283,23 @@ def export( # If no batch size configured, export as single batch with retries as configured if self._max_export_batch_size is None: - for retry_num in range(_MAX_RETRYS): - resp = self._export( - serialized_data.SerializeToString(), deadline_sec - time() - ) - if resp.ok: - return MetricExportResult.SUCCESS - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - if ( - not _is_retryable(resp) - or retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - ): - _logger.error( - "Failed to export metrics batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return MetricExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", - resp.reason, - backoff_seconds, - ) - sleep(backoff_seconds) + return self._export_with_retries( + serialized_data.SerializeToString(), deadline_sec + ) # Else, export in batches of configured size split_metrics_batches = list(self._split_metrics_data(serialized_data)) export_result = MetricExportResult.SUCCESS for split_metrics_data in split_metrics_batches: - # Export current batch until success, non-transient error, or retries maxed out - for retry_num in range(_MAX_RETRYS): - split_resp = self._export( - split_metrics_data.SerializeToString(), - deadline_sec - time(), - ) - if split_resp.ok: - export_result = MetricExportResult.SUCCESS - # Move on to next batch - break - - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - if ( - not _is_retryable(split_resp) - or retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - ): - _logger.error( - "Failed to export metrics batch code: %s, reason: %s", - split_resp.status_code, - split_resp.text, - ) - export_result = MetricExportResult.FAILURE - # Don't retry; move on to next batch - break - - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %.2fs.", - split_resp.reason, - backoff_seconds, - ) - sleep(backoff_seconds) - continue + batch_result = self._export_with_retries( + split_metrics_data.SerializeToString(), + deadline_sec, + ) + + if batch_result == MetricExportResult.FAILURE: + export_result = MetricExportResult.FAILURE + # Don't retry; move on to next batch # Return last result after all batches are attempted return export_result From aa6c822939d6412e36e5ffdcb7f744195d0289d9 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Mon, 14 Jul 2025 16:50:50 -0700 Subject: [PATCH 07/13] Rm unused imports --- .../exporter/otlp/proto/http/metric_exporter/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 21e4e6bfa92..2e8b58f9a5a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -25,9 +25,7 @@ Dict, Iterable, List, - Mapping, Optional, - Sequence, ) import requests From acc1c7c6f5746a401253f8f3a8af8b85f25829e3 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Mon, 14 Jul 2025 16:51:14 -0700 Subject: [PATCH 08/13] WhichOneof instead of HasField --- .../proto/http/metric_exporter/__init__.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 2e8b58f9a5a..b796844c66a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -348,7 +348,8 @@ def _split_metrics_data( # with different accessors for data points, etc # We maintain these structures throughout batch calculation current_data_points = [] - if metric.HasField("sum"): + field_name = metric.WhichOneof("data") + if field_name == "sum": split_metrics.append( { "name": metric.name, @@ -362,7 +363,7 @@ def _split_metrics_data( } ) current_data_points = metric.sum.data_points - elif metric.HasField("histogram"): + elif field_name == "histogram": split_metrics.append( { "name": metric.name, @@ -375,7 +376,7 @@ def _split_metrics_data( } ) current_data_points = metric.histogram.data_points - elif metric.HasField("exponential_histogram"): + elif field_name == "exponential_histogram": split_metrics.append( { "name": metric.name, @@ -390,7 +391,7 @@ def _split_metrics_data( current_data_points = ( metric.exponential_histogram.data_points ) - elif metric.HasField("gauge"): + elif field_name == "gauge": split_metrics.append( { "name": metric.name, @@ -402,7 +403,7 @@ def _split_metrics_data( } ) current_data_points = metric.gauge.data_points - elif metric.HasField("summary"): + elif field_name == "summary": split_metrics.append( { "name": metric.name, @@ -436,7 +437,8 @@ def _split_metrics_data( batch_size = 0 split_data_points = [] - if metric.HasField("sum"): + field_name = metric.WhichOneof("data") + if field_name == "sum": split_metrics = [ { "name": metric.name, @@ -449,7 +451,7 @@ def _split_metrics_data( }, } ] - elif metric.HasField("histogram"): + elif field_name == "histogram": split_metrics = [ { "name": metric.name, @@ -461,7 +463,7 @@ def _split_metrics_data( }, } ] - elif metric.HasField("exponential_histogram"): + elif field_name == "exponential_histogram": split_metrics = [ { "name": metric.name, @@ -473,7 +475,7 @@ def _split_metrics_data( }, } ] - elif metric.HasField("gauge"): + elif field_name == "gauge": split_metrics = [ { "name": metric.name, @@ -484,7 +486,7 @@ def _split_metrics_data( }, } ] - elif metric.HasField("summary"): + elif field_name == "summary": split_metrics = [ { "name": metric.name, From e7c80e5c5a286843079bd30b0d012686099daa95 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Mon, 14 Jul 2025 16:59:35 -0700 Subject: [PATCH 09/13] Move _get_split_resource_metrics_pb2 out of class --- .../proto/http/metric_exporter/__init__.py | 320 +++++++++--------- 1 file changed, 158 insertions(+), 162 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index b796844c66a..55a95703922 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -426,7 +426,7 @@ def _split_metrics_data( if batch_size >= self._max_export_batch_size: yield pb2.MetricsData( - resource_metrics=self._get_split_resource_metrics_pb2( + resource_metrics=_get_split_resource_metrics_pb2( split_resource_metrics ) ) @@ -527,171 +527,11 @@ def _split_metrics_data( if batch_size > 0: yield pb2.MetricsData( - resource_metrics=self._get_split_resource_metrics_pb2( + resource_metrics=_get_split_resource_metrics_pb2( split_resource_metrics ) ) - def _get_split_resource_metrics_pb2( - self, - split_resource_metrics: List[Dict], - ) -> List[pb2.ResourceMetrics]: - """Helper that returns a list of pb2.ResourceMetrics objects based on split_resource_metrics. - Example input: - - ```python - [ - { - "resource": , - "schema_url": "http://foo-bar", - "scope_metrics": [ - "scope": , - "schema_url": "http://foo-baz", - "metrics": [ - { - "name": "apples", - "description": "number of apples purchased", - "sum": { - "aggregation_temporality": 1, - "is_monotonic": "false", - "data_points": [ - { - start_time_unix_nano: 1000 - time_unix_nano: 1001 - exemplars { - time_unix_nano: 1002 - span_id: "foo-span" - trace_id: "foo-trace" - as_int: 5 - } - as_int: 5 - } - ] - } - }, - ], - ], - }, - ] - ``` - - Args: - split_resource_metrics: A list of dict representations of ResourceMetrics, - ScopeMetrics, Metrics, and data points. - - Returns: - List[pb2.ResourceMetrics]: A list of pb2.ResourceMetrics objects containing - pb2.ScopeMetrics, pb2.Metrics, and data points - """ - split_resource_metrics_pb = [] - for resource_metrics in split_resource_metrics: - new_resource_metrics = pb2.ResourceMetrics( - resource=resource_metrics.get("resource"), - scope_metrics=[], - schema_url=resource_metrics.get("schema_url"), - ) - for scope_metrics in resource_metrics.get("scope_metrics", []): - new_scope_metrics = pb2.ScopeMetrics( - scope=scope_metrics.get("scope"), - metrics=[], - schema_url=scope_metrics.get("schema_url"), - ) - - for metric in scope_metrics.get("metrics", []): - new_metric = None - data_points = [] - - if "sum" in metric: - new_metric = pb2.Metric( - name=metric.get("name"), - description=metric.get("description"), - unit=metric.get("unit"), - sum=pb2.Sum( - data_points=[], - aggregation_temporality=metric.get("sum").get( - "aggregation_temporality" - ), - is_monotonic=metric.get("sum").get( - "is_monotonic" - ), - ), - ) - data_points = metric.get("sum").get("data_points") - elif "histogram" in metric: - new_metric = pb2.Metric( - name=metric.get("name"), - description=metric.get("description"), - unit=metric.get("unit"), - histogram=pb2.Histogram( - data_points=[], - aggregation_temporality=metric.get( - "histogram" - ).get("aggregation_temporality"), - ), - ) - data_points = metric.get("histogram").get( - "data_points" - ) - elif "exponential_histogram" in metric: - new_metric = pb2.Metric( - name=metric.get("name"), - description=metric.get("description"), - unit=metric.get("unit"), - exponential_histogram=pb2.ExponentialHistogram( - data_points=[], - aggregation_temporality=metric.get( - "exponential_histogram" - ).get("aggregation_temporality"), - ), - ) - data_points = metric.get("exponential_histogram").get( - "data_points" - ) - elif "gauge" in metric: - new_metric = pb2.Metric( - name=metric.get("name"), - description=metric.get("description"), - unit=metric.get("unit"), - gauge=pb2.Gauge( - data_points=[], - ), - ) - data_points = metric.get("gauge").get("data_points") - elif "summary" in metric: - new_metric = pb2.Metric( - name=metric.get("name"), - description=metric.get("description"), - unit=metric.get("unit"), - summary=pb2.Summary( - data_points=[], - ), - ) - data_points = metric.get("summary").get("data_points") - else: - _logger.warning( - "Tried to split and export an unsupported metric type. Skipping." - ) - continue - - for data_point in data_points: - if "sum" in metric: - new_metric.sum.data_points.append(data_point) - elif "histogram" in metric: - new_metric.histogram.data_points.append(data_point) - elif "exponential_histogram" in metric: - new_metric.exponential_histogram.data_points.append( - data_point - ) - elif "gauge" in metric: - new_metric.gauge.data_points.append(data_point) - elif "summary" in metric: - new_metric.summary.data_points.append(data_point) - - new_scope_metrics.metrics.append(new_metric) - new_resource_metrics.scope_metrics.append(new_scope_metrics) - split_resource_metrics_pb.append(new_resource_metrics) - return split_resource_metrics_pb - def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring call") @@ -708,6 +548,162 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: return True +def _get_split_resource_metrics_pb2( + split_resource_metrics: List[Dict], +) -> List[pb2.ResourceMetrics]: + """Helper that returns a list of pb2.ResourceMetrics objects based on split_resource_metrics. + Example input: + + ```python + [ + { + "resource": , + "schema_url": "http://foo-bar", + "scope_metrics": [ + "scope": , + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "apples", + "description": "number of apples purchased", + "sum": { + "aggregation_temporality": 1, + "is_monotonic": "false", + "data_points": [ + { + start_time_unix_nano: 1000 + time_unix_nano: 1001 + exemplars { + time_unix_nano: 1002 + span_id: "foo-span" + trace_id: "foo-trace" + as_int: 5 + } + as_int: 5 + } + ] + } + }, + ], + ], + }, + ] + ``` + + Args: + split_resource_metrics: A list of dict representations of ResourceMetrics, + ScopeMetrics, Metrics, and data points. + + Returns: + List[pb2.ResourceMetrics]: A list of pb2.ResourceMetrics objects containing + pb2.ScopeMetrics, pb2.Metrics, and data points + """ + split_resource_metrics_pb = [] + for resource_metrics in split_resource_metrics: + new_resource_metrics = pb2.ResourceMetrics( + resource=resource_metrics.get("resource"), + scope_metrics=[], + schema_url=resource_metrics.get("schema_url"), + ) + for scope_metrics in resource_metrics.get("scope_metrics", []): + new_scope_metrics = pb2.ScopeMetrics( + scope=scope_metrics.get("scope"), + metrics=[], + schema_url=scope_metrics.get("schema_url"), + ) + + for metric in scope_metrics.get("metrics", []): + new_metric = None + data_points = [] + + if "sum" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + sum=pb2.Sum( + data_points=[], + aggregation_temporality=metric.get("sum").get( + "aggregation_temporality" + ), + is_monotonic=metric.get("sum").get("is_monotonic"), + ), + ) + data_points = metric.get("sum").get("data_points") + elif "histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + histogram=pb2.Histogram( + data_points=[], + aggregation_temporality=metric.get( + "histogram" + ).get("aggregation_temporality"), + ), + ) + data_points = metric.get("histogram").get("data_points") + elif "exponential_histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + exponential_histogram=pb2.ExponentialHistogram( + data_points=[], + aggregation_temporality=metric.get( + "exponential_histogram" + ).get("aggregation_temporality"), + ), + ) + data_points = metric.get("exponential_histogram").get( + "data_points" + ) + elif "gauge" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + gauge=pb2.Gauge( + data_points=[], + ), + ) + data_points = metric.get("gauge").get("data_points") + elif "summary" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + summary=pb2.Summary( + data_points=[], + ), + ) + data_points = metric.get("summary").get("data_points") + else: + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) + continue + + for data_point in data_points: + if "sum" in metric: + new_metric.sum.data_points.append(data_point) + elif "histogram" in metric: + new_metric.histogram.data_points.append(data_point) + elif "exponential_histogram" in metric: + new_metric.exponential_histogram.data_points.append( + data_point + ) + elif "gauge" in metric: + new_metric.gauge.data_points.append(data_point) + elif "summary" in metric: + new_metric.summary.data_points.append(data_point) + + new_scope_metrics.metrics.append(new_metric) + new_resource_metrics.scope_metrics.append(new_scope_metrics) + split_resource_metrics_pb.append(new_resource_metrics) + return split_resource_metrics_pb + + @deprecated( "Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead. Deprecated since version 1.18.0.", ) From fb6043b350e8cdff2f47bd1abfad844a0e46703b Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Mon, 14 Jul 2025 17:27:47 -0700 Subject: [PATCH 10/13] Update http metric exporter batch return to be like grpc --- .../exporter/otlp/proto/http/metric_exporter/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 55a95703922..34c0e47c7c1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -296,10 +296,10 @@ def export( ) if batch_result == MetricExportResult.FAILURE: - export_result = MetricExportResult.FAILURE - # Don't retry; move on to next batch + # If any batch fails, return failure immediately + return MetricExportResult.FAILURE - # Return last result after all batches are attempted + # Return SUCCESS only if all batches succeeded return export_result def _split_metrics_data( From 888861aee99bc4691c45d61e90282baa230f66c7 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Mon, 14 Jul 2025 17:29:06 -0700 Subject: [PATCH 11/13] Fix tests --- .../metrics/test_otlp_metrics_exporter.py | 283 +++++++++++++++++- 1 file changed, 270 insertions(+), 13 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 5089ab54d51..9f235a098fe 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -33,6 +33,7 @@ DEFAULT_METRICS_EXPORT_PATH, DEFAULT_TIMEOUT, OTLPMetricExporter, + _get_split_resource_metrics_pb2, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ from opentelemetry.proto.common.v1.common_pb2 import ( @@ -655,9 +656,7 @@ def test_get_split_resource_metrics_pb2_one_of_each(self): } ] - result = OTLPMetricExporter()._get_split_resource_metrics_pb2( - split_resource_metrics - ) + result = _get_split_resource_metrics_pb2(split_resource_metrics) self.assertEqual(len(result), 1) self.assertIsInstance(result[0], pb2.ResourceMetrics) self.assertEqual(result[0].schema_url, "http://foo-bar") @@ -755,9 +754,7 @@ def test_get_split_resource_metrics_pb2_multiples(self): }, ] - result = OTLPMetricExporter()._get_split_resource_metrics_pb2( - split_resource_metrics - ) + result = _get_split_resource_metrics_pb2(split_resource_metrics) self.assertEqual(len(result), 2) self.assertEqual(result[0].schema_url, "http://foo-bar-1") self.assertEqual(result[1].schema_url, "http://foo-bar-2") @@ -801,9 +798,7 @@ def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): ] with self.assertLogs(level="WARNING") as log: - result = OTLPMetricExporter()._get_split_resource_metrics_pb2( - split_resource_metrics - ) + result = _get_split_resource_metrics_pb2(split_resource_metrics) self.assertEqual(len(result), 1) self.assertIn( "Tried to split and export an unsupported metric type", @@ -813,14 +808,180 @@ def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): @patch.object(OTLPMetricExporter, "_export") @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") @patch( "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" ) - def test_export_retries_with_batching( + def test_export_retries_with_batching_success( + self, + mock_encode_metrics, + mock_time, + mock_random, + mock_export, + ): + mock_time.return_value = 0 + mock_random.uniform.return_value = 1 + mock_export.side_effect = [ + # Success + MagicMock(ok=True), + MagicMock(ok=True), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + batch_2 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.SUCCESS) + self.assertEqual(mock_export.call_count, 2) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString(), 10), + call(batch_2.SerializeToString(), 10), + ] + ) + + @patch.object(OTLPMetricExporter, "_export") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching_failure_first( + self, + mock_encode_metrics, + mock_time, + mock_random, + mock_export, + ): + mock_time.return_value = 0 + mock_random.uniform.return_value = 1 + mock_export.side_effect = [ + # Non-retryable + MagicMock(ok=False, status_code=400, reason="bad request"), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.FAILURE) + self.assertEqual(mock_export.call_count, 1) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString(), 10), + ] + ) + + @patch.object(OTLPMetricExporter, "_export") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching_failure_last( self, mock_encode_metrics, - mock_sleep, mock_time, mock_random, mock_export, @@ -828,13 +989,109 @@ def test_export_retries_with_batching( mock_time.return_value = 0 mock_random.uniform.return_value = 1 mock_export.side_effect = [ + # Success + MagicMock(ok=True), # Non-retryable MagicMock(ok=False, status_code=400, reason="bad request"), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + batch_2 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.FAILURE) + self.assertEqual(mock_export.call_count, 2) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString(), 10), + call(batch_2.SerializeToString(), 10), + ] + ) + + @patch.object(OTLPMetricExporter, "_export") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching_failure_retryable( + self, + mock_encode_metrics, + mock_time, + mock_random, + mock_export, + ): + mock_time.return_value = 0 + mock_random.uniform.return_value = 1 + mock_export.side_effect = [ + # Success + MagicMock(ok=True), # Retryable MagicMock( ok=False, status_code=500, reason="internal server error" ), - # Success + # Then success MagicMock(ok=True), ] mock_encode_metrics.return_value = pb2.MetricsData( From dbd639f2f6546f3b00aaaf2d64f4e8ae71054bea Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Tue, 15 Jul 2025 16:00:17 -0700 Subject: [PATCH 12/13] Actually return last batch result like grpc --- .../exporter/otlp/proto/http/metric_exporter/__init__.py | 8 ++------ .../tests/metrics/test_otlp_metrics_exporter.py | 6 ++++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 34c0e47c7c1..9e3ee64e444 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -290,16 +290,12 @@ def export( export_result = MetricExportResult.SUCCESS for split_metrics_data in split_metrics_batches: - batch_result = self._export_with_retries( + export_result = self._export_with_retries( split_metrics_data.SerializeToString(), deadline_sec, ) - if batch_result == MetricExportResult.FAILURE: - # If any batch fails, return failure immediately - return MetricExportResult.FAILURE - - # Return SUCCESS only if all batches succeeded + # Return export result of the last batch, like gRPC exporter return export_result def _split_metrics_data( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 9f235a098fe..ab497438421 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -918,6 +918,8 @@ def test_export_retries_with_batching_failure_first( mock_export.side_effect = [ # Non-retryable MagicMock(ok=False, status_code=400, reason="bad request"), + MagicMock(ok=True), + MagicMock(ok=True), ] mock_encode_metrics.return_value = pb2.MetricsData( resource_metrics=[ @@ -965,8 +967,8 @@ def test_export_retries_with_batching_failure_first( exporter = OTLPMetricExporter(max_export_batch_size=2) result = exporter.export("foo") - self.assertEqual(result, MetricExportResult.FAILURE) - self.assertEqual(mock_export.call_count, 1) + self.assertEqual(result, MetricExportResult.SUCCESS) + self.assertEqual(mock_export.call_count, 2) mock_export.assert_has_calls( [ call(batch_1.SerializeToString(), 10), From eed52e3c51c8d77a05fe1f3bbd13679af5d88b1f Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Tue, 15 Jul 2025 16:19:10 -0700 Subject: [PATCH 13/13] Mv _split_metrics_data out of class --- .../proto/http/metric_exporter/__init__.py | 452 +++++++++--------- .../metrics/test_otlp_metrics_exporter.py | 10 +- 2 files changed, 236 insertions(+), 226 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 9e3ee64e444..563bd4cd143 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -286,7 +286,9 @@ def export( ) # Else, export in batches of configured size - split_metrics_batches = list(self._split_metrics_data(serialized_data)) + split_metrics_batches = list( + _split_metrics_data(serialized_data, self._max_export_batch_size) + ) export_result = MetricExportResult.SUCCESS for split_metrics_data in split_metrics_batches: @@ -298,250 +300,254 @@ def export( # Return export result of the last batch, like gRPC exporter return export_result - def _split_metrics_data( - self, - metrics_data: pb2.MetricsData, - ) -> Iterable[pb2.MetricsData]: - """Splits metrics data into several MetricsData (copies protobuf originals), - based on configured data point max export batch size. + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring call") + return + self._session.close() + self._shutdown = True - Args: - metrics_data: metrics object based on HTTP protocol buffer definition + @property + def _exporting(self) -> str: + return "metrics" - Returns: - Iterable[pb2.MetricsData]: An iterable of pb2.MetricsData objects containing - pb2.ResourceMetrics, pb2.ScopeMetrics, pb2.Metrics, and data points - """ - batch_size: int = 0 - # Stores split metrics data as editable references - # used to write batched pb2 objects for export when finalized - split_resource_metrics = [] - - for resource_metrics in metrics_data.resource_metrics: - split_scope_metrics = [] - split_resource_metrics.append( + def force_flush(self, timeout_millis: float = 10_000) -> bool: + """Nothing is buffered in this exporter, so this method does nothing.""" + return True + + +def _split_metrics_data( + metrics_data: pb2.MetricsData, + max_export_batch_size: int | None = None, +) -> Iterable[pb2.MetricsData]: + """Splits metrics data into several MetricsData (copies protobuf originals), + based on configured data point max export batch size. + + Args: + metrics_data: metrics object based on HTTP protocol buffer definition + + Returns: + Iterable[pb2.MetricsData]: An iterable of pb2.MetricsData objects containing + pb2.ResourceMetrics, pb2.ScopeMetrics, pb2.Metrics, and data points + """ + if not max_export_batch_size: + return metrics_data + + batch_size: int = 0 + # Stores split metrics data as editable references + # used to write batched pb2 objects for export when finalized + split_resource_metrics = [] + + for resource_metrics in metrics_data.resource_metrics: + split_scope_metrics = [] + split_resource_metrics.append( + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ) + + for scope_metrics in resource_metrics.scope_metrics: + split_metrics = [] + split_scope_metrics.append( { - "resource": resource_metrics.resource, - "schema_url": resource_metrics.schema_url, - "scope_metrics": split_scope_metrics, + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, } ) - for scope_metrics in resource_metrics.scope_metrics: - split_metrics = [] - split_scope_metrics.append( - { - "scope": scope_metrics.scope, - "schema_url": scope_metrics.schema_url, - "metrics": split_metrics, - } - ) - - for metric in scope_metrics.metrics: - split_data_points = [] - - # protobuf specifies metrics types (e.g. Sum, Histogram) - # with different accessors for data points, etc - # We maintain these structures throughout batch calculation - current_data_points = [] - field_name = metric.WhichOneof("data") - if field_name == "sum": - split_metrics.append( - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "sum": { - "aggregation_temporality": metric.sum.aggregation_temporality, - "is_monotonic": metric.sum.is_monotonic, - "data_points": split_data_points, - }, - } - ) - current_data_points = metric.sum.data_points - elif field_name == "histogram": - split_metrics.append( - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "histogram": { - "aggregation_temporality": metric.histogram.aggregation_temporality, - "data_points": split_data_points, - }, - } - ) - current_data_points = metric.histogram.data_points - elif field_name == "exponential_histogram": - split_metrics.append( - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "exponential_histogram": { - "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, - "data_points": split_data_points, - }, - } - ) - current_data_points = ( - metric.exponential_histogram.data_points - ) - elif field_name == "gauge": - split_metrics.append( - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "gauge": { - "data_points": split_data_points, - }, - } - ) - current_data_points = metric.gauge.data_points - elif field_name == "summary": - split_metrics.append( - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "summary": { - "data_points": split_data_points, - }, - } - ) - else: - _logger.warning( - "Tried to split and export an unsupported metric type. Skipping." - ) - continue + for metric in scope_metrics.metrics: + split_data_points = [] + + # protobuf specifies metrics types (e.g. Sum, Histogram) + # with different accessors for data points, etc + # We maintain these structures throughout batch calculation + current_data_points = [] + field_name = metric.WhichOneof("data") + if field_name == "sum": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + }, + } + ) + current_data_points = metric.sum.data_points + elif field_name == "histogram": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ) + current_data_points = metric.histogram.data_points + elif field_name == "exponential_histogram": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ) + current_data_points = ( + metric.exponential_histogram.data_points + ) + elif field_name == "gauge": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + }, + } + ) + current_data_points = metric.gauge.data_points + elif field_name == "summary": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + }, + } + ) + else: + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) + continue - for data_point in current_data_points: - split_data_points.append(data_point) - batch_size += 1 + for data_point in current_data_points: + split_data_points.append(data_point) + batch_size += 1 - if batch_size >= self._max_export_batch_size: - yield pb2.MetricsData( - resource_metrics=_get_split_resource_metrics_pb2( - split_resource_metrics - ) + if batch_size >= max_export_batch_size: + yield pb2.MetricsData( + resource_metrics=_get_split_resource_metrics_pb2( + split_resource_metrics ) + ) - # Reset all the reference variables with current metrics_data position - # minus yielded data_points. Need to clear data_points and keep metric - # to avoid duplicate data_point export - batch_size = 0 - split_data_points = [] - - field_name = metric.WhichOneof("data") - if field_name == "sum": - split_metrics = [ - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "sum": { - "aggregation_temporality": metric.sum.aggregation_temporality, - "is_monotonic": metric.sum.is_monotonic, - "data_points": split_data_points, - }, - } - ] - elif field_name == "histogram": - split_metrics = [ - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "histogram": { - "aggregation_temporality": metric.histogram.aggregation_temporality, - "data_points": split_data_points, - }, - } - ] - elif field_name == "exponential_histogram": - split_metrics = [ - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "exponential_histogram": { - "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, - "data_points": split_data_points, - }, - } - ] - elif field_name == "gauge": - split_metrics = [ - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "gauge": { - "data_points": split_data_points, - }, - } - ] - elif field_name == "summary": - split_metrics = [ - { - "name": metric.name, - "description": metric.description, - "unit": metric.unit, - "summary": { - "data_points": split_data_points, - }, - } - ] + # Reset all the reference variables with current metrics_data position + # minus yielded data_points. Need to clear data_points and keep metric + # to avoid duplicate data_point export + batch_size = 0 + split_data_points = [] - split_scope_metrics = [ + field_name = metric.WhichOneof("data") + if field_name == "sum": + split_metrics = [ { - "scope": scope_metrics.scope, - "schema_url": scope_metrics.schema_url, - "metrics": split_metrics, + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + }, } ] - split_resource_metrics = [ + elif field_name == "histogram": + split_metrics = [ { - "resource": resource_metrics.resource, - "schema_url": resource_metrics.schema_url, - "scope_metrics": split_scope_metrics, + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ] + elif field_name == "exponential_histogram": + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ] + elif field_name == "gauge": + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + }, + } + ] + elif field_name == "summary": + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + }, } ] - if not split_data_points: - # If data_points is empty remove the whole metric - split_metrics.pop() - - if not split_metrics: - # If metrics is empty remove the whole scope_metrics - split_scope_metrics.pop() - - if not split_scope_metrics: - # If scope_metrics is empty remove the whole resource_metrics - split_resource_metrics.pop() + split_scope_metrics = [ + { + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, + } + ] + split_resource_metrics = [ + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ] - if batch_size > 0: - yield pb2.MetricsData( - resource_metrics=_get_split_resource_metrics_pb2( - split_resource_metrics - ) - ) + if not split_data_points: + # If data_points is empty remove the whole metric + split_metrics.pop() - def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - if self._shutdown: - _logger.warning("Exporter already shutdown, ignoring call") - return - self._session.close() - self._shutdown = True + if not split_metrics: + # If metrics is empty remove the whole scope_metrics + split_scope_metrics.pop() - @property - def _exporting(self) -> str: - return "metrics" + if not split_scope_metrics: + # If scope_metrics is empty remove the whole resource_metrics + split_resource_metrics.pop() - def force_flush(self, timeout_millis: float = 10_000) -> bool: - """Nothing is buffered in this exporter, so this method does nothing.""" - return True + if batch_size > 0: + yield pb2.MetricsData( + resource_metrics=_get_split_resource_metrics_pb2( + split_resource_metrics + ) + ) def _get_split_resource_metrics_pb2( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index ab497438421..38cf398d529 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -34,6 +34,7 @@ DEFAULT_TIMEOUT, OTLPMetricExporter, _get_split_resource_metrics_pb2, + _split_metrics_data, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ from opentelemetry.proto.common.v1.common_pb2 import ( @@ -368,8 +369,9 @@ def test_split_metrics_data_many_data_points(self): ) split_metrics_data: List[MetricsData] = list( # pylint: disable=protected-access - OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + _split_metrics_data( metrics_data=metrics_data, + max_export_batch_size=2, ) ) @@ -446,8 +448,9 @@ def test_split_metrics_data_nb_data_points_equal_batch_size(self): split_metrics_data: List[MetricsData] = list( # pylint: disable=protected-access - OTLPMetricExporter(max_export_batch_size=3)._split_metrics_data( + _split_metrics_data( metrics_data=metrics_data, + max_export_batch_size=3, ) ) @@ -537,8 +540,9 @@ def test_split_metrics_data_many_resources_scopes_metrics(self): split_metrics_data: List[MetricsData] = list( # pylint: disable=protected-access - OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + _split_metrics_data( metrics_data=metrics_data, + max_export_batch_size=2, ) )