diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b80ef9ec3..570080816b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Overwrite logging.config.fileConfig and logging.config.dictConfig to ensure the OTLP `LogHandler` remains attached to the root logger. Fix a bug that can cause a deadlock to occur over `logging._lock` in some cases ([#4636](https://github.com/open-telemetry/opentelemetry-python/pull/4636)). +- Filter duplicate logs out of some internal `logger`'s logs on the export logs path that might otherwise endlessly log or cause a recursion depth exceeded issue in cases where logging itself results in an exception. + ([#4695](https://github.com/open-telemetry/opentelemetry-python/pull/4695)). - Update OTLP gRPC/HTTP exporters: calling shutdown will now interrupt exporters that are sleeping before a retry attempt, and cause them to return failure immediately. diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 6791062d5d..e61ed8baee 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -59,6 +59,7 @@ KeyValue, ) from opentelemetry.proto.resource.v1.resource_pb2 import Resource # noqa: F401 +from opentelemetry.sdk._shared_internal import DuplicateFilter from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, @@ -87,6 +88,8 @@ ) _MAX_RETRYS = 6 logger = getLogger(__name__) +# This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc. +logger.addFilter(DuplicateFilter()) SDKDataT = TypeVar("SDKDataT") ResourceDataT = TypeVar("ResourceDataT") TypingResourceT = TypeVar("TypingResourceT") diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 4c8b4de600..eb4d25bb10 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -38,6 +38,7 @@ LogExporter, LogExportResult, ) +from opentelemetry.sdk._shared_internal import DuplicateFilter from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, @@ -57,6 +58,8 @@ from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) +# This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc. +_logger.addFilter(DuplicateFilter()) DEFAULT_COMPRESSION = Compression.NoCompression diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index ec629221b8..411f92aec1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -27,7 +27,7 @@ set_value, ) from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor -from opentelemetry.sdk._shared_internal import BatchProcessor +from opentelemetry.sdk._shared_internal import BatchProcessor, DuplicateFilter from opentelemetry.sdk.environment_variables import ( OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, @@ -43,6 +43,7 @@ "Unable to parse value for %s as integer. Defaulting to %s." ) _logger = logging.getLogger(__name__) +_logger.addFilter(DuplicateFilter()) class LogExportResult(enum.Enum): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index aec04e80ea..235a6737c0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -39,6 +39,26 @@ from opentelemetry.util._once import Once +class DuplicateFilter(logging.Filter): + """Filter that can be applied to internal `logger`'s. + + Currently applied to `logger`s on the export logs path that could otherwise cause endless logging of errors or a + recursion depth exceeded issue in cases where logging itself results in an exception.""" + + def filter(self, record): + current_log = ( + record.module, + record.levelno, + record.msg, + time.time() // 60, + ) + if current_log != getattr(self, "last_log", None): + self.last_log = current_log # pylint: disable=attribute-defined-outside-init + return True + # False means python's `logging` module will no longer process this log. + return False + + class BatchExportStrategy(enum.Enum): EXPORT_ALL = 0 EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1 @@ -89,6 +109,7 @@ def __init__( daemon=True, ) self._logger = logging.getLogger(__name__) + self._logger.addFilter(DuplicateFilter()) self._exporting = exporting self._shutdown = False diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index 541d27c880..f07ebc5ae7 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -14,6 +14,7 @@ # pylint: disable=protected-access import gc +import logging import multiprocessing import os import threading @@ -33,6 +34,9 @@ from opentelemetry.sdk._logs.export import ( BatchLogRecordProcessor, ) +from opentelemetry.sdk._shared_internal import ( + DuplicateFilter, +) from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.util.instrumentation import InstrumentationScope @@ -56,6 +60,7 @@ def __init__(self, export_sleep: int): self.num_export_calls = 0 self.export_sleep = export_sleep self._shutdown = False + self.sleep_interrupted = False self.export_sleep_event = threading.Event() def export(self, _: list[Any]): @@ -65,6 +70,7 @@ def export(self, _: list[Any]): sleep_interrupted = self.export_sleep_event.wait(self.export_sleep) if sleep_interrupted: + self.sleep_interrupted = True raise ValueError("Did not get to finish !") def shutdown(self): @@ -219,7 +225,7 @@ def test_record_processor_is_garbage_collected( assert weak_ref() is None def test_shutdown_allows_1_export_to_finish( - self, batch_processor_class, telemetry, caplog + self, batch_processor_class, telemetry ): # This exporter throws an exception if it's export sleep cannot finish. exporter = MockExporterForTesting(export_sleep=2) @@ -244,5 +250,15 @@ def test_shutdown_allows_1_export_to_finish( time.sleep(0.1) assert processor._batch_processor._worker_thread.is_alive() is False # Expect the second call to be interrupted by shutdown, and the third call to never be made. - assert "Exception while exporting" in caplog.text + assert exporter.sleep_interrupted is True assert 2 == exporter.num_export_calls + + +class TestCommonFuncs(unittest.TestCase): + def test_duplicate_logs_filter_works(self): + test_logger = logging.getLogger("testLogger") + test_logger.addFilter(DuplicateFilter()) + with self.assertLogs("testLogger") as cm: + test_logger.info("message") + test_logger.info("message") + self.assertEqual(len(cm.output), 1)