Skip to content

Commit 1fdeb12

Browse files
authored
Block-builder: adopt concurrent fetcher from ingest storage. (#12222)
Reuse ingest-storage's ConcurrentFetchers inside the block-builder to consume faster from Kafka when `kafka.fetch-concurrency-max` is given in the flags.
1 parent 447ee42 commit 1fdeb12

File tree

8 files changed

+414
-171
lines changed

8 files changed

+414
-171
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
* [ENHANCEMENT] Querier: Include more information about inflight queries in the activity tracker. A querier logs this information after it restarts following a crash. #12526
5353
* [ENHANCEMENT] Ingester: Add experimental `-blocks-storage.tsdb.index-lookup-planning-comparison-portion` flag to enable mirrored chunk querier comparison between queries with and without index lookup planning. #12460
5454
* [ENHANCEMENT] Ruler: Add native histogram version of `cortex_ruler_sync_rules_duration_seconds`. #12628
55+
* [ENHANCEMENT] Block-builder: Implement concurrent consumption within a job when `-ingest-storage.kafka.fetch-concurrency-max` is given. #12222
5556
* [ENHANCEMENT] Query-frontend: Labels query optimizer is no longer experimental and is enabled by default. It can be disabled with `-query-frontend.labels-query-optimizer-enabled=false` CLI flag. #12606
5657
* [ENHANCEMENT] Distributor: Add value length to "label value too long" error. #12583
5758
* [ENHANCEMENT] Distributor: The metric `cortex_distributor_uncompressed_request_body_size_bytes` now differentiates by the handler serving the request. #12661

pkg/blockbuilder/blockbuilder.go

Lines changed: 132 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import (
2020
"github.com/prometheus/prometheus/tsdb"
2121
"github.com/thanos-io/objstore"
2222
"github.com/twmb/franz-go/pkg/kgo"
23+
"github.com/twmb/franz-go/plugin/kprom"
2324
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
2425
"go.opentelemetry.io/otel"
25-
"go.uber.org/atomic"
2626
"google.golang.org/grpc"
2727

2828
"github.com/grafana/mimir/pkg/blockbuilder/schedulerpb"
@@ -49,11 +49,11 @@ type BlockBuilder struct {
4949
schedulerClient schedulerpb.SchedulerClient
5050
schedulerConn *grpc.ClientConn
5151

52-
// the current job iteration number. For tests.
53-
jobIteration atomic.Int64
54-
5552
blockBuilderMetrics blockBuilderMetrics
5653
tsdbBuilderMetrics tsdbBuilderMetrics
54+
readerMetrics *ingest.ReaderMetrics
55+
readerMetricsSource swappableReaderMetricsSource
56+
kpromMetrics *kprom.Metrics
5757
pusherConsumerMetrics *ingest.PusherConsumerMetrics
5858
}
5959

@@ -74,13 +74,25 @@ func newWithSchedulerClient(
7474
limits *validation.Overrides,
7575
schedulerClient schedulerpb.SchedulerClient,
7676
) (*BlockBuilder, error) {
77+
kpm := ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "block-builder", reg)
78+
readerMetricsSource := swappableReaderMetricsSource{&zeroReaderMetricsSource{}}
79+
80+
var readerMetrics *ingest.ReaderMetrics
81+
if cfg.Kafka.FetchConcurrencyMax > 0 {
82+
m := ingest.NewReaderMetrics(reg, readerMetricsSource, cfg.Kafka.Topic, kpm)
83+
readerMetrics = &m
84+
}
85+
7786
b := &BlockBuilder{
7887
cfg: cfg,
7988
logger: logger,
8089
register: reg,
8190
limits: limits,
8291
blockBuilderMetrics: newBlockBuilderMetrics(reg),
8392
tsdbBuilderMetrics: newTSDBBBuilderMetrics(reg),
93+
readerMetrics: readerMetrics,
94+
readerMetricsSource: readerMetricsSource,
95+
kpromMetrics: kpm,
8496
pusherConsumerMetrics: ingest.NewPusherConsumerMetrics(reg),
8597
}
8698

@@ -100,7 +112,6 @@ func newWithSchedulerClient(
100112
}
101113

102114
b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
103-
104115
return b, nil
105116
}
106117

@@ -146,7 +157,7 @@ func (b *BlockBuilder) starting(context.Context) (err error) {
146157

147158
b.kafkaClient, err = ingest.NewKafkaReaderClient(
148159
b.cfg.Kafka,
149-
ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "block-builder", b.register),
160+
b.kpromMetrics,
150161
b.logger,
151162
)
152163
if err != nil {
@@ -168,7 +179,6 @@ func (b *BlockBuilder) stopping(_ error) error {
168179

169180
// running learns about the jobs from a block-builder-scheduler, and consumes one job at a time.
170181
func (b *BlockBuilder) running(ctx context.Context) error {
171-
172182
// Block-builder attempts to complete the current job when a shutdown
173183
// request is received.
174184
// To enable this, we create a child context whose cancellation signal is
@@ -200,7 +210,7 @@ func (b *BlockBuilder) running(ctx context.Context) error {
200210
}
201211

202212
// Once we've gotten a job, we attempt to complete it even if the context is cancelled.
203-
if _, err := b.consumeJob(graceCtx, key, spec); err != nil {
213+
if err := b.consumeJob(graceCtx, key, spec); err != nil {
204214
level.Error(b.logger).Log("msg", "failed to consume job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
205215

206216
if err := b.schedulerClient.FailJob(key); err != nil {
@@ -214,13 +224,11 @@ func (b *BlockBuilder) running(ctx context.Context) error {
214224
if err := b.schedulerClient.CompleteJob(key); err != nil {
215225
level.Error(b.logger).Log("msg", "failed to complete job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
216226
}
217-
218-
b.jobIteration.Inc()
219227
}
220228
}
221229

222230
// consumeJob performs block consumption from Kafka into object storage based on the given job spec.
223-
func (b *BlockBuilder) consumeJob(ctx context.Context, key schedulerpb.JobKey, spec schedulerpb.JobSpec) (lastOffset int64, err error) {
231+
func (b *BlockBuilder) consumeJob(ctx context.Context, key schedulerpb.JobKey, spec schedulerpb.JobSpec) (err error) {
224232
defer func(start time.Time) {
225233
success := "true"
226234
if err != nil {
@@ -243,6 +251,84 @@ func (b *BlockBuilder) consumeJob(ctx context.Context, key schedulerpb.JobKey, s
243251
return b.consumePartitionSection(ctx, logger, consumer, builder, spec.Partition, spec.StartOffset, spec.EndOffset)
244252
}
245253

254+
type fetchPoller interface {
255+
PollFetches(context.Context) kgo.Fetches
256+
}
257+
258+
type fetchWrapper struct {
259+
fetchers *ingest.ConcurrentFetchers
260+
}
261+
262+
func (f *fetchWrapper) PollFetches(ctx context.Context) kgo.Fetches {
263+
fetch, _ := f.fetchers.PollFetches(ctx)
264+
return fetch
265+
}
266+
267+
var _ fetchPoller = (*fetchWrapper)(nil)
268+
269+
// swappableReaderMetricsSource is a ReaderMetricsSource that can be swapped out at runtime.
270+
type swappableReaderMetricsSource struct {
271+
ingest.ReaderMetricsSource
272+
}
273+
274+
func (s *swappableReaderMetricsSource) set(metricsSource ingest.ReaderMetricsSource) {
275+
s.ReaderMetricsSource = metricsSource
276+
}
277+
278+
type zeroReaderMetricsSource struct{}
279+
280+
func (z *zeroReaderMetricsSource) BufferedBytes() int64 { return 0 }
281+
func (z *zeroReaderMetricsSource) BufferedRecords() int64 { return 0 }
282+
func (z *zeroReaderMetricsSource) EstimatedBytesPerRecord() int64 { return 0 }
283+
284+
var _ ingest.ReaderMetricsSource = (*zeroReaderMetricsSource)(nil)
285+
286+
// newFetchers creates a new concurrent fetcher, retrying until it succeeds or the context is cancelled.
287+
// The returned error is the last error encountered.
288+
func (b *BlockBuilder) newFetchers(ctx context.Context, logger log.Logger, partition int32, startOffset int64) (*ingest.ConcurrentFetchers, error) {
289+
if b.readerMetrics == nil {
290+
panic("readerMetrics should be non-nil when concurrent fetchers are used")
291+
}
292+
293+
boff := backoff.New(ctx, backoff.Config{
294+
MinBackoff: 100 * time.Millisecond,
295+
MaxBackoff: 5 * time.Second,
296+
MaxRetries: 10,
297+
})
298+
299+
var lastError error
300+
301+
for boff.Ongoing() {
302+
f, ferr := ingest.NewConcurrentFetchers(
303+
ctx,
304+
b.kafkaClient,
305+
logger,
306+
b.cfg.Kafka.Topic,
307+
partition,
308+
startOffset,
309+
b.cfg.Kafka.FetchConcurrencyMax,
310+
int32(b.cfg.Kafka.MaxBufferedBytes),
311+
b.cfg.Kafka.UseCompressedBytesAsFetchMaxBytes,
312+
b.cfg.Kafka.FetchMaxWait,
313+
nil, // Don't need a reader since we've provided the start offset.
314+
ingest.OnRangeErrorAbort,
315+
nil, // We're aborting on range error, so we don't need an offset reader.
316+
backoff.Config{
317+
MinBackoff: 100 * time.Millisecond,
318+
MaxBackoff: 1 * time.Second,
319+
},
320+
b.readerMetrics)
321+
if ferr == nil {
322+
return f, nil
323+
}
324+
level.Warn(b.logger).Log("msg", "failed to create concurrent fetcher, probably retrying...", "err", ferr)
325+
lastError = ferr
326+
boff.Wait()
327+
}
328+
329+
return nil, lastError
330+
}
331+
246332
// consumePartitionSection is for the use of scheduler-based architecture.
247333
// startOffset is inclusive, endOffset is exclusive, and must be valid offsets and not something in the future (endOffset can be technically 1 offset in the future).
248334
// All the records and samples between these offsets will be consumed and put into a block.
@@ -254,8 +340,8 @@ func (b *BlockBuilder) consumePartitionSection(
254340
builder *TSDBBuilder,
255341
partition int32,
256342
startOffset, endOffset int64,
257-
) (lastConsumedOffset int64, retErr error) {
258-
lastConsumedOffset = startOffset
343+
) (retErr error) {
344+
lastConsumedOffset := startOffset
259345
if startOffset >= endOffset {
260346
level.Info(logger).Log("msg", "nothing to consume")
261347
return
@@ -287,29 +373,49 @@ func (b *BlockBuilder) consumePartitionSection(
287373
})
288374
defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{b.cfg.Kafka.Topic: {partition}})
289375

376+
var fetchPoller fetchPoller = b.kafkaClient
377+
378+
if b.cfg.Kafka.FetchConcurrencyMax > 0 {
379+
f, ferr := b.newFetchers(ctx, logger, partition, startOffset)
380+
if ferr != nil {
381+
return fmt.Errorf("creating concurrent fetcher: %w", ferr)
382+
}
383+
384+
b.readerMetricsSource.set(f)
385+
386+
f.Start(ctx)
387+
defer f.Stop()
388+
389+
fetchPoller = &fetchWrapper{f}
390+
}
391+
290392
level.Info(logger).Log("msg", "start consuming", "partition", partition, "start_offset", startOffset, "end_offset", endOffset)
291393

292-
var (
293-
firstRecOffset = int64(-1)
294-
lastRecOffset = int64(-1)
295-
)
394+
firstRecOffset := int64(-1)
296395

297-
for lastRecOffset < endOffset-1 {
396+
for lastConsumedOffset < endOffset-1 {
298397
if err := context.Cause(ctx); err != nil {
299-
return 0, err
398+
return err
300399
}
301400

302401
// PollFetches can return a non-failed fetch with zero records. In such a case, with only the fetches at hands,
303402
// we cannot tell if the consumer has already reached the latest end of the partition, i.e. no more records to consume,
304403
// or there is more data in the backlog, and we must retry the poll. That's why the consumer loop above has to guard
305404
// the iterations against the endOffset, so it retries the polling up until the expected end of the partition is reached.
306-
fetches := b.kafkaClient.PollFetches(ctx)
405+
fetches := fetchPoller.PollFetches(ctx)
406+
var fetchErr error
307407
fetches.EachError(func(_ string, _ int32, err error) {
308408
if !errors.Is(err, context.Canceled) {
309409
level.Error(logger).Log("msg", "failed to fetch records", "err", err)
310410
b.blockBuilderMetrics.fetchErrors.WithLabelValues(fmt.Sprintf("%d", partition)).Inc()
411+
if fetchErr == nil {
412+
fetchErr = err
413+
}
311414
}
312415
})
416+
if fetchErr != nil {
417+
return fmt.Errorf("poll fetches: %w", fetchErr)
418+
}
313419

314420
recordsAll := func(fetches kgo.Fetches) iter.Seq[*kgo.Record] {
315421
return func(yield func(*kgo.Record) bool) {
@@ -328,15 +434,14 @@ func (b *BlockBuilder) consumePartitionSection(
328434

329435
records := recordsAll(fetches)
330436
for rec := range records {
331-
lastRecOffset = rec.Offset
437+
lastConsumedOffset = rec.Offset
332438
if firstRecOffset == -1 {
333-
firstRecOffset = lastRecOffset
439+
firstRecOffset = lastConsumedOffset
334440
}
335441
}
336442

337-
err := consumer.Consume(ctx, records)
338-
if err != nil {
339-
return 0, fmt.Errorf("consume records in partition %d: %w", partition, err)
443+
if err := consumer.Consume(ctx, records); err != nil {
444+
return fmt.Errorf("consume records in partition %d: %w", partition, err)
340445
}
341446
}
342447

@@ -348,12 +453,12 @@ func (b *BlockBuilder) consumePartitionSection(
348453
var err error
349454
blockMetas, err = builder.CompactAndUpload(ctx, b.uploadBlocks)
350455
if err != nil {
351-
return 0, err
456+
return err
352457
}
353458

354459
// TODO: figure out a way to track the blockCounts metrics.
355460

356-
return lastRecOffset, nil
461+
return nil
357462
}
358463

359464
func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, metas []tsdb.BlockMeta) error {

pkg/blockbuilder/blockbuilder_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ func TestBlockBuilder(t *testing.T) {
130130
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
131131
)
132132
}
133-
134133
})
135134
}
136135
}

0 commit comments

Comments
 (0)