diff --git a/CHANGELOG.md b/CHANGELOG.md index d1435b3b5c8..35184b491b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -279,6 +279,7 @@ * [BUGFIX] Distributor: Fix metric metadata of type Unknown being silently dropped from RW2 requests. #12461 * [BUGFIX] Distributor: Preserve inconsistent metric metadata in Remote Write 1.0 to 2.0 conversion. Previously, when converting RW1.0 requests with multiple different metadata for the same series, only the first metadata was kept. Now all inconsistent metadata are preserved to match Prometheus behavior. This only affects experimental Remote Write 2.0. #12541 #12804 * [BUGFIX] Ruler: Fix ruler remotequerier request body consumption on retries. #12514 +* [BUGFIX] Ingester: (Ingest storage) Fix fetcher potentially requesting more bytes from Kafka than its configured limit when bytes-per-record estimation is incorrect. #13051 * [BUGFIX] Block-builder: Fix a bug where a consumption error can cause a job to stay assigned to a worker for the remainder of its lifetime. #12522 * [BUGFIX] Querier: Fix possible panic when evaluating a nested subquery where the parent has no steps. #12524 * [BUGFIX] Querier: Fix bug where the pruning toggles AST optimization pass doesn't work in the query planner. #12783 diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 06aa904cbb0..dbc5252490b 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -76,19 +76,19 @@ type fetchWant struct { startOffset int64 // inclusive endOffset int64 // exclusive estimatedBytesPerRecord int - targetMaxBytes int + maxBytesLimit int32 // result should be closed when there are no more fetches for this partition. It is ok to send multiple times on the channel. result chan fetchResult } -func fetchWantFrom(offset int64, targetMaxBytes, estimatedBytesPerRecord int) fetchWant { +func fetchWantFrom(offset int64, maxBytesLimit int32, estimatedBytesPerRecord int) fetchWant { estimatedBytesPerRecord = max(estimatedBytesPerRecord, 1) - estimatedNumberOfRecords := max(1, targetMaxBytes/estimatedBytesPerRecord) + estimatedNumberOfRecords := max(1, int(maxBytesLimit)/estimatedBytesPerRecord) return fetchWant{ startOffset: offset, endOffset: offset + int64(estimatedNumberOfRecords), - targetMaxBytes: targetMaxBytes, + maxBytesLimit: maxBytesLimit, estimatedBytesPerRecord: estimatedBytesPerRecord, result: make(chan fetchResult), } @@ -96,7 +96,7 @@ func fetchWantFrom(offset int64, targetMaxBytes, estimatedBytesPerRecord int) fe // Next returns the fetchWant for the next numRecords starting from the last known offset. func (w fetchWant) Next() fetchWant { - n := fetchWantFrom(w.endOffset, w.targetMaxBytes, w.estimatedBytesPerRecord) + n := fetchWantFrom(w.endOffset, w.maxBytesLimit, w.estimatedBytesPerRecord) n.estimatedBytesPerRecord = w.estimatedBytesPerRecord return n } @@ -108,10 +108,11 @@ func (w fetchWant) MaxBytes() int32 { if fetchBytes > math.MaxInt32 || fetchBytes < 0 { // This shouldn't happen because w should have been trimmed before sending the request. // But we definitely don't want to request negative bytes by casting to int32, so add this safeguard. - return math.MaxInt32 + fetchBytes = math.MaxInt32 } - fetchBytes = max(forcedMinValueForMaxBytes, fetchBytes) - return int32(fetchBytes) + maxBytes := max(forcedMinValueForMaxBytes, int32(fetchBytes)) + maxBytes = min(w.maxBytesLimit, maxBytes) + return maxBytes } // UpdateBytesPerRecord updates the expected bytes per record based on the results of the last fetch and trims the fetchWant if MaxBytes() would now exceed math.MaxInt32. @@ -843,8 +844,8 @@ func (w *inflightFetchWants) removeNextResult() { } func (r *ConcurrentFetchers) start(ctx context.Context, startOffset int64, concurrency int) { - targetBytesPerFetcher := int(r.maxBufferedBytesLimit) / concurrency - level.Info(r.logger).Log("msg", "starting concurrent fetchers", "start_offset", startOffset, "concurrency", concurrency, "bytes_per_fetch_request", targetBytesPerFetcher) + maxBytesPerFetcher := r.maxBufferedBytesLimit / int32(concurrency) + level.Info(r.logger).Log("msg", "starting concurrent fetchers", "start_offset", startOffset, "concurrency", concurrency, "max_bytes_per_fetch_request", maxBytesPerFetcher) // HWM is updated by the fetchers. A value of 0 is the same as there not being any produced records. // A value of 0 doesn't prevent progress because we ensure there is at least one inflight fetchWant. @@ -864,7 +865,7 @@ func (r *ConcurrentFetchers) start(ctx context.Context, startOffset int64, concu var ( // nextFetch is the next records fetch operation we want to issue to one of the running workers. // It contains the offset range to fetch and a channel where the result should be written to. - nextFetch = fetchWantFrom(startOffset, targetBytesPerFetcher, initialBytesPerRecord) + nextFetch = fetchWantFrom(startOffset, maxBytesPerFetcher, initialBytesPerRecord) // inflight is the list of all fetchWants that are currently in flight. inflight = inflightFetchWants{bytes: r.bufferedFetchedBytes} diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index 68765a80abd..df17fb6c754 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -930,10 +930,11 @@ func TestConcurrentFetchers(t *testing.T) { t.Parallel() const ( - topicName = "test-topic" - partitionID = 1 - concurrency = 30 - maxInflightBytes = 5_000_000 + topicName = "test-topic" + partitionID = 1 + concurrency = 30 + maxInflightBytes = 5_000_000 + perFetcherMaxBytes = maxInflightBytes / concurrency largeRecordsCount = 100 largeRecordSize = 100_000 @@ -969,7 +970,7 @@ func TestConcurrentFetchers(t *testing.T) { consumedRecords := fetches.NumRecords() pollFetchesAndAssertNoRecords(t, fetchers) - t.Log("Consumed all large records") + t.Logf("Consumed %d large records", fetches.NumRecords()) // Produce small records smallValue := bytes.Repeat([]byte{'b'}, smallRecordSize) @@ -982,19 +983,19 @@ func TestConcurrentFetchers(t *testing.T) { // Consume half of the small records. This should be enough to stabilize the records size estimation. fetches = longPollFetches(fetchers, smallRecordsCount/2, 10*time.Second) consumedRecords += fetches.NumRecords() - t.Log("Consumed half of the small records") + t.Logf("Consumed %d of the small records", fetches.NumRecords()) // Assert that the buffer is well utilized. waitForStableBufferedRecords(t, fetchers) t.Log("Buffered records stabilized") assert.LessOrEqualf(t, fetchers.BufferedBytes(), int64(maxInflightBytes), "Should not buffer more than %d bytes of small records", maxInflightBytes) - assert.GreaterOrEqual(t, fetchers.BufferedBytes(), int64(maxInflightBytes/2), "Should still buffer a decent number of records") + assert.GreaterOrEqual(t, fetchers.BufferedBytes(), int64(perFetcherMaxBytes), "At least one fetcher's worth of bytes should be buffered") // Consume the rest of the small records. fetches = longPollFetches(fetchers, smallRecordsCount/2, 10*time.Second) consumedRecords += fetches.NumRecords() - t.Log("Consumed rest of the small records") + t.Logf("Consumed %d more of the small records", fetches.NumRecords()) // Verify we received correct number of records const totalProducedRecords = largeRecordsCount + smallRecordsCount @@ -1077,7 +1078,7 @@ func TestConcurrentFetchers_fetchSingle(t *testing.T) { startOffset: 1, endOffset: 5, estimatedBytesPerRecord: 100, - targetMaxBytes: 1000000, + maxBytesLimit: 1000000, } res := fetchers.fetchSingle(ctx, fw) @@ -1097,7 +1098,7 @@ func TestConcurrentFetchers_fetchSingle(t *testing.T) { startOffset: 1, endOffset: 5, estimatedBytesPerRecord: 100, - targetMaxBytes: 1000000, + maxBytesLimit: 1000000, } res := fetchers.fetchSingle(ctx, fw) @@ -1116,7 +1117,7 @@ func TestConcurrentFetchers_fetchSingle(t *testing.T) { startOffset: 1, endOffset: 5, estimatedBytesPerRecord: 100, - targetMaxBytes: 1000000, + maxBytesLimit: 1000000, } res := fetchers.fetchSingle(ctx, fw) @@ -1148,7 +1149,7 @@ func TestConcurrentFetchers_fetchSingle(t *testing.T) { startOffset: 1, endOffset: 5, estimatedBytesPerRecord: 100, - targetMaxBytes: 1000000, + maxBytesLimit: 1000000, } res := fetchers.fetchSingle(ctx, fw) @@ -1348,34 +1349,33 @@ func longPollFetches(fetchers *ConcurrentFetchers, minRecords int, timeout time. // we may have to call it multiple times to process all buffered records that need to be // discarded. func pollFetchesAndAssertNoRecords(t *testing.T, fetchers *ConcurrentFetchers) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - // If there are no buffered records, we can skip the polling at all. if fetchers.BufferedRecords() == 0 { return } - for { + // Poll with a short timeout for each call, but keep polling as long as there are buffered records. + // This handles the case where with high concurrency, some fetches contain only duplicate records + // and take time to be polled and discarded. + const pollTimeout = 100 * time.Millisecond + const maxAttempts = 50 // 50 * 100ms = 5s total + + for attempt := 0; attempt < maxAttempts && fetchers.BufferedRecords() > 0; attempt++ { + ctx, cancel := context.WithTimeout(context.Background(), pollTimeout) fetches, returnCtx := fetchers.PollFetches(ctx) + cancel() + + // If context timed out and there are still buffered records, continue trying if errors.Is(returnCtx.Err(), context.DeadlineExceeded) { - break + continue } // We always expect that PollFetches() returns zero records. assert.Len(t, fetches.Records(), 0) - - // If there are no buffered records, we're good. We can end the assertion. - if fetchers.BufferedRecords() == 0 { - return - } } - // We stopped polling fetches. We have to make sure there are no buffered records. - if !assert.Zero(t, fetchers.BufferedRecords(), "expected there aren't any buffered records") { - fetches, _ := fetchers.PollFetches(ctx) - t.Logf("%#v", fetches) - } + // After all attempts, verify there are no buffered records remaining. + assert.Zero(t, fetchers.BufferedRecords(), "expected there aren't any buffered records after polling") } type waiterFunc func() @@ -1396,6 +1396,7 @@ func TestFetchWant_MaxBytes(t *testing.T) { startOffset: 0, endOffset: 10, estimatedBytesPerRecord: 100, + maxBytesLimit: math.MaxInt32, }, expected: 1_000_000, // minimum fetch size }, @@ -1404,6 +1405,7 @@ func TestFetchWant_MaxBytes(t *testing.T) { startOffset: 0, endOffset: 1000, estimatedBytesPerRecord: 1000, + maxBytesLimit: math.MaxInt32, }, expected: 1_050_000, // 1000 * 1000 * 1.05 }, @@ -1412,6 +1414,7 @@ func TestFetchWant_MaxBytes(t *testing.T) { startOffset: 0, endOffset: 2 << 31, estimatedBytesPerRecord: 2 << 30, + maxBytesLimit: math.MaxInt32, }, expected: math.MaxInt32, }, @@ -1420,9 +1423,19 @@ func TestFetchWant_MaxBytes(t *testing.T) { startOffset: 0, endOffset: math.MaxInt64, estimatedBytesPerRecord: math.MaxInt32, + maxBytesLimit: math.MaxInt32, }, expected: math.MaxInt32, }, + "capped by maxBytesLimit": { + fw: fetchWant{ + startOffset: 0, + endOffset: 1000, + estimatedBytesPerRecord: 10000, + maxBytesLimit: 5_000_000, + }, + expected: 5_000_000, // capped by maxBytesLimit even though calculation would be 10_500_000 + }, } for name, tc := range testCases { diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 5513b088c6a..16931464100 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -1987,18 +1987,16 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testi "with fetch concurrency": { concurrencyVariant: []readerTestCfgOpt{withFetchConcurrency(2)}, expectedBufferedRecords: 1, - // only one fetcher is active because we don't fetch over the HWM. - // That fetcher should fetch 1MB. It's padded with 5% to account for underestimation of the record size. - // The expected buffered bytes are 1.05MB - expectedBufferedBytes: 1_050_000, + // only one fetcher is active because we don't fetch over the HWM. That fetcher should fetch 1MB. + // The 5% padding is overridden because 1MB+5% is beyond the limit for the individual fetcher. + expectedBufferedBytes: 1_000_000, expectedBufferedRecordsFromClient: 0, }, "with higher fetch concurrency": { concurrencyVariant: []readerTestCfgOpt{withFetchConcurrency(4)}, expectedBufferedRecords: 1, // There is one fetcher fetching. That fetcher should fetch 500KB. - // But we clamp the MaxBytes to at least 1MB, so that we don't underfetch when the absolute volume of data is low. - expectedBufferedBytes: 1_000_000, + expectedBufferedBytes: 500_000, expectedBufferedRecordsFromClient: 0, }, }