Skip to content

Commit c673f45

Browse files
committed
Add distributor ingester push timeouts total metric
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent bb7ba54 commit c673f45

File tree

3 files changed

+68
-18
lines changed

3 files changed

+68
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
66
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
7+
* [ENHANCEMENT] Distributor: Add `cortex_distributor_ingester_push_timeouts_total` metric to track the number of push requests to ingesters that were canceled due to timeout. #7155
78
* [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125
89
* [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092
910
* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098

pkg/distributor/distributor.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -111,24 +111,25 @@ type Distributor struct {
111111
inflightClientRequests atomic.Int64
112112

113113
// Metrics
114-
queryDuration *instrument.HistogramCollector
115-
receivedSamples *prometheus.CounterVec
116-
receivedSamplesPerLabelSet *prometheus.CounterVec
117-
receivedExemplars *prometheus.CounterVec
118-
receivedMetadata *prometheus.CounterVec
119-
incomingSamples *prometheus.CounterVec
120-
incomingExemplars *prometheus.CounterVec
121-
incomingMetadata *prometheus.CounterVec
122-
nonHASamples *prometheus.CounterVec
123-
dedupedSamples *prometheus.CounterVec
124-
labelsHistogram prometheus.Histogram
125-
ingesterAppends *prometheus.CounterVec
126-
ingesterAppendFailures *prometheus.CounterVec
127-
ingesterQueries *prometheus.CounterVec
128-
ingesterQueryFailures *prometheus.CounterVec
129-
ingesterPartialDataQueries prometheus.Counter
130-
replicationFactor prometheus.Gauge
131-
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
114+
queryDuration *instrument.HistogramCollector
115+
receivedSamples *prometheus.CounterVec
116+
receivedSamplesPerLabelSet *prometheus.CounterVec
117+
receivedExemplars *prometheus.CounterVec
118+
receivedMetadata *prometheus.CounterVec
119+
incomingSamples *prometheus.CounterVec
120+
incomingExemplars *prometheus.CounterVec
121+
incomingMetadata *prometheus.CounterVec
122+
nonHASamples *prometheus.CounterVec
123+
dedupedSamples *prometheus.CounterVec
124+
labelsHistogram prometheus.Histogram
125+
ingesterAppends *prometheus.CounterVec
126+
ingesterAppendFailures *prometheus.CounterVec
127+
ingesterQueries *prometheus.CounterVec
128+
ingesterQueryFailures *prometheus.CounterVec
129+
ingesterPartialDataQueries prometheus.Counter
130+
replicationFactor prometheus.Gauge
131+
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
132+
distributorIngesterPushTimeoutPerUser *prometheus.CounterVec
132133

133134
validateMetrics *validation.ValidateMetrics
134135

@@ -409,6 +410,10 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
409410
Name: "cortex_distributor_latest_seen_sample_timestamp_seconds",
410411
Help: "Unix timestamp of latest received sample per user.",
411412
}, []string{"user"}),
413+
distributorIngesterPushTimeoutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
414+
Name: "cortex_distributor_ingester_push_timeouts_total",
415+
Help: "The total number of push requests to ingesters that were canceled due to timeout.",
416+
}, []string{"user"}),
412417

413418
validateMetrics: validation.NewValidateMetrics(reg),
414419
asyncExecutor: util.NewNoOpExecutor(),
@@ -932,6 +937,12 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s
932937

933938
// Use a background context to make sure all ingesters get samples even if we return early
934939
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
940+
defer func() {
941+
if errors.Is(localCtx.Err(), context.DeadlineExceeded) {
942+
d.distributorIngesterPushTimeoutPerUser.WithLabelValues(userID).Inc()
943+
}
944+
}()
945+
935946
localCtx = user.InjectOrgID(localCtx, userID)
936947
if sp := opentracing.SpanFromContext(ctx); sp != nil {
937948
localCtx = opentracing.ContextWithSpan(localCtx, sp)

pkg/distributor/distributor_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3079,6 +3079,7 @@ type prepConfig struct {
30793079
tokens [][]uint32
30803080
useStreamPush bool
30813081
nameValidationScheme model.ValidationScheme
3082+
remoteTimeout time.Duration
30823083
}
30833084

30843085
type prepState struct {
@@ -3199,6 +3200,10 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
31993200
distributorCfg.NameValidationScheme = cfg.nameValidationScheme
32003201
}
32013202

3203+
if cfg.remoteTimeout > 0 {
3204+
distributorCfg.RemoteTimeout = cfg.remoteTimeout
3205+
}
3206+
32023207
if cfg.shuffleShardEnabled {
32033208
distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle
32043209
distributorCfg.ShuffleShardingLookbackPeriod = time.Hour
@@ -4520,3 +4525,36 @@ func TestFindHALabels(t *testing.T) {
45204525
assert.Equal(t, c.expected.replica, replica)
45214526
}
45224527
}
4528+
4529+
func TestDistributor_BatchTimeoutMetric(t *testing.T) {
4530+
t.Parallel()
4531+
4532+
limits := &validation.Limits{}
4533+
flagext.DefaultValues(limits)
4534+
4535+
distributors, _, regs, _ := prepare(t, prepConfig{
4536+
numIngesters: 3,
4537+
happyIngesters: 3,
4538+
numDistributors: 1,
4539+
limits: limits,
4540+
remoteTimeout: time.Nanosecond, // To produce timeout
4541+
})
4542+
4543+
distributor := distributors[0]
4544+
reg := regs[0]
4545+
4546+
ctx := context.Background()
4547+
ctx = user.InjectOrgID(context.Background(), "user-1")
4548+
4549+
for range 5 {
4550+
request := makeWriteRequest(0, 30, 0, 0)
4551+
_, err := distributor.Push(ctx, request)
4552+
require.NoError(t, err)
4553+
}
4554+
4555+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
4556+
# HELP cortex_distributor_ingester_push_timeouts_total The total number of push requests to ingesters that were canceled due to timeout.
4557+
# TYPE cortex_distributor_ingester_push_timeouts_total counter
4558+
cortex_distributor_ingester_push_timeouts_total{user="user-1"} 5
4559+
`), "cortex_distributor_ingester_push_timeouts_total"))
4560+
}

0 commit comments

Comments
 (0)