diff --git a/CHANGELOG.md b/CHANGELOG.md index dfc9179da3..784e95dee1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add configurable `max_export_batch_size` to OTLP HTTP metrics exporter + ([#4576](https://github.com/open-telemetry/opentelemetry-python/pull/4576)) + ## Version 1.36.0/0.57b0 (2025-07-29) - Add missing Prometheus exporter documentation @@ -16,7 +19,6 @@ the OTLP `LogHandler` remains attached to the root logger. Fix a bug that can cause a deadlock to occur over `logging._lock` in some cases ([#4636](https://github.com/open-telemetry/opentelemetry-python/pull/4636)). - otlp-http-exporter: set default value for param `timeout_sec` in `_export` method ([#4691](https://github.com/open-telemetry/opentelemetry-python/pull/4691)) - - Update OTLP gRPC/HTTP exporters: calling shutdown will now interrupt exporters that are sleeping before a retry attempt, and cause them to return failure immediately. Update BatchSpan/LogRecordProcessors: shutdown will now complete after 30 seconds of trying to finish diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 3b7079f7fc..b09f1ce3fe 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -24,10 +24,9 @@ Any, Callable, Dict, + Iterable, List, - Mapping, Optional, - Sequence, ) import requests @@ -120,7 +119,29 @@ def __init__( preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, + max_export_batch_size: int | None = None, ): + """OTLP HTTP metrics exporter + + Args: + endpoint: Target URL to which the exporter is going to send metrics + certificate_file: Path to the certificate file to use for any TLS + client_key_file: Path to the client key file to use for any TLS + client_certificate_file: Path to the client certificate file to use for any TLS + headers: Headers to be sent with HTTP requests at export + timeout: Timeout in seconds for export + compression: Compression to use; one of none, gzip, deflate + session: Requests session to use at export + preferred_temporality: Map of preferred temporality for each metric type. + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what + preferred temporality is. + preferred_aggregation: Map of preferred aggregation for each metric type. + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what + preferred aggregation is. + max_export_batch_size: Maximum number of data points to export in a single request. + If not set there is no limit to the number of data points in a request. + If it is set and the number of data points exceeds the max, the request will be split. + """ self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, @@ -170,6 +191,7 @@ def __init__( self._common_configuration( preferred_temporality, preferred_aggregation ) + self._max_export_batch_size: int | None = max_export_batch_size self._shutdown = False def _export( @@ -209,21 +231,25 @@ def _export( ) return resp - def export( + def _export_with_retries( self, - metrics_data: MetricsData, - timeout_millis: Optional[float] = 10000, - **kwargs, + serialized_data: bytes, + deadline_sec: float, ) -> MetricExportResult: - if self._shutdown: - _logger.warning("Exporter already shutdown, ignoring batch") - return MetricExportResult.FAILURE - serialized_data = encode_metrics(metrics_data).SerializeToString() - deadline_sec = time() + self._timeout + """Export serialized data with retry logic until success, non-transient error, or exponential backoff maxed out. + + Args: + serialized_data: serialized metrics data to export + deadline_sec: timestamp deadline for the export + + Returns: + MetricExportResult: SUCCESS if export succeeded, FAILURE otherwise + """ for retry_num in range(_MAX_RETRYS): resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: return MetricExportResult.SUCCESS + # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) if ( @@ -238,6 +264,7 @@ def export( resp.text, ) return MetricExportResult.FAILURE + _logger.warning( "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", resp.reason, @@ -249,6 +276,40 @@ def export( break return MetricExportResult.FAILURE + def export( + self, + metrics_data: MetricsData, + timeout_millis: Optional[float] = 10000, + **kwargs, + ) -> MetricExportResult: + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return MetricExportResult.FAILURE + + serialized_data = encode_metrics(metrics_data) + deadline_sec = time() + self._timeout + + # If no batch size configured, export as single batch with retries as configured + if self._max_export_batch_size is None: + return self._export_with_retries( + serialized_data.SerializeToString(), deadline_sec + ) + + # Else, export in batches of configured size + split_metrics_batches = list( + _split_metrics_data(serialized_data, self._max_export_batch_size) + ) + export_result = MetricExportResult.SUCCESS + + for split_metrics_data in split_metrics_batches: + export_result = self._export_with_retries( + split_metrics_data.SerializeToString(), + deadline_sec, + ) + + # Return export result of the last batch, like gRPC exporter + return export_result + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring call") @@ -266,6 +327,396 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: return True +def _split_metrics_data( + metrics_data: pb2.MetricsData, + max_export_batch_size: int | None = None, +) -> Iterable[pb2.MetricsData]: + """Splits metrics data into several MetricsData (copies protobuf originals), + based on configured data point max export batch size. + + Args: + metrics_data: metrics object based on HTTP protocol buffer definition + + Returns: + Iterable[pb2.MetricsData]: An iterable of pb2.MetricsData objects containing + pb2.ResourceMetrics, pb2.ScopeMetrics, pb2.Metrics, and data points + """ + if not max_export_batch_size: + return metrics_data + + batch_size: int = 0 + # Stores split metrics data as editable references + # used to write batched pb2 objects for export when finalized + split_resource_metrics = [] + + for resource_metrics in metrics_data.resource_metrics: + split_scope_metrics = [] + split_resource_metrics.append( + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ) + + for scope_metrics in resource_metrics.scope_metrics: + split_metrics = [] + split_scope_metrics.append( + { + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, + } + ) + + for metric in scope_metrics.metrics: + split_data_points = [] + + # protobuf specifies metrics types (e.g. Sum, Histogram) + # with different accessors for data points, etc + # We maintain these structures throughout batch calculation + current_data_points = [] + field_name = metric.WhichOneof("data") + if field_name == "sum": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + }, + } + ) + current_data_points = metric.sum.data_points + elif field_name == "histogram": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ) + current_data_points = metric.histogram.data_points + elif field_name == "exponential_histogram": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ) + current_data_points = ( + metric.exponential_histogram.data_points + ) + elif field_name == "gauge": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + }, + } + ) + current_data_points = metric.gauge.data_points + elif field_name == "summary": + split_metrics.append( + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + }, + } + ) + else: + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) + continue + + for data_point in current_data_points: + split_data_points.append(data_point) + batch_size += 1 + + if batch_size >= max_export_batch_size: + yield pb2.MetricsData( + resource_metrics=_get_split_resource_metrics_pb2( + split_resource_metrics + ) + ) + + # Reset all the reference variables with current metrics_data position + # minus yielded data_points. Need to clear data_points and keep metric + # to avoid duplicate data_point export + batch_size = 0 + split_data_points = [] + + field_name = metric.WhichOneof("data") + if field_name == "sum": + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "sum": { + "aggregation_temporality": metric.sum.aggregation_temporality, + "is_monotonic": metric.sum.is_monotonic, + "data_points": split_data_points, + }, + } + ] + elif field_name == "histogram": + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "histogram": { + "aggregation_temporality": metric.histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ] + elif field_name == "exponential_histogram": + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "exponential_histogram": { + "aggregation_temporality": metric.exponential_histogram.aggregation_temporality, + "data_points": split_data_points, + }, + } + ] + elif field_name == "gauge": + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "gauge": { + "data_points": split_data_points, + }, + } + ] + elif field_name == "summary": + split_metrics = [ + { + "name": metric.name, + "description": metric.description, + "unit": metric.unit, + "summary": { + "data_points": split_data_points, + }, + } + ] + + split_scope_metrics = [ + { + "scope": scope_metrics.scope, + "schema_url": scope_metrics.schema_url, + "metrics": split_metrics, + } + ] + split_resource_metrics = [ + { + "resource": resource_metrics.resource, + "schema_url": resource_metrics.schema_url, + "scope_metrics": split_scope_metrics, + } + ] + + if not split_data_points: + # If data_points is empty remove the whole metric + split_metrics.pop() + + if not split_metrics: + # If metrics is empty remove the whole scope_metrics + split_scope_metrics.pop() + + if not split_scope_metrics: + # If scope_metrics is empty remove the whole resource_metrics + split_resource_metrics.pop() + + if batch_size > 0: + yield pb2.MetricsData( + resource_metrics=_get_split_resource_metrics_pb2( + split_resource_metrics + ) + ) + + +def _get_split_resource_metrics_pb2( + split_resource_metrics: List[Dict], +) -> List[pb2.ResourceMetrics]: + """Helper that returns a list of pb2.ResourceMetrics objects based on split_resource_metrics. + Example input: + + ```python + [ + { + "resource": , + "schema_url": "http://foo-bar", + "scope_metrics": [ + "scope": , + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "apples", + "description": "number of apples purchased", + "sum": { + "aggregation_temporality": 1, + "is_monotonic": "false", + "data_points": [ + { + start_time_unix_nano: 1000 + time_unix_nano: 1001 + exemplars { + time_unix_nano: 1002 + span_id: "foo-span" + trace_id: "foo-trace" + as_int: 5 + } + as_int: 5 + } + ] + } + }, + ], + ], + }, + ] + ``` + + Args: + split_resource_metrics: A list of dict representations of ResourceMetrics, + ScopeMetrics, Metrics, and data points. + + Returns: + List[pb2.ResourceMetrics]: A list of pb2.ResourceMetrics objects containing + pb2.ScopeMetrics, pb2.Metrics, and data points + """ + split_resource_metrics_pb = [] + for resource_metrics in split_resource_metrics: + new_resource_metrics = pb2.ResourceMetrics( + resource=resource_metrics.get("resource"), + scope_metrics=[], + schema_url=resource_metrics.get("schema_url"), + ) + for scope_metrics in resource_metrics.get("scope_metrics", []): + new_scope_metrics = pb2.ScopeMetrics( + scope=scope_metrics.get("scope"), + metrics=[], + schema_url=scope_metrics.get("schema_url"), + ) + + for metric in scope_metrics.get("metrics", []): + new_metric = None + data_points = [] + + if "sum" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + sum=pb2.Sum( + data_points=[], + aggregation_temporality=metric.get("sum").get( + "aggregation_temporality" + ), + is_monotonic=metric.get("sum").get("is_monotonic"), + ), + ) + data_points = metric.get("sum").get("data_points") + elif "histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + histogram=pb2.Histogram( + data_points=[], + aggregation_temporality=metric.get( + "histogram" + ).get("aggregation_temporality"), + ), + ) + data_points = metric.get("histogram").get("data_points") + elif "exponential_histogram" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + exponential_histogram=pb2.ExponentialHistogram( + data_points=[], + aggregation_temporality=metric.get( + "exponential_histogram" + ).get("aggregation_temporality"), + ), + ) + data_points = metric.get("exponential_histogram").get( + "data_points" + ) + elif "gauge" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + gauge=pb2.Gauge( + data_points=[], + ), + ) + data_points = metric.get("gauge").get("data_points") + elif "summary" in metric: + new_metric = pb2.Metric( + name=metric.get("name"), + description=metric.get("description"), + unit=metric.get("unit"), + summary=pb2.Summary( + data_points=[], + ), + ) + data_points = metric.get("summary").get("data_points") + else: + _logger.warning( + "Tried to split and export an unsupported metric type. Skipping." + ) + continue + + for data_point in data_points: + if "sum" in metric: + new_metric.sum.data_points.append(data_point) + elif "histogram" in metric: + new_metric.histogram.data_points.append(data_point) + elif "exponential_histogram" in metric: + new_metric.exponential_histogram.data_points.append( + data_point + ) + elif "gauge" in metric: + new_metric.gauge.data_points.append(data_point) + elif "summary" in metric: + new_metric.summary.data_points.append(data_point) + + new_scope_metrics.metrics.append(new_metric) + new_resource_metrics.scope_metrics.append(new_scope_metrics) + split_resource_metrics_pb.append(new_resource_metrics) + return split_resource_metrics_pb + + @deprecated( "Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead. Deprecated since version 1.18.0.", ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 815761397e..47f3234182 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=too-many-lines import threading import time from logging import WARNING from os import environ +from typing import List from unittest import TestCase -from unittest.mock import ANY, MagicMock, Mock, patch +from unittest.mock import ANY, MagicMock, Mock, call, patch from requests import Session from requests.models import Response @@ -32,8 +34,18 @@ DEFAULT_METRICS_EXPORT_PATH, DEFAULT_TIMEOUT, OTLPMetricExporter, + _get_split_resource_metrics_pb2, + _split_metrics_data, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ +from opentelemetry.proto.common.v1.common_pb2 import ( + InstrumentationScope, + KeyValue, +) +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as Pb2Resource, +) from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, @@ -87,6 +99,7 @@ # pylint: disable=protected-access class TestOTLPMetricExporter(TestCase): + # pylint: disable=too-many-public-methods def setUp(self): self.metrics = { "sum_int": MetricsData( @@ -332,6 +345,838 @@ def test_serialization(self, mock_post): cert=exporter._client_cert, ) + def test_split_metrics_data_many_data_points(self): + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + _split_metrics_data( + metrics_data=metrics_data, + max_export_batch_size=2, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ), + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_split_metrics_data_nb_data_points_equal_batch_size(self): + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + _split_metrics_data( + metrics_data=metrics_data, + max_export_batch_size=3, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_split_metrics_data_many_resources_scopes_metrics(self): + # GIVEN + metrics_data = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ) + + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + _split_metrics_data( + metrics_data=metrics_data, + max_export_batch_size=2, + ) + ) + + self.assertEqual( + [ + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ), + pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ), + ], + split_metrics_data, + ) + + def test_get_split_resource_metrics_pb2_one_of_each(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo", value={"string_value": "bar"}) + ], + ), + "schema_url": "http://foo-bar", + "scope_metrics": [ + { + "scope": InstrumentationScope( + name="foo-scope", version="1.0.0" + ), + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "foo-metric", + "description": "foo-description", + "unit": "foo-unit", + "sum": { + "aggregation_temporality": 1, + "is_monotonic": True, + "data_points": [ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + as_double=42.42, + ) + ], + }, + } + ], + } + ], + } + ] + + result = _get_split_resource_metrics_pb2(split_resource_metrics) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], pb2.ResourceMetrics) + self.assertEqual(result[0].schema_url, "http://foo-bar") + self.assertEqual(len(result[0].scope_metrics), 1) + self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope") + self.assertEqual(len(result[0].scope_metrics[0].metrics), 1) + self.assertEqual( + result[0].scope_metrics[0].metrics[0].name, "foo-metric" + ) + self.assertEqual( + result[0].scope_metrics[0].metrics[0].sum.is_monotonic, True + ) + + def test_get_split_resource_metrics_pb2_multiples(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo1", value={"string_value": "bar2"}) + ], + ), + "schema_url": "http://foo-bar-1", + "scope_metrics": [ + { + "scope": InstrumentationScope( + name="foo-scope-1", version="1.0.0" + ), + "schema_url": "http://foo-baz-1", + "metrics": [ + { + "name": "foo-metric-1", + "description": "foo-description-1", + "unit": "foo-unit-1", + "gauge": { + "data_points": [ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + as_double=42.42, + ) + ], + }, + } + ], + } + ], + }, + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo2", value={"string_value": "bar2"}) + ], + ), + "schema_url": "http://foo-bar-2", + "scope_metrics": [ + { + "scope": InstrumentationScope( + name="foo-scope-2", version="2.0.0" + ), + "schema_url": "http://foo-baz-2", + "metrics": [ + { + "name": "foo-metric-2", + "description": "foo-description-2", + "unit": "foo-unit-2", + "histogram": { + "aggregation_temporality": 2, + "data_points": [ + pb2.HistogramDataPoint( + attributes=[ + KeyValue( + key="dp_key", + value={ + "string_value": "dp_value" + }, + ) + ], + start_time_unix_nano=12345, + time_unix_nano=12350, + ) + ], + }, + } + ], + } + ], + }, + ] + + result = _get_split_resource_metrics_pb2(split_resource_metrics) + self.assertEqual(len(result), 2) + self.assertEqual(result[0].schema_url, "http://foo-bar-1") + self.assertEqual(result[1].schema_url, "http://foo-bar-2") + self.assertEqual(len(result[0].scope_metrics), 1) + self.assertEqual(len(result[1].scope_metrics), 1) + self.assertEqual(result[0].scope_metrics[0].scope.name, "foo-scope-1") + self.assertEqual(result[1].scope_metrics[0].scope.name, "foo-scope-2") + self.assertEqual( + result[0].scope_metrics[0].metrics[0].name, "foo-metric-1" + ) + self.assertEqual( + result[1].scope_metrics[0].metrics[0].name, "foo-metric-2" + ) + + def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): + split_resource_metrics = [ + { + "resource": Pb2Resource( + attributes=[ + KeyValue(key="foo", value={"string_value": "bar"}) + ], + ), + "schema_url": "http://foo-bar", + "scope_metrics": [ + { + "scope": InstrumentationScope( + name="foo", version="1.0.0" + ), + "schema_url": "http://foo-baz", + "metrics": [ + { + "name": "unsupported-metric", + "description": "foo-bar", + "unit": "foo-bar", + "unsupported_metric_type": {}, + } + ], + } + ], + } + ] + + with self.assertLogs(level="WARNING") as log: + result = _get_split_resource_metrics_pb2(split_resource_metrics) + self.assertEqual(len(result), 1) + self.assertIn( + "Tried to split and export an unsupported metric type", + log.output[0], + ) + + @patch.object(OTLPMetricExporter, "_export") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching_success( + self, + mock_encode_metrics, + mock_time, + mock_random, + mock_export, + ): + mock_time.return_value = 0 + mock_random.uniform.return_value = 1 + mock_export.side_effect = [ + # Success + MagicMock(ok=True), + MagicMock(ok=True), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + batch_2 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.SUCCESS) + self.assertEqual(mock_export.call_count, 2) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString(), 10), + call(batch_2.SerializeToString(), 10), + ] + ) + + @patch.object(OTLPMetricExporter, "_export") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching_failure_first( + self, + mock_encode_metrics, + mock_time, + mock_random, + mock_export, + ): + mock_time.return_value = 0 + mock_random.uniform.return_value = 1 + mock_export.side_effect = [ + # Non-retryable + MagicMock(ok=False, status_code=400, reason="bad request"), + MagicMock(ok=True), + MagicMock(ok=True), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.SUCCESS) + self.assertEqual(mock_export.call_count, 2) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString(), 10), + ] + ) + + @patch.object(OTLPMetricExporter, "_export") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching_failure_last( + self, + mock_encode_metrics, + mock_time, + mock_random, + mock_export, + ): + mock_time.return_value = 0 + mock_random.uniform.return_value = 1 + mock_export.side_effect = [ + # Success + MagicMock(ok=True), + # Non-retryable + MagicMock(ok=False, status_code=400, reason="bad request"), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + batch_2 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.FAILURE) + self.assertEqual(mock_export.call_count, 2) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString(), 10), + call(batch_2.SerializeToString(), 10), + ] + ) + + @patch.object(OTLPMetricExporter, "_export") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") + @patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" + ) + def test_export_retries_with_batching_failure_retryable( + self, + mock_encode_metrics, + mock_time, + mock_random, + mock_export, + ): + mock_time.return_value = 0 + mock_random.uniform.return_value = 1 + mock_export.side_effect = [ + # Success + MagicMock(ok=True), + # Retryable + MagicMock( + ok=False, status_code=500, reason="internal server error" + ), + # Then success + MagicMock(ok=True), + ] + mock_encode_metrics.return_value = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + batch_1 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + _number_data_point(12), + ], + ), + ], + ), + ], + ), + ] + ) + batch_2 = pb2.MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + ] + ) + + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export("foo") + self.assertEqual(result, MetricExportResult.SUCCESS) + self.assertEqual(mock_export.call_count, 3) + mock_export.assert_has_calls( + [ + call(batch_1.SerializeToString(), 10), + call(batch_2.SerializeToString(), 10), + call(batch_2.SerializeToString(), 10), + ] + ) + def test_aggregation_temporality(self): otlp_metric_exporter = OTLPMetricExporter() @@ -573,3 +1418,44 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + + +def _resource_metrics( + index: int, scope_metrics: List[pb2.ScopeMetrics] +) -> pb2.ResourceMetrics: + return pb2.ResourceMetrics( + resource={ + "attributes": [KeyValue(key="a", value={"int_value": index})], + }, + schema_url=f"resource_url_{index}", + scope_metrics=scope_metrics, + ) + + +def _scope_metrics(index: int, metrics: List[pb2.Metric]) -> pb2.ScopeMetrics: + return pb2.ScopeMetrics( + scope=InstrumentationScope(name=f"scope_{index}"), + schema_url=f"scope_url_{index}", + metrics=metrics, + ) + + +def _gauge(index: int, data_points: List[pb2.NumberDataPoint]) -> pb2.Metric: + return pb2.Metric( + name=f"gauge_{index}", + description="description", + unit="unit", + gauge=pb2.Gauge(data_points=data_points), + ) + + +def _number_data_point(value: int) -> pb2.NumberDataPoint: + return pb2.NumberDataPoint( + attributes=[ + KeyValue(key="a", value={"int_value": 1}), + KeyValue(key="b", value={"bool_value": True}), + ], + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + as_int=value, + )