Skip to content

Commit ab330b8

Browse files
corylanouclaude
andcommitted
feat: add dynamic directory watcher for automatic database discovery
Implement real-time monitoring of directory replication paths using fsnotify. The DirectoryMonitor automatically detects when SQLite databases are created or removed from watched directories and dynamically adds/removes them from replication without requiring restarts. Key features: - Automatic database discovery with pattern matching - Support for recursive directory watching - Thread-safe database lifecycle management - New Store.AddDB() and Store.RemoveDB() methods for dynamic management - Comprehensive integration tests for lifecycle validation This enhancement builds on the existing directory replication feature (#738) by making it fully dynamic for use cases like multi-tenant SaaS where databases are created and destroyed frequently. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent ee08140 commit ab330b8

File tree

7 files changed

+548
-41
lines changed

7 files changed

+548
-41
lines changed
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log/slog"
7+
"os"
8+
"path/filepath"
9+
"strings"
10+
"sync"
11+
12+
"github.com/fsnotify/fsnotify"
13+
14+
"github.com/benbjohnson/litestream"
15+
)
16+
17+
// DirectoryMonitor watches a directory tree for SQLite databases and dynamically
18+
// manages database instances within the store as files are created or removed.
19+
type DirectoryMonitor struct {
20+
store *litestream.Store
21+
config *DBConfig
22+
dirPath string
23+
pattern string
24+
recursive bool
25+
26+
watcher *fsnotify.Watcher
27+
ctx context.Context
28+
cancel context.CancelFunc
29+
30+
logger *slog.Logger
31+
32+
mu sync.Mutex
33+
dbs map[string]*litestream.DB
34+
watchedDirs map[string]struct{}
35+
36+
wg sync.WaitGroup
37+
}
38+
39+
// NewDirectoryMonitor returns a new monitor for directory-based replication.
40+
func NewDirectoryMonitor(ctx context.Context, store *litestream.Store, dbc *DBConfig, existing []*litestream.DB) (*DirectoryMonitor, error) {
41+
if dbc == nil {
42+
return nil, errors.New("directory config required")
43+
}
44+
if store == nil {
45+
return nil, errors.New("store required")
46+
}
47+
48+
dirPath, err := expand(dbc.Dir)
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
if _, err := os.Stat(dirPath); err != nil {
54+
return nil, err
55+
}
56+
57+
watcher, err := fsnotify.NewWatcher()
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
monitorCtx, cancel := context.WithCancel(ctx)
63+
dm := &DirectoryMonitor{
64+
store: store,
65+
config: dbc,
66+
dirPath: dirPath,
67+
pattern: dbc.Pattern,
68+
recursive: dbc.Recursive,
69+
watcher: watcher,
70+
ctx: monitorCtx,
71+
cancel: cancel,
72+
logger: slog.With("dir", dirPath),
73+
dbs: make(map[string]*litestream.DB),
74+
watchedDirs: make(map[string]struct{}),
75+
}
76+
77+
for _, db := range existing {
78+
dm.dbs[db.Path()] = db
79+
}
80+
81+
if err := dm.addInitialWatches(); err != nil {
82+
watcher.Close()
83+
cancel()
84+
return nil, err
85+
}
86+
87+
dm.wg.Add(1)
88+
go dm.run()
89+
90+
return dm, nil
91+
}
92+
93+
// Close stops the directory monitor and releases resources.
94+
func (dm *DirectoryMonitor) Close() {
95+
dm.cancel()
96+
_ = dm.watcher.Close()
97+
dm.wg.Wait()
98+
}
99+
100+
func (dm *DirectoryMonitor) run() {
101+
defer dm.wg.Done()
102+
103+
for {
104+
select {
105+
case <-dm.ctx.Done():
106+
return
107+
case event, ok := <-dm.watcher.Events:
108+
if !ok {
109+
return
110+
}
111+
dm.handleEvent(event)
112+
case err, ok := <-dm.watcher.Errors:
113+
if !ok {
114+
return
115+
}
116+
dm.logger.Error("directory watcher error", "error", err)
117+
}
118+
}
119+
}
120+
121+
func (dm *DirectoryMonitor) addInitialWatches() error {
122+
if dm.recursive {
123+
return filepath.WalkDir(dm.dirPath, func(path string, d os.DirEntry, err error) error {
124+
if err != nil {
125+
return err
126+
}
127+
if !d.IsDir() {
128+
return nil
129+
}
130+
return dm.addDirectoryWatch(path)
131+
})
132+
}
133+
134+
return dm.addDirectoryWatch(dm.dirPath)
135+
}
136+
137+
func (dm *DirectoryMonitor) addDirectoryWatch(path string) error {
138+
abspath := filepath.Clean(path)
139+
140+
dm.mu.Lock()
141+
if _, ok := dm.watchedDirs[abspath]; ok {
142+
dm.mu.Unlock()
143+
return nil
144+
}
145+
dm.watchedDirs[abspath] = struct{}{}
146+
dm.mu.Unlock()
147+
148+
if err := dm.watcher.Add(abspath); err != nil {
149+
dm.mu.Lock()
150+
delete(dm.watchedDirs, abspath)
151+
dm.mu.Unlock()
152+
return err
153+
}
154+
155+
dm.logger.Debug("watching directory", "path", abspath)
156+
return nil
157+
}
158+
159+
func (dm *DirectoryMonitor) removeDirectoryWatch(path string) {
160+
abspath := filepath.Clean(path)
161+
162+
dm.mu.Lock()
163+
if _, ok := dm.watchedDirs[abspath]; !ok {
164+
dm.mu.Unlock()
165+
return
166+
}
167+
delete(dm.watchedDirs, abspath)
168+
dm.mu.Unlock()
169+
170+
if err := dm.watcher.Remove(abspath); err != nil {
171+
dm.logger.Debug("remove directory watch", "path", abspath, "error", err)
172+
}
173+
}
174+
175+
func (dm *DirectoryMonitor) handleEvent(event fsnotify.Event) {
176+
path := filepath.Clean(event.Name)
177+
if path == "" {
178+
return
179+
}
180+
181+
info, statErr := os.Stat(path)
182+
isDir := statErr == nil && info.IsDir()
183+
184+
if dm.recursive && isDir && event.Op&(fsnotify.Create|fsnotify.Rename) != 0 {
185+
if err := dm.addDirectoryWatch(path); err != nil {
186+
dm.logger.Error("add directory watch", "path", path, "error", err)
187+
}
188+
}
189+
190+
if isDir {
191+
if event.Op&(fsnotify.Remove|fsnotify.Rename) != 0 {
192+
dm.removeDirectoryWatch(path)
193+
dm.removeDatabasesUnder(path)
194+
}
195+
return
196+
}
197+
198+
if statErr != nil && !os.IsNotExist(statErr) {
199+
dm.logger.Debug("stat event path", "path", path, "error", statErr)
200+
return
201+
}
202+
203+
if event.Op&(fsnotify.Remove|fsnotify.Rename) != 0 {
204+
dm.removeDatabase(path)
205+
return
206+
}
207+
208+
if event.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Rename) != 0 {
209+
dm.handlePotentialDatabase(path)
210+
}
211+
}
212+
213+
func (dm *DirectoryMonitor) handlePotentialDatabase(path string) {
214+
if !dm.matchesPattern(path) {
215+
return
216+
}
217+
218+
dm.mu.Lock()
219+
if _, exists := dm.dbs[path]; exists {
220+
dm.mu.Unlock()
221+
return
222+
}
223+
dm.dbs[path] = nil
224+
dm.mu.Unlock()
225+
226+
if !IsSQLiteDatabase(path) {
227+
dm.mu.Lock()
228+
delete(dm.dbs, path)
229+
dm.mu.Unlock()
230+
return
231+
}
232+
233+
db, err := newDBFromDirectoryEntry(dm.config, dm.dirPath, path)
234+
if err != nil {
235+
dm.mu.Lock()
236+
delete(dm.dbs, path)
237+
dm.mu.Unlock()
238+
dm.logger.Error("configure database", "path", path, "error", err)
239+
return
240+
}
241+
242+
if err := dm.store.AddDB(db); err != nil {
243+
dm.mu.Lock()
244+
delete(dm.dbs, path)
245+
dm.mu.Unlock()
246+
dm.logger.Error("add database to store", "path", path, "error", err)
247+
return
248+
}
249+
250+
dm.mu.Lock()
251+
dm.dbs[path] = db
252+
dm.mu.Unlock()
253+
254+
dm.logger.Info("added database to replication", "path", path)
255+
}
256+
257+
func (dm *DirectoryMonitor) removeDatabase(path string) {
258+
dm.mu.Lock()
259+
db := dm.dbs[path]
260+
delete(dm.dbs, path)
261+
dm.mu.Unlock()
262+
263+
if db == nil {
264+
return
265+
}
266+
267+
if err := dm.store.RemoveDB(context.Background(), db.Path()); err != nil {
268+
dm.logger.Error("remove database from store", "path", path, "error", err)
269+
return
270+
}
271+
272+
dm.logger.Info("removed database from replication", "path", path)
273+
}
274+
275+
func (dm *DirectoryMonitor) removeDatabasesUnder(dir string) {
276+
prefix := dir + string(os.PathSeparator)
277+
278+
dm.mu.Lock()
279+
var toClose []*litestream.DB
280+
for path, db := range dm.dbs {
281+
if path == dir || strings.HasPrefix(path, prefix) {
282+
toClose = append(toClose, db)
283+
delete(dm.dbs, path)
284+
}
285+
}
286+
dm.mu.Unlock()
287+
288+
for _, db := range toClose {
289+
if db == nil {
290+
continue
291+
}
292+
if err := dm.store.RemoveDB(context.Background(), db.Path()); err != nil {
293+
dm.logger.Error("remove database from store", "path", db.Path(), "error", err)
294+
}
295+
}
296+
}
297+
298+
func (dm *DirectoryMonitor) matchesPattern(path string) bool {
299+
matched, err := filepath.Match(dm.pattern, filepath.Base(path))
300+
if err != nil {
301+
dm.logger.Error("pattern match failed", "pattern", dm.pattern, "path", path, "error", err)
302+
return false
303+
}
304+
return matched
305+
}

0 commit comments

Comments
 (0)