-
Notifications
You must be signed in to change notification settings - Fork 121
feat(table): Support Dynamic Partition Overwrite #482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,8 @@ import ( | |
"fmt" | ||
"runtime" | ||
"slices" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -343,6 +345,250 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp | |
return t.apply(updates, reqs) | ||
} | ||
|
||
// DynamicPartitionOverwrite performs a dynamic partition overwrite operation. | ||
// It detects partition values in the provided arrow table using the current | ||
// partition spec, deletes existing partitions matching these values, and then | ||
// appends the new data. | ||
func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error { | ||
if t.meta.CurrentSpec().IsUnpartitioned() { | ||
return fmt.Errorf("%w: cannot apply dynamic overwrite on an unpartitioned table", ErrInvalidOperation) | ||
} | ||
|
||
// Check that all partition fields use identity transforms | ||
currentSpec := t.meta.CurrentSpec() | ||
for field := range currentSpec.Fields() { | ||
if _, ok := field.Transform.(iceberg.IdentityTransform); !ok { | ||
return fmt.Errorf("%w: dynamic overwrite does not support non-identity-transform fields in partition spec: %s", | ||
ErrInvalidOperation, field.Name) | ||
} | ||
} | ||
|
||
if tbl.NumRows() == 0 { | ||
return nil | ||
} | ||
Comment on lines
+366
to
+368
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this overwrite the partition with an empty partition? |
||
|
||
fs, err := t.tbl.fsF(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
commitUUID := uuid.New() | ||
rdr := array.NewTableReader(tbl, batchSize) | ||
defer rdr.Release() | ||
dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ | ||
sc: tbl.Schema(), | ||
itr: array.IterFromReader(rdr), | ||
fs: fs.(io.WriteFileIO), | ||
writeUUID: &commitUUID, | ||
}) | ||
|
||
var allDataFiles []iceberg.DataFile | ||
for df, err := range dataFiles { | ||
if err != nil { | ||
return err | ||
} | ||
allDataFiles = append(allDataFiles, df) | ||
} | ||
|
||
partitionsToOverwrite := make(map[string]struct{}) | ||
for _, df := range allDataFiles { | ||
partitionKey := fmt.Sprintf("%v", df.Partition()) | ||
partitionsToOverwrite[partitionKey] = struct{}{} | ||
} | ||
Comment on lines
+385
to
+397
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can probably merge these loops |
||
|
||
deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite) | ||
|
||
if err := t.deleteFileByFilter(ctx, deleteFilter, snapshotProps); err != nil { | ||
return err | ||
} | ||
|
||
appendFiles := t.appendSnapshotProducer(fs, snapshotProps) | ||
for _, df := range allDataFiles { | ||
appendFiles.appendDataFile(df) | ||
} | ||
|
||
updates, reqs, err := appendFiles.commit() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return t.apply(updates, reqs) | ||
} | ||
|
||
// deleteFileByFilter performs a delete operation with the given filter and snapshot properties. | ||
func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also working on a complete delete API (CoW) that can delete row level and file level based on predicate in #518. |
||
fs, err := t.tbl.fsF(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this use the commitUUID? |
||
|
||
currentSnapshot := t.meta.currentSnapshot() | ||
if currentSnapshot == nil { | ||
return fmt.Errorf("%w: cannot delete from table without existing snapshot", ErrInvalidOperation) | ||
} | ||
|
||
scan, err := t.Scan(WithRowFilter(filter)) | ||
if err != nil { | ||
return err | ||
} | ||
fileScan, err := scan.PlanFiles(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Mark files for deletion | ||
for _, task := range fileScan { | ||
deleteProducer.deleteDataFile(task.File) | ||
} | ||
|
||
updates, reqs, err := deleteProducer.commit() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return t.apply(updates, reqs) | ||
} | ||
|
||
// buildPartitionPredicate builds a filter predicate matching any of the input partition records. | ||
func (t *Transaction) buildPartitionPredicate(partitionRecords map[string]struct{}) iceberg.BooleanExpression { | ||
partitionSpec := t.meta.CurrentSpec() | ||
schema := t.meta.CurrentSchema() | ||
|
||
var partitionFields []string | ||
for field := range partitionSpec.Fields() { | ||
if field, ok := schema.FindFieldByID(field.SourceID); ok { | ||
partitionFields = append(partitionFields, field.Name) | ||
} | ||
} | ||
|
||
// Build OR expression for all partitions | ||
var expressions []iceberg.BooleanExpression | ||
|
||
for partitionKey := range partitionRecords { | ||
partitionValues := parsePartitionKey(partitionKey, partitionFields) | ||
|
||
// Build AND expression for this partition | ||
var partitionExprs []iceberg.BooleanExpression | ||
for i, fieldName := range partitionFields { | ||
if i < len(partitionValues) { | ||
value := partitionValues[i] | ||
if value == nil { | ||
partitionExprs = append(partitionExprs, iceberg.IsNull(iceberg.Reference(fieldName))) | ||
} else { | ||
// Create an expression based on a field type | ||
if field, ok := schema.FindFieldByName(fieldName); ok { | ||
partitionExprs = append(partitionExprs, createEqualToExpression(iceberg.Reference(fieldName), value, field.Type)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
if len(partitionExprs) > 0 { | ||
partitionExpr := partitionExprs[0] | ||
for _, expr := range partitionExprs[1:] { | ||
partitionExpr = iceberg.NewAnd(partitionExpr, expr) | ||
} | ||
Comment on lines
+489
to
+492
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is already handled via |
||
expressions = append(expressions, partitionExpr) | ||
} | ||
} | ||
|
||
if len(expressions) == 0 { | ||
return iceberg.AlwaysFalse{} | ||
} | ||
|
||
result := expressions[0] | ||
for _, expr := range expressions[1:] { | ||
result = iceberg.NewOr(result, expr) | ||
} | ||
Comment on lines
+501
to
+504
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as above, |
||
|
||
return result | ||
} | ||
|
||
// parsePartitionKey parses a partition key string into individual values. | ||
func parsePartitionKey(partitionKey string, fieldNames []string) []interface{} { | ||
// Simple parsing for demonstration - assumes a format like "field1=value1/field2=value2" | ||
parts := strings.Split(partitionKey, "/") | ||
values := make([]interface{}, len(fieldNames)) | ||
Comment on lines
+510
to
+513
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have the schema, we can use the field names to determine the types so we know what type to parse into from the strings |
||
|
||
for i, part := range parts { | ||
if i >= len(fieldNames) { | ||
break | ||
} | ||
|
||
if strings.Contains(part, "=") { | ||
kv := strings.SplitN(part, "=", 2) | ||
if len(kv) == 2 { | ||
values[i] = parsePartitionValue(kv[1]) | ||
} | ||
} | ||
} | ||
|
||
return values | ||
} | ||
|
||
// parsePartitionValue converts a string partition value to the appropriate type. | ||
func parsePartitionValue(valueStr string) interface{} { | ||
if valueStr == "null" || valueStr == "" { | ||
return nil | ||
} | ||
|
||
if i, err := strconv.ParseInt(valueStr, 10, 64); err == nil { | ||
return i | ||
} | ||
|
||
if f, err := strconv.ParseFloat(valueStr, 64); err == nil { | ||
return f | ||
} | ||
|
||
if b, err := strconv.ParseBool(valueStr); err == nil { | ||
return b | ||
} | ||
|
||
return valueStr | ||
} | ||
|
||
// createEqualToExpression creates an EqualTo expression with the correct type | ||
func createEqualToExpression(term iceberg.UnboundTerm, value interface{}, typ iceberg.Type) iceberg.BooleanExpression { | ||
switch t := typ.(type) { | ||
case iceberg.PrimitiveType: | ||
switch t { | ||
case iceberg.PrimitiveTypes.Int32: | ||
if v, ok := value.(int32); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.Int64: | ||
if v, ok := value.(int64); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.Float32: | ||
if v, ok := value.(float32); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.Float64: | ||
if v, ok := value.(float64); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.String: | ||
if v, ok := value.(string); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
case iceberg.PrimitiveTypes.Bool: | ||
if v, ok := value.(bool); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
} | ||
} | ||
Comment on lines
+554
to
+582
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the types and casting should be handled for you once the expression is bound. So you shouldn't need the |
||
|
||
// Fallback to string | ||
if v, ok := value.(string); ok { | ||
return iceberg.EqualTo(term, v) | ||
} | ||
|
||
return iceberg.EqualTo(term, fmt.Sprintf("%v", value)) | ||
} | ||
|
||
func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) { | ||
updatedMeta, err := t.meta.Build() | ||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this defined in the spec? Or is this just a NotYetImplemented thing?