Skip to content

Commit 4552f4a

Browse files
committed
expose projection hints in parquet queryable
Signed-off-by: yeya24 <benye@amazon.com>
1 parent f26509f commit 4552f4a

File tree

12 files changed

+646
-46
lines changed

12 files changed

+646
-46
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`.
99
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
1010
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
11+
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
1112
* [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125
1213
* [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092
1314
* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098

docs/blocks-storage/querier.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,18 @@ querier:
310310
# queryable.
311311
# CLI flag: -querier.parquet-queryable-fallback-disabled
312312
[parquet_queryable_fallback_disabled: <boolean> | default = false]
313+
314+
# [Experimental] If true, parquet queryable will honor projection hints and
315+
# only materialize requested labels. Projection is only applied when all
316+
# queried blocks are parquet blocks and not querying ingesters.
317+
# CLI flag: -querier.parquet-queryable-honor-projection-hints
318+
[parquet_queryable_honor_projection_hints: <boolean> | default = false]
319+
320+
# [Experimental] Time buffer to use when checking if query overlaps with
321+
# ingester data. Projection hints are disabled if query time range overlaps
322+
# with (now - query-ingesters-within - buffer).
323+
# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer
324+
[parquet_queryable_projection_hints_ingester_buffer: <duration> | default = 1h]
313325
```
314326
315327
### `blocks_storage_config`

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4827,6 +4827,18 @@ thanos_engine:
48274827
# need to make sure Parquet files are created before it is queryable.
48284828
# CLI flag: -querier.parquet-queryable-fallback-disabled
48294829
[parquet_queryable_fallback_disabled: <boolean> | default = false]
4830+
4831+
# [Experimental] If true, parquet queryable will honor projection hints and only
4832+
# materialize requested labels. Projection is only applied when all queried
4833+
# blocks are parquet blocks and not querying ingesters.
4834+
# CLI flag: -querier.parquet-queryable-honor-projection-hints
4835+
[parquet_queryable_honor_projection_hints: <boolean> | default = false]
4836+
4837+
# [Experimental] Time buffer to use when checking if query overlaps with
4838+
# ingester data. Projection hints are disabled if query time range overlaps with
4839+
# (now - query-ingesters-within - buffer).
4840+
# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer
4841+
[parquet_queryable_projection_hints_ingester_buffer: <duration> | default = 1h]
48304842
```
48314843
48324844
### `query_frontend_config`

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ require (
8787
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
8888
github.com/oklog/ulid/v2 v2.1.1
8989
github.com/parquet-go/parquet-go v0.25.1
90-
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71
90+
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94
9191
github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a
9292
github.com/prometheus/procfs v0.16.1
9393
github.com/sercand/kuberesolver/v5 v5.1.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,8 +1634,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
16341634
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
16351635
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
16361636
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
1637-
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 h1:BwrzRNGy0GbnBA7rQd85G6NuFvydvwTXxRB9XiA5TXk=
1638-
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
1637+
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 h1:6WmPxbqGMjBKLOZvurIZR5eEBF0Rd0t1oQ06PMWaHe8=
1638+
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
16391639
github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4=
16401640
github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis=
16411641
github.com/prometheus/alertmanager v0.29.0 h1:/ET4NmAGx2Dv9kStrXIBqBgHyiSgIk4OetY+hoZRfgc=

integration/parquet_querier_test.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77
"fmt"
88
"math/rand"
99
"path/filepath"
10+
"slices"
1011
"strconv"
1112
"testing"
1213
"time"
1314

1415
"github.com/cortexproject/promqlsmith"
16+
"github.com/prometheus/common/model"
1517
"github.com/prometheus/prometheus/model/labels"
1618
"github.com/stretchr/testify/require"
1719
"github.com/thanos-io/objstore"
@@ -23,7 +25,9 @@ import (
2325
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
2426
"github.com/cortexproject/cortex/integration/e2ecortex"
2527
"github.com/cortexproject/cortex/pkg/storage/bucket"
28+
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
2629
"github.com/cortexproject/cortex/pkg/storage/tsdb"
30+
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2731
"github.com/cortexproject/cortex/pkg/util/log"
2832
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
2933
)
@@ -176,3 +180,213 @@ func TestParquetFuzz(t *testing.T) {
176180
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
177181
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
178182
}
183+
184+
func TestParquetProjectionPushdownFuzz(t *testing.T) {
185+
s, err := e2e.NewScenario(networkName)
186+
require.NoError(t, err)
187+
defer s.Close()
188+
189+
consul := e2edb.NewConsulWithName("consul")
190+
memcached := e2ecache.NewMemcached()
191+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
192+
193+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
194+
flags := mergeFlags(
195+
baseFlags,
196+
map[string]string{
197+
"-target": "all,parquet-converter",
198+
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
199+
"-blocks-storage.tsdb.ship-interval": "1s",
200+
"-blocks-storage.bucket-store.sync-interval": "1s",
201+
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
202+
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
203+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
204+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
205+
"-querier.query-store-for-labels-enabled": "true",
206+
// compactor
207+
"-compactor.cleanup-interval": "1s",
208+
// Ingester.
209+
"-ring.store": "consul",
210+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
211+
// Distributor.
212+
"-distributor.replication-factor": "1",
213+
// Store-gateway.
214+
"-store-gateway.sharding-enabled": "false",
215+
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
216+
// alert manager
217+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
218+
// parquet-converter
219+
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
220+
"-parquet-converter.conversion-interval": "1s",
221+
"-parquet-converter.enabled": "true",
222+
// Querier - Enable Thanos engine with projection optimizer
223+
"-querier.thanos-engine": "true",
224+
"-querier.optimizers": "propagate-matchers,sort-matchers,merge-selects,detect-histogram-stats,projection", // Enable all optimizers including projection
225+
"-querier.enable-parquet-queryable": "true",
226+
"-querier.parquet-queryable-honor-projection-hints": "true", // Honor projection hints
227+
// Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters
228+
// Since test queries are 24-48h old, they won't query ingesters and projection will be enabled
229+
"-querier.query-ingesters-within": "2h",
230+
// Enable cache for parquet labels and chunks
231+
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
232+
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
233+
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
234+
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
235+
},
236+
)
237+
238+
// make alert manager config dir
239+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
240+
241+
ctx := context.Background()
242+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
243+
dir := filepath.Join(s.SharedDir(), "data")
244+
numSeries := 20
245+
numSamples := 100
246+
lbls := make([]labels.Labels, 0, numSeries)
247+
scrapeInterval := time.Minute
248+
statusCodes := []string{"200", "400", "404", "500", "502"}
249+
methods := []string{"GET", "POST", "PUT", "DELETE"}
250+
now := time.Now()
251+
// Make sure query time is old enough to not overlap with ingesters.
252+
start := now.Add(-time.Hour * 72)
253+
end := now.Add(-time.Hour * 48)
254+
255+
// Create series with multiple labels
256+
for i := range numSeries {
257+
lbls = append(lbls, labels.FromStrings(
258+
labels.MetricName, "http_requests_total",
259+
"job", "api-server",
260+
"instance", fmt.Sprintf("instance-%d", i%5),
261+
"status_code", statusCodes[i%len(statusCodes)],
262+
"method", methods[i%len(methods)],
263+
"path", fmt.Sprintf("/api/v1/endpoint%d", i%3),
264+
"cluster", "test-cluster",
265+
))
266+
}
267+
268+
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
269+
require.NoError(t, err)
270+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
271+
require.NoError(t, s.StartAndWaitReady(minio))
272+
273+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
274+
require.NoError(t, s.StartAndWaitReady(cortex))
275+
276+
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
277+
require.NoError(t, err)
278+
bkt := storage.GetBucket()
279+
userBucket := bucket.NewUserBucketClient("user-1", bkt, nil)
280+
281+
err = block.Upload(ctx, log.Logger, userBucket, filepath.Join(dir, id.String()), metadata.NoneFunc)
282+
require.NoError(t, err)
283+
284+
// Wait until we convert the blocks to parquet AND bucket index is updated
285+
cortex_testutil.Poll(t, 300*time.Second, true, func() interface{} {
286+
// Check if parquet marker exists
287+
markerFound := false
288+
err := userBucket.Iter(context.Background(), "", func(name string) error {
289+
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
290+
markerFound = true
291+
}
292+
return nil
293+
}, objstore.WithRecursiveIter())
294+
if err != nil || !markerFound {
295+
return false
296+
}
297+
298+
// Check if bucket index exists AND contains the parquet block metadata
299+
idx, err := bucketindex.ReadIndex(ctx, bkt, "user-1", nil, log.Logger)
300+
if err != nil {
301+
return false
302+
}
303+
304+
// Verify the block is in the bucket index with parquet metadata
305+
for _, b := range idx.Blocks {
306+
if b.ID == id && b.Parquet != nil {
307+
require.True(t, b.Parquet.Version == cortex_parquet.CurrentVersion)
308+
return true
309+
}
310+
}
311+
return false
312+
})
313+
314+
c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
315+
require.NoError(t, err)
316+
317+
testCases := []struct {
318+
name string
319+
query string
320+
expectedLabels []string // Labels that should be present in result
321+
}{
322+
{
323+
name: "vector selector query should not use projection",
324+
query: `http_requests_total`,
325+
expectedLabels: []string{"__name__", "job", "instance", "status_code", "method", "path", "cluster"},
326+
},
327+
{
328+
name: "simple_sum_by_job",
329+
query: `sum by (job) (http_requests_total)`,
330+
expectedLabels: []string{"job"},
331+
},
332+
{
333+
name: "rate_with_aggregation",
334+
query: `sum by (method) (rate(http_requests_total[5m]))`,
335+
expectedLabels: []string{"method"},
336+
},
337+
{
338+
name: "multiple_grouping_labels",
339+
query: `sum by (job, status_code) (http_requests_total)`,
340+
expectedLabels: []string{"job", "status_code"},
341+
},
342+
{
343+
name: "aggregation without query",
344+
query: `sum without (instance, method) (http_requests_total)`,
345+
expectedLabels: []string{"job", "status_code", "path", "cluster"},
346+
},
347+
}
348+
349+
for _, tc := range testCases {
350+
t.Run(tc.name, func(t *testing.T) {
351+
t.Logf("Testing: %s", tc.query)
352+
353+
// Execute instant query
354+
result, err := c.Query(tc.query, end)
355+
require.NoError(t, err)
356+
require.NotNil(t, result)
357+
358+
// Verify we got results
359+
vector, ok := result.(model.Vector)
360+
require.True(t, ok, "result should be a vector")
361+
require.NotEmpty(t, vector, "query should return results")
362+
363+
t.Logf("Query returned %d series", len(vector))
364+
365+
// Verify projection worked: series should only have the expected labels
366+
for _, sample := range vector {
367+
actualLabels := make(map[string]struct{})
368+
for label := range sample.Metric {
369+
actualLabels[string(label)] = struct{}{}
370+
}
371+
372+
// Check that all expected labels are present
373+
for _, expectedLabel := range tc.expectedLabels {
374+
_, ok := actualLabels[expectedLabel]
375+
require.True(t, ok,
376+
"series should have %s label", expectedLabel)
377+
}
378+
379+
// Check that no unexpected labels are present
380+
for lbl := range actualLabels {
381+
if !slices.Contains(tc.expectedLabels, lbl) {
382+
require.Fail(t, "series should not have unexpected label: %s", lbl)
383+
}
384+
}
385+
}
386+
})
387+
}
388+
389+
// Verify that parquet blocks were queried
390+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
391+
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
392+
}

0 commit comments

Comments
 (0)