From 4b8d8de76d1b6320addbab4d4605998946f0c994 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Thu, 24 Jul 2025 11:02:24 -0700 Subject: [PATCH 1/2] Support for table delete with copy on write --- table/properties.go | 5 ++ table/scanner.go | 14 +++ table/snapshot_producers.go | 175 ++++++++++++++++++++++++++++++++++++ table/table.go | 10 +++ table/table_test.go | 65 ++++++++++++++ table/transaction.go | 148 ++++++++++++++++++++++++++++++ 6 files changed, 417 insertions(+) diff --git a/table/properties.go b/table/properties.go index bb3c2b089..a27b27674 100644 --- a/table/properties.go +++ b/table/properties.go @@ -31,6 +31,11 @@ const ( ObjectStoreEnabledKey = "write.object-storage.enabled" ObjectStoreEnabledDefault = false + DeleteMode = "write.delete.mode" + DeleteModeCopyOnWrite = "copy-on-write" + DeleteModeMergeOnRead = "merge-on-read" + DeleteModeDefault = DeleteModeCopyOnWrite + DefaultNameMappingKey = "schema.name-mapping.default" MetricsModeColumnConfPrefix = "write.metadata.metrics.column" diff --git a/table/scanner.go b/table/scanner.go index 93dd052cc..674e2adb6 100644 --- a/table/scanner.go +++ b/table/scanner.go @@ -501,3 +501,17 @@ func (scan *Scan) ToArrowTable(ctx context.Context) (arrow.Table, error) { return array.NewTableFromRecords(schema, records), nil } + +func toArrowTableFromRecords(schema *arrow.Schema, recordItr iter.Seq2[arrow.Record, error]) (arrow.Table, error) { + records := make([]arrow.Record, 0) + for rec, err := range recordItr { + if err != nil { + return nil, err + } + + defer rec.Release() + records = append(records, rec) + } + + return array.NewTableFromRecords(schema, records), nil +} diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 58721ed49..0e11dfd56 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -410,6 +410,181 @@ func (m *mergeAppendFiles) processManifests(manifests []iceberg.ManifestFile) ([ return append(result, unmergedDeleteManifests...), nil } +type deleteFiles struct { + base *snapshotProducer + + predicate iceberg.BooleanExpression + caseSensitive bool + + computed bool + keepManifests []iceberg.ManifestFile + removedEntries []iceberg.ManifestEntry + partialRewrites bool +} + +func newDeleteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) + prod.producerImpl = &deleteFiles{ + base: prod, + predicate: iceberg.AlwaysFalse{}, + caseSensitive: true, + computed: false, + keepManifests: make([]iceberg.ManifestFile, 0), + removedEntries: make([]iceberg.ManifestEntry, 0), + partialRewrites: false, + } + + return prod +} + +func (df deleteFiles) deleteByPredicate(predicate iceberg.BooleanExpression, caseSensitive bool) { + df.predicate = predicate + df.caseSensitive = caseSensitive +} + +func (df deleteFiles) buildPartitionProjection(specID int) (iceberg.BooleanExpression, error) { + schema := df.base.txn.tbl.Schema() + spec := df.base.txn.tbl.Metadata().PartitionSpecs()[specID] + project := newInclusiveProjection(schema, spec, df.caseSensitive) + partitionFilter, err := project(df.predicate) + if err != nil { + return nil, err + } + return partitionFilter, nil +} + +func (df deleteFiles) partitionFilters() *keyDefaultMap[int, iceberg.BooleanExpression] { + return newKeyDefaultMapWrapErr(df.buildPartitionProjection) +} + +func (df deleteFiles) buildManifestEvaluator(specID int) (ManifestEvaluator, error) { + spec := df.base.txn.tbl.metadata.PartitionSpecs()[specID] + schema := df.base.txn.tbl.metadata.CurrentSchema() + return newManifestEvaluator( + spec, + schema, + df.partitionFilters().Get(specID), + df.caseSensitive) +} + +func (df deleteFiles) computeDeletes() error { + df.computed = true + schema := df.base.txn.tbl.Schema() + + manifestEvaluators := make(map[int32]ManifestEvaluator) + strictMetricsEvaluator, err := newStrictMetricsEvaluator(schema, df.predicate, df.caseSensitive, false) + if err != nil { + return nil + } + inclusiveMetricsEvaluator, err := newInclusiveMetricsEvaluator(schema, df.predicate, df.caseSensitive, false) + if err != nil { + return nil + } + + snapshot := df.base.txn.meta.currentSnapshot() + manifestFiles, err := snapshot.Manifests(df.base.io) + if err != nil { + return err + } + for _, manifestFile := range manifestFiles { + if manifestFile.ManifestContent() == iceberg.ManifestContentData { + containMatch, err := manifestEvaluators[manifestFile.PartitionSpecID()](manifestFile) + if err != nil { + return err + } + if !containMatch { + df.keepManifests = append(df.keepManifests, manifestFile) + } else { + deletedEntries := make([]iceberg.ManifestEntry, 0) + keepEntries := make([]iceberg.ManifestEntry, 0) + entries, err := manifestFile.FetchEntries(df.base.io, true) + if err != nil { + return err + } + for _, entry := range entries { + ok, err := strictMetricsEvaluator(entry.DataFile()) + if err != nil { + return err + } + if ok == rowsMustMatch { + deletedEntries = append(deletedEntries, df.copyWithNewStatus(entry, iceberg.EntryStatusDELETED)) + } else { + keepEntries = append(keepEntries, df.copyWithNewStatus(entry, iceberg.EntryStatusEXISTING)) + ok, err := inclusiveMetricsEvaluator(entry.DataFile()) + if err != nil { + return err + } + if ok == rowsMightNotMatch { + df.partialRewrites = true + } + } + } + + if len(deletedEntries) > 0 { + df.removedEntries = append(df.removedEntries, deletedEntries...) + if len(keepEntries) > 0 { + spec := df.base.txn.tbl.Metadata().PartitionSpecs()[manifestFile.PartitionSpecID()] + wr, path, counter, err := df.base.newManifestWriter(spec) + if err != nil { + return err + } + for _, entry := range keepEntries { + err = wr.Add(entry) + if err != nil { + return err + } + } + if err = wr.Close(); err != nil { + return err + } + mf, err := wr.ToManifestFile(path, counter.Count) + if err != nil { + return err + } + df.keepManifests = append(df.keepManifests, mf) + } + } else { + df.keepManifests = append(df.keepManifests, manifestFile) + } + } + } else { + df.keepManifests = append(df.keepManifests, manifestFile) + } + } + + return nil +} + +func (df deleteFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + // no post processing + return manifests, nil +} + +func (df deleteFiles) existingManifests() ([]iceberg.ManifestFile, error) { + return df.keepManifests, nil +} + +func (df deleteFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { + return df.removedEntries, nil +} + +func (df deleteFiles) rewriteNeeded() bool { + return df.partialRewrites +} + +func (df deleteFiles) copyWithNewStatus(entry iceberg.ManifestEntry, status iceberg.ManifestEntryStatus) iceberg.ManifestEntry { + snapId := entry.SnapshotID() + seqNum := entry.SequenceNum() + + return iceberg.NewManifestEntry( + status, + &snapId, + &seqNum, + entry.FileSequenceNum(), + entry.DataFile(), + ) +} + type snapshotProducer struct { producerImpl diff --git a/table/table.go b/table/table.go index 0bd3ba3fe..42743d95c 100644 --- a/table/table.go +++ b/table/table.go @@ -125,6 +125,16 @@ func (t Table) Append(ctx context.Context, rdr array.RecordReader, snapshotProps return txn.Commit(ctx) } +// Delete is a shortcut for NewTransaction().Delete() and then committing the transaction +func (t Table) Delete(ctx context.Context, deleteFilter iceberg.BooleanExpression, snapshotProps iceberg.Properties, caseSensitive bool) (*Table, error) { + txn := t.NewTransaction() + if err := txn.Delete(ctx, deleteFilter, snapshotProps, caseSensitive); err != nil { + return nil, err + } + + return txn.Commit(ctx) +} + func (t Table) AllManifests(ctx context.Context) iter.Seq2[iceberg.ManifestFile, error] { fs, err := t.fsF(ctx) if err != nil { diff --git a/table/table_test.go b/table/table_test.go index 11322c27c..8dddc9db7 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -778,6 +778,71 @@ func (t *TableWritingTestSuite) TestAddFilesReferencedCurrentSnapshotIgnoreDupli t.Equal([]int32{0, 0, 0}, deleted) } +func (t *TableWritingTestSuite) TestDelete() { + ident := table.Identifier{"default", "unpartitioned_table_v" + strconv.Itoa(t.formatVersion)} + tbl := t.createTable(ident, t.formatVersion, + *iceberg.UnpartitionedSpec, t.tableSchema) + + t.NotNil(tbl) + + files := make([]string, 0) + for i := range 5 { + filePath := fmt.Sprintf("%s/unpartitioned/test-%d.parquet", t.location, i) + t.writeParquet(mustFS(t.T(), tbl).(iceio.WriteFileIO), filePath, t.arrTbl) + files = append(files, filePath) + } + + tx := tbl.NewTransaction() + t.Require().NoError(tx.AddFiles(t.ctx, files, nil, false)) + + stagedTbl, err := tx.StagedTable() + t.Require().NoError(err) + t.NotNil(stagedTbl.NameMapping()) + + t.Equal(stagedTbl.CurrentSnapshot().Summary, + &table.Summary{ + Operation: table.OpAppend, + Properties: iceberg.Properties{ + "added-data-files": "5", + "added-files-size": "3600", + "added-records": "5", + "total-data-files": "5", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "3600", + "total-position-deletes": "0", + "total-records": "5", + }, + }) + + scan, err := tx.Scan() + t.Require().NoError(err) + + contents, err := scan.ToArrowTable(context.Background()) + t.Require().NoError(err) + defer contents.Release() + + t.EqualValues(5, contents.NumRows()) + fmt.Printf("%s", contents.String()) + + deleteFilter := iceberg.EqualTo(iceberg.Reference("baz"), int32(123)) + err = tx.Delete(t.ctx, deleteFilter, nil, true) + t.Require().NoError(err) + + stagedTbl, err = tx.StagedTable() + t.Require().NoError(err) + + scan, err = stagedTbl.NewTransaction().Scan() + t.Require().NoError(err) + + contents, err = scan.ToArrowTable(context.Background()) + t.Require().NoError(err) + defer contents.Release() + + t.EqualValues(0, contents.NumRows()) + fmt.Printf("%s", contents.String()) +} + type mockedCatalog struct{} func (m *mockedCatalog) LoadTable(ctx context.Context, ident table.Identifier, props iceberg.Properties) (*table.Table, error) { diff --git a/table/transaction.go b/table/transaction.go index 09f0fa916..9d5a79c2f 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -23,6 +23,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/apache/arrow-go/v18/arrow/compute/exprs" + "github.com/apache/iceberg-go/table/substrait" "runtime" "slices" "sync" @@ -58,6 +60,10 @@ func (s snapshotUpdate) mergeAppend() *snapshotProducer { return newMergeAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) } +func (s snapshotUpdate) delete() *snapshotProducer { + return newDeleteFilesProducer(OpDelete, s.txn, s.io, nil, s.snapshotProps) +} + type Transaction struct { tbl *Table meta *MetadataBuilder @@ -291,6 +297,148 @@ func (t *Transaction) Append(ctx context.Context, rdr array.RecordReader, snapsh return t.apply(updates, reqs) } +type replacedFiles struct { + originalFile iceberg.DataFile + replaceFiles []iceberg.DataFile +} + +func (t *Transaction) Delete(ctx context.Context, deleteFilter iceberg.BooleanExpression, snapshotProps iceberg.Properties, caseSensitive bool) error { + + if deleteMode := t.tbl.metadata.Properties().Get(DeleteMode, DeleteModeDefault); deleteMode == DeleteModeMergeOnRead { + fmt.Println("Merge on read is not yet supported, falling back to copy-on-write") + } + + fs, err := t.tbl.fsF(ctx) + if err != nil { + return err + } + + deleteSnapshot := t.updateSnapshot(fs, snapshotProps).delete() + df := deleteSnapshot.producerImpl.(*deleteFiles) + df.deleteByPredicate(deleteFilter, caseSensitive) + err = df.computeDeletes() + if err != nil { + return err + } + + // check if any data file require a rewrite + if df.rewriteNeeded() { + schema := t.tbl.Schema() + _, err := iceberg.BindExpr(schema, deleteFilter, caseSensitive) + if err != nil { + return err + } + + preserveRowFilter := iceberg.NewNot(deleteFilter) + boundRowFilter, err := iceberg.BindExpr(schema, preserveRowFilter, caseSensitive) + if err != nil { + return err + } + + extSet, recordFilter, err := substrait.ConvertExpr(schema, boundRowFilter, caseSensitive) + if err != nil { + return err + } + + scan, err := t.Scan(WithRowFilter(deleteFilter), WithCaseSensitive(caseSensitive)) + if err != nil { + return err + } + + files, err := scan.PlanFiles(ctx) + if err != nil { + return err + } + + replacement := make([]replacedFiles, 0) + commitUuid := uuid.New() + + for _, originalFile := range files { + arrSchema, allRecords, err := (&arrowScan{ + metadata: scan.metadata, + fs: fs, + projectedSchema: schema, + boundRowFilter: iceberg.AlwaysTrue{}, + caseSensitive: scan.caseSensitive, + rowLimit: scan.limit, + options: scan.options, + concurrency: scan.concurrency, + }).GetRecords(ctx, []FileScanTask{originalFile}) + if err != nil { + return err + } + + filterFn := filterRecords( + exprs.WithExtensionIDSet(ctx, exprs.NewExtensionSetDefault(*extSet)), + recordFilter, + ) + allRecordCnt := 0 + records := make([]arrow.Record, 0) + for rec, err := range allRecords { + allRecordCnt += 1 + if err != nil { + return err + } + defer rec.Release() + + filtered, err := filterFn(rec) + if err != nil { + return err + } + if filtered != nil { + records = append(records, rec) + } + } + + if allRecordCnt == 0 || allRecordCnt == len(records) { + replacement = append(replacement, replacedFiles{ + originalFile.File, + []iceberg.DataFile{}, + }) + } else if allRecordCnt != len(records) { + replaceFiles := make([]iceberg.DataFile, 0) + arrTbl := array.NewTableFromRecords(arrSchema, records) + defer arrTbl.Release() + rdr := array.NewTableReader(arrTbl, -1) + defer rdr.Release() + itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ + sc: arrSchema, + itr: array.IterFromReader(rdr), + fs: fs.(io.WriteFileIO), + writeUUID: &commitUuid, + }) + for df, err := range itr { + if err != nil { + return err + } + replaceFiles = append(replaceFiles, df) + } + replacement = append(replacement, replacedFiles{ + originalFile.File, + replaceFiles, + }) + } + } + + if len(replacement) > 0 { + overwriteSnapshot := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(&commitUuid) + for _, rf := range replacement { + overwriteSnapshot.deleteDataFile(rf.originalFile) + for _, replaceFile := range rf.replaceFiles { + overwriteSnapshot.appendDataFile(replaceFile) + } + } + updates, reqs, err := overwriteSnapshot.commit() + if err != nil { + return err + } + + return t.apply(updates, reqs) + } + } + return nil +} + // ReplaceFiles is actually just an overwrite operation with multiple // files deleted and added. // From 4b9a7ef86273779a1090d5722649da539a17f0fd Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Thu, 31 Jul 2025 16:59:49 -0700 Subject: [PATCH 2/2] Refine delete files snapshot producer --- table/snapshot_producers.go | 50 ++++++++++++++++++++----------------- table/transaction.go | 15 +++++++---- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 0e11dfd56..fa2389267 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -416,19 +416,25 @@ type deleteFiles struct { predicate iceberg.BooleanExpression caseSensitive bool - computed bool keepManifests []iceberg.ManifestFile removedEntries []iceberg.ManifestEntry partialRewrites bool } -func newDeleteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { +func newDeleteFilesProducer( + op Operation, + txn *Transaction, + predicate iceberg.BooleanExpression, + caseSensitive bool, + fs iceio.WriteFileIO, + commitUUID *uuid.UUID, + snapshotProps iceberg.Properties, +) *snapshotProducer { prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) prod.producerImpl = &deleteFiles{ base: prod, - predicate: iceberg.AlwaysFalse{}, - caseSensitive: true, - computed: false, + predicate: predicate, + caseSensitive: caseSensitive, keepManifests: make([]iceberg.ManifestFile, 0), removedEntries: make([]iceberg.ManifestEntry, 0), partialRewrites: false, @@ -437,12 +443,7 @@ func newDeleteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO return prod } -func (df deleteFiles) deleteByPredicate(predicate iceberg.BooleanExpression, caseSensitive bool) { - df.predicate = predicate - df.caseSensitive = caseSensitive -} - -func (df deleteFiles) buildPartitionProjection(specID int) (iceberg.BooleanExpression, error) { +func (df *deleteFiles) buildPartitionProjection(specID int32) (iceberg.BooleanExpression, error) { schema := df.base.txn.tbl.Schema() spec := df.base.txn.tbl.Metadata().PartitionSpecs()[specID] project := newInclusiveProjection(schema, spec, df.caseSensitive) @@ -453,11 +454,11 @@ func (df deleteFiles) buildPartitionProjection(specID int) (iceberg.BooleanExpre return partitionFilter, nil } -func (df deleteFiles) partitionFilters() *keyDefaultMap[int, iceberg.BooleanExpression] { +func (df *deleteFiles) partitionFilters() *keyDefaultMap[int32, iceberg.BooleanExpression] { return newKeyDefaultMapWrapErr(df.buildPartitionProjection) } -func (df deleteFiles) buildManifestEvaluator(specID int) (ManifestEvaluator, error) { +func (df *deleteFiles) buildManifestEvaluator(specID int32) (func(iceberg.ManifestFile) (bool, error), error) { spec := df.base.txn.tbl.metadata.PartitionSpecs()[specID] schema := df.base.txn.tbl.metadata.CurrentSchema() return newManifestEvaluator( @@ -467,11 +468,10 @@ func (df deleteFiles) buildManifestEvaluator(specID int) (ManifestEvaluator, err df.caseSensitive) } -func (df deleteFiles) computeDeletes() error { - df.computed = true +func (df *deleteFiles) computeDeletes() error { schema := df.base.txn.tbl.Schema() - manifestEvaluators := make(map[int32]ManifestEvaluator) + manifestEvaluators := newKeyDefaultMapWrapErr(df.buildManifestEvaluator) strictMetricsEvaluator, err := newStrictMetricsEvaluator(schema, df.predicate, df.caseSensitive, false) if err != nil { return nil @@ -488,7 +488,7 @@ func (df deleteFiles) computeDeletes() error { } for _, manifestFile := range manifestFiles { if manifestFile.ManifestContent() == iceberg.ManifestContentData { - containMatch, err := manifestEvaluators[manifestFile.PartitionSpecID()](manifestFile) + containMatch, err := manifestEvaluators.Get(manifestFile.PartitionSpecID())(manifestFile) if err != nil { return err } @@ -555,25 +555,29 @@ func (df deleteFiles) computeDeletes() error { return nil } -func (df deleteFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { +func (df *deleteFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { // no post processing return manifests, nil } -func (df deleteFiles) existingManifests() ([]iceberg.ManifestFile, error) { +func (df *deleteFiles) existingManifests() ([]iceberg.ManifestFile, error) { return df.keepManifests, nil } -func (df deleteFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { +func (df *deleteFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { return df.removedEntries, nil } -func (df deleteFiles) rewriteNeeded() bool { +func (df *deleteFiles) rewriteNeeded() bool { return df.partialRewrites } -func (df deleteFiles) copyWithNewStatus(entry iceberg.ManifestEntry, status iceberg.ManifestEntryStatus) iceberg.ManifestEntry { - snapId := entry.SnapshotID() +func (df *deleteFiles) copyWithNewStatus(entry iceberg.ManifestEntry, status iceberg.ManifestEntryStatus) iceberg.ManifestEntry { + snapId := df.base.snapshotID + if status == iceberg.EntryStatusEXISTING { + snapId = entry.SnapshotID() + } + seqNum := entry.SequenceNum() return iceberg.NewManifestEntry( diff --git a/table/transaction.go b/table/transaction.go index 9d5a79c2f..4154ca89d 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -60,8 +60,8 @@ func (s snapshotUpdate) mergeAppend() *snapshotProducer { return newMergeAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) } -func (s snapshotUpdate) delete() *snapshotProducer { - return newDeleteFilesProducer(OpDelete, s.txn, s.io, nil, s.snapshotProps) +func (s snapshotUpdate) delete(predicate iceberg.BooleanExpression, caseSensitive bool) *snapshotProducer { + return newDeleteFilesProducer(OpDelete, s.txn, predicate, caseSensitive, s.io, nil, s.snapshotProps) } type Transaction struct { @@ -313,9 +313,8 @@ func (t *Transaction) Delete(ctx context.Context, deleteFilter iceberg.BooleanEx return err } - deleteSnapshot := t.updateSnapshot(fs, snapshotProps).delete() + deleteSnapshot := t.updateSnapshot(fs, snapshotProps).delete(deleteFilter, caseSensitive) df := deleteSnapshot.producerImpl.(*deleteFiles) - df.deleteByPredicate(deleteFilter, caseSensitive) err = df.computeDeletes() if err != nil { return err @@ -436,7 +435,13 @@ func (t *Transaction) Delete(ctx context.Context, deleteFilter iceberg.BooleanEx return t.apply(updates, reqs) } } - return nil + + updates, reqs, err := deleteSnapshot.commit() + if err != nil { + return err + } + + return t.apply(updates, reqs) } // ReplaceFiles is actually just an overwrite operation with multiple