Skip to content

Commit b8abe5e

Browse files
committed
feature(metastore): query metastore before building physical plan
1 parent 9bd5d6a commit b8abe5e

File tree

6 files changed

+642
-11
lines changed

6 files changed

+642
-11
lines changed

pkg/engine/engine.go

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,16 @@ type ExecutorConfig struct {
5858

5959
// RangeConfig determines how to optimize range reads in the V2 engine.
6060
RangeConfig rangeio.Config `yaml:"range_reads" category:"experimental" doc:"description=Configures how to read byte ranges from object storage when using the V2 engine."`
61+
62+
// MetaqueriesEnabled toggles the metaquery planning stage.
63+
MetaqueriesEnabled bool `yaml:"metaqueries_enabled" category:"experimental"`
6164
}
6265

6366
func (cfg *ExecutorConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
6467
f.IntVar(&cfg.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.")
6568
f.IntVar(&cfg.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.")
6669
cfg.RangeConfig.RegisterFlags(prefix+"range-reads.", f)
70+
f.BoolVar(&cfg.MetaqueriesEnabled, prefix+"metaquery-enable", false, "Experimental: Enable the metaquery planning stage that precomputes catalog lookups.")
6771
}
6872

6973
// Params holds parameters for constructing a new [Engine].
@@ -107,6 +111,13 @@ type Engine struct {
107111
limits logql.Limits // Limits to apply to engine queries.
108112

109113
metastore metastore.Metastore
114+
// metaqueriesEnabled gates the metaquery planning stage.
115+
metaqueriesEnabled bool
116+
metaqueryRunner physical.MetaqueryRunner
117+
}
118+
119+
func (e *Engine) metaqueriesActive() bool {
120+
return e.metaqueriesEnabled && e.metaqueryRunner != nil
110121
}
111122

112123
// New creates a new Engine.
@@ -120,9 +131,10 @@ func New(params Params) (*Engine, error) {
120131
metrics: newMetrics(params.Registerer),
121132
rangeConfig: params.Config.RangeConfig,
122133

123-
scheduler: params.Scheduler,
124-
bucket: bucket.NewXCapBucket(params.Bucket),
125-
limits: params.Limits,
134+
scheduler: params.Scheduler,
135+
bucket: bucket.NewXCapBucket(params.Bucket),
136+
limits: params.Limits,
137+
metaqueriesEnabled: params.Config.MetaqueriesEnabled,
126138
}
127139

128140
if e.bucket != nil {
@@ -133,6 +145,12 @@ func New(params Params) (*Engine, error) {
133145
e.metastore = metastore.NewObjectMetastore(indexBucket, e.logger, params.Registerer)
134146
}
135147

148+
if e.metaqueriesEnabled && e.metastore != nil {
149+
e.metaqueryRunner = &physical.LocalMetaqueryRunner{Metastore: e.metastore}
150+
} else {
151+
e.metaqueriesEnabled = false
152+
}
153+
136154
return e, nil
137155
}
138156

@@ -305,9 +323,31 @@ func (e *Engine) buildPhysicalPlan(ctx context.Context, logger log.Logger, param
305323
region := xcap.RegionFromContext(ctx)
306324
timer := prometheus.NewTimer(e.metrics.physicalPlanning)
307325

308-
// TODO(rfratto): To improve the performance of the physical planner, we
309-
// may want to parallelize metastore lookups across scheduled tasks as well.
310-
catalog := physical.NewMetastoreCatalog(ctx, e.metastore)
326+
var (
327+
catalog physical.Catalog
328+
metaDuration time.Duration
329+
metaRequests int
330+
)
331+
if e.metaqueriesActive() {
332+
// run all the metastore lookups at this point and prepare a metastore catalog that already has all the answers
333+
var err error
334+
catalog, metaDuration, metaRequests, err = e.prepareCatalogWithMetaqueries(ctx, params, logicalPlan)
335+
if err != nil {
336+
level.Warn(logger).Log("msg", "failed to prepare metaqueries", "err", err)
337+
region.RecordError(err)
338+
return nil, 0, ErrPlanningFailed
339+
}
340+
e.metrics.metaqueryPlanning.Observe(metaDuration.Seconds())
341+
level.Info(logger).Log("msg", "finished metaquery planning", "duration", metaDuration.String(), "requests", metaRequests)
342+
region.AddEvent("finished metaquery planning",
343+
attribute.Stringer("duration", metaDuration),
344+
attribute.Int("requests", metaRequests),
345+
)
346+
} else {
347+
// TODO(rfratto): To improve the performance of the physical planner, we
348+
// may want to parallelize metastore lookups across scheduled tasks as well.
349+
catalog = physical.NewMetastoreCatalog(ctx, e.metastore)
350+
}
311351

312352
// TODO(rfratto): It feels strange that we need to past the start/end time
313353
// to the physical planner. Isn't it already represented by the logical
@@ -341,6 +381,32 @@ func (e *Engine) buildPhysicalPlan(ctx context.Context, logger log.Logger, param
341381
return physicalPlan, duration, nil
342382
}
343383

384+
func (e *Engine) prepareCatalogWithMetaqueries(ctx context.Context, params logql.Params, logicalPlan *logical.Plan) (physical.Catalog, time.Duration, int, error) {
385+
start := time.Now()
386+
387+
collector := physical.NewMetaqueryCollectorCatalog()
388+
collectorPlanner := physical.NewPlanner(physical.NewContext(params.Start(), params.End()), collector)
389+
if _, err := collectorPlanner.Build(logicalPlan); err != nil {
390+
return nil, 0, 0, err
391+
}
392+
393+
requests := collector.Requests()
394+
395+
prepared := physical.NewMetaqueryPreparedCatalog()
396+
for _, req := range requests {
397+
result, err := e.metaqueryRunner.Run(ctx, req)
398+
if err != nil {
399+
return nil, 0, 0, fmt.Errorf("executing metaquery: %w", err)
400+
}
401+
err = prepared.Store(req, result)
402+
if err != nil {
403+
return nil, 0, 0, fmt.Errorf("storing metaquery result: %w", err)
404+
}
405+
}
406+
407+
return prepared, time.Since(start), len(requests), nil
408+
}
409+
344410
// buildWorkflow builds a workflow from the given physical plan.
345411
func (e *Engine) buildWorkflow(ctx context.Context, logger log.Logger, physicalPlan *physical.Plan) (*workflow.Workflow, time.Duration, error) {
346412
tenantID, err := user.ExtractOrgID(ctx)

pkg/engine/engine_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package engine
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
11+
"github.com/grafana/loki/v3/pkg/engine/internal/planner/logical"
12+
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
13+
"github.com/grafana/loki/v3/pkg/engine/internal/types"
14+
"github.com/grafana/loki/v3/pkg/logproto"
15+
"github.com/grafana/loki/v3/pkg/logql"
16+
)
17+
18+
func TestPrepareCatalogWithMetaqueries(t *testing.T) {
19+
t.Parallel()
20+
21+
start := time.Unix(0, 0)
22+
end := start.Add(time.Minute)
23+
24+
params, err := logql.NewLiteralParams("{foo=\"bar\"}", start, end, 0, 0, logproto.BACKWARD, 0, nil, nil)
25+
require.NoError(t, err)
26+
27+
makeTable := buildTestMakeTable()
28+
plan := &logical.Plan{
29+
Instructions: []logical.Instruction{
30+
makeTable,
31+
&logical.Return{Value: makeTable},
32+
},
33+
}
34+
35+
stub := &metaqueryRunnerStub{
36+
sections: []*metastore.DataobjSectionDescriptor{
37+
{
38+
SectionKey: metastore.SectionKey{ObjectPath: "obj", SectionIdx: 1},
39+
StreamIDs: []int64{1},
40+
Start: start,
41+
End: end,
42+
},
43+
},
44+
}
45+
46+
e := &Engine{
47+
metaqueriesEnabled: true,
48+
metaqueryRunner: stub,
49+
}
50+
51+
catalog, _, requestCount, err := e.prepareCatalogWithMetaqueries(context.Background(), params, plan)
52+
require.NoError(t, err)
53+
require.Equal(t, 1, requestCount)
54+
require.Len(t, stub.requests, 1)
55+
56+
// Running the planner with the prepared catalog should succeed because the metaqueries were resolved.
57+
planner := physical.NewPlanner(physical.NewContext(start, end), catalog)
58+
_, err = planner.Build(plan)
59+
require.NoError(t, err)
60+
}
61+
62+
func buildTestMakeTable() *logical.MakeTable {
63+
selector := &logical.BinOp{
64+
Left: logical.NewColumnRef("__name__", types.ColumnTypeLabel),
65+
Right: logical.NewLiteral("value"),
66+
Op: types.BinaryOpEq,
67+
}
68+
return &logical.MakeTable{
69+
Selector: selector,
70+
Shard: logical.NewShard(0, 1),
71+
}
72+
}
73+
74+
type metaqueryRunnerStub struct {
75+
sections []*metastore.DataobjSectionDescriptor
76+
requests []physical.MetaqueryRequest
77+
}
78+
79+
func (m *metaqueryRunnerStub) Run(_ context.Context, req physical.MetaqueryRequest) (physical.MetaqueryResponse, error) {
80+
m.requests = append(m.requests, req)
81+
return physical.MetaqueryResponse{Kind: physical.MetaqueryRequestKindSections, Sections: m.sections}, nil
82+
}

0 commit comments

Comments
 (0)