-
Notifications
You must be signed in to change notification settings - Fork 858
Seidb restructure #2653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Seidb restructure #2653
Changes from all commits
915a3dc
206931d
a876284
ad238ab
98c663c
c8f6aa2
9069021
b7268ff
0a4174c
1cabf1f
bbb7a84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| package parquet | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| package pebbledb | ||
|
|
||
| import ( | ||
| "encoding/binary" | ||
| goerrors "errors" | ||
| "fmt" | ||
|
|
||
| "github.com/cockroachdb/pebble" | ||
| ) | ||
|
|
||
| const ( | ||
| currentVersionKey = "s/_meta/version" // the key name of the version. | ||
| VersionSize = 8 // the number of bytes needed to store the version. | ||
| ) | ||
|
|
||
| // Batch is a set of modifications to apply to the store. | ||
| type Batch struct { | ||
| db *pebble.DB | ||
| batch *pebble.Batch | ||
| writeOps *pebble.WriteOptions | ||
| } | ||
|
|
||
| // NewBatch creates new batch. | ||
| func NewBatch(db *pebble.DB) *Batch { | ||
| batch := db.NewBatch() | ||
| return &Batch{ | ||
| db: db, | ||
| batch: batch, | ||
| writeOps: pebble.NoSync, | ||
| } | ||
| } | ||
|
|
||
| // Size returns number of operations in the batch. | ||
| func (b *Batch) Size() int { | ||
| return b.batch.Len() | ||
| } | ||
|
|
||
| // Reset resets the batch. | ||
| func (b *Batch) Reset() { | ||
| b.batch.Reset() | ||
| } | ||
|
|
||
| // Set sets key. | ||
| func (b *Batch) Set(key, value []byte) error { | ||
| if err := b.batch.Set(key, value, nil); err != nil { | ||
| return fmt.Errorf("failed to write PebbleDB batch: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // SetCurrentVersion sets the version metadata. | ||
| func (b *Batch) SetCurrentVersion(version uint64) error { | ||
| var versionBz [VersionSize]byte | ||
| binary.LittleEndian.PutUint64(versionBz[:], version) | ||
| if err := b.batch.Set([]byte(currentVersionKey), versionBz[:], nil); err != nil { | ||
| return fmt.Errorf("failed to write current version to PebbleDB batch: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Delete deletes key from the store. | ||
| func (b *Batch) Delete(key []byte) error { | ||
| return b.batch.Delete(key, nil) | ||
| } | ||
|
|
||
| // Commit commits changes. | ||
| func (b *Batch) Commit() (err error) { | ||
| defer func() { | ||
| err = goerrors.Join(err, b.batch.Close()) | ||
| }() | ||
|
|
||
| return b.batch.Commit(b.writeOps) | ||
| } | ||
|
|
||
| // SetByStore sets key in the store. | ||
| func (b *Batch) SetByStore(storeKey string, key, value []byte) error { | ||
| prefixedKey := prependStoreKey(storeKey, key) | ||
| if err := b.batch.Set(prefixedKey, value, nil); err != nil { | ||
| return fmt.Errorf("failed to write PebbleDB batch: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // DeleteByStore deletes key from the store. | ||
| func (b *Batch) DeleteByStore(storeKey string, key []byte) error { | ||
| prefixedKey := prependStoreKey(storeKey, key) | ||
| return b.batch.Delete(prefixedKey, nil) | ||
| } | ||
|
|
||
| func getStorePrefix(storeKey string) []byte { | ||
| // "s/k:" + storeKey + "/" | ||
| b := make([]byte, 0, len("s/k:/")+len(storeKey)) | ||
| b = append(b, "s/k:"...) | ||
| b = append(b, storeKey...) | ||
| b = append(b, '/') | ||
| return b | ||
| } | ||
|
|
||
| func prependStoreKey(storeKey string, key []byte) []byte { | ||
| if storeKey == "" { | ||
| return key | ||
| } | ||
| return append(getStorePrefix(storeKey), key...) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| package pebbledb | ||
|
|
||
| import ( | ||
| "github.com/cockroachdb/pebble" | ||
| "github.com/cockroachdb/pebble/bloom" | ||
| "github.com/pkg/errors" | ||
| "golang.org/x/exp/slices" | ||
| ) | ||
|
|
||
| // Database represents database. | ||
| type Database struct { | ||
| storage *pebble.DB | ||
| writeOps *pebble.WriteOptions | ||
| } | ||
|
|
||
| // OpenDB opens an existing or create a new database. | ||
| func OpenDB(dbPath string) *Database { | ||
| cache := pebble.NewCache(1024 * 1024 * 512) | ||
| defer cache.Unref() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have to keep the defer cache.Unref() - it's the correct pattern recommended by PebbleDB. The idea is to keep reference counting down to 0 when closing the DB.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i see!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i double checked this issue. the |
||
| opts := &pebble.Options{ | ||
| Cache: cache, | ||
| FormatMajorVersion: pebble.FormatNewest, | ||
| L0CompactionThreshold: 4, | ||
| L0StopWritesThreshold: 1000, | ||
| LBaseMaxBytes: 64 << 20, // 64 MB | ||
| Levels: make([]pebble.LevelOptions, 7), | ||
| MaxConcurrentCompactions: func() int { return 3 }, | ||
| MemTableSize: 64 << 20, | ||
| MemTableStopWritesThreshold: 4, | ||
| DisableWAL: false, | ||
| } | ||
|
|
||
| for i := range opts.Levels { | ||
| l := &opts.Levels[i] | ||
| l.BlockSize = 32 << 10 // 32 KB | ||
| l.IndexBlockSize = 256 << 10 // 256 KB | ||
| l.FilterPolicy = bloom.FilterPolicy(10) | ||
| l.FilterType = pebble.TableFilter | ||
| if i > 1 { | ||
| l.Compression = pebble.ZstdCompression | ||
| } | ||
| if i > 0 { | ||
| l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2 | ||
| } | ||
| l.EnsureDefaults() | ||
| } | ||
| opts.Levels[6].FilterPolicy = nil | ||
| opts.FlushSplitBytes = opts.Levels[0].TargetFileSize | ||
| opts = opts.EnsureDefaults() | ||
|
|
||
| db, err := pebble.Open(dbPath, opts) | ||
| if err != nil { | ||
| panic(err) | ||
| } | ||
|
|
||
| database := &Database{ | ||
| storage: db, | ||
| writeOps: pebble.NoSync, | ||
| } | ||
|
|
||
| return database | ||
| } | ||
|
|
||
| // Has checks if key is available. | ||
| func (db *Database) Has(key []byte) (bool, error) { | ||
| val, err := db.Get(key) | ||
| if err != nil { | ||
| return false, errors.WithStack(err) | ||
| } | ||
| return val != nil, nil | ||
| } | ||
|
|
||
| // Get returns value by key. | ||
| // The returned value is a copy and safe to use after this call returns. | ||
| func (db *Database) Get(key []byte) ([]byte, error) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from ai: Pebble Get() returns a value that is only valid until the returned closer is closed.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's interesting, I think this is copied from v3 and we didn't really see this issue at all in v3. Will still fix it though |
||
| value, closer, err := db.storage.Get(key) | ||
| if err != nil { | ||
| if errors.Is(err, pebble.ErrNotFound) { | ||
| return nil, nil | ||
| } | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| defer func() { | ||
| _ = closer.Close() | ||
| }() | ||
| // Must clone the value before closer.Close() is called, | ||
| // as PebbleDB's zero-copy semantics mean the underlying | ||
| // memory is only valid until the closer is closed. | ||
| return slices.Clone(value), nil | ||
| } | ||
|
|
||
| // Set override and persist key,value pair. | ||
| func (db *Database) Set(key []byte, value []byte) error { | ||
| return db.storage.Set(key, value, db.writeOps) | ||
| } | ||
|
|
||
| // Close closes the database. | ||
| func (db *Database) Close() error { | ||
| _ = db.storage.Flush() | ||
| err := db.storage.Close() | ||
| return errors.WithStack(err) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| package pebbledb | ||
| package mvcc | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍