Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@
* [BUGFIX] Distributor: Fix metric metadata of type Unknown being silently dropped from RW2 requests. #12461
* [BUGFIX] Distributor: Preserve inconsistent metric metadata in Remote Write 1.0 to 2.0 conversion. Previously, when converting RW1.0 requests with multiple different metadata for the same series, only the first metadata was kept. Now all inconsistent metadata are preserved to match Prometheus behavior. This only affects experimental Remote Write 2.0. #12541 #12804
* [BUGFIX] Ruler: Fix ruler remotequerier request body consumption on retries. #12514
* [BUGFIX] Ingester: (Ingest storage) Fix fetcher potentially requesting more bytes from Kafka than its configured limit when bytes-per-record estimation is incorrect. #13051
* [BUGFIX] Block-builder: Fix a bug where a consumption error can cause a job to stay assigned to a worker for the remainder of its lifetime. #12522
* [BUGFIX] Querier: Fix possible panic when evaluating a nested subquery where the parent has no steps. #12524
* [BUGFIX] Querier: Fix bug where the pruning toggles AST optimization pass doesn't work in the query planner. #12783
Expand Down
23 changes: 12 additions & 11 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,27 @@ type fetchWant struct {
startOffset int64 // inclusive
endOffset int64 // exclusive
estimatedBytesPerRecord int
targetMaxBytes int
maxBytesLimit int32

// result should be closed when there are no more fetches for this partition. It is ok to send multiple times on the channel.
result chan fetchResult
}

func fetchWantFrom(offset int64, targetMaxBytes, estimatedBytesPerRecord int) fetchWant {
func fetchWantFrom(offset int64, maxBytesLimit int32, estimatedBytesPerRecord int) fetchWant {
estimatedBytesPerRecord = max(estimatedBytesPerRecord, 1)
estimatedNumberOfRecords := max(1, targetMaxBytes/estimatedBytesPerRecord)
estimatedNumberOfRecords := max(1, int(maxBytesLimit)/estimatedBytesPerRecord)
return fetchWant{
startOffset: offset,
endOffset: offset + int64(estimatedNumberOfRecords),
targetMaxBytes: targetMaxBytes,
maxBytesLimit: maxBytesLimit,
estimatedBytesPerRecord: estimatedBytesPerRecord,
result: make(chan fetchResult),
}
}

// Next returns the fetchWant for the next numRecords starting from the last known offset.
func (w fetchWant) Next() fetchWant {
n := fetchWantFrom(w.endOffset, w.targetMaxBytes, w.estimatedBytesPerRecord)
n := fetchWantFrom(w.endOffset, w.maxBytesLimit, w.estimatedBytesPerRecord)
n.estimatedBytesPerRecord = w.estimatedBytesPerRecord
return n
}
Expand All @@ -108,10 +108,11 @@ func (w fetchWant) MaxBytes() int32 {
if fetchBytes > math.MaxInt32 || fetchBytes < 0 {
// This shouldn't happen because w should have been trimmed before sending the request.
// But we definitely don't want to request negative bytes by casting to int32, so add this safeguard.
return math.MaxInt32
fetchBytes = math.MaxInt32
}
fetchBytes = max(forcedMinValueForMaxBytes, fetchBytes)
return int32(fetchBytes)
maxBytes := max(forcedMinValueForMaxBytes, int32(fetchBytes))
maxBytes = min(w.maxBytesLimit, maxBytes)
return maxBytes
}

// UpdateBytesPerRecord updates the expected bytes per record based on the results of the last fetch and trims the fetchWant if MaxBytes() would now exceed math.MaxInt32.
Expand Down Expand Up @@ -843,8 +844,8 @@ func (w *inflightFetchWants) removeNextResult() {
}

func (r *ConcurrentFetchers) start(ctx context.Context, startOffset int64, concurrency int) {
targetBytesPerFetcher := int(r.maxBufferedBytesLimit) / concurrency
level.Info(r.logger).Log("msg", "starting concurrent fetchers", "start_offset", startOffset, "concurrency", concurrency, "bytes_per_fetch_request", targetBytesPerFetcher)
maxBytesPerFetcher := r.maxBufferedBytesLimit / int32(concurrency)
level.Info(r.logger).Log("msg", "starting concurrent fetchers", "start_offset", startOffset, "concurrency", concurrency, "max_bytes_per_fetch_request", maxBytesPerFetcher)

// HWM is updated by the fetchers. A value of 0 is the same as there not being any produced records.
// A value of 0 doesn't prevent progress because we ensure there is at least one inflight fetchWant.
Expand All @@ -864,7 +865,7 @@ func (r *ConcurrentFetchers) start(ctx context.Context, startOffset int64, concu
var (
// nextFetch is the next records fetch operation we want to issue to one of the running workers.
// It contains the offset range to fetch and a channel where the result should be written to.
nextFetch = fetchWantFrom(startOffset, targetBytesPerFetcher, initialBytesPerRecord)
nextFetch = fetchWantFrom(startOffset, maxBytesPerFetcher, initialBytesPerRecord)

// inflight is the list of all fetchWants that are currently in flight.
inflight = inflightFetchWants{bytes: r.bufferedFetchedBytes}
Expand Down
67 changes: 40 additions & 27 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,10 +930,11 @@ func TestConcurrentFetchers(t *testing.T) {
t.Parallel()

const (
topicName = "test-topic"
partitionID = 1
concurrency = 30
maxInflightBytes = 5_000_000
topicName = "test-topic"
partitionID = 1
concurrency = 30
maxInflightBytes = 5_000_000
perFetcherMaxBytes = maxInflightBytes / concurrency

largeRecordsCount = 100
largeRecordSize = 100_000
Expand Down Expand Up @@ -969,7 +970,7 @@ func TestConcurrentFetchers(t *testing.T) {
consumedRecords := fetches.NumRecords()

pollFetchesAndAssertNoRecords(t, fetchers)
t.Log("Consumed all large records")
t.Logf("Consumed %d large records", fetches.NumRecords())

// Produce small records
smallValue := bytes.Repeat([]byte{'b'}, smallRecordSize)
Expand All @@ -982,19 +983,19 @@ func TestConcurrentFetchers(t *testing.T) {
// Consume half of the small records. This should be enough to stabilize the records size estimation.
fetches = longPollFetches(fetchers, smallRecordsCount/2, 10*time.Second)
consumedRecords += fetches.NumRecords()
t.Log("Consumed half of the small records")
t.Logf("Consumed %d of the small records", fetches.NumRecords())

// Assert that the buffer is well utilized.
waitForStableBufferedRecords(t, fetchers)
t.Log("Buffered records stabilized")

assert.LessOrEqualf(t, fetchers.BufferedBytes(), int64(maxInflightBytes), "Should not buffer more than %d bytes of small records", maxInflightBytes)
assert.GreaterOrEqual(t, fetchers.BufferedBytes(), int64(maxInflightBytes/2), "Should still buffer a decent number of records")
assert.GreaterOrEqual(t, fetchers.BufferedBytes(), int64(perFetcherMaxBytes), "At least one fetcher's worth of bytes should be buffered")

// Consume the rest of the small records.
fetches = longPollFetches(fetchers, smallRecordsCount/2, 10*time.Second)
consumedRecords += fetches.NumRecords()
t.Log("Consumed rest of the small records")
t.Logf("Consumed %d more of the small records", fetches.NumRecords())

// Verify we received correct number of records
const totalProducedRecords = largeRecordsCount + smallRecordsCount
Expand Down Expand Up @@ -1077,7 +1078,7 @@ func TestConcurrentFetchers_fetchSingle(t *testing.T) {
startOffset: 1,
endOffset: 5,
estimatedBytesPerRecord: 100,
targetMaxBytes: 1000000,
maxBytesLimit: 1000000,
}
res := fetchers.fetchSingle(ctx, fw)

Expand All @@ -1097,7 +1098,7 @@ func TestConcurrentFetchers_fetchSingle(t *testing.T) {
startOffset: 1,
endOffset: 5,
estimatedBytesPerRecord: 100,
targetMaxBytes: 1000000,
maxBytesLimit: 1000000,
}
res := fetchers.fetchSingle(ctx, fw)

Expand All @@ -1116,7 +1117,7 @@ func TestConcurrentFetchers_fetchSingle(t *testing.T) {
startOffset: 1,
endOffset: 5,
estimatedBytesPerRecord: 100,
targetMaxBytes: 1000000,
maxBytesLimit: 1000000,
}
res := fetchers.fetchSingle(ctx, fw)

Expand Down Expand Up @@ -1148,7 +1149,7 @@ func TestConcurrentFetchers_fetchSingle(t *testing.T) {
startOffset: 1,
endOffset: 5,
estimatedBytesPerRecord: 100,
targetMaxBytes: 1000000,
maxBytesLimit: 1000000,
}
res := fetchers.fetchSingle(ctx, fw)

Expand Down Expand Up @@ -1348,34 +1349,33 @@ func longPollFetches(fetchers *ConcurrentFetchers, minRecords int, timeout time.
// we may have to call it multiple times to process all buffered records that need to be
// discarded.
func pollFetchesAndAssertNoRecords(t *testing.T, fetchers *ConcurrentFetchers) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// If there are no buffered records, we can skip the polling at all.
if fetchers.BufferedRecords() == 0 {
return
}

for {
// Poll with a short timeout for each call, but keep polling as long as there are buffered records.
// This handles the case where with high concurrency, some fetches contain only duplicate records
// and take time to be polled and discarded.
const pollTimeout = 100 * time.Millisecond
const maxAttempts = 50 // 50 * 100ms = 5s total

for attempt := 0; attempt < maxAttempts && fetchers.BufferedRecords() > 0; attempt++ {
ctx, cancel := context.WithTimeout(context.Background(), pollTimeout)
fetches, returnCtx := fetchers.PollFetches(ctx)
cancel()

// If context timed out and there are still buffered records, continue trying
if errors.Is(returnCtx.Err(), context.DeadlineExceeded) {
break
continue
}

// We always expect that PollFetches() returns zero records.
assert.Len(t, fetches.Records(), 0)

// If there are no buffered records, we're good. We can end the assertion.
if fetchers.BufferedRecords() == 0 {
return
}
}

// We stopped polling fetches. We have to make sure there are no buffered records.
if !assert.Zero(t, fetchers.BufferedRecords(), "expected there aren't any buffered records") {
fetches, _ := fetchers.PollFetches(ctx)
t.Logf("%#v", fetches)
}
// After all attempts, verify there are no buffered records remaining.
assert.Zero(t, fetchers.BufferedRecords(), "expected there aren't any buffered records after polling")
}

type waiterFunc func()
Expand All @@ -1396,6 +1396,7 @@ func TestFetchWant_MaxBytes(t *testing.T) {
startOffset: 0,
endOffset: 10,
estimatedBytesPerRecord: 100,
maxBytesLimit: math.MaxInt32,
},
expected: 1_000_000, // minimum fetch size
},
Expand All @@ -1404,6 +1405,7 @@ func TestFetchWant_MaxBytes(t *testing.T) {
startOffset: 0,
endOffset: 1000,
estimatedBytesPerRecord: 1000,
maxBytesLimit: math.MaxInt32,
},
expected: 1_050_000, // 1000 * 1000 * 1.05
},
Expand All @@ -1412,6 +1414,7 @@ func TestFetchWant_MaxBytes(t *testing.T) {
startOffset: 0,
endOffset: 2 << 31,
estimatedBytesPerRecord: 2 << 30,
maxBytesLimit: math.MaxInt32,
},
expected: math.MaxInt32,
},
Expand All @@ -1420,9 +1423,19 @@ func TestFetchWant_MaxBytes(t *testing.T) {
startOffset: 0,
endOffset: math.MaxInt64,
estimatedBytesPerRecord: math.MaxInt32,
maxBytesLimit: math.MaxInt32,
},
expected: math.MaxInt32,
},
"capped by maxBytesLimit": {
fw: fetchWant{
startOffset: 0,
endOffset: 1000,
estimatedBytesPerRecord: 10000,
maxBytesLimit: 5_000_000,
},
expected: 5_000_000, // capped by maxBytesLimit even though calculation would be 10_500_000
},
}

for name, tc := range testCases {
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1987,18 +1987,16 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testi
"with fetch concurrency": {
concurrencyVariant: []readerTestCfgOpt{withFetchConcurrency(2)},
expectedBufferedRecords: 1,
// only one fetcher is active because we don't fetch over the HWM.
// That fetcher should fetch 1MB. It's padded with 5% to account for underestimation of the record size.
// The expected buffered bytes are 1.05MB
expectedBufferedBytes: 1_050_000,
// only one fetcher is active because we don't fetch over the HWM. That fetcher should fetch 1MB.
// The 5% padding is overridden because 1MB+5% is beyond the limit for the individual fetcher.
expectedBufferedBytes: 1_000_000,
expectedBufferedRecordsFromClient: 0,
},
"with higher fetch concurrency": {
concurrencyVariant: []readerTestCfgOpt{withFetchConcurrency(4)},
expectedBufferedRecords: 1,
// There is one fetcher fetching. That fetcher should fetch 500KB.
// But we clamp the MaxBytes to at least 1MB, so that we don't underfetch when the absolute volume of data is low.
expectedBufferedBytes: 1_000_000,
expectedBufferedBytes: 500_000,
expectedBufferedRecordsFromClient: 0,
},
}
Expand Down