Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- Added `Resource::get_ref(&self, key: &Key) -> Option<&Value>` to allow retrieving a reference to a resource value without cloning.
- **Breaking** Removed the following public hidden methods from the `SdkTracer` [#3227][3227]:
- `id_generator`, `should_sample`
- **Fix**: ObservableCounter and ObservableUpDownCounter now correctly report only data points from the current measurement cycle,
removing stale attribute combinations that are no longer observed. [#3213](https://github.com/open-telemetry/opentelemetry-rust/pull/3248)

[3227]: https://github.com/open-telemetry/opentelemetry-rust/pull/3227

Expand Down
10 changes: 8 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ where
}

/// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
/// This is used in Cumulative temporality mode, where [`ValueMap`] is not cleared.
/// This is used for synchronous instruments (Counter, Histogram, etc.) in Cumulative temporality mode,
/// where attribute sets persist across collection cycles and [`ValueMap`] is not cleared.
pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
Expand All @@ -179,7 +180,12 @@ where
}

/// Iterate through all attribute sets, populate `DataPoints` and reset.
/// This is used in Delta temporality mode, where [`ValueMap`] is reset after collection.
/// This is used for:
/// - Synchronous instruments in Delta temporality mode
/// - Asynchronous instruments (Observable) in both Delta and Cumulative temporality modes
///
/// For asynchronous instruments, this removes stale attribute sets that were not observed
/// in the current callback, ensuring only currently active attributes are reported.
pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ impl<T: Number> PrecomputedSum<T> {
s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic;

// Use collect_and_reset to remove stale attributes (not observed in current callback)
// For cumulative, report absolute values (no delta calculation needed)
self.value_map
.collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
attributes,
value: aggr.value.get_value(),
exemplars: vec![],
Expand Down
17 changes: 3 additions & 14 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1457,17 +1457,12 @@ mod tests {
// Run this test with stdout enabled to see output.
// cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture

asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper("gauge");
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
"gauge", true,
);
// TODO fix: all asynchronous instruments should not emit data points if not measured
// but these implementations are still buggy
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
"counter", false,
"counter",
);
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
"updown_counter",
false,
);
}

Expand Down Expand Up @@ -1782,7 +1777,6 @@ mod tests {

fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
instrument_name: &'static str,
should_not_emit: bool,
) {
let mut test_context = TestContext::new(Temporality::Cumulative);
let attributes = Arc::new([KeyValue::new("key1", "value1")]);
Expand Down Expand Up @@ -1844,12 +1838,7 @@ mod tests {

test_context.flush_metrics();

if should_not_emit {
test_context.check_no_metrics();
} else {
// Test that latest export has the same data as the previous one
assert_correct_export(&mut test_context, instrument_name);
}
test_context.check_no_metrics();

fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
match instrument_name {
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,8 @@ fn aggregate_fn<T: Number>(
}
Aggregation::Sum => {
let fns = match kind {
// TODO implement: observable instruments should not report data points on every collect
// from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
// MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
// Observable instruments use collect_and_reset to report only data points
// measured in the current callback, removing stale attributes
InstrumentKind::ObservableCounter => b.precomputed_sum(true),
InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
Expand Down