Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions table/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"fmt"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -343,6 +345,250 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp
return t.apply(updates, reqs)
}

// DynamicPartitionOverwrite performs a dynamic partition overwrite operation.
// It detects partition values in the provided arrow table using the current
// partition spec, deletes existing partitions matching these values, and then
// appends the new data.
func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error {
if t.meta.CurrentSpec().IsUnpartitioned() {
return fmt.Errorf("%w: cannot apply dynamic overwrite on an unpartitioned table", ErrInvalidOperation)
}

// Check that all partition fields use identity transforms
currentSpec := t.meta.CurrentSpec()
for field := range currentSpec.Fields() {
if _, ok := field.Transform.(iceberg.IdentityTransform); !ok {
return fmt.Errorf("%w: dynamic overwrite does not support non-identity-transform fields in partition spec: %s",
ErrInvalidOperation, field.Name)
}
}
Comment on lines +357 to +364
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this defined in the spec? Or is this just a NotYetImplemented thing?


if tbl.NumRows() == 0 {
return nil
}
Comment on lines +366 to +368
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this overwrite the partition with an empty partition?


fs, err := t.tbl.fsF(ctx)
if err != nil {
return err
}

commitUUID := uuid.New()
rdr := array.NewTableReader(tbl, batchSize)
defer rdr.Release()
dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{
sc: tbl.Schema(),
itr: array.IterFromReader(rdr),
fs: fs.(io.WriteFileIO),
writeUUID: &commitUUID,
})

var allDataFiles []iceberg.DataFile
for df, err := range dataFiles {
if err != nil {
return err
}
allDataFiles = append(allDataFiles, df)
}

partitionsToOverwrite := make(map[string]struct{})
for _, df := range allDataFiles {
partitionKey := fmt.Sprintf("%v", df.Partition())
partitionsToOverwrite[partitionKey] = struct{}{}
}
Comment on lines +385 to +397
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can probably merge these loops


deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite)

if err := t.deleteFileByFilter(ctx, deleteFilter, snapshotProps); err != nil {
return err
}

appendFiles := t.appendSnapshotProducer(fs, snapshotProps)
for _, df := range allDataFiles {
appendFiles.appendDataFile(df)
}

updates, reqs, err := appendFiles.commit()
if err != nil {
return err
}

return t.apply(updates, reqs)
}

// deleteFileByFilter performs a delete operation with the given filter and snapshot properties.
func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error {
Copy link
Contributor

@lliangyu-lin lliangyu-lin Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also working on a complete delete API (CoW) that can delete row level and file level based on predicate in #518.
Hopefully we don't need this method once the full delete API is supported.

fs, err := t.tbl.fsF(ctx)
if err != nil {
return err
}

deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this use the commitUUID?


currentSnapshot := t.meta.currentSnapshot()
if currentSnapshot == nil {
return fmt.Errorf("%w: cannot delete from table without existing snapshot", ErrInvalidOperation)
}

scan, err := t.Scan(WithRowFilter(filter))
if err != nil {
return err
}
fileScan, err := scan.PlanFiles(ctx)
if err != nil {
return err
}

// Mark files for deletion
for _, task := range fileScan {
deleteProducer.deleteDataFile(task.File)
}

updates, reqs, err := deleteProducer.commit()
if err != nil {
return err
}

return t.apply(updates, reqs)
}

// buildPartitionPredicate builds a filter predicate matching any of the input partition records.
func (t *Transaction) buildPartitionPredicate(partitionRecords map[string]struct{}) iceberg.BooleanExpression {
partitionSpec := t.meta.CurrentSpec()
schema := t.meta.CurrentSchema()

var partitionFields []string
for field := range partitionSpec.Fields() {
if field, ok := schema.FindFieldByID(field.SourceID); ok {
partitionFields = append(partitionFields, field.Name)
}
}

// Build OR expression for all partitions
var expressions []iceberg.BooleanExpression

for partitionKey := range partitionRecords {
partitionValues := parsePartitionKey(partitionKey, partitionFields)

// Build AND expression for this partition
var partitionExprs []iceberg.BooleanExpression
for i, fieldName := range partitionFields {
if i < len(partitionValues) {
value := partitionValues[i]
if value == nil {
partitionExprs = append(partitionExprs, iceberg.IsNull(iceberg.Reference(fieldName)))
} else {
// Create an expression based on a field type
if field, ok := schema.FindFieldByName(fieldName); ok {
partitionExprs = append(partitionExprs, createEqualToExpression(iceberg.Reference(fieldName), value, field.Type))
}
}
}
}

if len(partitionExprs) > 0 {
partitionExpr := partitionExprs[0]
for _, expr := range partitionExprs[1:] {
partitionExpr = iceberg.NewAnd(partitionExpr, expr)
}
Comment on lines +489 to +492
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already handled via NewAnd. You can do: partitionExpr := iceberg.NewAnd(partitionExprs[0], partitionExprs[1], partitionExprs[2:]...)

expressions = append(expressions, partitionExpr)
}
}

if len(expressions) == 0 {
return iceberg.AlwaysFalse{}
}

result := expressions[0]
for _, expr := range expressions[1:] {
result = iceberg.NewOr(result, expr)
}
Comment on lines +501 to +504
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above, iceberg.NewOr already handles an arbitrary number of arguments so you don't have to do this loop manually


return result
}

// parsePartitionKey parses a partition key string into individual values.
func parsePartitionKey(partitionKey string, fieldNames []string) []interface{} {
// Simple parsing for demonstration - assumes a format like "field1=value1/field2=value2"
parts := strings.Split(partitionKey, "/")
values := make([]interface{}, len(fieldNames))
Comment on lines +510 to +513
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have the schema, we can use the field names to determine the types so we know what type to parse into from the strings


for i, part := range parts {
if i >= len(fieldNames) {
break
}

if strings.Contains(part, "=") {
kv := strings.SplitN(part, "=", 2)
if len(kv) == 2 {
values[i] = parsePartitionValue(kv[1])
}
}
}

return values
}

// parsePartitionValue converts a string partition value to the appropriate type.
func parsePartitionValue(valueStr string) interface{} {
if valueStr == "null" || valueStr == "" {
return nil
}

if i, err := strconv.ParseInt(valueStr, 10, 64); err == nil {
return i
}

if f, err := strconv.ParseFloat(valueStr, 64); err == nil {
return f
}

if b, err := strconv.ParseBool(valueStr); err == nil {
return b
}

return valueStr
}

// createEqualToExpression creates an EqualTo expression with the correct type
func createEqualToExpression(term iceberg.UnboundTerm, value interface{}, typ iceberg.Type) iceberg.BooleanExpression {
switch t := typ.(type) {
case iceberg.PrimitiveType:
switch t {
case iceberg.PrimitiveTypes.Int32:
if v, ok := value.(int32); ok {
return iceberg.EqualTo(term, v)
}
case iceberg.PrimitiveTypes.Int64:
if v, ok := value.(int64); ok {
return iceberg.EqualTo(term, v)
}
case iceberg.PrimitiveTypes.Float32:
if v, ok := value.(float32); ok {
return iceberg.EqualTo(term, v)
}
case iceberg.PrimitiveTypes.Float64:
if v, ok := value.(float64); ok {
return iceberg.EqualTo(term, v)
}
case iceberg.PrimitiveTypes.String:
if v, ok := value.(string); ok {
return iceberg.EqualTo(term, v)
}
case iceberg.PrimitiveTypes.Bool:
if v, ok := value.(bool); ok {
return iceberg.EqualTo(term, v)
}
}
}
Comment on lines +554 to +582
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the types and casting should be handled for you once the expression is bound. So you shouldn't need the iceberg.Type, just do a switch on value.(type) and calling iceberg.EqualTo(term, v)


// Fallback to string
if v, ok := value.(string); ok {
return iceberg.EqualTo(term, v)
}

return iceberg.EqualTo(term, fmt.Sprintf("%v", value))
}

func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) {
updatedMeta, err := t.meta.Build()
if err != nil {
Expand Down
Loading
Loading