-
Notifications
You must be signed in to change notification settings - Fork 124
feat(table): Implement delete files snapshot producer API #530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(table): Implement delete files snapshot producer API #530
Conversation
| err := df.computeDeletes(iceberg.EqualTo(iceberg.Reference("foo"), true), true) | ||
| t.Require().NoError(err) | ||
|
|
||
| updates, reqs, err := updater.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we need an idiomatic way to override the commit in snapshotUpdate. When no entries were deleted, updates and requirements should be empty instead of creating a snapshot with no changes.
https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/update/snapshot.py#L370-L373
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. Perhaps we need another base function in the impls to allow the producer to either override the commit method or otherwise indicate there was no change and thus no need to create a new snapshot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add a DeleteByPredicate method?
| snapshotProps iceberg.Properties, | ||
| ) *snapshotProducer { | ||
| prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) | ||
| prod.producerImpl = &deleteFiles{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we default predicate to AlwaysFalse{}?
| predicate iceberg.BooleanExpression | ||
| caseSensitive bool | ||
|
|
||
| computed bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does computed represent?
| partitionFilter, err := project(df.predicate) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return partitionFilter, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can just do return project(df.predicate)
| func (df *deleteFiles) ensureComputed() error { | ||
| if !df.computed { | ||
| err := df.computeDeletes(iceberg.AlwaysFalse{}, true) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've got a race condition here. You need to lock around this or use a sync.Once, or something equivalent.
| err := df.computeDeletes(iceberg.EqualTo(iceberg.Reference("foo"), true), true) | ||
| t.Require().NoError(err) | ||
|
|
||
| updates, reqs, err := updater.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. Perhaps we need another base function in the impls to allow the producer to either override the commit method or otherwise indicate there was no change and thus no need to create a new snapshot
|
@zeroshade Thank you for the feedbacks! |
Description
Testing
Pending to add unit tests