diff --git a/iac/provider-gcp/nomad/clean-nfs-cache.tf b/iac/provider-gcp/nomad/clean-nfs-cache.tf index 3816788609..326db315f4 100644 --- a/iac/provider-gcp/nomad/clean-nfs-cache.tf +++ b/iac/provider-gcp/nomad/clean-nfs-cache.tf @@ -15,15 +15,16 @@ resource "nomad_job" "clean_nfs_cache" { count = var.shared_chunk_cache_path != "" ? 1 : 0 jobspec = templatefile("${path.module}/jobs/clean-nfs-cache.hcl", { - node_pool = var.orchestrator_node_pool - bucket_name = var.fc_env_pipeline_bucket_name - environment = var.environment - clean_nfs_cache_checksum = data.external.filestore_cleanup_checksum.result.hex - nfs_cache_mount_path = var.shared_chunk_cache_path - max_disk_usage_target = var.filestore_cache_cleanup_disk_usage_target - dry_run = var.filestore_cache_cleanup_dry_run - deletions_per_loop = var.filestore_cache_cleanup_deletions_per_loop - files_per_loop = var.filestore_cache_cleanup_files_per_loop - otel_collector_endpoint = data.google_secret_manager_secret_version.grafana_logs_url.secret_data + node_pool = var.orchestrator_node_pool + bucket_name = var.fc_env_pipeline_bucket_name + environment = var.environment + clean_nfs_cache_checksum = data.external.filestore_cleanup_checksum.result.hex + nfs_cache_mount_path = var.shared_chunk_cache_path + max_disk_usage_target = var.filestore_cache_cleanup_disk_usage_target + dry_run = var.filestore_cache_cleanup_dry_run + deletions_per_loop = var.filestore_cache_cleanup_deletions_per_loop + files_per_loop = var.filestore_cache_cleanup_files_per_loop + otel_collector_grpc_endpoint = "localhost:${var.otel_collector_grpc_port}" + launch_darkly_api_key = trimspace(data.google_secret_manager_secret_version.launch_darkly_api_key.secret_data) }) } diff --git a/iac/provider-gcp/nomad/jobs/clean-nfs-cache.hcl b/iac/provider-gcp/nomad/jobs/clean-nfs-cache.hcl index 50eed641df..b79d613ad9 100644 --- a/iac/provider-gcp/nomad/jobs/clean-nfs-cache.hcl +++ b/iac/provider-gcp/nomad/jobs/clean-nfs-cache.hcl @@ -19,8 +19,15 @@ job "filestore-cleanup" { task "filestore-cleanup" { driver = "raw_exec" + resources { + memory = 2048 // in MB + } + env { NODE_ID = "$${node.unique.name}" + %{ if launch_darkly_api_key != "" } + LAUNCH_DARKLY_API_KEY = "${launch_darkly_api_key}" + %{ endif } } config { @@ -30,9 +37,7 @@ job "filestore-cleanup" { "--disk-usage-target-percent=${max_disk_usage_target}", "--files-per-loop=${files_per_loop}", "--deletions-per-loop=${deletions_per_loop}", - %{ if otel_collector_endpoint != "" } - "--otel-collector-endpoint=${otel_collector_endpoint}", - %{ endif } + "--otel-collector-endpoint=${otel_collector_grpc_endpoint}", "${nfs_cache_mount_path}", ] } diff --git a/packages/orchestrator/cmd/clean-nfs-cache/compare_test.go b/packages/orchestrator/cmd/clean-nfs-cache/compare_test.go new file mode 100644 index 0000000000..45c048fb3b --- /dev/null +++ b/packages/orchestrator/cmd/clean-nfs-cache/compare_test.go @@ -0,0 +1,92 @@ +package main + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/e2b-dev/infra/packages/orchestrator/cmd/clean-nfs-cache/ex" + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +func TestCompare(t *testing.T) { + var ( + testFileSize = 7317 + NDirs = 500 + NFiles = 10000 + PercentClean = 13 + ) + + if testing.Short() { + NDirs = 5 + NFiles = 100 + } + + ctx := context.Background() + targetBytesToDelete := uint64(NFiles*testFileSize*PercentClean/100) + 1 + + printSummary := func(start time.Time, atimes []time.Duration, deletedBytes uint64) { + mean, sd := standardDeviation(atimes) + t.Logf("Cleaned %d out of %d bytes in %v; file age %v (%v)", + deletedBytes, targetBytesToDelete, time.Since(start), mean.Round(time.Hour), sd.Round(time.Minute)) + } + + for _, nScan := range []int{1, 4, 16, 1024} { + for _, nDel := range []int{1, 2, 8, 1024} { + for _, nStat := range []int{1, 4, 16, 1024} { + t.Run(fmt.Sprintf("Scan%v-Del%v-Stat%v", nScan, nDel, nStat), func(t *testing.T) { + path := t.TempDir() + ex.CreateTestDir(path, NDirs, NFiles, testFileSize) + t.Cleanup(func() { + os.RemoveAll(path) + }) + start := time.Now() + c := ex.NewCleaner(ex.Options{ + Path: path, + DeleteN: NFiles / 100, + BatchN: NFiles / 10, + DryRun: false, + MaxConcurrentStat: nStat, + MaxConcurrentScan: nScan, + MaxConcurrentDelete: nDel, + TargetBytesToDelete: targetBytesToDelete, + MaxErrorRetries: 10, + }, logger.NewNopLogger()) + + err := c.Clean(ctx) + require.NoError(t, err) + require.GreaterOrEqual(t, c.DeletedBytes.Load(), targetBytesToDelete) + require.LessOrEqual(t, c.StatxInDirC.Load(), int64(NFiles)) + require.LessOrEqual(t, c.StatxC.Load(), int64(NFiles)+c.DeleteSubmittedC.Load()) + printSummary(start, c.DeletedAge, c.DeletedBytes.Load()) + }) + } + } + } + + t.Run("cleanNFSCache", func(t *testing.T) { + path := t.TempDir() + ex.CreateTestDir(path, NDirs, NFiles, testFileSize) + t.Cleanup(func() { + os.RemoveAll(path) + }) + + start := time.Now() + targetBytesToDelete := int64(NFiles*testFileSize*PercentClean/100) + 1 + + allResults, err := cleanNFSCache(ctx, []string{ + "clean-nfs-cache", + "--dry-run=false", + fmt.Sprintf("--files-per-loop=%d", NFiles/10), + fmt.Sprintf("--deletions-per-loop=%d", NFiles/100), + path, + }, targetBytesToDelete) + require.NoError(t, err) + require.GreaterOrEqual(t, allResults.deletedBytes, targetBytesToDelete) + printSummary(start, allResults.lastAccessed, uint64(allResults.deletedBytes)) + }) +} diff --git a/packages/orchestrator/cmd/clean-nfs-cache/ex/alloc_bench_test.go b/packages/orchestrator/cmd/clean-nfs-cache/ex/alloc_bench_test.go new file mode 100644 index 0000000000..d2ea2e8a75 --- /dev/null +++ b/packages/orchestrator/cmd/clean-nfs-cache/ex/alloc_bench_test.go @@ -0,0 +1,52 @@ +package ex + +import ( + "testing" + "unsafe" +) + +// sizeOfDir returns an estimate of the memory used by a Dir value +// including the name bytes (the string header is counted in unsafe.Sizeof). +func sizeOfDir(d *Dir) uintptr { + sz := unsafe.Sizeof(*d) + sz += uintptr(len(d.Name)) + + return sz +} + +// sizeOfFile returns an estimate of the memory used by a File value +// including the name bytes. +func sizeOfFile(f *File) uintptr { + sz := unsafe.Sizeof(*f) + sz += uintptr(len(f.Name)) + + return sz +} + +func BenchmarkSizes(b *testing.B) { + // create a 64-byte name + nameBytes := make([]byte, 64) + for i := range nameBytes { + nameBytes[i] = 'x' + } + name := string(nameBytes) + + d := &Dir{Name: name} + f := &File{Name: name} + + b.Run("DirSizeBytes", func(b *testing.B) { + for range b.N { + _ = sizeOfDir(d) + } + b.ReportMetric(float64(sizeOfDir(d)), "bytes") + b.Logf("Dir: struct header %d bytes + name %d bytes = %d bytes", unsafe.Sizeof(*d), len(d.Name), sizeOfDir(d)) + }) + + b.Run("FileSizeBytes", func(b *testing.B) { + for range b.N { + _ = sizeOfFile(f) + } + b.ReportMetric(float64(sizeOfFile(f)), "bytes") + b.Logf("File: struct header %d bytes + name %d bytes = %d bytes", unsafe.Sizeof(*f), len(f.Name), sizeOfFile(f)) + }) +} diff --git a/packages/orchestrator/cmd/clean-nfs-cache/ex/clean.go b/packages/orchestrator/cmd/clean-nfs-cache/ex/clean.go new file mode 100644 index 0000000000..fd529c8890 --- /dev/null +++ b/packages/orchestrator/cmd/clean-nfs-cache/ex/clean.go @@ -0,0 +1,378 @@ +package ex + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +type Cleaner struct { + Options + Counters + logger.Logger + + base string + root *Dir + statRequestCh chan *statReq +} + +type Options struct { + Experimental bool + Path string + BatchN int + DeleteN int + TargetBytesToDelete uint64 + DryRun bool + MaxConcurrentStat int + MaxConcurrentScan int + MaxConcurrentDelete int + MaxErrorRetries int + TargetDiskUsagePercent float64 + OtelCollectorEndpoint string +} + +type Counters struct { + DeleteSubmittedC atomic.Int64 + DeleteAttemptC atomic.Int64 + DeleteErrC atomic.Int64 + DeleteAlreadyGoneC atomic.Int64 + DeleteSkipC atomic.Int64 + DeletedBytes atomic.Uint64 + DeletedAge []time.Duration + + // Syscalls + ReadDirC atomic.Int64 + StatxInDirC atomic.Int64 + StatxC atomic.Int64 + RemoveC atomic.Int64 + RemoveDirC atomic.Int64 +} + +const ( + dirStateInitial = iota + dirStateScanning + dirStatScanned +) + +type Dir struct { + mu sync.Mutex + state int + + Name string + Dirs []*Dir + Files []File +} + +type File struct { + Name string // short name + ATimeUnix int64 // atime in unix seconds + Size uint64 +} + +type Candidate struct { + Parent *Dir + FullPath string + ATimeUnix int64 + BTimeUnix int64 + Size uint64 +} + +type statReq struct { + df *os.File + name string + response chan *statReq + f *File + err error +} + +var ( + ErrNoFiles = errors.New("no files found to clean") + ErrMaxRetries = errors.New("maximum error retries reached") + ErrBusy = errors.New("directory is in use by another scanner") + ErrUsage = errors.New("usage: clean-nfs-cache []") +) + +func NewCleaner(opts Options, log logger.Logger) *Cleaner { + c := &Cleaner{ + Options: opts, + Logger: log, + base: filepath.Dir(opts.Path), + root: NewDir(filepath.Base(opts.Path)), + } + + return c +} + +func (c *Cleaner) validateOptions() error { + if c.Path == "" { + return ErrUsage + } + var errs []error + if c.DeleteN <= 0 { + errs = append(errs, errors.New("deletions-per-loop must be > 0")) + } + if c.BatchN <= 0 { + errs = append(errs, errors.New("files-per-loop must be > 0")) + } + if c.BatchN < c.DeleteN { + errs = append(errs, errors.New("files-per-loop must be >= deletions-per-loop")) + } + if c.TargetBytesToDelete == 0 && c.TargetDiskUsagePercent == 0 { + errs = append(errs, errors.New("either target-bytes-to-delete or disk-usage-target-percent must be set")) + } + if c.MaxConcurrentStat <= 0 { + errs = append(errs, errors.New("max-concurrent-stat must be > 0")) + } + if c.MaxConcurrentScan <= 0 { + errs = append(errs, errors.New("max-concurrent-scan must be > 0")) + } + if c.MaxConcurrentDelete <= 0 { + errs = append(errs, errors.New("max-concurrent-delete must be > 0")) + } + + return errors.Join(errs...) +} + +func (c *Cleaner) Clean(ctx context.Context) error { + if err := c.validateOptions(); err != nil { + return err + } + + ctx, cancel := context.WithCancel(ctx) + + errCh := make(chan error) + defer close(errCh) + candidateCh := make(chan *Candidate) + defer close(candidateCh) + deleteCh := make(chan *Candidate) + defer close(deleteCh) + c.statRequestCh = make(chan *statReq) + defer close(c.statRequestCh) + + drainedCh := make(chan struct{}) + running := &sync.WaitGroup{} + + batch := make([]*Candidate, 0, c.DeleteN) + n := 0 + + draining := false + var result error + drain := func(err error) { + if draining { + return + } + + cancel() + draining = true + result = err + + go func() { + // wait for the running goroutines to finish + running.Wait() + close(drainedCh) + }() + } + + for range c.MaxConcurrentStat { + running.Add(1) + go c.Statter(ctx, running) + } + for range c.MaxConcurrentScan { + running.Add(1) + go c.Scanner(ctx, candidateCh, errCh, running) + } + for range c.MaxConcurrentDelete { + running.Add(1) + go c.Deleter(ctx, deleteCh, running) + } + + for { + if c.DeletedBytes.Load() >= c.TargetBytesToDelete && !draining { + c.Info(ctx, "target bytes deleted reached, draining remaining candidates") + drain(nil) + } + + select { + case <-drainedCh: + return result + + case <-ctx.Done(): + drain(ctx.Err()) + + case candidate := <-candidateCh: + if draining { + continue + } + + n++ + batch = append(batch, candidate) + if n < c.BatchN { + continue + } + + // Process the batch, start by sorting candidates by age (oldest first) + sort.Slice(batch, func(i, j int) bool { + return batch[i].ATimeUnix > batch[j].ATimeUnix + }) + + c.Info(ctx, "selected batch", + zap.Int("count", len(batch)), + zap.Duration("oldest", time.Since(time.Unix(batch[0].ATimeUnix, 0))), + zap.Duration("newest", time.Since(time.Unix(batch[len(batch)-1].ATimeUnix, 0))), + ) + + // reinsert the "younger" candidates back into the directory tree + c.reinsertCandidates(batch[c.DeleteN:]) + + total := uint64(0) + for _, toDelete := range batch[:c.DeleteN] { + deleteCh <- toDelete + c.DeleteSubmittedC.Add(1) + total += toDelete.Size + } + c.Info(ctx, "deleting files", + zap.Int("count", c.DeleteN), + zap.Uint64("bytes", total)) + batch = batch[:0] + n = 0 + + case err := <-errCh: + if !draining && errors.Is(err, ErrMaxRetries) { + drain(err) + } + } + } +} + +func (c *Cleaner) reinsertCandidates(candidates []*Candidate) { + // sort the candidates by their parent directory so we lock and re-sort each directory only once. + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].Parent.Name < candidates[j].Parent.Name + }) + var prevParent *Dir + var files []File + for _, candidate := range candidates { + parent := candidate.Parent // not nil + newParent := parent != prevParent + if newParent { + if prevParent != nil { + prevParent.reinsertFiles(files) + } + prevParent = parent + files = files[:0] + } + + f := File{ + Name: filepath.Base(candidate.FullPath), + ATimeUnix: candidate.ATimeUnix, + Size: candidate.Size, + } + files = append(files, f) + } + if prevParent != nil { + prevParent.reinsertFiles(files) + } +} + +func (c *Cleaner) abs(path []*Dir, name string) string { + join := c.base + for _, p := range path { + join = filepath.Join(join, p.Name) + } + if name != "" { + join = filepath.Join(join, name) + } + + return join +} + +func NewDir(name string) *Dir { + return &Dir{ + Name: name, + mu: sync.Mutex{}, + } +} + +func (d *Dir) reinsertFiles(files []File) { + d.mu.Lock() + defer d.mu.Unlock() + + d.Files = append(d.Files, files...) + d.sort() +} + +func (d *Dir) sort() { + // sort the dirs by name + sort.Slice(d.Dirs, func(i, j int) bool { + return d.Dirs[i].Name < d.Dirs[j].Name + }) + + sort.Slice(d.Files, func(i, j int) bool { + return d.Files[i].ATimeUnix > d.Files[j].ATimeUnix + }) +} + +func (d *Dir) IsScanned() bool { + d.mu.Lock() + defer d.mu.Unlock() + + return d.state == dirStatScanned +} + +func (d *Dir) isEmpty() bool { + return d.state == dirStatScanned && len(d.Files) == 0 && len(d.Dirs) == 0 +} + +func (d *Dir) IsEmpty() bool { + d.mu.Lock() + defer d.mu.Unlock() + + return d.isEmpty() +} + +func (c *Cleaner) timeit(ctx context.Context, message string, fn func()) { + start := time.Now() + fn() + done := time.Since(start).Round(time.Millisecond) + + c.Debug(ctx, message, zap.Duration("duration", done)) +} + +func CreateTestDir(path string, nDirs int, nFiles int, fsize int) { + os.MkdirAll(path, 0o755) + + for i := range nDirs { + dirPath := filepath.Join(path, fmt.Sprintf("dir%d", i)) + err := os.Mkdir(dirPath, 0o755) + if err != nil { + panic(err) + } + } + + for i := range nFiles { + dirPath := filepath.Join(path, fmt.Sprintf("dir%d", i%nDirs)) + filePath := filepath.Join(dirPath, fmt.Sprintf("file%d.txt", i)) + err := os.WriteFile(filePath, []byte(""), 0o644) + if err == nil { + err = os.Truncate(filePath, int64(fsize)) + } + if err != nil { + panic(err) + } + tt := time.Now().Add(-1 * time.Duration(i) * time.Minute) + err = os.Chtimes(filePath, tt, tt) + if err != nil { + panic(err) + } + } +} diff --git a/packages/orchestrator/cmd/clean-nfs-cache/ex/clean_test.go b/packages/orchestrator/cmd/clean-nfs-cache/ex/clean_test.go new file mode 100644 index 0000000000..236698e966 --- /dev/null +++ b/packages/orchestrator/cmd/clean-nfs-cache/ex/clean_test.go @@ -0,0 +1,130 @@ +package ex + +import ( + "context" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +func TestDirSort(t *testing.T) { + d := &Dir{ + Name: "testdir", + Dirs: []*Dir{ + {Name: "subdirB"}, + {Name: "subdirA"}, + {Name: "subdirC"}, + }, + Files: []File{ + {Name: "file3.txt", ATimeUnix: 300}, + {Name: "file1.txt", ATimeUnix: 100}, + {Name: "file2.txt", ATimeUnix: 200}, + }, + } + + d.sort() + + require.Equal(t, "subdirA", d.Dirs[0].Name) + require.Equal(t, "subdirB", d.Dirs[1].Name) + require.Equal(t, "subdirC", d.Dirs[2].Name) + + require.Equal(t, "file3.txt", d.Files[0].Name) + require.Equal(t, int64(300), d.Files[0].ATimeUnix) + require.Equal(t, "file2.txt", d.Files[1].Name) + require.Equal(t, int64(200), d.Files[1].ATimeUnix) + require.Equal(t, "file1.txt", d.Files[2].Name) + require.Equal(t, int64(100), d.Files[2].ATimeUnix) +} + +func TestCleanDeletesTwoFiles(t *testing.T) { + root := t.TempDir() + defer os.RemoveAll(root) + + // create root path used by Cleaner + rootPath := filepath.Join(root, "root") + err := os.MkdirAll(rootPath, 0o755) + require.NoError(t, err) + + subdirs := []string{"subA", "subB"} + origFiles := map[string][]string{} + + now := time.Now() + + // Create 2 subdirs each with 9 files: file0 (oldest) ... file8 (newest) + for _, sd := range subdirs { + dirPath := filepath.Join(rootPath, sd) + err = os.MkdirAll(dirPath, 0o755) + require.NoError(t, err) + + names := []string{} + for i := range 9 { + name := filepath.Join(dirPath, "file"+strconv.Itoa(i)+".txt") + names = append(names, filepath.Base(name)) + // write 512 bytes to ensure non-zero size + err = os.WriteFile(name, make([]byte, 512), 0o644) + require.NoError(t, err) + + // file0 should be oldest, file8 newest + ageMinutes := 100*(5*i) + i // ensure clear ordering + mtime := now.Add(time.Duration(-ageMinutes) * time.Minute) + err = os.Chtimes(name, mtime, mtime) + require.NoError(t, err) + } + origFiles[sd] = names + } + + // Configure Cleaner to delete 2 files (target bytes equal to 2 files) + opts := Options{ + Path: rootPath, + BatchN: 4, + DeleteN: 2, + TargetBytesToDelete: 1024, // 2 * 512 + DryRun: false, + MaxConcurrentStat: 1, + MaxConcurrentScan: 1, + MaxConcurrentDelete: 1, + } + + c := NewCleaner(opts, logger.NewNopLogger()) + + // Run Clean with a timeout to avoid hangs. + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + err = c.Clean(ctx) + require.NoError(t, err) + + // Collect which files remain and which were deleted + deleted := []string{} + for _, sd := range subdirs { + dirPath := filepath.Join(rootPath, sd) + entries, err := os.ReadDir(dirPath) + require.NoError(t, err) + remaining := map[string]bool{} + for _, e := range entries { + if !e.IsDir() { + remaining[e.Name()] = true + } + } + for _, fn := range origFiles[sd] { + if !remaining[fn] { + deleted = append(deleted, filepath.Join(sd, fn)) + } + } + } + + // Expect at least 2 deletions + require.GreaterOrEqual(t, len(deleted), 2) + + // Expect that the newest files remain + for _, sd := range subdirs { + expectedFile := filepath.Join(sd, "file8.txt") + require.NotContains(t, deleted, expectedFile, "expected newest files to remain") + } +} diff --git a/packages/orchestrator/cmd/clean-nfs-cache/ex/delete.go b/packages/orchestrator/cmd/clean-nfs-cache/ex/delete.go new file mode 100644 index 0000000000..d442cce5e9 --- /dev/null +++ b/packages/orchestrator/cmd/clean-nfs-cache/ex/delete.go @@ -0,0 +1,58 @@ +package ex + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + "go.uber.org/zap" +) + +func (c *Cleaner) Deleter(ctx context.Context, toDelete <-chan *Candidate, done *sync.WaitGroup) { + defer done.Done() + for { + select { + case <-ctx.Done(): + return + case d := <-toDelete: + c.deleteFile(ctx, d) + } + } +} + +func (c *Cleaner) deleteFile(ctx context.Context, candidate *Candidate) { + // Best-effort: get current metadata to detect atime changes or if file is gone + meta, err := c.stat(candidate.FullPath) + c.DeleteAttemptC.Add(1) + + switch { + case err != nil: + if !os.IsNotExist(err) { + c.Info(ctx, "error stating file before delete", zap.Error(err)) + c.DeleteAlreadyGoneC.Add(1) + } else { + c.DeleteErrC.Add(1) + } + + case meta.ATimeUnix == candidate.ATimeUnix: + c.RemoveC.Add(1) + if !c.DryRun { + c.timeit(ctx, + fmt.Sprintf("delete file aged %v: %s", time.Since(time.Unix(candidate.ATimeUnix, 0)), candidate.FullPath), + func() { + err = os.Remove(candidate.FullPath) + }) + } + if err == nil { + c.DeletedBytes.Add(candidate.Size) + c.root.mu.Lock() + c.DeletedAge = append(c.DeletedAge, time.Since(time.Unix(candidate.ATimeUnix, 0))) + c.root.mu.Unlock() + } + + default: + c.DeleteSkipC.Add(1) + } +} diff --git a/packages/orchestrator/cmd/clean-nfs-cache/ex/scan.go b/packages/orchestrator/cmd/clean-nfs-cache/ex/scan.go new file mode 100644 index 0000000000..55acd065ae --- /dev/null +++ b/packages/orchestrator/cmd/clean-nfs-cache/ex/scan.go @@ -0,0 +1,267 @@ +package ex + +import ( + "context" + "errors" + "fmt" + "io" + "math/rand" + "os" + "sync" + "time" + + "go.uber.org/zap" +) + +func (c *Cleaner) Scanner(ctx context.Context, candidateCh chan<- *Candidate, errCh chan<- error, done *sync.WaitGroup) { + defer done.Done() + continuousErrors := 0 + for { + select { + case <-ctx.Done(): + return + default: + candidate, err := c.FindCandidate(ctx) + + switch { + case err == nil: + continuousErrors = 0 + candidateCh <- candidate + + case errors.Is(err, ErrBusy): + // We tried a busy directory, just retry + time.Sleep(1 * time.Millisecond) + + continue + + default: + if !errors.Is(err, ErrNoFiles) { + c.Info(ctx, "error during scanning", zap.Error(err)) + } + continuousErrors++ + if continuousErrors >= c.MaxErrorRetries { + errCh <- ErrMaxRetries + + return + } + errCh <- err + } + } + } +} + +func (c *Cleaner) Statter(ctx context.Context, done *sync.WaitGroup) { + defer done.Done() + for { + select { + case <-ctx.Done(): + return + case req := <-c.statRequestCh: + f, err := c.statInDir(req.df, req.name) + req.f = f + req.err = err + req.response <- req + } + } +} + +func (c *Cleaner) FindCandidate(ctx context.Context) (*Candidate, error) { + return c.findCandidate(ctx, []*Dir{c.root}) +} + +func (c *Cleaner) findCandidate(ctx context.Context, path []*Dir) (*Candidate, error) { + d, err := c.scanDir(ctx, path) + if err != nil { + return nil, err + } + + f, subDir, err := d.randomSubdirOrOldestFile() + switch { + case err != nil: + return nil, err + + case f == nil: + return c.findCandidate(ctx, append(path, subDir)) + + default: + return &Candidate{ + Parent: d, + FullPath: c.abs(path, f.Name), + ATimeUnix: f.ATimeUnix, + Size: f.Size, + }, nil + } +} + +func (d *Dir) randomSubdirOrOldestFile() (cadidate *File, randomSubdir *Dir, err error) { + d.mu.Lock() + defer d.mu.Unlock() + + if len(d.Files) == 0 && len(d.Dirs) == 0 { + return nil, nil, ErrNoFiles + } + itemCount := len(d.Dirs) + len(d.Files) + i := rand.Intn(itemCount) + + if i < len(d.Dirs) { + return nil, d.Dirs[i], nil + } + + // file needs to be unlinked before it's returned + f := d.Files[len(d.Files)-1] + d.Files = d.Files[:len(d.Files)-1] + + return &f, nil, nil +} + +func (c *Cleaner) scanDir(ctx context.Context, path []*Dir) (out *Dir, err error) { + d := path[len(path)-1] + + d.mu.Lock() + + switch d.state { + case dirStatScanned: + d.mu.Unlock() + + return d, nil + + case dirStateScanning: + d.mu.Unlock() + + return nil, ErrBusy + + default: + // continue + } + d.state = dirStateScanning + d.mu.Unlock() + + defer func() { + if err != nil { + // on error, mark dir as not scanned + d.mu.Lock() + d.state = dirStateInitial + d.mu.Unlock() + } + }() + + absPath := c.abs(path, "") + df, err := os.Open(absPath) + if err != nil { + return nil, fmt.Errorf("failed to open directory %s: %w", absPath, err) + } + defer df.Close() + + entries := []os.DirEntry{} + for { + c.ReadDirC.Add(1) + e, err := df.ReadDir(2048) + if len(e) != 0 { + entries = append(entries, e...) + } + switch { + case err == io.EOF: + // explicit EOF - we're done + case err != nil: + return nil, fmt.Errorf("failed to read directory %s: %w", df.Name(), err) + case len(e) < 2048: + // got fewer than requested with no error - we're done + default: + // got exactly 2048, keep reading + continue + } + + break + } + + // If the directory is empty, remove it from its parent and delete it + if len(entries) == 0 && len(path) > 1 { + c.removeEmptyDir(ctx, path) + + return nil, fmt.Errorf("%w: empty directory %s", ErrNoFiles, absPath) + } + + dirs := make([]*Dir, 0) + nFiles := 0 + var filenames []string + for _, e := range entries { + name := e.Name() + t := e.Type() + + if t&os.ModeDir != 0 { + dirs = append(dirs, NewDir(name)) + } else { + // file + nFiles++ + filenames = append(filenames, name) + } + } + + // submit all stat requests + responseCh := make(chan *statReq, len(filenames)) + for _, name := range filenames { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case c.statRequestCh <- &statReq{df: df, name: name, response: responseCh}: + // submitted + } + } + + // get all stat responses + err = nil + files := make([]File, nFiles) + for i := range nFiles { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case resp := <-responseCh: + if resp.err != nil { + err = resp.err + + continue + } + files[i] = *resp.f + } + } + if err != nil { + return nil, err + } + + d.mu.Lock() + d.Dirs = dirs + d.Files = files + d.sort() + d.state = dirStatScanned + d.mu.Unlock() + + return d, nil +} + +func (c *Cleaner) removeEmptyDir(ctx context.Context, path []*Dir) { + d := path[len(path)-1] + parent := path[len(path)-2] + absPath := c.abs(path, "") + + parent.mu.Lock() + // remove this dir from its parent + for i, subdir := range parent.Dirs { + if subdir.Name != d.Name { + continue + } + parent.Dirs = append(parent.Dirs[:i], parent.Dirs[i+1:]...) + + break + } + parent.mu.Unlock() + + if !c.DryRun { + if err := os.Remove(absPath); err == nil { + c.RemoveDirC.Add(1) + } else { + c.Info(ctx, "failed to delete empty dir", + zap.String("dir", absPath), + zap.Error(err)) + } + } +} diff --git a/packages/orchestrator/cmd/clean-nfs-cache/ex/scan_test.go b/packages/orchestrator/cmd/clean-nfs-cache/ex/scan_test.go new file mode 100644 index 0000000000..204ddd63d0 --- /dev/null +++ b/packages/orchestrator/cmd/clean-nfs-cache/ex/scan_test.go @@ -0,0 +1,108 @@ +package ex + +import ( + "context" + "os" + "path/filepath" + "strconv" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +func TestScanDir(t *testing.T) { + path := t.TempDir() + CreateTestDir(path, 157, 10000, 1000) + t.Cleanup(func() { + os.RemoveAll(path) + }) + + c := NewCleaner(Options{ + Path: path, + }, logger.NewNopLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + c.statRequestCh = make(chan *statReq, 1) + go c.Statter(ctx, wg) + defer func() { + cancel() + wg.Wait() + }() + + df, err := os.Open(path) + require.NoError(t, err) + defer df.Close() + + dir, err := c.scanDir(ctx, []*Dir{c.root}) + require.NoError(t, err) + require.True(t, dir.IsScanned()) + require.False(t, dir.IsEmpty()) + require.NotEmpty(t, dir.Dirs) + + sub := dir.Dirs[0] + dfsub, err := os.Open(filepath.Join(path, sub.Name)) + require.NoError(t, err) + defer dfsub.Close() + + sub, err = c.scanDir(ctx, []*Dir{c.root, sub}) + require.NoError(t, err) + require.True(t, sub.IsScanned()) + require.False(t, sub.IsEmpty()) + require.NotEmpty(t, sub.Files) +} + +func TestRandomSubdirOrOldestFile(t *testing.T) { + // build a Dir with files sorted so that the oldest file is at the end + d := &Dir{} + count := 10 + d.Files = make([]File, count) + for i := range count { + // timestamps decrease so the last entry is the oldest + ts := int64(1000 - i) + name := "f" + strconv.Itoa(i) + d.Files[i] = File{ + Name: name, + ATimeUnix: ts, + Size: uint64(100000 + i), + } + } + d.sort() + + f, sub, err := d.randomSubdirOrOldestFile() + require.NoError(t, err) + require.Nil(t, sub) + require.NotNil(t, f) + require.Equal(t, "f9", f.Name) + require.Equal(t, int64(991), f.ATimeUnix) + + // build a dir with no files, so we get a subdir for sure + d2 := &Dir{} + count = 5 + d2.Dirs = make([]*Dir, count) + for i := range count { + name := "d" + strconv.Itoa(i) + d2.Dirs[i] = NewDir(name) + } + d2.sort() + f, sub, err = d2.randomSubdirOrOldestFile() + require.NoError(t, err) + require.Nil(t, f) + require.NotNil(t, sub) + require.Contains(t, map[string]bool{ + "d0": true, + "d1": true, + "d2": true, + "d3": true, + "d4": true, + }, sub.Name) + + // build an empty dir + d3 := &Dir{} + _, _, err = d3.randomSubdirOrOldestFile() + require.ErrorIs(t, err, ErrNoFiles) +} diff --git a/packages/orchestrator/cmd/clean-nfs-cache/ex/stat_linux.go b/packages/orchestrator/cmd/clean-nfs-cache/ex/stat_linux.go new file mode 100644 index 0000000000..192d652296 --- /dev/null +++ b/packages/orchestrator/cmd/clean-nfs-cache/ex/stat_linux.go @@ -0,0 +1,51 @@ +//go:build linux + +package ex + +import ( + "fmt" + "os" + + "golang.org/x/sys/unix" +) + +func (c *Cleaner) stat(fullPath string) (*Candidate, error) { + c.StatxC.Add(1) + var statx unix.Statx_t + err := unix.Statx(unix.AT_FDCWD, fullPath, + unix.AT_STATX_DONT_SYNC|unix.AT_SYMLINK_NOFOLLOW|unix.AT_NO_AUTOMOUNT, + unix.STATX_ATIME|unix.STATX_BTIME|unix.STATX_SIZE, + &statx, + ) + if err != nil { + return nil, fmt.Errorf("failed to statx %q: %w", fullPath, err) + } + + return &Candidate{ + Parent: nil, // not relevant here + FullPath: fullPath, + Size: statx.Size, + ATimeUnix: statx.Atime.Sec, + BTimeUnix: statx.Btime.Sec, + }, nil +} + +func (c *Cleaner) statInDir(df *os.File, filename string) (*File, error) { + c.StatxC.Add(1) + c.StatxInDirC.Add(1) + var statx unix.Statx_t + err := unix.Statx(int(df.Fd()), filename, + unix.AT_STATX_DONT_SYNC|unix.AT_SYMLINK_NOFOLLOW|unix.AT_NO_AUTOMOUNT, + unix.STATX_ATIME|unix.STATX_SIZE, + &statx, + ) + if err != nil { + return nil, fmt.Errorf("failed to statx %q: %w", filename, err) + } + + return &File{ + Name: filename, + Size: statx.Size, + ATimeUnix: statx.Atime.Sec, + }, nil +} diff --git a/packages/orchestrator/cmd/clean-nfs-cache/ex/stat_osx.go b/packages/orchestrator/cmd/clean-nfs-cache/ex/stat_osx.go new file mode 100644 index 0000000000..e0adb8e425 --- /dev/null +++ b/packages/orchestrator/cmd/clean-nfs-cache/ex/stat_osx.go @@ -0,0 +1,47 @@ +//go:build darwin + +package ex + +import ( + "fmt" + "os" + "path/filepath" + "syscall" +) + +func (c *Cleaner) stat(path string) (*Candidate, error) { + c.StatxC.Add(1) + stat, err := os.Stat(path) + if err != nil { + return nil, fmt.Errorf("could not stat info: %w", err) + } + + actualStruct, ok := stat.Sys().(*syscall.Stat_t) + if !ok { + return nil, fmt.Errorf("stat did not return a syscall.Stat_t for %q: %T", + stat.Name(), stat.Sys()) + } + + return &Candidate{ + Parent: nil, // not relevant here + FullPath: path, + Size: uint64(stat.Size()), + ATimeUnix: actualStruct.Atimespec.Sec, + BTimeUnix: actualStruct.Birthtimespec.Sec, + }, nil +} + +func (c *Cleaner) statInDir(df *os.File, filename string) (*File, error) { + c.StatxInDirC.Add(1) + // performance on OS X doeas not matter, so just use the full stat + cand, err := c.stat(filepath.Join(df.Name(), filename)) + if err != nil { + return nil, err + } + + return &File{ + Name: filename, + ATimeUnix: cand.ATimeUnix, + Size: cand.Size, + }, nil +} diff --git a/packages/orchestrator/cmd/clean-nfs-cache/main.go b/packages/orchestrator/cmd/clean-nfs-cache/main.go index b409a2eaab..60f06e402c 100644 --- a/packages/orchestrator/cmd/clean-nfs-cache/main.go +++ b/packages/orchestrator/cmd/clean-nfs-cache/main.go @@ -12,12 +12,15 @@ import ( "time" "github.com/google/uuid" + "github.com/launchdarkly/go-sdk-common/v3/ldvalue" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "github.com/e2b-dev/infra/packages/orchestrator/cmd/clean-nfs-cache/ex" "github.com/e2b-dev/infra/packages/orchestrator/cmd/clean-nfs-cache/pkg" "github.com/e2b-dev/infra/packages/shared/pkg/env" + featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" "github.com/e2b-dev/infra/packages/shared/pkg/utils" @@ -31,23 +34,193 @@ const ( func main() { ctx := context.Background() - if err := cleanNFSCache(ctx); err != nil { - logger.L().Error(ctx, "clean NFS cache failed", zap.Error(err)) - os.Exit(1) + var log logger.Logger + var err error + var opts ex.Options + + defer func() { + if err != nil { + if log != nil { + log.Error(ctx, "NFS cache cleaner failed", zap.Error(err)) + } else { + fmt.Println("NFS cache cleaner failed:", err) + } + os.Exit(1) + } + }() + + opts, log, err = preRun(ctx) + if err != nil { + return + } + defer log.Sync() + if !opts.Experimental { + _, err = cleanNFSCache(ctx, os.Args, int64(opts.TargetBytesToDelete)) + + return + } + + start := time.Now() + log.Info(ctx, "starting", + zap.Bool("dry_run", opts.DryRun), + zap.Bool("experimental", opts.Experimental), + zap.Uint64("target_bytes_to_delete", opts.TargetBytesToDelete), + zap.Float64("target_disk_usage_percent", opts.TargetDiskUsagePercent), + zap.Int("batch_n", opts.BatchN), + zap.Int("delete_n", opts.DeleteN), + zap.Int("max_error_retries", opts.MaxErrorRetries), + zap.String("path", opts.Path), + zap.String("otel_collector_endpoint", opts.OtelCollectorEndpoint), + zap.Int("max_concurrent_stat", opts.MaxConcurrentStat), + zap.Int("max_concurrent_scan", opts.MaxConcurrentScan), + zap.Int("max_concurrent_delete", opts.MaxConcurrentDelete), + ) + + c := ex.NewCleaner(opts, log) + if err = c.Clean(ctx); err != nil { + return + } + + if c.RemoveC.Load() == 0 { + log.Info(ctx, "no files deleted") + + return } + + mean, sd := standardDeviation(c.DeletedAge) + dur := time.Since(start) + filesPerSec := float64(c.RemoveC.Load()) / dur.Seconds() + bytesPerSec := float64(c.DeletedBytes.Load()) / dur.Seconds() + log.Info(ctx, "summary", + zap.Bool("dry_run", opts.DryRun), + zap.Int64("del_submitted", c.DeleteSubmittedC.Load()), + zap.Int64("del_attempted", c.DeleteAttemptC.Load()), + zap.Int64("del_already_gone", c.DeleteAlreadyGoneC.Load()), + zap.Int64("del_err", c.DeleteErrC.Load()), + zap.Int64("del_skip_changed", c.DeleteSkipC.Load()), + zap.Int64("del_files", c.RemoveC.Load()), + zap.Int64("empty_dirs", c.RemoveDirC.Load()), + zap.Uint64("bytes", c.DeletedBytes.Load()), + zap.Duration("most_recently_used", minDuration(c.DeletedAge).Round(time.Second)), + zap.Duration("least_recently_used", maxDuration(c.DeletedAge).Round(time.Second)), + zap.Duration("mean_age", mean.Round(time.Second)), + zap.Float64("files_per_second", filesPerSec), + zap.Float64("bytes_per_second", bytesPerSec), + zap.Duration("duration", dur.Round(time.Second)), + zap.Duration("std_deviation", sd.Round(time.Second))) } -func cleanNFSCache(ctx context.Context) error { - path, opts, err := parseArgs() +func preRun(ctx context.Context) (ex.Options, logger.Logger, error) { + var opts ex.Options + + flags := flag.NewFlagSet("clean-nfs-cache", flag.ExitOnError) + flags.Float64Var(&opts.TargetDiskUsagePercent, "disk-usage-target-percent", 90, "disk usage target as a % (0-100)") + flags.BoolVar(&opts.DryRun, "dry-run", true, "dry run") + flags.IntVar(&opts.BatchN, "files-per-loop", 10000, "number of files to gather metadata for per loop") + flags.IntVar(&opts.DeleteN, "deletions-per-loop", 100, "maximum number of files to delete per loop") + flags.StringVar(&opts.OtelCollectorEndpoint, "otel-collector-endpoint", "", "endpoint of the otel collector") + flags.IntVar(&opts.MaxConcurrentStat, "max-concurrent-stat", 1, "number of concurrent stat goroutines") + flags.IntVar(&opts.MaxConcurrentScan, "max-concurrent-scan", 1, "number of concurrent scanner goroutines") + flags.IntVar(&opts.MaxConcurrentDelete, "max-concurrent-delete", 1, "number of concurrent deleter goroutines") + flags.IntVar(&opts.MaxErrorRetries, "max-error-retries", 10, "maximum number of continuous error retries before giving up") + flags.Uint64Var(&opts.TargetBytesToDelete, "target-bytes-to-delete", 0, "target number of bytes to delete (overrides disk-usage-target-percent if set)") + flags.BoolVar(&opts.Experimental, "experimental", false, "enable experimental features") + + args := os.Args[1:] // skip the command name + if err := flags.Parse(args); err != nil { + return opts, nil, fmt.Errorf("could not parse flags: %w", err) + } + + args = flags.Args() + if len(args) != 1 { + return opts, nil, ErrUsage + } + opts.Path = args[0] + + ffc, err := featureflags.NewClient() + if err != nil { + return opts, nil, err + } + defer ffc.Close(ctx) + + v, err := ffc.JSONFlag(ctx, featureflags.CleanNFSCacheExperimental) if err != nil { - return fmt.Errorf("invalid arguments: %w", err) + return opts, nil, err + } + + if v.Type() == ldvalue.ObjectType { + m := v.AsValueMap() + if m.Get("experimental").IsBool() { + opts.Experimental = m.Get("experimental").BoolValue() + } + + if opts.Experimental { + if m.Get("maxConcurrentDelete").IsNumber() { + opts.MaxConcurrentDelete = m.Get("maxConcurrentDelete").IntValue() + } + if m.Get("maxConcurrentScan").IsNumber() { + opts.MaxConcurrentScan = m.Get("maxConcurrentScan").IntValue() + } + if m.Get("maxConcurrentStat").IsNumber() { + opts.MaxConcurrentStat = m.Get("maxConcurrentStat").IntValue() + } + if m.Get("maxErrorRetries").IsNumber() { + opts.MaxErrorRetries = m.Get("maxErrorRetries").IntValue() + } + } + + if m.Get("targetBytesToDelete").IsNumber() { + opts.TargetBytesToDelete = uint64(m.Get("targetBytesToDelete").Float64Value()) + } + } + + var cores []zapcore.Core + if opts.OtelCollectorEndpoint != "" { + otelCore, err := newOtelCore(ctx, opts.OtelCollectorEndpoint) + if err != nil { + return opts, nil, fmt.Errorf("failed to create otel logger: %w", err) + } + cores = append(cores, otelCore) + } + + l := utils.Must(logger.NewLogger(ctx, logger.LoggerConfig{ + ServiceName: serviceName, + IsInternal: true, + IsDebug: env.IsDebug(), + Cores: cores, + EnableConsole: true, + })) + + if opts.TargetBytesToDelete == 0 { + var diskInfo pkg.DiskInfo + var err error + timeit(ctx, fmt.Sprintf("getting disk info for %q", opts.Path), func() { + diskInfo, err = pkg.GetDiskInfo(ctx, opts.Path) + }) + if err != nil { + return opts, nil, fmt.Errorf("could not get disk info: %w", err) + } + targetDiskUsage := uint64(opts.TargetDiskUsagePercent / 100 * float64(diskInfo.Total)) + if uint64(diskInfo.Used) > targetDiskUsage { + opts.TargetBytesToDelete = uint64(diskInfo.Used) - targetDiskUsage + } + } + + return opts, l, nil +} + +func cleanNFSCache(ctx context.Context, args []string, targetBytesToDelete int64) (results, error) { + var allResults results + path, opts, err := parseArgs(args) + if err != nil { + return allResults, fmt.Errorf("invalid arguments: %w", err) } var cores []zapcore.Core if opts.otelCollectorEndpoint != "" { - otelCore, err := newOtelCore(ctx, opts) + otelCore, err := newOtelCore(ctx, opts.otelCollectorEndpoint) if err != nil { - return fmt.Errorf("failed to create otel logger: %w", err) + return allResults, fmt.Errorf("failed to create otel logger: %w", err) } cores = append(cores, otelCore) } @@ -71,6 +244,7 @@ func cleanNFSCache(ctx context.Context) error { logger.L().Info(ctx, "starting", zap.Bool("dry_run", opts.dryRun), zap.Float64("target_percent", opts.targetDiskUsagePercent), + zap.Int64("target_bytes_to_delete", targetBytesToDelete), zap.String("path", path)) var diskInfo pkg.DiskInfo @@ -78,18 +252,21 @@ func cleanNFSCache(ctx context.Context) error { diskInfo, err = pkg.GetDiskInfo(ctx, path) }) if err != nil { - return fmt.Errorf("could not get disk info: %w", err) + return allResults, fmt.Errorf("could not get disk info: %w", err) } targetDiskUsage := int64(float64(opts.targetDiskUsagePercent) / 100 * float64(diskInfo.Total)) + // for testing + if targetBytesToDelete > 0 { + targetDiskUsage = diskInfo.Used - targetBytesToDelete + } areWeDone := func() bool { return diskInfo.Used < targetDiskUsage } cache := pkg.NewListingCache(path) - - var allResults results + start := time.Now() defer func() { - printSummary(ctx, allResults, opts) + printSummary(ctx, allResults, opts, start) }() // if conditions are met, we're done @@ -101,7 +278,7 @@ func cleanNFSCache(ctx context.Context) error { logger.L().Info(ctx, "got files", zap.Int("count", len(files))) }) if err != nil { - return fmt.Errorf("could not get File metadata: %w", err) + return allResults, fmt.Errorf("could not get File metadata: %w", err) } // sort files by access timestamp @@ -118,14 +295,14 @@ func cleanNFSCache(ctx context.Context) error { }) allResults = allResults.sum(results) if err != nil { - return fmt.Errorf("failed to delete files: %w", err) + return allResults, fmt.Errorf("failed to delete files: %w", err) } } - return nil + return allResults, nil } -func newOtelCore(ctx context.Context, opts opts) (zapcore.Core, error) { +func newOtelCore(ctx context.Context, endpoint string) (zapcore.Core, error) { nodeID := env.GetNodeID() serviceInstanceID := uuid.NewString() @@ -135,7 +312,7 @@ func newOtelCore(ctx context.Context, opts opts) (zapcore.Core, error) { } logsExporter, err := telemetry.NewLogExporter(ctx, - otlploggrpc.WithEndpoint(opts.otelCollectorEndpoint), + otlploggrpc.WithEndpoint(endpoint), ) if err != nil { return nil, fmt.Errorf("failed to create logs exporter: %w", err) @@ -147,32 +324,41 @@ func newOtelCore(ctx context.Context, opts opts) (zapcore.Core, error) { return otelCore, nil } -func printSummary(ctx context.Context, r results, opts opts) { +func printSummary(ctx context.Context, r results, opts opts, start time.Time) { if r.deletedFiles == 0 { logger.L().Info(ctx, "no files deleted") return } + avg, sd := standardDeviation(r.lastAccessed) + dur := time.Since(start) + filesPerSec := float64(r.deletedFiles) / dur.Seconds() + bytesPerSec := float64(r.deletedBytes) / dur.Seconds() + logger.L().Info(ctx, "summary", zap.Bool("dry_run", opts.dryRun), zap.Int64("files", r.deletedFiles), zap.Int64("bytes", r.deletedBytes), zap.Duration("most_recently_used", minDuration(r.lastAccessed).Round(time.Second)), zap.Duration("least_recently_used", maxDuration(r.lastAccessed).Round(time.Second)), - zap.Duration("std_deviation", standardDeviation(r.lastAccessed).Round(time.Second))) + zap.Duration("mean_age", avg.Round(time.Second)), + zap.Float64("files_per_second", filesPerSec), + zap.Float64("bytes_per_second", bytesPerSec), + zap.Duration("duration", dur.Round(time.Second)), + zap.Duration("std_deviation", sd.Round(time.Second))) } -func standardDeviation(accessed []time.Duration) time.Duration { +func standardDeviation(accessed []time.Duration) (mean, stddev time.Duration) { if len(accessed) == 0 { - return 0 + return 0, 0 } - var sum time.Duration + var sum float64 for i := range accessed { - sum += accessed[i] + sum += float64(accessed[i]) } - mean := sum / time.Duration(len(accessed)) + mean = time.Duration(sum / float64(len(accessed))) var sd float64 for i := range accessed { @@ -181,7 +367,7 @@ func standardDeviation(accessed []time.Duration) time.Duration { sd = math.Sqrt(sd / float64(len(accessed))) - return time.Duration(sd) + return mean, time.Duration(sd) } func maxDuration(durations []time.Duration) time.Duration { @@ -331,7 +517,7 @@ var ( ErrFail = errors.New("clean-nfs-cache failed to find enough space") ) -func parseArgs() (string, opts, error) { +func parseArgs(args []string) (string, opts, error) { flags := flag.NewFlagSet("clean-nfs-cache", flag.ExitOnError) var opts opts @@ -341,7 +527,7 @@ func parseArgs() (string, opts, error) { flags.Int64Var(&opts.filesToDeletePerLoop, "deletions-per-loop", 100, "maximum number of files to delete per loop") flags.StringVar(&opts.otelCollectorEndpoint, "otel-collector-endpoint", "", "endpoint of the otel collector") - args := os.Args[1:] // skip the command name + args = args[1:] // skip the command name if err := flags.Parse(args); err != nil { return "", opts, fmt.Errorf("could not parse flags: %w", err) } diff --git a/packages/shared/pkg/feature-flags/client.go b/packages/shared/pkg/feature-flags/client.go index 354f6be230..bd31ee6068 100644 --- a/packages/shared/pkg/feature-flags/client.go +++ b/packages/shared/pkg/feature-flags/client.go @@ -7,6 +7,7 @@ import ( "time" "github.com/launchdarkly/go-sdk-common/v3/ldcontext" + "github.com/launchdarkly/go-sdk-common/v3/ldvalue" ldclient "github.com/launchdarkly/go-server-sdk/v7" "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldtestdata" "go.uber.org/zap" @@ -65,6 +66,19 @@ func (c *Client) BoolFlag(ctx context.Context, flag BoolFlag, contexts ...ldcont return enabled, nil } +func (c *Client) JSONFlag(ctx context.Context, flag JSONFlag, contexts ...ldcontext.Context) (ldvalue.Value, error) { + if c.ld == nil { + return flag.fallback, fmt.Errorf("LaunchDarkly client is not initialized") + } + + v, err := c.ld.JSONVariationCtx(ctx, flag.name, mergeContexts(ctx, contexts), flag.fallback) + if err != nil { + return v, fmt.Errorf("error evaluating %s: %w", flag, err) + } + + return v, nil +} + func (c *Client) IntFlag(ctx context.Context, flag IntFlag, contexts ...ldcontext.Context) (int, error) { if c.ld == nil { return flag.fallback, fmt.Errorf("LaunchDarkly client is not initialized") diff --git a/packages/shared/pkg/feature-flags/flags.go b/packages/shared/pkg/feature-flags/flags.go index 1e228e7739..b6a19459cb 100644 --- a/packages/shared/pkg/feature-flags/flags.go +++ b/packages/shared/pkg/feature-flags/flags.go @@ -29,6 +29,29 @@ const ( // All flags must be defined here: https://app.launchdarkly.com/projects/default/flags/ +type JSONFlag struct { + name string + fallback ldvalue.Value +} + +func (f JSONFlag) String() string { + return f.name +} + +func (f JSONFlag) Fallback() *ldvalue.Value { + return &f.fallback +} + +func newJSONFlag(name string, fallback ldvalue.Value) JSONFlag { + flag := JSONFlag{name: name, fallback: fallback} + builder := LaunchDarklyOfflineStore.Flag(flag.name).ValueForAll(fallback) + LaunchDarklyOfflineStore.Update(builder) + + return flag +} + +var CleanNFSCacheExperimental = newJSONFlag("clean-nfs-cache-experimental", ldvalue.Null()) + type BoolFlag struct { name string fallback bool