Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [ENHANCEMENT] Distributor: Add `cortex_distributor_ingester_push_timeouts_total` metric to track the number of push requests to ingesters that were canceled due to timeout. #7155
* [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125
* [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092
* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098
Expand Down
47 changes: 29 additions & 18 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,24 +111,25 @@ type Distributor struct {
inflightClientRequests atomic.Int64

// Metrics
queryDuration *instrument.HistogramCollector
receivedSamples *prometheus.CounterVec
receivedSamplesPerLabelSet *prometheus.CounterVec
receivedExemplars *prometheus.CounterVec
receivedMetadata *prometheus.CounterVec
incomingSamples *prometheus.CounterVec
incomingExemplars *prometheus.CounterVec
incomingMetadata *prometheus.CounterVec
nonHASamples *prometheus.CounterVec
dedupedSamples *prometheus.CounterVec
labelsHistogram prometheus.Histogram
ingesterAppends *prometheus.CounterVec
ingesterAppendFailures *prometheus.CounterVec
ingesterQueries *prometheus.CounterVec
ingesterQueryFailures *prometheus.CounterVec
ingesterPartialDataQueries prometheus.Counter
replicationFactor prometheus.Gauge
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
queryDuration *instrument.HistogramCollector
receivedSamples *prometheus.CounterVec
receivedSamplesPerLabelSet *prometheus.CounterVec
receivedExemplars *prometheus.CounterVec
receivedMetadata *prometheus.CounterVec
incomingSamples *prometheus.CounterVec
incomingExemplars *prometheus.CounterVec
incomingMetadata *prometheus.CounterVec
nonHASamples *prometheus.CounterVec
dedupedSamples *prometheus.CounterVec
labelsHistogram prometheus.Histogram
ingesterAppends *prometheus.CounterVec
ingesterAppendFailures *prometheus.CounterVec
ingesterQueries *prometheus.CounterVec
ingesterQueryFailures *prometheus.CounterVec
ingesterPartialDataQueries prometheus.Counter
replicationFactor prometheus.Gauge
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
distributorIngesterPushTimeoutPerUser *prometheus.CounterVec

validateMetrics *validation.ValidateMetrics

Expand Down Expand Up @@ -409,6 +410,10 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "cortex_distributor_latest_seen_sample_timestamp_seconds",
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"}),
distributorIngesterPushTimeoutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_ingester_push_timeouts_total",
Help: "The total number of push requests to ingesters that were canceled due to timeout.",
}, []string{"user"}),

validateMetrics: validation.NewValidateMetrics(reg),
asyncExecutor: util.NewNoOpExecutor(),
Expand Down Expand Up @@ -932,6 +937,12 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s

// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer func() {
if errors.Is(localCtx.Err(), context.DeadlineExceeded) {
d.distributorIngesterPushTimeoutPerUser.WithLabelValues(userID).Inc()
}
}()

localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
Expand Down
38 changes: 38 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3079,6 +3079,7 @@ type prepConfig struct {
tokens [][]uint32
useStreamPush bool
nameValidationScheme model.ValidationScheme
remoteTimeout time.Duration
}

type prepState struct {
Expand Down Expand Up @@ -3199,6 +3200,10 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
distributorCfg.NameValidationScheme = cfg.nameValidationScheme
}

if cfg.remoteTimeout > 0 {
distributorCfg.RemoteTimeout = cfg.remoteTimeout
}

if cfg.shuffleShardEnabled {
distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle
distributorCfg.ShuffleShardingLookbackPeriod = time.Hour
Expand Down Expand Up @@ -4520,3 +4525,36 @@ func TestFindHALabels(t *testing.T) {
assert.Equal(t, c.expected.replica, replica)
}
}

func TestDistributor_BatchTimeoutMetric(t *testing.T) {
t.Parallel()

limits := &validation.Limits{}
flagext.DefaultValues(limits)

distributors, _, regs, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: limits,
remoteTimeout: time.Nanosecond, // To produce timeout
})

distributor := distributors[0]
reg := regs[0]

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")

for range 5 {
request := makeWriteRequest(0, 30, 0, 0)
_, err := distributor.Push(ctx, request)
require.NoError(t, err)
}

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_ingester_push_timeouts_total The total number of push requests to ingesters that were canceled due to timeout.
# TYPE cortex_distributor_ingester_push_timeouts_total counter
cortex_distributor_ingester_push_timeouts_total{user="user-1"} 5
`), "cortex_distributor_ingester_push_timeouts_total"))
}
Loading