@@ -2,10 +2,12 @@ package querier
22
33import (
44 "context"
5+ "fmt"
56 "time"
67
78 "github.com/go-kit/log"
89 "github.com/go-kit/log/level"
10+ lru "github.com/hashicorp/golang-lru/v2"
911 "github.com/parquet-go/parquet-go"
1012 "github.com/pkg/errors"
1113 "github.com/prometheus-community/parquet-common/schema"
@@ -87,6 +89,11 @@ func NewParquetQueryable(
8789 return nil , err
8890 }
8991
92+ cache , err := newCache [* parquet_storage.ParquetShard ]("parquet-shards" , config .ParquetQueryableShardCacheSize , newCacheMetrics (reg ))
93+ if err != nil {
94+ return nil , err
95+ }
96+
9097 cDecoder := schema .NewPrometheusParquetChunksDecoder (chunkenc .NewPool ())
9198
9299 parquetQueryable , err := search .NewParquetQueryable (cDecoder , func (ctx context.Context , mint , maxt int64 ) ([]* parquet_storage.ParquetShard , error ) {
@@ -106,20 +113,30 @@ func NewParquetQueryable(
106113
107114 for i , block := range blocks {
108115 errGroup .Go (func () error {
109- // we always only have 1 shard - shard 0
110- shard , err := parquet_storage .OpenParquetShard (ctx ,
111- userBkt ,
112- block .ID .String (),
113- 0 ,
114- parquet_storage .WithFileOptions (
115- parquet .SkipMagicBytes (true ),
116- parquet .ReadBufferSize (100 * 1024 ),
117- parquet .SkipBloomFilters (true ),
118- ),
119- parquet_storage .WithOptimisticReader (true ),
120- )
116+ cacheKey := fmt .Sprintf ("%v-%v" , userID , block .ID )
117+ shard := cache .Get (cacheKey )
118+ if shard == nil {
119+ // we always only have 1 shard - shard 0
120+ // Use context.Background() here as the file can be cached and live after the request ends.
121+ shard , err = parquet_storage .OpenParquetShard (context .Background (),
122+ userBkt ,
123+ block .ID .String (),
124+ 0 ,
125+ parquet_storage .WithFileOptions (
126+ parquet .SkipMagicBytes (true ),
127+ parquet .ReadBufferSize (100 * 1024 ),
128+ parquet .SkipBloomFilters (true ),
129+ ),
130+ parquet_storage .WithOptimisticReader (true ),
131+ )
132+ if err != nil {
133+ return err
134+ }
135+ cache .Set (cacheKey , shard )
136+ }
137+
121138 shards [i ] = shard
122- return err
139+ return nil
123140 })
124141 }
125142
@@ -401,3 +418,89 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i
401418
402419 return remaining , parquetBlocks , nil
403420}
421+
422+ type cacheInterface [T any ] interface {
423+ Get (path string ) T
424+ Set (path string , reader T )
425+ }
426+
427+ type cacheMetrics struct {
428+ hits * prometheus.CounterVec
429+ misses * prometheus.CounterVec
430+ evictions * prometheus.CounterVec
431+ size * prometheus.GaugeVec
432+ }
433+
434+ func newCacheMetrics (reg prometheus.Registerer ) * cacheMetrics {
435+ return & cacheMetrics {
436+ hits : promauto .With (reg ).NewCounterVec (prometheus.CounterOpts {
437+ Name : "cortex_parquet_queryable_cache_hits_total" ,
438+ Help : "Total number of parquet cache hits" ,
439+ }, []string {"name" }),
440+ misses : promauto .With (reg ).NewCounterVec (prometheus.CounterOpts {
441+ Name : "cortex_parquet_queryable_cache_misses_total" ,
442+ Help : "Total number of parquet cache misses" ,
443+ }, []string {"name" }),
444+ evictions : promauto .With (reg ).NewCounterVec (prometheus.CounterOpts {
445+ Name : "cortex_parquet_queryable_cache_evictions_total" ,
446+ Help : "Total number of parquet cache evictions" ,
447+ }, []string {"name" }),
448+ size : promauto .With (reg ).NewGaugeVec (prometheus.GaugeOpts {
449+ Name : "cortex_parquet_queryable_cache_item_count" ,
450+ Help : "Current number of cached parquet items" ,
451+ }, []string {"name" }),
452+ }
453+ }
454+
455+ type Cache [T any ] struct {
456+ cache * lru.Cache [string , T ]
457+ name string
458+ metrics * cacheMetrics
459+ }
460+
461+ func newCache [T any ](name string , size int , metrics * cacheMetrics ) (cacheInterface [T ], error ) {
462+ if size <= 0 {
463+ return & noopCache [T ]{}, nil
464+ }
465+ cache , err := lru .NewWithEvict (size , func (key string , value T ) {
466+ metrics .evictions .WithLabelValues (name ).Inc ()
467+ metrics .size .WithLabelValues (name ).Dec ()
468+ })
469+ if err != nil {
470+ return nil , err
471+ }
472+
473+ return & Cache [T ]{
474+ cache : cache ,
475+ name : name ,
476+ metrics : metrics ,
477+ }, nil
478+ }
479+
480+ func (c * Cache [T ]) Get (path string ) (r T ) {
481+ if reader , ok := c .cache .Get (path ); ok {
482+ c .metrics .hits .WithLabelValues (c .name ).Inc ()
483+ return reader
484+ }
485+ c .metrics .misses .WithLabelValues (c .name ).Inc ()
486+ return
487+ }
488+
489+ func (c * Cache [T ]) Set (path string , reader T ) {
490+ if ! c .cache .Contains (path ) {
491+ c .metrics .size .WithLabelValues (c .name ).Inc ()
492+ }
493+ c .metrics .misses .WithLabelValues (c .name ).Inc ()
494+ c .cache .Add (path , reader )
495+ }
496+
497+ type noopCache [T any ] struct {
498+ }
499+
500+ func (n noopCache [T ]) Get (_ string ) (r T ) {
501+ return
502+ }
503+
504+ func (n noopCache [T ]) Set (_ string , _ T ) {
505+
506+ }
0 commit comments