-
Notifications
You must be signed in to change notification settings - Fork 120
feat(table): Support Dynamic Partition Overwrite #482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(table): Support Dynamic Partition Overwrite #482
Conversation
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
table/transaction.go
Outdated
} | ||
|
||
// Delete performs a delete operation with the given filter and snapshot properties. | ||
func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm a little bit doubting here, DPO suppose to delete the whole file inside a partition, this Delete method semantic is odd, per description i would guess that this should delete rows based on predicate, but in a fact it deletes whole files.
maybe it worth to rename method and make it private?
is there any need to keep it public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right. I think it should be a private method. I changed the name to make it more meaningful
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
// Check that all partition fields use identity transforms | ||
currentSpec := t.meta.CurrentSpec() | ||
for field := range currentSpec.Fields() { | ||
if _, ok := field.Transform.(iceberg.IdentityTransform); !ok { | ||
return fmt.Errorf("%w: dynamic overwrite does not support non-identity-transform fields in partition spec: %s", | ||
ErrInvalidOperation, field.Name) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this defined in the spec? Or is this just a NotYetImplemented thing?
if tbl.NumRows() == 0 { | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this overwrite the partition with an empty partition?
var allDataFiles []iceberg.DataFile | ||
for df, err := range dataFiles { | ||
if err != nil { | ||
return err | ||
} | ||
allDataFiles = append(allDataFiles, df) | ||
} | ||
|
||
partitionsToOverwrite := make(map[string]struct{}) | ||
for _, df := range allDataFiles { | ||
partitionKey := fmt.Sprintf("%v", df.Partition()) | ||
partitionsToOverwrite[partitionKey] = struct{}{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can probably merge these loops
return err | ||
} | ||
|
||
deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this use the commitUUID?
partitionExpr := partitionExprs[0] | ||
for _, expr := range partitionExprs[1:] { | ||
partitionExpr = iceberg.NewAnd(partitionExpr, expr) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is already handled via NewAnd
. You can do: partitionExpr := iceberg.NewAnd(partitionExprs[0], partitionExprs[1], partitionExprs[2:]...)
result := expressions[0] | ||
for _, expr := range expressions[1:] { | ||
result = iceberg.NewOr(result, expr) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above, iceberg.NewOr
already handles an arbitrary number of arguments so you don't have to do this loop manually
func parsePartitionKey(partitionKey string, fieldNames []string) []interface{} { | ||
// Simple parsing for demonstration - assumes a format like "field1=value1/field2=value2" | ||
parts := strings.Split(partitionKey, "/") | ||
values := make([]interface{}, len(fieldNames)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have the schema, we can use the field names to determine the types so we know what type to parse into from the strings
switch t := typ.(type) { | ||
case iceberg.PrimitiveType: | ||
switch t { | ||
case iceberg.PrimitiveTypes.Int32: | ||
if v, ok := value.(int32); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.Int64: | ||
if v, ok := value.(int64); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.Float32: | ||
if v, ok := value.(float32); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.Float64: | ||
if v, ok := value.(float64); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.String: | ||
if v, ok := value.(string); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.Bool: | ||
if v, ok := value.(bool); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the types and casting should be handled for you once the expression is bound. So you shouldn't need the iceberg.Type
, just do a switch on value.(type)
and calling iceberg.EqualTo(term, v)
} | ||
|
||
// deleteFileByFilter performs a delete operation with the given filter and snapshot properties. | ||
func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also working on a complete delete API (CoW) that can delete row level and file level based on predicate in #518.
Hopefully we don't need this method once the full delete API is supported.
No description provided.