Skip to content
This repository was archived by the owner on Sep 9, 2020. It is now read-only.

Commit 580ccfd

Browse files
committed
gps: source cache: share single bolt db file between multiple singleSourceCaches
1 parent 1d632a4 commit 580ccfd

File tree

5 files changed

+126
-101
lines changed

5 files changed

+126
-101
lines changed

internal/gps/source.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,7 @@ func newSourceCoordinator(superv *supervisor, deducer deducer, cachedir string,
6565
}
6666
}
6767

68-
func (sc *sourceCoordinator) close() {
69-
for k, v := range sc.srcs {
70-
if err := v.close(); err != nil {
71-
sc.logger.Println(errors.Wrapf(err, "error closing source gateway for %q", k))
72-
}
73-
}
74-
}
68+
func (sc *sourceCoordinator) close() {}
7569

7670
func (sc *sourceCoordinator) getSourceGatewayFor(ctx context.Context, id ProjectIdentifier) (*sourceGateway, error) {
7771
if sc.supervisor.getLifetimeContext().Err() != nil {
@@ -211,10 +205,6 @@ func newSourceGateway(maybe maybeSource, superv *supervisor, cachedir string) *s
211205
return sg
212206
}
213207

214-
func (sg *sourceGateway) close() error {
215-
return errors.Wrap(sg.cache.close(), "error closing cache")
216-
}
217-
218208
func (sg *sourceGateway) syncLocal(ctx context.Context) error {
219209
sg.mu.Lock()
220210
defer sg.mu.Unlock()

internal/gps/source_cache.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@ type singleSourceCache interface {
5858
// If the input is a revision and multiple UnpairedVersions are associated
5959
// with it, whatever happens to be the first is returned.
6060
toUnpaired(v Version) (UnpairedVersion, bool)
61-
62-
// close closes the cache and any backing resources.
63-
close() error
6461
}
6562

6663
type singleSourceCacheMemory struct {
@@ -80,8 +77,6 @@ func newMemoryCache() singleSourceCache {
8077
}
8178
}
8279

83-
func (*singleSourceCacheMemory) close() error { return nil }
84-
8580
type projectInfo struct {
8681
Manifest
8782
Lock

internal/gps/source_cache_bolt.go

Lines changed: 89 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,58 @@ import (
1818
"github.com/pkg/errors"
1919
)
2020

21+
// boltCache manages a bolt.DB cache and provides singleSourceCaches.
22+
type boltCache struct {
23+
db *bolt.DB
24+
epoch int64 // getters will not return values older than this unix timestamp
25+
logger *log.Logger // info logging
26+
}
27+
28+
// newBoltCache returns a new boltCache backed by a BoltDB file under the cache directory.
29+
func newBoltCache(cd string, epoch int64, logger *log.Logger) (*boltCache, error) {
30+
path := sourceCachePath(cd, "bolt") + ".db"
31+
dir := filepath.Dir(path)
32+
if fi, err := os.Stat(dir); os.IsNotExist(err) {
33+
if err := os.MkdirAll(dir, os.ModeDir|os.ModePerm); err != nil {
34+
return nil, errors.Wrapf(err, "failed to create source cache directory: %s", dir)
35+
}
36+
} else if err != nil {
37+
return nil, errors.Wrapf(err, "failed to check source cache directory: ", dir)
38+
} else if !fi.IsDir() {
39+
return nil, errors.Wrapf(err, "source cache path is not directory: %s", dir)
40+
}
41+
db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 1 * time.Second})
42+
if err != nil {
43+
return nil, errors.Wrapf(err, "failed to open BoltDB cache file %q", path)
44+
}
45+
return &boltCache{
46+
db: db,
47+
epoch: epoch,
48+
logger: logger,
49+
}, nil
50+
}
51+
52+
// newSingleSourceCache returns a new singleSourceCache for pi.
53+
func (c *boltCache) newSingleSourceCache(pi ProjectIdentifier) singleSourceCache {
54+
return &singleSourceCacheBolt{
55+
boltCache: c,
56+
pi: pi,
57+
sourceName: []byte(pi.normalizedSource()),
58+
}
59+
}
60+
61+
// close releases all cache resources.
62+
func (c *boltCache) close() error {
63+
return errors.Wrapf(c.db.Close(), "error closing Bolt database %q", c.db.String())
64+
}
65+
2166
// singleSourceCacheBolt implements a singleSourceCache backed by a persistent BoltDB file.
2267
// Version mappings are timestamped, and the `epoch` field limits the age of returned values.
23-
// Database access methods are safe for concurrent use with each other (excluding close).
68+
// Database access methods are safe for concurrent use.
2469
//
2570
// Implementation:
2671
//
27-
// At the top level there are buckets for (1) versions and (2) revisions.
72+
// Each source has a top-level bucket containing sub-buckets for (1) versions and (2) revisions.
2873
//
2974
// 1) Versions buckets hold version keys with revision values:
3075
//
@@ -54,41 +99,9 @@ import (
5499
// Keys: "<sequence_number>"
55100
// Values: "branch:<branch>", "defaultBranch:<branch>", "ver:<version>"
56101
type singleSourceCacheBolt struct {
57-
ProjectRoot
58-
db *bolt.DB
59-
epoch int64 // getters will not return values older than this unix timestamp
60-
logger *log.Logger // info logging
61-
}
62-
63-
// newBoltCache returns a new singleSourceCacheBolt backed by a project's BoltDB file under the cache directory.
64-
func newBoltCache(cd string, pi ProjectIdentifier, epoch int64, logger *log.Logger) (*singleSourceCacheBolt, error) {
65-
path := sourceCachePath(cd, pi.normalizedSource()) + ".db"
66-
dir := filepath.Dir(path)
67-
if fi, err := os.Stat(dir); os.IsNotExist(err) {
68-
if err := os.MkdirAll(dir, os.ModeDir|os.ModePerm); err != nil {
69-
return nil, errors.Wrapf(err, "failed to create source cache directory: %s", dir)
70-
}
71-
} else if err != nil {
72-
return nil, errors.Wrapf(err, "failed to check source cache directory: ", dir)
73-
} else if !fi.IsDir() {
74-
return nil, errors.Wrapf(err, "source cache path is not directory: %s", dir)
75-
}
76-
db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 1 * time.Second})
77-
if err != nil {
78-
return nil, errors.Wrapf(err, "failed to open BoltDB cache file %q", path)
79-
}
80-
return &singleSourceCacheBolt{
81-
ProjectRoot: pi.ProjectRoot,
82-
db: db,
83-
epoch: epoch,
84-
logger: logger,
85-
}, nil
86-
}
87-
88-
// close releases all database resources.
89-
// Must not be called concurrently with any other methods.
90-
func (s *singleSourceCacheBolt) close() error {
91-
return errors.Wrapf(s.db.Close(), "error closing Bolt database %q", s.db.String())
102+
*boltCache
103+
pi ProjectIdentifier
104+
sourceName []byte
92105
}
93106

94107
func (s *singleSourceCacheBolt) setManifestAndLock(rev Revision, ai ProjectAnalyzerInfo, m Manifest, l Lock) {
@@ -239,7 +252,7 @@ func (s *singleSourceCacheBolt) getPackageTree(rev Revision) (ptree pkgtree.Pack
239252
if err != nil {
240253
return err
241254
}
242-
ptree.ImportRoot = string(s.ProjectRoot)
255+
ptree.ImportRoot = string(s.pi.ProjectRoot)
243256
ptree.Packages = pkgs
244257
ok = true
245258
return nil
@@ -260,19 +273,19 @@ func (s *singleSourceCacheBolt) markRevisionExists(rev Revision) {
260273
}
261274

262275
func (s *singleSourceCacheBolt) setVersionMap(pvs []PairedVersion) {
263-
err := s.db.Update(func(tx *bolt.Tx) error {
264-
if err := cachePrefixDelete(tx, "versions:"); err != nil {
276+
err := s.updateSourceBucket(func(src *bolt.Bucket) error {
277+
if err := cachePrefixDelete(src, "versions:"); err != nil {
265278
return err
266279
}
267280
vk := cacheTimestampedKey("versions:", time.Now())
268-
versions, err := tx.CreateBucket(vk)
281+
versions, err := src.CreateBucket(vk)
269282
if err != nil {
270283
return err
271284
}
272285

273-
c := tx.Cursor()
286+
c := src.Cursor()
274287
for k, _ := c.Seek(cacheRev); bytes.HasPrefix(k, cacheRev); k, _ = c.Next() {
275-
rb := tx.Bucket(k)
288+
rb := src.Bucket(k)
276289
if err := cachePrefixDelete(rb, "versions:"); err != nil {
277290
return err
278291
}
@@ -289,7 +302,7 @@ func (s *singleSourceCacheBolt) setVersionMap(pvs []PairedVersion) {
289302
return errors.Wrap(err, "failed to put version->revision")
290303
}
291304

292-
b, err := tx.CreateBucketIfNotExists(cacheRevisionName(rev))
305+
b, err := src.CreateBucketIfNotExists(cacheRevisionName(rev))
293306
if err != nil {
294307
return errors.Wrapf(err, "failed to create bucket for revision: %s", rev)
295308
}
@@ -344,8 +357,8 @@ func (s *singleSourceCacheBolt) getVersionsFor(rev Revision) (uvs []UnpairedVers
344357

345358
func (s *singleSourceCacheBolt) getAllVersions() []PairedVersion {
346359
var pvs []PairedVersion
347-
err := s.db.View(func(tx *bolt.Tx) error {
348-
versions := cacheFindLatestValid(tx, "versions:", s.epoch)
360+
err := s.viewSourceBucket(func(src *bolt.Bucket) error {
361+
versions := cacheFindLatestValid(src, "versions:", s.epoch)
349362
if versions == nil {
350363
return nil
351364
}
@@ -367,8 +380,8 @@ func (s *singleSourceCacheBolt) getAllVersions() []PairedVersion {
367380
}
368381

369382
func (s *singleSourceCacheBolt) getRevisionFor(uv UnpairedVersion) (rev Revision, ok bool) {
370-
err := s.db.View(func(tx *bolt.Tx) error {
371-
versions := cacheFindLatestValid(tx, "versions:", s.epoch)
383+
err := s.viewSourceBucket(func(src *bolt.Bucket) error {
384+
versions := cacheFindLatestValid(src, "versions:", s.epoch)
372385
if versions == nil {
373386
return nil
374387
}
@@ -452,22 +465,44 @@ func cacheRevisionName(rev Revision) []byte {
452465
return name
453466
}
454467

455-
// viewRevBucket executes view with rev's bucket, if it exists.
456-
func (s *singleSourceCacheBolt) viewRevBucket(rev Revision, view func(b *bolt.Bucket) error) error {
468+
// viewSourceBucket executes view with the source bucket, if it exists.
469+
func (s *singleSourceCacheBolt) viewSourceBucket(view func(b *bolt.Bucket) error) error {
457470
return s.db.View(func(tx *bolt.Tx) error {
458-
b := tx.Bucket(cacheRevisionName(rev))
471+
b := tx.Bucket(s.sourceName)
459472
if b == nil {
460473
return nil
461474
}
462475
return view(b)
463476
})
464477
}
465478

466-
// updateRevBucket executes update with rev's bucket, creating it first if necessary.
467-
func (s *singleSourceCacheBolt) updateRevBucket(rev Revision, update func(b *bolt.Bucket) error) error {
479+
// updateSourceBucket executes update with the source bucket, creating it first if necessary.
480+
func (s *singleSourceCacheBolt) updateSourceBucket(update func(b *bolt.Bucket) error) error {
468481
return s.db.Update(func(tx *bolt.Tx) error {
482+
b, err := tx.CreateBucketIfNotExists(s.sourceName)
483+
if err != nil {
484+
return errors.Wrapf(err, "failed to create bucket: %s", s.sourceName)
485+
}
486+
return update(b)
487+
})
488+
}
489+
490+
// viewRevBucket executes view with rev's bucket for this source, if it exists.
491+
func (s *singleSourceCacheBolt) viewRevBucket(rev Revision, view func(b *bolt.Bucket) error) error {
492+
return s.viewSourceBucket(func(src *bolt.Bucket) error {
493+
b := src.Bucket(cacheRevisionName(rev))
494+
if b == nil {
495+
return nil
496+
}
497+
return view(b)
498+
})
499+
}
500+
501+
// updateRevBucket executes update with rev's bucket for this source, creating it first if necessary.
502+
func (s *singleSourceCacheBolt) updateRevBucket(rev Revision, update func(b *bolt.Bucket) error) error {
503+
return s.updateSourceBucket(func(src *bolt.Bucket) error {
469504
name := cacheRevisionName(rev)
470-
b, err := tx.CreateBucketIfNotExists(name)
505+
b, err := src.CreateBucketIfNotExists(name)
471506
if err != nil {
472507
return errors.Wrapf(err, "failed to create bucket: %s", name)
473508
}

internal/gps/source_cache_bolt_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ func TestBoltCacheTimeout(t *testing.T) {
2424
logger := log.New(test.Writer{t}, "", 0)
2525

2626
start := time.Now()
27-
c, err := newBoltCache(cpath, pi, start.Unix(), logger)
27+
bc, err := newBoltCache(cpath, start.Unix(), logger)
2828
if err != nil {
2929
t.Fatal(err)
3030
}
31-
defer c.close()
31+
defer bc.close()
32+
c := bc.newSingleSourceCache(pi)
3233

3334
rev := Revision("test")
3435
ai := ProjectAnalyzerInfo{Name: "name", Version: 42}
@@ -132,7 +133,7 @@ func TestBoltCacheTimeout(t *testing.T) {
132133
}
133134
}
134135

135-
if err := c.close(); err != nil {
136+
if err := bc.close(); err != nil {
136137
t.Fatal("failed to close cache:", err)
137138
}
138139

@@ -143,10 +144,11 @@ func TestBoltCacheTimeout(t *testing.T) {
143144
// Ensure a future timestamp.
144145
after = start.Add(10 * time.Second)
145146
}
146-
c, err = newBoltCache(cpath, pi, after.Unix(), logger)
147+
bc, err = newBoltCache(cpath, after.Unix(), logger)
147148
if err != nil {
148149
t.Fatal(err)
149150
}
151+
c = bc.newSingleSourceCache(pi)
150152

151153
gotM, gotL, ok := c.getManifestAndLock(rev, ai)
152154
if !ok {
@@ -169,15 +171,16 @@ func TestBoltCacheTimeout(t *testing.T) {
169171
}
170172
}
171173

172-
if err := c.close(); err != nil {
174+
if err := bc.close(); err != nil {
173175
t.Fatal("failed to close cache:", err)
174176
}
175177

176178
// Re-connect with the original epoch.
177-
c, err = newBoltCache(cpath, pi, start.Unix(), logger)
179+
bc, err = newBoltCache(cpath, start.Unix(), logger)
178180
if err != nil {
179181
t.Fatal(err)
180182
}
183+
c = bc.newSingleSourceCache(pi)
181184
// Read values timestamped > `start`.
182185
{
183186
gotM, gotL, ok := c.getManifestAndLock(rev, ai)

0 commit comments

Comments
 (0)