Skip to content

Commit e87000c

Browse files
corylanouclaude
andauthored
feat: add directory replication support for multi-tenant databases (#738)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 22b5ce1 commit e87000c

File tree

3 files changed

+849
-9
lines changed

3 files changed

+849
-9
lines changed

cmd/litestream/main.go

Lines changed: 223 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,20 +294,39 @@ func (c *Config) Validate() error {
294294
}
295295

296296
// Validate database configs
297-
for _, db := range c.DBs {
297+
for idx, db := range c.DBs {
298+
// Validate that either path or dir is specified, but not both
299+
if db.Path != "" && db.Dir != "" {
300+
return fmt.Errorf("database config #%d: cannot specify both 'path' and 'dir'", idx+1)
301+
}
302+
if db.Path == "" && db.Dir == "" {
303+
return fmt.Errorf("database config #%d: must specify either 'path' or 'dir'", idx+1)
304+
}
305+
306+
// When using dir, pattern must be specified
307+
if db.Dir != "" && db.Pattern == "" {
308+
return fmt.Errorf("database config #%d: 'pattern' is required when using 'dir'", idx+1)
309+
}
310+
311+
// Use path or dir for identifying the config in error messages
312+
dbIdentifier := db.Path
313+
if dbIdentifier == "" {
314+
dbIdentifier = db.Dir
315+
}
316+
298317
// Validate sync intervals for replicas
299318
if db.Replica != nil && db.Replica.SyncInterval != nil && *db.Replica.SyncInterval <= 0 {
300319
return &ConfigValidationError{
301320
Err: ErrInvalidSyncInterval,
302-
Field: fmt.Sprintf("dbs[%s].replica.sync-interval", db.Path),
321+
Field: fmt.Sprintf("dbs[%s].replica.sync-interval", dbIdentifier),
303322
Value: *db.Replica.SyncInterval,
304323
}
305324
}
306325
for i, replica := range db.Replicas {
307326
if replica.SyncInterval != nil && *replica.SyncInterval <= 0 {
308327
return &ConfigValidationError{
309328
Err: ErrInvalidSyncInterval,
310-
Field: fmt.Sprintf("dbs[%s].replicas[%d].sync-interval", db.Path, i),
329+
Field: fmt.Sprintf("dbs[%s].replicas[%d].sync-interval", dbIdentifier, i),
311330
Value: *replica.SyncInterval,
312331
}
313332
}
@@ -409,6 +428,9 @@ func ParseConfig(r io.Reader, expandEnv bool) (_ Config, err error) {
409428

410429
// Normalize paths.
411430
for _, dbConfig := range config.DBs {
431+
if dbConfig.Path == "" {
432+
continue
433+
}
412434
if dbConfig.Path, err = expand(dbConfig.Path); err != nil {
413435
return config, err
414436
}
@@ -440,9 +462,12 @@ type CompactionLevelConfig struct {
440462
Interval time.Duration `yaml:"interval"`
441463
}
442464

443-
// DBConfig represents the configuration for a single database.
465+
// DBConfig represents the configuration for a single database or directory of databases.
444466
type DBConfig struct {
445467
Path string `yaml:"path"`
468+
Dir string `yaml:"dir"` // Directory to scan for databases
469+
Pattern string `yaml:"pattern"` // File pattern to match (e.g., "*.db", "*.sqlite")
470+
Recursive bool `yaml:"recursive"` // Scan subdirectories recursively
446471
MetaPath *string `yaml:"meta-path"`
447472
MonitorInterval *time.Duration `yaml:"monitor-interval"`
448473
CheckpointInterval *time.Duration `yaml:"checkpoint-interval"`
@@ -512,6 +537,200 @@ func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) {
512537
return db, nil
513538
}
514539

540+
// NewDBsFromDirectoryConfig scans a directory and creates DB instances for all SQLite databases found.
541+
func NewDBsFromDirectoryConfig(dbc *DBConfig) ([]*litestream.DB, error) {
542+
if dbc.Dir == "" {
543+
return nil, fmt.Errorf("directory path is required for directory replication")
544+
}
545+
546+
if dbc.Pattern == "" {
547+
return nil, fmt.Errorf("pattern is required for directory replication")
548+
}
549+
550+
dirPath, err := expand(dbc.Dir)
551+
if err != nil {
552+
return nil, err
553+
}
554+
555+
// Find all SQLite databases in the directory
556+
dbPaths, err := FindSQLiteDatabases(dirPath, dbc.Pattern, dbc.Recursive)
557+
if err != nil {
558+
return nil, fmt.Errorf("failed to scan directory %s: %w", dirPath, err)
559+
}
560+
561+
if len(dbPaths) == 0 {
562+
return nil, fmt.Errorf("no SQLite databases found in directory %s with pattern %s", dirPath, dbc.Pattern)
563+
}
564+
565+
// Create DB instances for each found database
566+
var dbs []*litestream.DB
567+
for _, dbPath := range dbPaths {
568+
// Calculate relative path from directory root
569+
relPath, err := filepath.Rel(dirPath, dbPath)
570+
if err != nil {
571+
return nil, fmt.Errorf("failed to calculate relative path for %s: %w", dbPath, err)
572+
}
573+
574+
// Create a copy of the config for each database
575+
dbConfigCopy := *dbc
576+
dbConfigCopy.Path = dbPath
577+
dbConfigCopy.Dir = "" // Clear dir field for individual DB
578+
dbConfigCopy.Pattern = "" // Clear pattern field
579+
dbConfigCopy.Recursive = false // Clear recursive flag
580+
581+
// Deep copy replica config and make path unique per database.
582+
// This prevents all databases from writing to the same replica path.
583+
if dbc.Replica != nil {
584+
replicaCopy, err := cloneReplicaConfigWithRelativePath(dbc.Replica, relPath)
585+
if err != nil {
586+
return nil, fmt.Errorf("failed to configure replica for %s: %w", dbPath, err)
587+
}
588+
dbConfigCopy.Replica = replicaCopy
589+
}
590+
591+
// Also handle deprecated 'replicas' array field.
592+
if len(dbc.Replicas) > 0 {
593+
dbConfigCopy.Replicas = make([]*ReplicaConfig, len(dbc.Replicas))
594+
for i, replica := range dbc.Replicas {
595+
replicaCopy, err := cloneReplicaConfigWithRelativePath(replica, relPath)
596+
if err != nil {
597+
return nil, fmt.Errorf("failed to configure replica %d for %s: %w", i, dbPath, err)
598+
}
599+
dbConfigCopy.Replicas[i] = replicaCopy
600+
}
601+
}
602+
603+
db, err := NewDBFromConfig(&dbConfigCopy)
604+
if err != nil {
605+
return nil, fmt.Errorf("failed to create DB for %s: %w", dbPath, err)
606+
}
607+
dbs = append(dbs, db)
608+
}
609+
610+
return dbs, nil
611+
}
612+
613+
// cloneReplicaConfigWithRelativePath returns a copy of the replica configuration with the
614+
// database-relative path appended to either the replica path or URL, depending on how the
615+
// replica was configured.
616+
func cloneReplicaConfigWithRelativePath(base *ReplicaConfig, relPath string) (*ReplicaConfig, error) {
617+
if base == nil {
618+
return nil, nil
619+
}
620+
621+
replicaCopy := *base
622+
relPath = filepath.ToSlash(relPath)
623+
if relPath == "" || relPath == "." {
624+
return &replicaCopy, nil
625+
}
626+
627+
if replicaCopy.URL != "" {
628+
u, err := url.Parse(replicaCopy.URL)
629+
if err != nil {
630+
return nil, fmt.Errorf("parse replica url: %w", err)
631+
}
632+
appendRelativePathToURL(u, relPath)
633+
replicaCopy.URL = u.String()
634+
return &replicaCopy, nil
635+
}
636+
637+
switch base.ReplicaType() {
638+
case "file":
639+
relOSPath := filepath.FromSlash(relPath)
640+
if replicaCopy.Path != "" {
641+
replicaCopy.Path = filepath.Join(replicaCopy.Path, relOSPath)
642+
} else {
643+
replicaCopy.Path = relOSPath
644+
}
645+
default:
646+
// Normalize to forward slashes for cloud/object storage backends.
647+
basePath := filepath.ToSlash(replicaCopy.Path)
648+
if basePath != "" {
649+
replicaCopy.Path = path.Join(basePath, relPath)
650+
} else {
651+
replicaCopy.Path = relPath
652+
}
653+
}
654+
655+
return &replicaCopy, nil
656+
}
657+
658+
// appendRelativePathToURL appends relPath to the URL's path component, ensuring
659+
// the result remains rooted and uses forward slashes.
660+
func appendRelativePathToURL(u *url.URL, relPath string) {
661+
cleanRel := strings.TrimPrefix(relPath, "/")
662+
if cleanRel == "" || cleanRel == "." {
663+
return
664+
}
665+
666+
basePath := u.Path
667+
var joined string
668+
if basePath == "" {
669+
joined = cleanRel
670+
} else {
671+
joined = path.Join(basePath, cleanRel)
672+
}
673+
674+
joined = "/" + strings.TrimPrefix(joined, "/")
675+
u.Path = joined
676+
}
677+
678+
// FindSQLiteDatabases recursively finds all SQLite database files in a directory.
679+
// Exported for testing.
680+
func FindSQLiteDatabases(dir string, pattern string, recursive bool) ([]string, error) {
681+
var dbPaths []string
682+
683+
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
684+
if err != nil {
685+
return err
686+
}
687+
688+
// Skip directories unless recursive
689+
if info.IsDir() {
690+
if !recursive && path != dir {
691+
return filepath.SkipDir
692+
}
693+
return nil
694+
}
695+
696+
// Check if file matches pattern
697+
matched, err := filepath.Match(pattern, filepath.Base(path))
698+
if err != nil {
699+
return err
700+
}
701+
if !matched {
702+
return nil
703+
}
704+
705+
// Check if it's a SQLite database
706+
if IsSQLiteDatabase(path) {
707+
dbPaths = append(dbPaths, path)
708+
}
709+
710+
return nil
711+
})
712+
713+
return dbPaths, err
714+
}
715+
716+
// IsSQLiteDatabase checks if a file is a SQLite database by reading its header.
717+
// Exported for testing.
718+
func IsSQLiteDatabase(path string) bool {
719+
file, err := os.Open(path)
720+
if err != nil {
721+
return false
722+
}
723+
defer file.Close()
724+
725+
// SQLite files start with "SQLite format 3\x00"
726+
header := make([]byte, 16)
727+
if _, err := file.Read(header); err != nil {
728+
return false
729+
}
730+
731+
return string(header) == "SQLite format 3\x00"
732+
}
733+
515734
// ByteSize is a custom type for parsing byte sizes from YAML.
516735
// It supports both SI units (KB, MB, GB using base 1000) and IEC units
517736
// (KiB, MiB, GiB using base 1024) as well as short forms (K, M, G).

0 commit comments

Comments
 (0)