-
Notifications
You must be signed in to change notification settings - Fork 131
feat(table): Implement delete files snapshot producer API #530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -410,6 +410,232 @@ | |
| return append(result, unmergedDeleteManifests...), nil | ||
| } | ||
|
|
||
| func newDeleteFilesProducer( | ||
| op Operation, | ||
| txn *Transaction, | ||
| fs iceio.WriteFileIO, | ||
| commitUUID *uuid.UUID, | ||
| snapshotProps iceberg.Properties, | ||
| ) *snapshotProducer { | ||
| prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) | ||
| prod.producerImpl = &deleteFiles{ | ||
| base: prod, | ||
| computed: false, | ||
| keepManifests: make([]iceberg.ManifestFile, 0), | ||
| removedEntries: make([]iceberg.ManifestEntry, 0), | ||
| partialRewrites: false, | ||
| } | ||
|
|
||
| return prod | ||
| } | ||
|
|
||
| type deleteFiles struct { | ||
| base *snapshotProducer | ||
|
|
||
| predicate iceberg.BooleanExpression | ||
| caseSensitive bool | ||
|
|
||
| computed bool | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does |
||
| keepManifests []iceberg.ManifestFile | ||
| removedEntries []iceberg.ManifestEntry | ||
| partialRewrites bool | ||
| } | ||
|
|
||
| func (df *deleteFiles) buildPartitionProjection(specID int) (iceberg.BooleanExpression, error) { | ||
| schema := df.base.txn.meta.CurrentSchema() | ||
| spec := df.base.spec(specID) | ||
| project := newInclusiveProjection(schema, spec, df.caseSensitive) | ||
| partitionFilter, err := project(df.predicate) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return partitionFilter, nil | ||
| } | ||
|
|
||
| func (df *deleteFiles) partitionFilters() *keyDefaultMap[int, iceberg.BooleanExpression] { | ||
| return newKeyDefaultMapWrapErr(df.buildPartitionProjection) | ||
| } | ||
|
|
||
| func (df *deleteFiles) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile) (bool, error), error) { | ||
| spec := df.base.spec(specID) | ||
| schema := df.base.txn.meta.CurrentSchema() | ||
|
|
||
| return newManifestEvaluator( | ||
| spec, | ||
| schema, | ||
| df.partitionFilters().Get(specID), | ||
| df.caseSensitive) | ||
| } | ||
|
|
||
| func (df *deleteFiles) copyWithNewStatus(entry iceberg.ManifestEntry, status iceberg.ManifestEntryStatus) iceberg.ManifestEntry { | ||
| snapId := entry.SnapshotID() | ||
| if status == iceberg.EntryStatusDELETED { | ||
| snapId = df.base.snapshotID | ||
| } | ||
|
|
||
| seqNum := entry.SequenceNum() | ||
|
|
||
| return iceberg.NewManifestEntry( | ||
| status, | ||
| &snapId, | ||
| &seqNum, | ||
| entry.FileSequenceNum(), | ||
| entry.DataFile(), | ||
| ) | ||
| } | ||
|
|
||
| func (df *deleteFiles) computeDeletes(predicate iceberg.BooleanExpression, caseSensitive bool) error { | ||
| df.predicate = predicate | ||
| df.caseSensitive = caseSensitive | ||
|
|
||
| schema := df.base.txn.meta.CurrentSchema() | ||
| manifestEvaluators := newKeyDefaultMapWrapErr(df.buildManifestEvaluator) | ||
| strictMetricsEvaluator, err := newStrictMetricsEvaluator(schema, predicate, caseSensitive, false) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| inclusiveMetricsEvaluator, err := newInclusiveMetricsEvaluator(schema, predicate, caseSensitive, false) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // table has no snapshot, nothing to compute | ||
| if df.base.parentSnapshotID <= 0 { | ||
| return nil | ||
| } | ||
|
|
||
| snapshot, err := df.base.txn.meta.SnapshotByID(df.base.parentSnapshotID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| manifestFiles, err := snapshot.Manifests(df.base.io) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for _, manifestFile := range manifestFiles { | ||
| if manifestFile.ManifestContent() == iceberg.ManifestContentData { | ||
| containMatch, err := manifestEvaluators.Get(int(manifestFile.PartitionSpecID()))(manifestFile) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if !containMatch { | ||
| // if manifest file doesn't contain relevant rows matched the predicate, | ||
| // keep it in the manifest list | ||
| df.keepManifests = append(df.keepManifests, manifestFile) | ||
| } else { | ||
| // else evaluate each manifest entry and the corresponding data file in the manifest file | ||
| deleteEntries := make([]iceberg.ManifestEntry, 0) | ||
| keepEntries := make([]iceberg.ManifestEntry, 0) | ||
| entries, err := manifestFile.FetchEntries(df.base.io, true) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for _, entry := range entries { | ||
| ok, err := strictMetricsEvaluator(entry.DataFile()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if ok == rowsMustMatch { | ||
|
Check failure on line 542 in table/snapshot_producers.go
|
||
| // based on entry metadata, all rows in the data file matched the predicate | ||
| deleteEntries = append(deleteEntries, df.copyWithNewStatus(entry, iceberg.EntryStatusDELETED)) | ||
| } else { | ||
| // can't determine based on entry metadata | ||
| keepEntries = append(keepEntries, df.copyWithNewStatus(entry, iceberg.EntryStatusEXISTING)) | ||
| ok, err = inclusiveMetricsEvaluator(entry.DataFile()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if ok != rowsMightNotMatch { | ||
|
Check failure on line 552 in table/snapshot_producers.go
|
||
| df.partialRewrites = true | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if len(deleteEntries) > 0 { | ||
| df.removedEntries = append(df.removedEntries, deleteEntries...) | ||
|
|
||
| // rewrite the manifests | ||
| if len(keepEntries) > 0 { | ||
| spec := df.base.spec(int(manifestFile.PartitionSpecID())) | ||
| wr, path, counter, err := df.base.newManifestWriter(spec) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| for _, entry := range keepEntries { | ||
| err = wr.Existing(entry) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| if err = wr.Close(); err != nil { | ||
| return err | ||
| } | ||
| mf, err := wr.ToManifestFile(path, counter.Count) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| df.keepManifests = append(df.keepManifests, mf) | ||
| } | ||
| } else { | ||
| df.keepManifests = append(df.keepManifests, manifestFile) | ||
| } | ||
| } | ||
| } else { | ||
| df.keepManifests = append(df.keepManifests, manifestFile) | ||
| } | ||
| } | ||
| df.computed = true | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (df *deleteFiles) ensureComputed() error { | ||
| if !df.computed { | ||
| err := df.computeDeletes(iceberg.AlwaysFalse{}, true) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
Comment on lines
+596
to
+605
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You've got a race condition here. You need to lock around this or use a sync.Once, or something equivalent. |
||
|
|
||
| func (df *deleteFiles) rewriteNeeded() (bool, error) { | ||
| err := df.ensureComputed() | ||
| if err != nil { | ||
| return false, err | ||
| } | ||
|
|
||
| return df.partialRewrites, nil | ||
| } | ||
|
|
||
| func (df *deleteFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { | ||
| // no post processing | ||
| return manifests, nil | ||
| } | ||
|
|
||
| func (df *deleteFiles) existingManifests() ([]iceberg.ManifestFile, error) { | ||
| err := df.ensureComputed() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return df.keepManifests, nil | ||
| } | ||
|
|
||
| func (df *deleteFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { | ||
| err := df.ensureComputed() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return df.removedEntries, nil | ||
| } | ||
|
|
||
| type snapshotProducer struct { | ||
| producerImpl | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we default
predicatetoAlwaysFalse{}?