From de835157d3e518c84fc594b330e99b539adda430 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Tue, 8 Jul 2025 22:58:14 +0100 Subject: [PATCH 1/2] Support Dynamic Partition Overwrite Signed-off-by: dttung2905 --- table/transaction.go | 246 ++++++++++++++++++++++++++++++++++++++ table/transaction_test.go | 202 +++++++++++++++++++++++++++++++ 2 files changed, 448 insertions(+) diff --git a/table/transaction.go b/table/transaction.go index 63790893e..a749d31bb 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -24,6 +24,8 @@ import ( "fmt" "runtime" "slices" + "strconv" + "strings" "sync" "time" @@ -343,6 +345,250 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp return t.apply(updates, reqs) } +// DynamicPartitionOverwrite performs a dynamic partition overwrite operation. +// It detects partition values in the provided arrow table using the current +// partition spec, deletes existing partitions matching these values, and then +// appends the new data. +func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error { + if t.meta.CurrentSpec().IsUnpartitioned() { + return fmt.Errorf("%w: cannot apply dynamic overwrite on an unpartitioned table", ErrInvalidOperation) + } + + // Check that all partition fields use identity transforms + currentSpec := t.meta.CurrentSpec() + for field := range currentSpec.Fields() { + if _, ok := field.Transform.(iceberg.IdentityTransform); !ok { + return fmt.Errorf("%w: dynamic overwrite does not support non-identity-transform fields in partition spec: %s", + ErrInvalidOperation, field.Name) + } + } + + if tbl.NumRows() == 0 { + return nil + } + + fs, err := t.tbl.fsF(ctx) + if err != nil { + return err + } + + commitUUID := uuid.New() + rdr := array.NewTableReader(tbl, batchSize) + defer rdr.Release() + dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ + sc: tbl.Schema(), + itr: array.IterFromReader(rdr), + fs: fs.(io.WriteFileIO), + writeUUID: &commitUUID, + }) + + var allDataFiles []iceberg.DataFile + for df, err := range dataFiles { + if err != nil { + return err + } + allDataFiles = append(allDataFiles, df) + } + + partitionsToOverwrite := make(map[string]struct{}) + for _, df := range allDataFiles { + partitionKey := fmt.Sprintf("%v", df.Partition()) + partitionsToOverwrite[partitionKey] = struct{}{} + } + + deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite) + + if err := t.Delete(ctx, deleteFilter, snapshotProps); err != nil { + return err + } + + appendFiles := t.appendSnapshotProducer(fs, snapshotProps) + for _, df := range allDataFiles { + appendFiles.appendDataFile(df) + } + + updates, reqs, err := appendFiles.commit() + if err != nil { + return err + } + + return t.apply(updates, reqs) +} + +// Delete performs a delete operation with the given filter and snapshot properties. +func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { + fs, err := t.tbl.fsF(ctx) + if err != nil { + return err + } + + deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(nil) + + currentSnapshot := t.meta.currentSnapshot() + if currentSnapshot == nil { + return fmt.Errorf("%w: cannot delete from table without existing snapshot", ErrInvalidOperation) + } + + scan, err := t.Scan(WithRowFilter(filter)) + if err != nil { + return err + } + fileScan, err := scan.PlanFiles(ctx) + if err != nil { + return err + } + + // Mark files for deletion + for _, task := range fileScan { + deleteProducer.deleteDataFile(task.File) + } + + updates, reqs, err := deleteProducer.commit() + if err != nil { + return err + } + + return t.apply(updates, reqs) +} + +// buildPartitionPredicate builds a filter predicate matching any of the input partition records. +func (t *Transaction) buildPartitionPredicate(partitionRecords map[string]struct{}) iceberg.BooleanExpression { + partitionSpec := t.meta.CurrentSpec() + schema := t.meta.CurrentSchema() + + var partitionFields []string + for field := range partitionSpec.Fields() { + if field, ok := schema.FindFieldByID(field.SourceID); ok { + partitionFields = append(partitionFields, field.Name) + } + } + + // Build OR expression for all partitions + var expressions []iceberg.BooleanExpression + + for partitionKey := range partitionRecords { + partitionValues := parsePartitionKey(partitionKey, partitionFields) + + // Build AND expression for this partition + var partitionExprs []iceberg.BooleanExpression + for i, fieldName := range partitionFields { + if i < len(partitionValues) { + value := partitionValues[i] + if value == nil { + partitionExprs = append(partitionExprs, iceberg.IsNull(iceberg.Reference(fieldName))) + } else { + // Create an expression based on a field type + if field, ok := schema.FindFieldByName(fieldName); ok { + partitionExprs = append(partitionExprs, createEqualToExpression(iceberg.Reference(fieldName), value, field.Type)) + } + } + } + } + + if len(partitionExprs) > 0 { + partitionExpr := partitionExprs[0] + for _, expr := range partitionExprs[1:] { + partitionExpr = iceberg.NewAnd(partitionExpr, expr) + } + expressions = append(expressions, partitionExpr) + } + } + + if len(expressions) == 0 { + return iceberg.AlwaysFalse{} + } + + result := expressions[0] + for _, expr := range expressions[1:] { + result = iceberg.NewOr(result, expr) + } + + return result +} + +// parsePartitionKey parses a partition key string into individual values. +func parsePartitionKey(partitionKey string, fieldNames []string) []interface{} { + // Simple parsing for demonstration - assumes a format like "field1=value1/field2=value2" + parts := strings.Split(partitionKey, "/") + values := make([]interface{}, len(fieldNames)) + + for i, part := range parts { + if i >= len(fieldNames) { + break + } + + if strings.Contains(part, "=") { + kv := strings.SplitN(part, "=", 2) + if len(kv) == 2 { + values[i] = parsePartitionValue(kv[1]) + } + } + } + + return values +} + +// parsePartitionValue converts a string partition value to the appropriate type. +func parsePartitionValue(valueStr string) interface{} { + if valueStr == "null" || valueStr == "" { + return nil + } + + if i, err := strconv.ParseInt(valueStr, 10, 64); err == nil { + return i + } + + if f, err := strconv.ParseFloat(valueStr, 64); err == nil { + return f + } + + if b, err := strconv.ParseBool(valueStr); err == nil { + return b + } + + return valueStr +} + +// createEqualToExpression creates an EqualTo expression with the correct type +func createEqualToExpression(term iceberg.UnboundTerm, value interface{}, typ iceberg.Type) iceberg.BooleanExpression { + switch t := typ.(type) { + case iceberg.PrimitiveType: + switch t { + case iceberg.PrimitiveTypes.Int32: + if v, ok := value.(int32); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.Int64: + if v, ok := value.(int64); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.Float32: + if v, ok := value.(float32); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.Float64: + if v, ok := value.(float64); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.String: + if v, ok := value.(string); ok { + return iceberg.EqualTo(term, v) + } + case iceberg.PrimitiveTypes.Bool: + if v, ok := value.(bool); ok { + return iceberg.EqualTo(term, v) + } + } + } + + // Fallback to string + if v, ok := value.(string); ok { + return iceberg.EqualTo(term, v) + } + + return iceberg.EqualTo(term, fmt.Sprintf("%v", value)) +} + func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) { updatedMeta, err := t.meta.Build() if err != nil { diff --git a/table/transaction_test.go b/table/transaction_test.go index 40f015c65..664dade5a 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -34,9 +34,11 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" + "github.com/apache/iceberg-go/internal" "github.com/apache/iceberg-go/internal/recipe" iceio "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go/modules/compose" ) @@ -134,3 +136,203 @@ func (s *SparkIntegrationTestSuite) TestAddFile() { func TestSparkIntegration(t *testing.T) { suite.Run(t, new(SparkIntegrationTestSuite)) } + +// Dynamic Partition Overwrite Tests +func TestDynamicPartitionOverwrite_UnpartitionedTable(t *testing.T) { + // Create a table with no partition spec + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String, Required: false}, + ) + + metadata, err := table.NewMetadataBuilder() + assert.NoError(t, err) + _, err = metadata.AddSchema(schema, 2, true) + assert.NoError(t, err) + _, err = metadata.SetCurrentSchemaID(0) + assert.NoError(t, err) + _, err = metadata.AddPartitionSpec(iceberg.UnpartitionedSpec, true) + assert.NoError(t, err) + _, err = metadata.SetDefaultSpecID(0) + assert.NoError(t, err) + _, err = metadata.SetFormatVersion(2) + assert.NoError(t, err) + + meta, err := metadata.Build() + assert.NoError(t, err) + + var mockfs internal.MockFS + mockfs.Test(t) + + table := table.New( + []string{"test", "table"}, + meta, + "test/location", + func(ctx context.Context) (iceio.IO, error) { + return &mockfs, nil + }, + nil, + ) + + txn := table.NewTransaction() + + // Create test data + mem := memory.DefaultAllocator + builder := array.NewInt32Builder(mem) + builder.AppendValues([]int32{1, 2, 3}, nil) + idArray := builder.NewArray() + + builder2 := array.NewStringBuilder(mem) + builder2.AppendValues([]string{"a", "b", "c"}, nil) + dataArray := builder2.NewArray() + + arrowSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + record := array.NewRecord(arrowSchema, []arrow.Array{idArray, dataArray}, 3) + tableData := array.NewTableFromRecords(arrowSchema, []arrow.Record{record}) + + // Should return an error for an unpartitioned table + err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot apply dynamic overwrite on an unpartitioned table") +} + +func TestDynamicPartitionOverwrite_NonIdentityTransform(t *testing.T) { + // Create a table with a non-identity transform + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String, Required: false}, + ) + + partitionSpec := iceberg.NewPartitionSpec( + iceberg.PartitionField{ + SourceID: 1, + FieldID: 1000, + Name: "id_bucket", + Transform: iceberg.BucketTransform{NumBuckets: 4}, + }, + ) + + metadata, err := table.NewMetadataBuilder() + assert.NoError(t, err) + _, err = metadata.AddSchema(schema, 2, true) + assert.NoError(t, err) + _, err = metadata.SetCurrentSchemaID(0) + assert.NoError(t, err) + _, err = metadata.AddPartitionSpec(&partitionSpec, true) + assert.NoError(t, err) + _, err = metadata.SetDefaultSpecID(0) + assert.NoError(t, err) + _, err = metadata.SetFormatVersion(2) + assert.NoError(t, err) + + meta, err := metadata.Build() + assert.NoError(t, err) + + var mockfs internal.MockFS + mockfs.Test(t) + + table := table.New( + []string{"test", "table"}, + meta, + "test/location", + func(ctx context.Context) (iceio.IO, error) { + return &mockfs, nil + }, + nil, + ) + + txn := table.NewTransaction() + + // Create test data + mem := memory.DefaultAllocator + builder := array.NewInt32Builder(mem) + builder.AppendValues([]int32{1, 2, 3}, nil) + idArray := builder.NewArray() + + builder2 := array.NewStringBuilder(mem) + builder2.AppendValues([]string{"a", "b", "c"}, nil) + dataArray := builder2.NewArray() + + arrowSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + record := array.NewRecord(arrowSchema, []arrow.Array{idArray, dataArray}, 3) + tableData := array.NewTableFromRecords(arrowSchema, []arrow.Record{record}) + + // Should return an error for non-identity transform + err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "dynamic overwrite does not support non-identity-transform fields in partition spec") +} + +func TestDynamicPartitionOverwrite_EmptyTable(t *testing.T) { + // Create a table with identity transform + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.PrimitiveTypes.String, Required: false}, + ) + + partitionSpec := iceberg.NewPartitionSpec( + iceberg.PartitionField{ + SourceID: 1, + FieldID: 1000, + Name: "id", + Transform: iceberg.IdentityTransform{}, + }, + ) + + metadata, err := table.NewMetadataBuilder() + assert.NoError(t, err) + _, err = metadata.AddSchema(schema, 2, true) + assert.NoError(t, err) + _, err = metadata.SetCurrentSchemaID(0) + assert.NoError(t, err) + _, err = metadata.AddPartitionSpec(&partitionSpec, true) + assert.NoError(t, err) + _, err = metadata.SetDefaultSpecID(0) + assert.NoError(t, err) + + _, err = metadata.SetFormatVersion(2) + assert.NoError(t, err) + + meta, err := metadata.Build() + assert.NoError(t, err) + + var mockfs internal.MockFS + mockfs.Test(t) + + tbl := table.New( + []string{"test", "table"}, + meta, + "test/location", + func(ctx context.Context) (iceio.IO, error) { + return &mockfs, nil + }, + nil, + ) + + txn := tbl.NewTransaction() + + // Create empty test data (empty arrays for each field) + arrowSchema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + mem := memory.DefaultAllocator + idArr := array.NewInt32Builder(mem).NewArray() + dataArr := array.NewStringBuilder(mem).NewArray() + record := array.NewRecord(arrowSchema, []arrow.Array{idArr, dataArr}, 0) + tableData := array.NewTableFromRecords(arrowSchema, []arrow.Record{record}) + + // Should return no error for an empty table + err = txn.DynamicPartitionOverwrite(context.Background(), tableData, 1000, iceberg.Properties{}) + assert.NoError(t, err) +} + +// TODO: Find a way to test the happy path of DynamicPartitionOverwrite From 931506de1279fa64e59c95ab76c52db5f1c623f1 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Wed, 9 Jul 2025 18:49:10 +0100 Subject: [PATCH 2/2] Make deleteByFilter method private Signed-off-by: dttung2905 --- table/transaction.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/table/transaction.go b/table/transaction.go index a749d31bb..35c99dcf3 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -398,7 +398,7 @@ func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.T deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite) - if err := t.Delete(ctx, deleteFilter, snapshotProps); err != nil { + if err := t.deleteFileByFilter(ctx, deleteFilter, snapshotProps); err != nil { return err } @@ -415,8 +415,8 @@ func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.T return t.apply(updates, reqs) } -// Delete performs a delete operation with the given filter and snapshot properties. -func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { +// deleteFileByFilter performs a delete operation with the given filter and snapshot properties. +func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { fs, err := t.tbl.fsF(ctx) if err != nil { return err