From fc7d10b1bc277adf5bfcb6a90572d37d9888c4de Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 11 Sep 2025 10:49:31 -0700 Subject: [PATCH 1/9] Add metric for number of series with discarded samples Signed-off-by: Essam Eldaly --- pkg/ingester/ingester.go | 7 + pkg/util/discardedseries/tracker.go | 164 +++++++++++++++++++++++ pkg/util/discardedseries/tracker_test.go | 139 +++++++++++++++++++ pkg/util/validation/validate.go | 17 +++ 4 files changed, 327 insertions(+) create mode 100644 pkg/util/discardedseries/tracker.go create mode 100644 pkg/util/discardedseries/tracker_test.go diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index faf7deb85b5..bd53fb0d978 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1259,22 +1259,27 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte switch cause := errors.Cause(err); { case errors.Is(cause, storage.ErrOutOfBounds): sampleOutOfBoundsCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, &copiedLabels) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrOutOfOrderSample): sampleOutOfOrderCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, &copiedLabels) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp): newValueForTimestampCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, &copiedLabels) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrTooOldSample): sampleTooOldCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, &copiedLabels) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, errMaxSeriesPerUserLimitExceeded): perUserSeriesLimitCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, &copiedLabels) updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels)) }) @@ -1287,12 +1292,14 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded): perMetricSeriesLimitCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, &copiedLabels) updateFirstPartial(func() error { return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels)) }) case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}): perLabelSetSeriesLimitCount++ + i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, &copiedLabels) // We only track per labelset discarded samples for throttling by labelset limit. reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit) updateFirstPartial(func() error { diff --git a/pkg/util/discardedseries/tracker.go b/pkg/util/discardedseries/tracker.go new file mode 100644 index 00000000000..5a59995cb0c --- /dev/null +++ b/pkg/util/discardedseries/tracker.go @@ -0,0 +1,164 @@ +package discardedseries + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" +) + +const ( + vendMetricsInterval = 30 * time.Second +) + +type labelCounterStruct struct { + *sync.RWMutex + *labels.Labels + inCurrentCycle bool +} + +type seriesCounterStruct struct { + *sync.RWMutex + seriesCountMap map[uint64]*labelCounterStruct +} + +type userCounterStruct struct { + *sync.RWMutex + userSeriesMap map[string]*seriesCounterStruct +} + +type DiscardedSeriesTracker struct { + *sync.RWMutex + reasonUserMap map[string]*userCounterStruct + discardedSeriesGauge *prometheus.GaugeVec +} + +func NewDiscardedSeriesTracker(discardedSeriesGauge *prometheus.GaugeVec) *DiscardedSeriesTracker { + tracker := &DiscardedSeriesTracker{ + RWMutex: &sync.RWMutex{}, + reasonUserMap: make(map[string]*userCounterStruct), + discardedSeriesGauge: discardedSeriesGauge, + } + return tracker +} + +func (t *DiscardedSeriesTracker) Track(reason string, user string, labels *labels.Labels) { + series := labels.Hash() + t.RLock() + userCounter, ok := t.reasonUserMap[reason] + t.RUnlock() + if !ok { + t.Lock() + userCounter, ok = t.reasonUserMap[reason] + if !ok { + userCounter = &userCounterStruct{ + RWMutex: &sync.RWMutex{}, + userSeriesMap: make(map[string]*seriesCounterStruct), + } + t.reasonUserMap[reason] = userCounter + } + t.Unlock() + } + + userCounter.RLock() + seriesCounter, ok := userCounter.userSeriesMap[user] + userCounter.RUnlock() + if !ok { + userCounter.Lock() + seriesCounter, ok = userCounter.userSeriesMap[user] + if !ok { + seriesCounter = &seriesCounterStruct{ + RWMutex: &sync.RWMutex{}, + seriesCountMap: make(map[uint64]*labelCounterStruct), + } + userCounter.userSeriesMap[user] = seriesCounter + } + userCounter.Unlock() + } + + seriesCounter.RLock() + labelCounter, ok := seriesCounter.seriesCountMap[series] + seriesCounter.RUnlock() + if !ok { + seriesCounter.Lock() + labelCounter, ok = seriesCounter.seriesCountMap[series] + if !ok { + labelCounter = &labelCounterStruct{ + Labels: labels, + RWMutex: &sync.RWMutex{}, + inCurrentCycle: true, + } + seriesCounter.seriesCountMap[series] = labelCounter + } + seriesCounter.Unlock() + } + + labelCounter.Lock() + labelCounter.inCurrentCycle = true + labelCounter.Unlock() +} + +func (t *DiscardedSeriesTracker) UpdateMetrics() { + usersToDelete := make([]string, 0) + t.RLock() + for reason, userCounter := range t.reasonUserMap { + userCounter.RLock() + for user, seriesCounter := range userCounter.userSeriesMap { + seriesCounter.Lock() + for hash, labelCounter := range seriesCounter.seriesCountMap { + labelCounter.Lock() + val := 0.0 + if labelCounter.inCurrentCycle { + val = 1.0 + } + t.discardedSeriesGauge.WithLabelValues(reason, user, labelCounter.Labels.String()).Set(val) + if !labelCounter.inCurrentCycle { + delete(seriesCounter.seriesCountMap, hash) + } else { + labelCounter.inCurrentCycle = false + } + labelCounter.Unlock() + } + if len(seriesCounter.seriesCountMap) == 0 { + usersToDelete = append(usersToDelete, user) + } + seriesCounter.Unlock() + } + userCounter.RUnlock() + userCounter.Lock() + for _, user := range usersToDelete { + _, ok := userCounter.userSeriesMap[user] + if ok && userCounter.userSeriesMap[user].seriesCountMap != nil { + delete(userCounter.userSeriesMap, user) + } + } + userCounter.Unlock() + } + t.RUnlock() +} + +func (t *DiscardedSeriesTracker) StartDiscardedSeriesGoroutine() { + go func() { + ticker := time.NewTicker(vendMetricsInterval) + for range ticker.C { + t.UpdateMetrics() + } + }() +} + +// only used in testing +func (t *DiscardedSeriesTracker) getSeriesCount(reason string, user string) int { + count := -1 + if userCounter, ok := t.reasonUserMap[reason]; ok { + if seriesCounter, ok := userCounter.userSeriesMap[user]; ok { + count = 0 + for _, label := range seriesCounter.seriesCountMap { + if label.inCurrentCycle { + count++ + } + } + } + } + return count +} diff --git a/pkg/util/discardedseries/tracker_test.go b/pkg/util/discardedseries/tracker_test.go new file mode 100644 index 00000000000..5c74a2e1747 --- /dev/null +++ b/pkg/util/discardedseries/tracker_test.go @@ -0,0 +1,139 @@ +package discardedseries + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestLabelSetTracker(t *testing.T) { + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cortex_discarded_series_total", + Help: "The total number of series that include discarded samples.", + }, + []string{"reason", "user", "labelset"}, + ) + + tracker := NewDiscardedSeriesTracker(gauge) + reason1 := "sample_out_of_bounds" + reason2 := "label_2" + reason3 := "unused_label" + user1 := "user1" + user2 := "user2" + series1 := labels.FromStrings("__name__", "1") + series2 := labels.FromStrings("__name__", "2") + + tracker.Track(reason1, user1, &series1) + tracker.Track(reason2, user1, &series1) + + tracker.Track(reason1, user2, &series1) + tracker.Track(reason1, user2, &series1) + tracker.Track(reason1, user2, &series1) + tracker.Track(reason1, user2, &series2) + + require.Equal(t, tracker.getSeriesCount(reason1, user1), 1) + require.Equal(t, tracker.getSeriesCount(reason2, user1), 1) + require.Equal(t, tracker.getSeriesCount(reason3, user1), -1) + + compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 0) + compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, &series1, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) + + require.Equal(t, tracker.getSeriesCount(reason1, user2), 2) + require.Equal(t, tracker.getSeriesCount(reason2, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason3, user2), -1) + + compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, &series2, 0) + + tracker.UpdateMetrics() + + tracker.Track(reason1, user1, &series1) + tracker.Track(reason1, user1, &series1) + + require.Equal(t, tracker.getSeriesCount(reason1, user1), 1) + require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user1), -1) + + compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 1) + compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, &series1, 1) + compareSeriesVendedCount(t, gauge, reason2, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) + + require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason3, user2), -1) + + compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 1) + compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 1) + compareSeriesVendedCount(t, gauge, reason2, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, &series2, 0) + + tracker.UpdateMetrics() + + require.Equal(t, tracker.getSeriesCount(reason1, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user1), -1) + require.Equal(t, tracker.getSeriesCount(reason3, user1), -1) + + compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 1) + compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, &series1, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) + + require.Equal(t, tracker.getSeriesCount(reason1, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason2, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason3, user2), -1) + + compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, &series2, 0) + + tracker.UpdateMetrics() + + require.Equal(t, tracker.getSeriesCount(reason1, user1), -1) + require.Equal(t, tracker.getSeriesCount(reason2, user1), -1) + require.Equal(t, tracker.getSeriesCount(reason3, user1), -1) + + compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 0) + compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, &series1, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) + + require.Equal(t, tracker.getSeriesCount(reason1, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason2, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason3, user2), -1) + + compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, &series1, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, &series2, 0) +} + +func compareSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, reason string, user string, labels *labels.Labels, val int) { + gauge, _ := gaugeVec.GetMetricWithLabelValues(reason, user, labels.String()) + require.Equal(t, testutil.ToFloat64(gauge), float64(val)) +} diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 7f6b09c231f..27314e4b481 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -17,6 +17,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/discardedseries" "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/labelset" ) @@ -87,6 +88,8 @@ type ValidateMetrics struct { DiscardedSamplesPerLabelSet *prometheus.CounterVec LabelSetTracker *labelset.LabelSetTracker + DiscardedSeries *prometheus.GaugeVec + DiscardedSeriesTracker *discardedseries.DiscardedSeriesTracker } func registerCollector(r prometheus.Registerer, c prometheus.Collector) { @@ -145,6 +148,14 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { NativeHistogramMinResetDuration: 1 * time.Hour, }, []string{"user"}) registerCollector(r, labelSizeBytes) + discardedSeries := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cortex_discarded_series", + Help: "The number of series that include discarded samples.", + }, + []string{discardReasonLabel, "user", "labelset"}, + ) + registerCollector(r, discardedSeries) m := &ValidateMetrics{ DiscardedSamples: discardedSamples, @@ -154,7 +165,10 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { HistogramSamplesReducedResolution: histogramSamplesReducedResolution, LabelSizeBytes: labelSizeBytes, LabelSetTracker: labelset.NewLabelSetTracker(), + DiscardedSeries: discardedSeries, + DiscardedSeriesTracker: discardedseries.NewDiscardedSeriesTracker(discardedSeries), } + m.DiscardedSeriesTracker.StartDiscardedSeriesGoroutine() return m } @@ -434,4 +448,7 @@ func DeletePerUserValidationMetrics(validateMetrics *ValidateMetrics, userID str if err := util.DeleteMatchingLabels(validateMetrics.LabelSizeBytes, filter); err != nil { level.Warn(log).Log("msg", "failed to remove cortex_label_size_bytes metric for user", "user", userID, "err", err) } + if err := util.DeleteMatchingLabels(validateMetrics.DiscardedSeries, filter); err != nil { + level.Warn(log).Log("msg", "failed to remove cortex_discarded_series metric for user", "user", userID, "err", err) + } } From 757d2258fdcf50bfc72aa8fce76f4ffc9cf32203 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 11 Sep 2025 13:31:33 -0700 Subject: [PATCH 2/9] Add cleanup for metric Signed-off-by: Essam Eldaly --- pkg/util/discardedseries/tracker.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/util/discardedseries/tracker.go b/pkg/util/discardedseries/tracker.go index 5a59995cb0c..357b3353518 100644 --- a/pkg/util/discardedseries/tracker.go +++ b/pkg/util/discardedseries/tracker.go @@ -108,15 +108,12 @@ func (t *DiscardedSeriesTracker) UpdateMetrics() { seriesCounter.Lock() for hash, labelCounter := range seriesCounter.seriesCountMap { labelCounter.Lock() - val := 0.0 if labelCounter.inCurrentCycle { - val = 1.0 - } - t.discardedSeriesGauge.WithLabelValues(reason, user, labelCounter.Labels.String()).Set(val) - if !labelCounter.inCurrentCycle { - delete(seriesCounter.seriesCountMap, hash) - } else { + t.discardedSeriesGauge.WithLabelValues(reason, user, labelCounter.Labels.String()).Set(1.0) labelCounter.inCurrentCycle = false + } else { + t.discardedSeriesGauge.DeleteLabelValues(reason, user, labelCounter.Labels.String()) + delete(seriesCounter.seriesCountMap, hash) } labelCounter.Unlock() } From 4670c02d6cb0ae72c9413c8e55a492f14dcc0cc3 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 11 Sep 2025 14:38:26 -0700 Subject: [PATCH 3/9] Update changelog Signed-off-by: Essam Eldaly --- CHANGELOG.md | 1 + pkg/util/discardedseries/tracker.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61eee8a099e..5a34f16f41e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,7 @@ * [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976 * [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986 * [ENHANCEMENT] Add source metadata to requests(api vs ruler) #6947 +* [ENHANCEMENT] Add new metric `cortex_discarded_series` to track number of series that have a discarded sample. #6995 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/util/discardedseries/tracker.go b/pkg/util/discardedseries/tracker.go index 357b3353518..afb5eff126e 100644 --- a/pkg/util/discardedseries/tracker.go +++ b/pkg/util/discardedseries/tracker.go @@ -109,10 +109,10 @@ func (t *DiscardedSeriesTracker) UpdateMetrics() { for hash, labelCounter := range seriesCounter.seriesCountMap { labelCounter.Lock() if labelCounter.inCurrentCycle { - t.discardedSeriesGauge.WithLabelValues(reason, user, labelCounter.Labels.String()).Set(1.0) + t.discardedSeriesGauge.WithLabelValues(reason, user, labelCounter.String()).Set(1.0) labelCounter.inCurrentCycle = false } else { - t.discardedSeriesGauge.DeleteLabelValues(reason, user, labelCounter.Labels.String()) + t.discardedSeriesGauge.DeleteLabelValues(reason, user, labelCounter.String()) delete(seriesCounter.seriesCountMap, hash) } labelCounter.Unlock() From a031ab9d3b386a20a458974222f0dabfe017405d Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 15 Sep 2025 13:15:54 -0700 Subject: [PATCH 4/9] Rename metric to include per labelset Signed-off-by: Essam Eldaly --- pkg/util/validation/validate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 27314e4b481..a7bd4368934 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -150,7 +150,7 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { registerCollector(r, labelSizeBytes) discardedSeries := prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cortex_discarded_series", + Name: "cortex_discarded_series_per_labelset", Help: "The number of series that include discarded samples.", }, []string{discardReasonLabel, "user", "labelset"}, From 0ee70d422ab25c943aa269d7c529b8e92e12a42f Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 15 Sep 2025 15:05:32 -0700 Subject: [PATCH 5/9] Rename variables and help text Signed-off-by: Essam Eldaly --- pkg/util/discardedseries/tracker.go | 2 +- pkg/util/discardedseries/tracker_test.go | 4 ++-- pkg/util/validation/validate.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/util/discardedseries/tracker.go b/pkg/util/discardedseries/tracker.go index afb5eff126e..3433c7a44d7 100644 --- a/pkg/util/discardedseries/tracker.go +++ b/pkg/util/discardedseries/tracker.go @@ -135,7 +135,7 @@ func (t *DiscardedSeriesTracker) UpdateMetrics() { t.RUnlock() } -func (t *DiscardedSeriesTracker) StartDiscardedSeriesGoroutine() { +func (t *DiscardedSeriesTracker) StartVendDiscardedSeriesMetricGoroutine() { go func() { ticker := time.NewTicker(vendMetricsInterval) for range ticker.C { diff --git a/pkg/util/discardedseries/tracker_test.go b/pkg/util/discardedseries/tracker_test.go index 5c74a2e1747..40c3b297564 100644 --- a/pkg/util/discardedseries/tracker_test.go +++ b/pkg/util/discardedseries/tracker_test.go @@ -12,8 +12,8 @@ import ( func TestLabelSetTracker(t *testing.T) { gauge := prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cortex_discarded_series_total", - Help: "The total number of series that include discarded samples.", + Name: "cortex_discarded_series_per_labelset", + Help: "The number of series that include discarded samples for each labelset.", }, []string{"reason", "user", "labelset"}, ) diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index a7bd4368934..e9b9d59224d 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -151,7 +151,7 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { discardedSeries := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cortex_discarded_series_per_labelset", - Help: "The number of series that include discarded samples.", + Help: "The number of series that include discarded samples for each labelset.", }, []string{discardReasonLabel, "user", "labelset"}, ) @@ -168,7 +168,7 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { DiscardedSeries: discardedSeries, DiscardedSeriesTracker: discardedseries.NewDiscardedSeriesTracker(discardedSeries), } - m.DiscardedSeriesTracker.StartDiscardedSeriesGoroutine() + m.DiscardedSeriesTracker.StartVendDiscardedSeriesMetricGoroutine() return m } From 9650f34f4849481b4937dc62c2980355b95c58cd Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 15 Sep 2025 16:53:24 -0700 Subject: [PATCH 6/9] Test fix Signed-off-by: Essam Eldaly --- pkg/util/discardedseries/tracker.go | 3 +-- pkg/util/discardedseries/tracker_test.go | 34 ++++++++++++------------ 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/pkg/util/discardedseries/tracker.go b/pkg/util/discardedseries/tracker.go index 3433c7a44d7..1190a763832 100644 --- a/pkg/util/discardedseries/tracker.go +++ b/pkg/util/discardedseries/tracker.go @@ -146,10 +146,9 @@ func (t *DiscardedSeriesTracker) StartVendDiscardedSeriesMetricGoroutine() { // only used in testing func (t *DiscardedSeriesTracker) getSeriesCount(reason string, user string) int { - count := -1 + count := 0 if userCounter, ok := t.reasonUserMap[reason]; ok { if seriesCounter, ok := userCounter.userSeriesMap[user]; ok { - count = 0 for _, label := range seriesCounter.seriesCountMap { if label.inCurrentCycle { count++ diff --git a/pkg/util/discardedseries/tracker_test.go b/pkg/util/discardedseries/tracker_test.go index 40c3b297564..eb2925ebd8b 100644 --- a/pkg/util/discardedseries/tracker_test.go +++ b/pkg/util/discardedseries/tracker_test.go @@ -37,7 +37,7 @@ func TestLabelSetTracker(t *testing.T) { require.Equal(t, tracker.getSeriesCount(reason1, user1), 1) require.Equal(t, tracker.getSeriesCount(reason2, user1), 1) - require.Equal(t, tracker.getSeriesCount(reason3, user1), -1) + require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 0) compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) @@ -47,8 +47,8 @@ func TestLabelSetTracker(t *testing.T) { compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) require.Equal(t, tracker.getSeriesCount(reason1, user2), 2) - require.Equal(t, tracker.getSeriesCount(reason2, user2), -1) - require.Equal(t, tracker.getSeriesCount(reason3, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 0) compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 0) @@ -64,7 +64,7 @@ func TestLabelSetTracker(t *testing.T) { require.Equal(t, tracker.getSeriesCount(reason1, user1), 1) require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) - require.Equal(t, tracker.getSeriesCount(reason3, user1), -1) + require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 1) compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) @@ -74,8 +74,8 @@ func TestLabelSetTracker(t *testing.T) { compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) - require.Equal(t, tracker.getSeriesCount(reason2, user2), -1) - require.Equal(t, tracker.getSeriesCount(reason3, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 1) compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 1) @@ -87,8 +87,8 @@ func TestLabelSetTracker(t *testing.T) { tracker.UpdateMetrics() require.Equal(t, tracker.getSeriesCount(reason1, user1), 0) - require.Equal(t, tracker.getSeriesCount(reason2, user1), -1) - require.Equal(t, tracker.getSeriesCount(reason3, user1), -1) + require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 1) compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) @@ -97,9 +97,9 @@ func TestLabelSetTracker(t *testing.T) { compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) - require.Equal(t, tracker.getSeriesCount(reason1, user2), -1) - require.Equal(t, tracker.getSeriesCount(reason2, user2), -1) - require.Equal(t, tracker.getSeriesCount(reason3, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 0) compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 0) @@ -110,9 +110,9 @@ func TestLabelSetTracker(t *testing.T) { tracker.UpdateMetrics() - require.Equal(t, tracker.getSeriesCount(reason1, user1), -1) - require.Equal(t, tracker.getSeriesCount(reason2, user1), -1) - require.Equal(t, tracker.getSeriesCount(reason3, user1), -1) + require.Equal(t, tracker.getSeriesCount(reason1, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 0) compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) @@ -121,9 +121,9 @@ func TestLabelSetTracker(t *testing.T) { compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) - require.Equal(t, tracker.getSeriesCount(reason1, user2), -1) - require.Equal(t, tracker.getSeriesCount(reason2, user2), -1) - require.Equal(t, tracker.getSeriesCount(reason3, user2), -1) + require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) + require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 0) compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 0) From 7f40b8ce50462ee5e801eca79f41a86bdcee7680 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 18 Sep 2025 16:19:16 -0700 Subject: [PATCH 7/9] Seperate and fix per labelset tracker Signed-off-by: Essam Eldaly --- pkg/ingester/ingester.go | 17 +- pkg/util/discardedseries/tracker.go | 191 +++++++++++++++++------ pkg/util/discardedseries/tracker_test.go | 98 +++++------- pkg/util/validation/validate.go | 23 ++- 4 files changed, 205 insertions(+), 124 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index bd53fb0d978..1a857f41485 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1259,27 +1259,27 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte switch cause := errors.Cause(err); { case errors.Is(cause, storage.ErrOutOfBounds): sampleOutOfBoundsCount++ - i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, &copiedLabels) + go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrOutOfOrderSample): sampleOutOfOrderCount++ - i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, &copiedLabels) + go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp): newValueForTimestampCount++ - i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, &copiedLabels) + go i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrTooOldSample): sampleTooOldCount++ - i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, &copiedLabels) + go i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, errMaxSeriesPerUserLimitExceeded): perUserSeriesLimitCount++ - i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, &copiedLabels) + go i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels)) }) @@ -1292,14 +1292,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded): perMetricSeriesLimitCount++ - i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, &copiedLabels) + go i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels)) }) case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}): perLabelSetSeriesLimitCount++ - i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, &copiedLabels) + go i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, copiedLabels.Hash()) + for _, matchedLabelset := range matchedLabelSetLimits { + go i.validateMetrics.DiscardedSeriesPerLabelsetTracker.Track(userID, copiedLabels.Hash(), matchedLabelset.Hash, matchedLabelset.Id) + } // We only track per labelset discarded samples for throttling by labelset limit. reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit) updateFirstPartial(func() error { diff --git a/pkg/util/discardedseries/tracker.go b/pkg/util/discardedseries/tracker.go index 1190a763832..4a53fbca8eb 100644 --- a/pkg/util/discardedseries/tracker.go +++ b/pkg/util/discardedseries/tracker.go @@ -5,22 +5,17 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" ) const ( - vendMetricsInterval = 30 * time.Second + vendMetricsInterval = 30 * time.Second + perLabelsetSeriesLimit = "per_labelset_series_limit" ) -type labelCounterStruct struct { - *sync.RWMutex - *labels.Labels - inCurrentCycle bool -} - type seriesCounterStruct struct { *sync.RWMutex - seriesCountMap map[uint64]*labelCounterStruct + seriesCountMap map[uint64]struct{} + labelsetId string } type userCounterStruct struct { @@ -28,12 +23,23 @@ type userCounterStruct struct { userSeriesMap map[string]*seriesCounterStruct } +type labelsetCounterStruct struct { + *sync.RWMutex + labelsetSeriesMap map[uint64]*seriesCounterStruct +} + type DiscardedSeriesTracker struct { *sync.RWMutex reasonUserMap map[string]*userCounterStruct discardedSeriesGauge *prometheus.GaugeVec } +type DiscardedSeriesPerLabelsetTracker struct { + *sync.RWMutex + userLabelsetMap map[string]*labelsetCounterStruct + discardedSeriesPerLabelsetGauge *prometheus.GaugeVec +} + func NewDiscardedSeriesTracker(discardedSeriesGauge *prometheus.GaugeVec) *DiscardedSeriesTracker { tracker := &DiscardedSeriesTracker{ RWMutex: &sync.RWMutex{}, @@ -43,8 +49,7 @@ func NewDiscardedSeriesTracker(discardedSeriesGauge *prometheus.GaugeVec) *Disca return tracker } -func (t *DiscardedSeriesTracker) Track(reason string, user string, labels *labels.Labels) { - series := labels.Hash() +func (t *DiscardedSeriesTracker) Track(reason string, user string, series uint64) { t.RLock() userCounter, ok := t.reasonUserMap[reason] t.RUnlock() @@ -70,7 +75,7 @@ func (t *DiscardedSeriesTracker) Track(reason string, user string, labels *label if !ok { seriesCounter = &seriesCounterStruct{ RWMutex: &sync.RWMutex{}, - seriesCountMap: make(map[uint64]*labelCounterStruct), + seriesCountMap: make(map[uint64]struct{}), } userCounter.userSeriesMap[user] = seriesCounter } @@ -78,25 +83,16 @@ func (t *DiscardedSeriesTracker) Track(reason string, user string, labels *label } seriesCounter.RLock() - labelCounter, ok := seriesCounter.seriesCountMap[series] + _, ok = seriesCounter.seriesCountMap[series] seriesCounter.RUnlock() if !ok { seriesCounter.Lock() - labelCounter, ok = seriesCounter.seriesCountMap[series] + _, ok = seriesCounter.seriesCountMap[series] if !ok { - labelCounter = &labelCounterStruct{ - Labels: labels, - RWMutex: &sync.RWMutex{}, - inCurrentCycle: true, - } - seriesCounter.seriesCountMap[series] = labelCounter + seriesCounter.seriesCountMap[series] = struct{}{} } seriesCounter.Unlock() } - - labelCounter.Lock() - labelCounter.inCurrentCycle = true - labelCounter.Unlock() } func (t *DiscardedSeriesTracker) UpdateMetrics() { @@ -106,31 +102,25 @@ func (t *DiscardedSeriesTracker) UpdateMetrics() { userCounter.RLock() for user, seriesCounter := range userCounter.userSeriesMap { seriesCounter.Lock() - for hash, labelCounter := range seriesCounter.seriesCountMap { - labelCounter.Lock() - if labelCounter.inCurrentCycle { - t.discardedSeriesGauge.WithLabelValues(reason, user, labelCounter.String()).Set(1.0) - labelCounter.inCurrentCycle = false - } else { - t.discardedSeriesGauge.DeleteLabelValues(reason, user, labelCounter.String()) - delete(seriesCounter.seriesCountMap, hash) - } - labelCounter.Unlock() - } - if len(seriesCounter.seriesCountMap) == 0 { + count := len(seriesCounter.seriesCountMap) + t.discardedSeriesGauge.WithLabelValues(reason, user).Set(float64(count)) + clear(seriesCounter.seriesCountMap) + if count == 0 { usersToDelete = append(usersToDelete, user) } seriesCounter.Unlock() } userCounter.RUnlock() - userCounter.Lock() - for _, user := range usersToDelete { - _, ok := userCounter.userSeriesMap[user] - if ok && userCounter.userSeriesMap[user].seriesCountMap != nil { - delete(userCounter.userSeriesMap, user) + if len(usersToDelete) > 0 { + userCounter.Lock() + for _, user := range usersToDelete { + if _, ok := userCounter.userSeriesMap[user]; ok { + t.discardedSeriesGauge.DeleteLabelValues(reason, user) + delete(userCounter.userSeriesMap, user) + } } + userCounter.Unlock() } - userCounter.Unlock() } t.RUnlock() } @@ -144,17 +134,118 @@ func (t *DiscardedSeriesTracker) StartVendDiscardedSeriesMetricGoroutine() { }() } -// only used in testing +func NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelsetGauge *prometheus.GaugeVec) *DiscardedSeriesPerLabelsetTracker { + tracker := &DiscardedSeriesPerLabelsetTracker{ + RWMutex: &sync.RWMutex{}, + userLabelsetMap: make(map[string]*labelsetCounterStruct), + discardedSeriesPerLabelsetGauge: discardedSeriesPerLabelsetGauge, + } + return tracker +} + +func (t *DiscardedSeriesPerLabelsetTracker) Track(user string, series uint64, matchedLabelsetHash uint64, matchedLabelsetId string) { + t.RLock() + labelsetCounter, ok := t.userLabelsetMap[user] + t.RUnlock() + if !ok { + t.Lock() + labelsetCounter, ok = t.userLabelsetMap[user] + if !ok { + labelsetCounter = &labelsetCounterStruct{ + RWMutex: &sync.RWMutex{}, + labelsetSeriesMap: make(map[uint64]*seriesCounterStruct), + } + t.userLabelsetMap[user] = labelsetCounter + } + t.Unlock() + } + + labelsetCounter.RLock() + seriesCounter, ok := labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] + labelsetCounter.RUnlock() + if !ok { + labelsetCounter.Lock() + seriesCounter, ok = labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] + if !ok { + seriesCounter = &seriesCounterStruct{ + RWMutex: &sync.RWMutex{}, + seriesCountMap: make(map[uint64]struct{}), + labelsetId: matchedLabelsetId, + } + labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] = seriesCounter + } + labelsetCounter.Unlock() + } + + seriesCounter.RLock() + _, ok = seriesCounter.seriesCountMap[series] + seriesCounter.RUnlock() + if !ok { + seriesCounter.Lock() + _, ok = seriesCounter.seriesCountMap[series] + if !ok { + seriesCounter.seriesCountMap[series] = struct{}{} + } + seriesCounter.Unlock() + } +} + +func (t *DiscardedSeriesPerLabelsetTracker) UpdateMetrics() { + usersToDelete := make([]string, 0) + labelsetsToDelete := make([]uint64, 0) + t.RLock() + for user, labelsetCounter := range t.userLabelsetMap { + labelsetCounter.RLock() + if len(labelsetCounter.labelsetSeriesMap) == 0 { + usersToDelete = append(usersToDelete, user) + } + for labelsetHash, seriesCounter := range labelsetCounter.labelsetSeriesMap { + seriesCounter.Lock() + count := len(seriesCounter.seriesCountMap) + t.discardedSeriesPerLabelsetGauge.WithLabelValues(perLabelsetSeriesLimit, user, seriesCounter.labelsetId).Set(float64(count)) + clear(seriesCounter.seriesCountMap) + if count == 0 { + labelsetsToDelete = append(labelsetsToDelete, labelsetHash) + } + seriesCounter.Unlock() + } + labelsetCounter.RUnlock() + if len(labelsetsToDelete) > 0 { + labelsetCounter.Lock() + for _, labelsetHash := range labelsetsToDelete { + if _, ok := labelsetCounter.labelsetSeriesMap[labelsetHash]; ok { + labelsetId := labelsetCounter.labelsetSeriesMap[labelsetHash].labelsetId + t.discardedSeriesPerLabelsetGauge.DeleteLabelValues(perLabelsetSeriesLimit, user, labelsetId) + delete(labelsetCounter.labelsetSeriesMap, labelsetHash) + } + } + labelsetCounter.Unlock() + } + } + t.RUnlock() + if len(usersToDelete) > 0 { + t.Lock() + for _, user := range usersToDelete { + delete(t.userLabelsetMap, user) + } + t.Unlock() + } +} + +func (t *DiscardedSeriesPerLabelsetTracker) StartVendDiscardedSeriesMetricGoroutine() { + go func() { + ticker := time.NewTicker(vendMetricsInterval) + for range ticker.C { + t.UpdateMetrics() + } + }() +} + func (t *DiscardedSeriesTracker) getSeriesCount(reason string, user string) int { - count := 0 if userCounter, ok := t.reasonUserMap[reason]; ok { if seriesCounter, ok := userCounter.userSeriesMap[user]; ok { - for _, label := range seriesCounter.seriesCountMap { - if label.inCurrentCycle { - count++ - } - } + return len(seriesCounter.seriesCountMap) } } - return count + return 0 } diff --git a/pkg/util/discardedseries/tracker_test.go b/pkg/util/discardedseries/tracker_test.go index eb2925ebd8b..1409ce9dadc 100644 --- a/pkg/util/discardedseries/tracker_test.go +++ b/pkg/util/discardedseries/tracker_test.go @@ -12,10 +12,10 @@ import ( func TestLabelSetTracker(t *testing.T) { gauge := prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cortex_discarded_series_per_labelset", - Help: "The number of series that include discarded samples for each labelset.", + Name: "cortex_discarded_series", + Help: "The number of series that include discarded samples.", }, - []string{"reason", "user", "labelset"}, + []string{"reason", "user"}, ) tracker := NewDiscardedSeriesTracker(gauge) @@ -27,62 +27,50 @@ func TestLabelSetTracker(t *testing.T) { series1 := labels.FromStrings("__name__", "1") series2 := labels.FromStrings("__name__", "2") - tracker.Track(reason1, user1, &series1) - tracker.Track(reason2, user1, &series1) + tracker.Track(reason1, user1, series1.Hash()) + tracker.Track(reason2, user1, series1.Hash()) - tracker.Track(reason1, user2, &series1) - tracker.Track(reason1, user2, &series1) - tracker.Track(reason1, user2, &series1) - tracker.Track(reason1, user2, &series2) + tracker.Track(reason1, user2, series1.Hash()) + tracker.Track(reason1, user2, series1.Hash()) + tracker.Track(reason1, user2, series1.Hash()) + tracker.Track(reason1, user2, series2.Hash()) require.Equal(t, tracker.getSeriesCount(reason1, user1), 1) require.Equal(t, tracker.getSeriesCount(reason2, user1), 1) require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) - compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 0) - compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) - compareSeriesVendedCount(t, gauge, reason2, user1, &series1, 0) - compareSeriesVendedCount(t, gauge, reason2, user1, &series2, 0) - compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) - compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason1, user1, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, 0) require.Equal(t, tracker.getSeriesCount(reason1, user2), 2) require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) - compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 0) - compareSeriesVendedCount(t, gauge, reason2, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason2, user2, &series2, 0) - compareSeriesVendedCount(t, gauge, reason3, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason3, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason1, user2, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, 0) tracker.UpdateMetrics() - tracker.Track(reason1, user1, &series1) - tracker.Track(reason1, user1, &series1) + tracker.Track(reason1, user1, series1.Hash()) + tracker.Track(reason1, user1, series1.Hash()) require.Equal(t, tracker.getSeriesCount(reason1, user1), 1) require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) - compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 1) - compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) - compareSeriesVendedCount(t, gauge, reason2, user1, &series1, 1) - compareSeriesVendedCount(t, gauge, reason2, user1, &series2, 0) - compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) - compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason1, user1, 1) + compareSeriesVendedCount(t, gauge, reason2, user1, 1) + compareSeriesVendedCount(t, gauge, reason3, user1, 0) require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) - compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 1) - compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 1) - compareSeriesVendedCount(t, gauge, reason2, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason2, user2, &series2, 0) - compareSeriesVendedCount(t, gauge, reason3, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason3, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason1, user2, 2) + compareSeriesVendedCount(t, gauge, reason2, user2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, 0) tracker.UpdateMetrics() @@ -90,23 +78,17 @@ func TestLabelSetTracker(t *testing.T) { require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) - compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 1) - compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) - compareSeriesVendedCount(t, gauge, reason2, user1, &series1, 0) - compareSeriesVendedCount(t, gauge, reason2, user1, &series2, 0) - compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) - compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason1, user1, 1) + compareSeriesVendedCount(t, gauge, reason2, user1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, 0) require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) - compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 0) - compareSeriesVendedCount(t, gauge, reason2, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason2, user2, &series2, 0) - compareSeriesVendedCount(t, gauge, reason3, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason3, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason1, user2, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, 0) tracker.UpdateMetrics() @@ -114,26 +96,20 @@ func TestLabelSetTracker(t *testing.T) { require.Equal(t, tracker.getSeriesCount(reason2, user1), 0) require.Equal(t, tracker.getSeriesCount(reason3, user1), 0) - compareSeriesVendedCount(t, gauge, reason1, user1, &series1, 0) - compareSeriesVendedCount(t, gauge, reason1, user1, &series2, 0) - compareSeriesVendedCount(t, gauge, reason2, user1, &series1, 0) - compareSeriesVendedCount(t, gauge, reason2, user1, &series2, 0) - compareSeriesVendedCount(t, gauge, reason3, user1, &series1, 0) - compareSeriesVendedCount(t, gauge, reason3, user1, &series2, 0) + compareSeriesVendedCount(t, gauge, reason1, user1, 0) + compareSeriesVendedCount(t, gauge, reason2, user1, 0) + compareSeriesVendedCount(t, gauge, reason3, user1, 0) require.Equal(t, tracker.getSeriesCount(reason1, user2), 0) require.Equal(t, tracker.getSeriesCount(reason2, user2), 0) require.Equal(t, tracker.getSeriesCount(reason3, user2), 0) - compareSeriesVendedCount(t, gauge, reason1, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason1, user2, &series2, 0) - compareSeriesVendedCount(t, gauge, reason2, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason2, user2, &series2, 0) - compareSeriesVendedCount(t, gauge, reason3, user2, &series1, 0) - compareSeriesVendedCount(t, gauge, reason3, user2, &series2, 0) + compareSeriesVendedCount(t, gauge, reason1, user2, 0) + compareSeriesVendedCount(t, gauge, reason2, user2, 0) + compareSeriesVendedCount(t, gauge, reason3, user2, 0) } -func compareSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, reason string, user string, labels *labels.Labels, val int) { - gauge, _ := gaugeVec.GetMetricWithLabelValues(reason, user, labels.String()) +func compareSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, reason string, user string, val int) { + gauge, _ := gaugeVec.GetMetricWithLabelValues(reason, user) require.Equal(t, testutil.ToFloat64(gauge), float64(val)) } diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index e9b9d59224d..4dbb0b17148 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -88,8 +88,11 @@ type ValidateMetrics struct { DiscardedSamplesPerLabelSet *prometheus.CounterVec LabelSetTracker *labelset.LabelSetTracker - DiscardedSeries *prometheus.GaugeVec - DiscardedSeriesTracker *discardedseries.DiscardedSeriesTracker + + DiscardedSeries *prometheus.GaugeVec + DiscardedSeriesPerLabelset *prometheus.GaugeVec + DiscardedSeriesTracker *discardedseries.DiscardedSeriesTracker + DiscardedSeriesPerLabelsetTracker *discardedseries.DiscardedSeriesPerLabelsetTracker } func registerCollector(r prometheus.Registerer, c prometheus.Collector) { @@ -149,13 +152,21 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { }, []string{"user"}) registerCollector(r, labelSizeBytes) discardedSeries := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cortex_discarded_series", + Help: "The number of series that include discarded samples.", + }, + []string{discardReasonLabel, "user"}, + ) + registerCollector(r, discardedSeries) + discardedSeriesPerLabelset := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cortex_discarded_series_per_labelset", Help: "The number of series that include discarded samples for each labelset.", }, []string{discardReasonLabel, "user", "labelset"}, ) - registerCollector(r, discardedSeries) + registerCollector(r, discardedSeriesPerLabelset) m := &ValidateMetrics{ DiscardedSamples: discardedSamples, @@ -166,9 +177,12 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics { LabelSizeBytes: labelSizeBytes, LabelSetTracker: labelset.NewLabelSetTracker(), DiscardedSeries: discardedSeries, + DiscardedSeriesPerLabelset: discardedSeriesPerLabelset, DiscardedSeriesTracker: discardedseries.NewDiscardedSeriesTracker(discardedSeries), + DiscardedSeriesPerLabelsetTracker: discardedseries.NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelset), } m.DiscardedSeriesTracker.StartVendDiscardedSeriesMetricGoroutine() + m.DiscardedSeriesPerLabelsetTracker.StartVendDiscardedSeriesMetricGoroutine() return m } @@ -448,7 +462,4 @@ func DeletePerUserValidationMetrics(validateMetrics *ValidateMetrics, userID str if err := util.DeleteMatchingLabels(validateMetrics.LabelSizeBytes, filter); err != nil { level.Warn(log).Log("msg", "failed to remove cortex_label_size_bytes metric for user", "user", userID, "err", err) } - if err := util.DeleteMatchingLabels(validateMetrics.DiscardedSeries, filter); err != nil { - level.Warn(log).Log("msg", "failed to remove cortex_discarded_series metric for user", "user", userID, "err", err) - } } From 70ae05479f57c6c0c898b3489b572075ef678be9 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Fri, 19 Sep 2025 10:12:53 -0700 Subject: [PATCH 8/9] Add per labelset tests Signed-off-by: Essam Eldaly --- .../discardedseries/perlabelset_tracker.go | 140 ++++++++++++++++++ .../perlabelset_tracker_test.go | 118 +++++++++++++++ pkg/util/discardedseries/tracker.go | 122 +-------------- pkg/util/discardedseries/tracker_test.go | 2 +- 4 files changed, 261 insertions(+), 121 deletions(-) create mode 100644 pkg/util/discardedseries/perlabelset_tracker.go create mode 100644 pkg/util/discardedseries/perlabelset_tracker_test.go diff --git a/pkg/util/discardedseries/perlabelset_tracker.go b/pkg/util/discardedseries/perlabelset_tracker.go new file mode 100644 index 00000000000..16a86c42db9 --- /dev/null +++ b/pkg/util/discardedseries/perlabelset_tracker.go @@ -0,0 +1,140 @@ +package discardedseries + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + perLabelsetSeriesLimit = "per_labelset_series_limit" +) + +type labelsetCounterStruct struct { + *sync.RWMutex + labelsetSeriesMap map[uint64]*seriesCounterStruct +} + +type DiscardedSeriesPerLabelsetTracker struct { + *sync.RWMutex + userLabelsetMap map[string]*labelsetCounterStruct + discardedSeriesPerLabelsetGauge *prometheus.GaugeVec +} + +func NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelsetGauge *prometheus.GaugeVec) *DiscardedSeriesPerLabelsetTracker { + tracker := &DiscardedSeriesPerLabelsetTracker{ + RWMutex: &sync.RWMutex{}, + userLabelsetMap: make(map[string]*labelsetCounterStruct), + discardedSeriesPerLabelsetGauge: discardedSeriesPerLabelsetGauge, + } + return tracker +} + +func (t *DiscardedSeriesPerLabelsetTracker) Track(user string, series uint64, matchedLabelsetHash uint64, matchedLabelsetId string) { + t.RLock() + labelsetCounter, ok := t.userLabelsetMap[user] + t.RUnlock() + if !ok { + t.Lock() + labelsetCounter, ok = t.userLabelsetMap[user] + if !ok { + labelsetCounter = &labelsetCounterStruct{ + RWMutex: &sync.RWMutex{}, + labelsetSeriesMap: make(map[uint64]*seriesCounterStruct), + } + t.userLabelsetMap[user] = labelsetCounter + } + t.Unlock() + } + + labelsetCounter.RLock() + seriesCounter, ok := labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] + labelsetCounter.RUnlock() + if !ok { + labelsetCounter.Lock() + seriesCounter, ok = labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] + if !ok { + seriesCounter = &seriesCounterStruct{ + RWMutex: &sync.RWMutex{}, + seriesCountMap: make(map[uint64]struct{}), + labelsetId: matchedLabelsetId, + } + labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] = seriesCounter + } + labelsetCounter.Unlock() + } + + seriesCounter.RLock() + _, ok = seriesCounter.seriesCountMap[series] + seriesCounter.RUnlock() + if !ok { + seriesCounter.Lock() + _, ok = seriesCounter.seriesCountMap[series] + if !ok { + seriesCounter.seriesCountMap[series] = struct{}{} + } + seriesCounter.Unlock() + } +} + +func (t *DiscardedSeriesPerLabelsetTracker) UpdateMetrics() { + usersToDelete := make([]string, 0) + labelsetsToDelete := make([]uint64, 0) + t.RLock() + for user, labelsetCounter := range t.userLabelsetMap { + labelsetCounter.RLock() + if len(labelsetCounter.labelsetSeriesMap) == 0 { + usersToDelete = append(usersToDelete, user) + } + for labelsetHash, seriesCounter := range labelsetCounter.labelsetSeriesMap { + seriesCounter.Lock() + count := len(seriesCounter.seriesCountMap) + t.discardedSeriesPerLabelsetGauge.WithLabelValues(perLabelsetSeriesLimit, user, seriesCounter.labelsetId).Set(float64(count)) + clear(seriesCounter.seriesCountMap) + if count == 0 { + labelsetsToDelete = append(labelsetsToDelete, labelsetHash) + } + seriesCounter.Unlock() + } + labelsetCounter.RUnlock() + if len(labelsetsToDelete) > 0 { + labelsetCounter.Lock() + for _, labelsetHash := range labelsetsToDelete { + if _, ok := labelsetCounter.labelsetSeriesMap[labelsetHash]; ok { + labelsetId := labelsetCounter.labelsetSeriesMap[labelsetHash].labelsetId + t.discardedSeriesPerLabelsetGauge.DeleteLabelValues(perLabelsetSeriesLimit, user, labelsetId) + delete(labelsetCounter.labelsetSeriesMap, labelsetHash) + } + } + labelsetCounter.Unlock() + } + } + t.RUnlock() + if len(usersToDelete) > 0 { + t.Lock() + for _, user := range usersToDelete { + delete(t.userLabelsetMap, user) + } + t.Unlock() + } +} + +func (t *DiscardedSeriesPerLabelsetTracker) StartVendDiscardedSeriesMetricGoroutine() { + go func() { + ticker := time.NewTicker(vendMetricsInterval) + for range ticker.C { + t.UpdateMetrics() + } + }() +} + +// only used in testing +func (t *DiscardedSeriesPerLabelsetTracker) getSeriesCount(user string, labelsetLimitHash uint64) int { + if labelsetCounter, ok := t.userLabelsetMap[user]; ok { + if seriesCounter, ok := labelsetCounter.labelsetSeriesMap[labelsetLimitHash]; ok { + return len(seriesCounter.seriesCountMap) + } + } + return 0 +} diff --git a/pkg/util/discardedseries/perlabelset_tracker_test.go b/pkg/util/discardedseries/perlabelset_tracker_test.go new file mode 100644 index 00000000000..849f987fb11 --- /dev/null +++ b/pkg/util/discardedseries/perlabelset_tracker_test.go @@ -0,0 +1,118 @@ +package discardedseries + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestPerLabelsetDiscardedSeriesTracker(t *testing.T) { + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cortex_discarded_series_per_labelset", + Help: "The number of series that include discarded samples for each labelset.", + }, + []string{"reason", "user", "labelset"}, + ) + + tracker := NewDiscardedSeriesPerLabelsetTracker(gauge) + user1 := "user1" + user2 := "user2" + series1 := labels.FromStrings("__name__", "1") + series2 := labels.FromStrings("__name__", "2") + labelset1 := uint64(10) + labelset2 := uint64(20) + labelset3 := uint64(30) + labelsetId1 := "ten" + labelsetId2 := "twenty" + labelsetId3 := "thirty" + + tracker.Track(user1, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user1, series1.Hash(), labelset2, labelsetId2) + + tracker.Track(user2, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user2, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user2, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user2, series2.Hash(), labelset1, labelsetId1) + + require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1) + require.Equal(t, tracker.getSeriesCount(user1, labelset2), 1) + require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0) + + require.Equal(t, tracker.getSeriesCount(user2, labelset1), 2) + require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0) + + tracker.UpdateMetrics() + + tracker.Track(user1, series1.Hash(), labelset1, labelsetId1) + tracker.Track(user1, series1.Hash(), labelset1, labelsetId1) + + require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1) + require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 1) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0) + + require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 2) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0) + + tracker.UpdateMetrics() + + require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0) + + require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0) + + tracker.UpdateMetrics() + + require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0) + + require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0) + require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0) + + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0) + comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0) +} + +func comparePerLabelsetSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, user string, labelsetLimitId string, val int) { + gauge, _ := gaugeVec.GetMetricWithLabelValues("per_labelset_series_limit", user, labelsetLimitId) + require.Equal(t, testutil.ToFloat64(gauge), float64(val)) +} diff --git a/pkg/util/discardedseries/tracker.go b/pkg/util/discardedseries/tracker.go index 4a53fbca8eb..82f6f33c6d7 100644 --- a/pkg/util/discardedseries/tracker.go +++ b/pkg/util/discardedseries/tracker.go @@ -8,8 +8,7 @@ import ( ) const ( - vendMetricsInterval = 30 * time.Second - perLabelsetSeriesLimit = "per_labelset_series_limit" + vendMetricsInterval = 30 * time.Second ) type seriesCounterStruct struct { @@ -23,23 +22,12 @@ type userCounterStruct struct { userSeriesMap map[string]*seriesCounterStruct } -type labelsetCounterStruct struct { - *sync.RWMutex - labelsetSeriesMap map[uint64]*seriesCounterStruct -} - type DiscardedSeriesTracker struct { *sync.RWMutex reasonUserMap map[string]*userCounterStruct discardedSeriesGauge *prometheus.GaugeVec } -type DiscardedSeriesPerLabelsetTracker struct { - *sync.RWMutex - userLabelsetMap map[string]*labelsetCounterStruct - discardedSeriesPerLabelsetGauge *prometheus.GaugeVec -} - func NewDiscardedSeriesTracker(discardedSeriesGauge *prometheus.GaugeVec) *DiscardedSeriesTracker { tracker := &DiscardedSeriesTracker{ RWMutex: &sync.RWMutex{}, @@ -134,113 +122,7 @@ func (t *DiscardedSeriesTracker) StartVendDiscardedSeriesMetricGoroutine() { }() } -func NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelsetGauge *prometheus.GaugeVec) *DiscardedSeriesPerLabelsetTracker { - tracker := &DiscardedSeriesPerLabelsetTracker{ - RWMutex: &sync.RWMutex{}, - userLabelsetMap: make(map[string]*labelsetCounterStruct), - discardedSeriesPerLabelsetGauge: discardedSeriesPerLabelsetGauge, - } - return tracker -} - -func (t *DiscardedSeriesPerLabelsetTracker) Track(user string, series uint64, matchedLabelsetHash uint64, matchedLabelsetId string) { - t.RLock() - labelsetCounter, ok := t.userLabelsetMap[user] - t.RUnlock() - if !ok { - t.Lock() - labelsetCounter, ok = t.userLabelsetMap[user] - if !ok { - labelsetCounter = &labelsetCounterStruct{ - RWMutex: &sync.RWMutex{}, - labelsetSeriesMap: make(map[uint64]*seriesCounterStruct), - } - t.userLabelsetMap[user] = labelsetCounter - } - t.Unlock() - } - - labelsetCounter.RLock() - seriesCounter, ok := labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] - labelsetCounter.RUnlock() - if !ok { - labelsetCounter.Lock() - seriesCounter, ok = labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] - if !ok { - seriesCounter = &seriesCounterStruct{ - RWMutex: &sync.RWMutex{}, - seriesCountMap: make(map[uint64]struct{}), - labelsetId: matchedLabelsetId, - } - labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] = seriesCounter - } - labelsetCounter.Unlock() - } - - seriesCounter.RLock() - _, ok = seriesCounter.seriesCountMap[series] - seriesCounter.RUnlock() - if !ok { - seriesCounter.Lock() - _, ok = seriesCounter.seriesCountMap[series] - if !ok { - seriesCounter.seriesCountMap[series] = struct{}{} - } - seriesCounter.Unlock() - } -} - -func (t *DiscardedSeriesPerLabelsetTracker) UpdateMetrics() { - usersToDelete := make([]string, 0) - labelsetsToDelete := make([]uint64, 0) - t.RLock() - for user, labelsetCounter := range t.userLabelsetMap { - labelsetCounter.RLock() - if len(labelsetCounter.labelsetSeriesMap) == 0 { - usersToDelete = append(usersToDelete, user) - } - for labelsetHash, seriesCounter := range labelsetCounter.labelsetSeriesMap { - seriesCounter.Lock() - count := len(seriesCounter.seriesCountMap) - t.discardedSeriesPerLabelsetGauge.WithLabelValues(perLabelsetSeriesLimit, user, seriesCounter.labelsetId).Set(float64(count)) - clear(seriesCounter.seriesCountMap) - if count == 0 { - labelsetsToDelete = append(labelsetsToDelete, labelsetHash) - } - seriesCounter.Unlock() - } - labelsetCounter.RUnlock() - if len(labelsetsToDelete) > 0 { - labelsetCounter.Lock() - for _, labelsetHash := range labelsetsToDelete { - if _, ok := labelsetCounter.labelsetSeriesMap[labelsetHash]; ok { - labelsetId := labelsetCounter.labelsetSeriesMap[labelsetHash].labelsetId - t.discardedSeriesPerLabelsetGauge.DeleteLabelValues(perLabelsetSeriesLimit, user, labelsetId) - delete(labelsetCounter.labelsetSeriesMap, labelsetHash) - } - } - labelsetCounter.Unlock() - } - } - t.RUnlock() - if len(usersToDelete) > 0 { - t.Lock() - for _, user := range usersToDelete { - delete(t.userLabelsetMap, user) - } - t.Unlock() - } -} - -func (t *DiscardedSeriesPerLabelsetTracker) StartVendDiscardedSeriesMetricGoroutine() { - go func() { - ticker := time.NewTicker(vendMetricsInterval) - for range ticker.C { - t.UpdateMetrics() - } - }() -} - +// only used in testing func (t *DiscardedSeriesTracker) getSeriesCount(reason string, user string) int { if userCounter, ok := t.reasonUserMap[reason]; ok { if seriesCounter, ok := userCounter.userSeriesMap[user]; ok { diff --git a/pkg/util/discardedseries/tracker_test.go b/pkg/util/discardedseries/tracker_test.go index 1409ce9dadc..8893907a09f 100644 --- a/pkg/util/discardedseries/tracker_test.go +++ b/pkg/util/discardedseries/tracker_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestLabelSetTracker(t *testing.T) { +func TestDiscardedSeriesTracker(t *testing.T) { gauge := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cortex_discarded_series", From 0736c564f1abd5f1a2496f5737cd57b52e83fbee Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 22 Sep 2025 14:56:57 -0700 Subject: [PATCH 9/9] Update changelog and remove unneeded async calls Signed-off-by: Essam Eldaly --- CHANGELOG.md | 2 +- pkg/ingester/ingester.go | 16 ++++++++-------- pkg/util/discardedseries/perlabelset_tracker.go | 1 + 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a34f16f41e..d8981e94416 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,7 +77,7 @@ * [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976 * [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986 * [ENHANCEMENT] Add source metadata to requests(api vs ruler) #6947 -* [ENHANCEMENT] Add new metric `cortex_discarded_series` to track number of series that have a discarded sample. #6995 +* [ENHANCEMENT] Add new metric `cortex_discarded_series` and `cortex_discarded_series_per_labelset` to track number of series that have a discarded sample. #6995 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1a857f41485..504820adb67 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1259,27 +1259,27 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte switch cause := errors.Cause(err); { case errors.Is(cause, storage.ErrOutOfBounds): sampleOutOfBoundsCount++ - go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, copiedLabels.Hash()) + i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrOutOfOrderSample): sampleOutOfOrderCount++ - go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, copiedLabels.Hash()) + i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp): newValueForTimestampCount++ - go i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, copiedLabels.Hash()) + i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, storage.ErrTooOldSample): sampleTooOldCount++ - go i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, copiedLabels.Hash()) + i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) }) case errors.Is(cause, errMaxSeriesPerUserLimitExceeded): perUserSeriesLimitCount++ - go i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, copiedLabels.Hash()) + i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels)) }) @@ -1292,16 +1292,16 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded): perMetricSeriesLimitCount++ - go i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, copiedLabels.Hash()) + i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, copiedLabels.Hash()) updateFirstPartial(func() error { return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels)) }) case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}): perLabelSetSeriesLimitCount++ - go i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, copiedLabels.Hash()) + i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, copiedLabels.Hash()) for _, matchedLabelset := range matchedLabelSetLimits { - go i.validateMetrics.DiscardedSeriesPerLabelsetTracker.Track(userID, copiedLabels.Hash(), matchedLabelset.Hash, matchedLabelset.Id) + i.validateMetrics.DiscardedSeriesPerLabelsetTracker.Track(userID, copiedLabels.Hash(), matchedLabelset.Hash, matchedLabelset.Id) } // We only track per labelset discarded samples for throttling by labelset limit. reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit) diff --git a/pkg/util/discardedseries/perlabelset_tracker.go b/pkg/util/discardedseries/perlabelset_tracker.go index 16a86c42db9..d2098117816 100644 --- a/pkg/util/discardedseries/perlabelset_tracker.go +++ b/pkg/util/discardedseries/perlabelset_tracker.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// TODO: if we change per labelset series limit from one reasoning to many, we can remove the hardcoded reasoning and add an extra reasoning map const ( perLabelsetSeriesLimit = "per_labelset_series_limit" )