Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
* [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976
* [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986
* [ENHANCEMENT] Add source metadata to requests(api vs ruler) #6947
* [ENHANCEMENT] Add new metric `cortex_discarded_series` to track number of series that have a discarded sample. #6995
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,22 +1259,27 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
switch cause := errors.Cause(err); {
case errors.Is(cause, storage.ErrOutOfBounds):
sampleOutOfBoundsCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, copiedLabels.Hash())
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrOutOfOrderSample):
sampleOutOfOrderCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, copiedLabels.Hash())
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
newValueForTimestampCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, copiedLabels.Hash())
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrTooOldSample):
sampleTooOldCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, copiedLabels.Hash())
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
perUserSeriesLimitCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, copiedLabels.Hash())
updateFirstPartial(func() error {
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
})
Expand All @@ -1287,12 +1292,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, copiedLabels.Hash())
updateFirstPartial(func() error {
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels))
})

case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
perLabelSetSeriesLimitCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, copiedLabels.Hash())
for _, matchedLabelset := range matchedLabelSetLimits {
go i.validateMetrics.DiscardedSeriesPerLabelsetTracker.Track(userID, copiedLabels.Hash(), matchedLabelset.Hash, matchedLabelset.Id)
}
// We only track per labelset discarded samples for throttling by labelset limit.
reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit)
updateFirstPartial(func() error {
Expand Down
140 changes: 140 additions & 0 deletions pkg/util/discardedseries/perlabelset_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package discardedseries

import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

const (
perLabelsetSeriesLimit = "per_labelset_series_limit"
)

type labelsetCounterStruct struct {
*sync.RWMutex
labelsetSeriesMap map[uint64]*seriesCounterStruct
}

type DiscardedSeriesPerLabelsetTracker struct {
*sync.RWMutex
userLabelsetMap map[string]*labelsetCounterStruct
discardedSeriesPerLabelsetGauge *prometheus.GaugeVec
}

func NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelsetGauge *prometheus.GaugeVec) *DiscardedSeriesPerLabelsetTracker {
tracker := &DiscardedSeriesPerLabelsetTracker{
RWMutex: &sync.RWMutex{},
userLabelsetMap: make(map[string]*labelsetCounterStruct),
discardedSeriesPerLabelsetGauge: discardedSeriesPerLabelsetGauge,
}
return tracker
}

func (t *DiscardedSeriesPerLabelsetTracker) Track(user string, series uint64, matchedLabelsetHash uint64, matchedLabelsetId string) {
t.RLock()
labelsetCounter, ok := t.userLabelsetMap[user]
t.RUnlock()
if !ok {
t.Lock()
labelsetCounter, ok = t.userLabelsetMap[user]
if !ok {
labelsetCounter = &labelsetCounterStruct{
RWMutex: &sync.RWMutex{},
labelsetSeriesMap: make(map[uint64]*seriesCounterStruct),
}
t.userLabelsetMap[user] = labelsetCounter
}
t.Unlock()
}

labelsetCounter.RLock()
seriesCounter, ok := labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
labelsetCounter.RUnlock()
if !ok {
labelsetCounter.Lock()
seriesCounter, ok = labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
if !ok {
seriesCounter = &seriesCounterStruct{
RWMutex: &sync.RWMutex{},
seriesCountMap: make(map[uint64]struct{}),
labelsetId: matchedLabelsetId,
}
labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] = seriesCounter
}
labelsetCounter.Unlock()
}

seriesCounter.RLock()
_, ok = seriesCounter.seriesCountMap[series]
seriesCounter.RUnlock()
if !ok {
seriesCounter.Lock()
_, ok = seriesCounter.seriesCountMap[series]
if !ok {
seriesCounter.seriesCountMap[series] = struct{}{}
}
seriesCounter.Unlock()
}
}

func (t *DiscardedSeriesPerLabelsetTracker) UpdateMetrics() {
usersToDelete := make([]string, 0)
labelsetsToDelete := make([]uint64, 0)
t.RLock()
for user, labelsetCounter := range t.userLabelsetMap {
labelsetCounter.RLock()
if len(labelsetCounter.labelsetSeriesMap) == 0 {
usersToDelete = append(usersToDelete, user)
}
for labelsetHash, seriesCounter := range labelsetCounter.labelsetSeriesMap {
seriesCounter.Lock()
count := len(seriesCounter.seriesCountMap)
t.discardedSeriesPerLabelsetGauge.WithLabelValues(perLabelsetSeriesLimit, user, seriesCounter.labelsetId).Set(float64(count))
clear(seriesCounter.seriesCountMap)
if count == 0 {
labelsetsToDelete = append(labelsetsToDelete, labelsetHash)
}
seriesCounter.Unlock()
}
labelsetCounter.RUnlock()
if len(labelsetsToDelete) > 0 {
labelsetCounter.Lock()
for _, labelsetHash := range labelsetsToDelete {
if _, ok := labelsetCounter.labelsetSeriesMap[labelsetHash]; ok {
labelsetId := labelsetCounter.labelsetSeriesMap[labelsetHash].labelsetId
t.discardedSeriesPerLabelsetGauge.DeleteLabelValues(perLabelsetSeriesLimit, user, labelsetId)
delete(labelsetCounter.labelsetSeriesMap, labelsetHash)
}
}
labelsetCounter.Unlock()
}
}
t.RUnlock()
if len(usersToDelete) > 0 {
t.Lock()
for _, user := range usersToDelete {
delete(t.userLabelsetMap, user)
}
t.Unlock()
}
}

func (t *DiscardedSeriesPerLabelsetTracker) StartVendDiscardedSeriesMetricGoroutine() {
go func() {
ticker := time.NewTicker(vendMetricsInterval)
for range ticker.C {
t.UpdateMetrics()
}
}()
}

// only used in testing
func (t *DiscardedSeriesPerLabelsetTracker) getSeriesCount(user string, labelsetLimitHash uint64) int {
if labelsetCounter, ok := t.userLabelsetMap[user]; ok {
if seriesCounter, ok := labelsetCounter.labelsetSeriesMap[labelsetLimitHash]; ok {
return len(seriesCounter.seriesCountMap)
}
}
return 0
}
118 changes: 118 additions & 0 deletions pkg/util/discardedseries/perlabelset_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package discardedseries

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

func TestPerLabelsetDiscardedSeriesTracker(t *testing.T) {
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cortex_discarded_series_per_labelset",
Help: "The number of series that include discarded samples for each labelset.",
},
[]string{"reason", "user", "labelset"},
)

tracker := NewDiscardedSeriesPerLabelsetTracker(gauge)
user1 := "user1"
user2 := "user2"
series1 := labels.FromStrings("__name__", "1")
series2 := labels.FromStrings("__name__", "2")
labelset1 := uint64(10)
labelset2 := uint64(20)
labelset3 := uint64(30)
labelsetId1 := "ten"
labelsetId2 := "twenty"
labelsetId3 := "thirty"

tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user1, series1.Hash(), labelset2, labelsetId2)

tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user2, series2.Hash(), labelset1, labelsetId1)

require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1)
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 1)
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)

require.Equal(t, tracker.getSeriesCount(user2, labelset1), 2)
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)

tracker.UpdateMetrics()

tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)

require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1)
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 1)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)

require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 2)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)

tracker.UpdateMetrics()

require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)

require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)

tracker.UpdateMetrics()

require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)

require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
}

func comparePerLabelsetSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, user string, labelsetLimitId string, val int) {
gauge, _ := gaugeVec.GetMetricWithLabelValues("per_labelset_series_limit", user, labelsetLimitId)
require.Equal(t, testutil.ToFloat64(gauge), float64(val))
}
Loading
Loading