From 6a18e5411b25d8f6111aff45f574b00e9f4798a7 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 14 Aug 2025 23:36:19 -0700 Subject: [PATCH 01/20] I want to rewrite in rust --- crates/iceberg/src/spec/manifest/entry.rs | 6 + crates/iceberg/src/spec/manifest/writer.rs | 6 +- crates/iceberg/src/transaction/append.rs | 3 +- crates/iceberg/src/transaction/mod.rs | 7 + .../iceberg/src/transaction/rewrite_files.rs | 234 ++++++++++++++++++ crates/iceberg/src/transaction/snapshot.rs | 109 ++++++-- 6 files changed, 339 insertions(+), 26 deletions(-) create mode 100644 crates/iceberg/src/transaction/rewrite_files.rs diff --git a/crates/iceberg/src/spec/manifest/entry.rs b/crates/iceberg/src/spec/manifest/entry.rs index e8fe0f223a..ff6d9ae083 100644 --- a/crates/iceberg/src/spec/manifest/entry.rs +++ b/crates/iceberg/src/spec/manifest/entry.rs @@ -130,6 +130,12 @@ impl ManifestEntry { self.snapshot_id } + /// File sequence number + #[inline] + pub fn file_sequence_number(&self) -> Option { + self.file_sequence_number + } + /// Data sequence number. #[inline] pub fn sequence_number(&self) -> Option { diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index ebb0590bcf..b6937fc597 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -295,10 +295,6 @@ impl ManifestWriter { /// Add a delete manifest entry. This method will update following status of the entry: /// - Update the entry status to `Deleted` /// - Set the snapshot id to the current snapshot id - /// - /// # TODO - /// Remove this allow later - #[allow(dead_code)] pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> { self.check_data_file(&entry.data_file)?; entry.status = ManifestStatus::Deleted; @@ -341,7 +337,7 @@ impl ManifestWriter { Ok(()) } - /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID, + /// Add a file as existing manifest entry. The original data and file sequence numbers, snapshot ID, /// which were assigned at commit, must be preserved when adding an existing entry. pub fn add_existing_file( &mut self, diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index f248543df2..cbaf714d31 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -90,6 +90,7 @@ impl TransactionAction for FastAppendAction { self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), + vec![], ); // validate added files @@ -124,7 +125,7 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> Result> { let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { return Ok(vec![]); diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 4116264a14..4adc46dbbf 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,7 @@ mod action; pub use action::*; mod append; +mod rewrite_files; mod snapshot; mod sort_order; mod update_location; @@ -71,6 +72,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::rewrite_files::RewriteFilesAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -146,6 +148,11 @@ impl Transaction { ReplaceSortOrderAction::new() } + /// Rewrite a set of data files of table + pub fn rewrite_files(&self) -> RewriteFilesAction { + RewriteFilesAction::new() + } + /// Set the location of table pub fn update_location(&self) -> UpdateLocationAction { UpdateLocationAction::new() diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs new file mode 100644 index 0000000000..1ba56d3cc9 --- /dev/null +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use super::snapshot::{DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer}; +use super::{ActionCommit, TransactionAction}; +use crate::error::Result; +use crate::spec::{ + DataFile, ManifestContentType, ManifestEntry, ManifestFile, ManifestStatus, Operation, +}; +use crate::table::Table; + +/// Transaction action for rewriting files. +pub struct RewriteFilesAction { + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + data_files_to_add: Vec, + // Data files and delete files to delete + data_files_to_delete: Vec, +} + +pub struct RewriteFilesOperation; + +impl RewriteFilesAction { + pub fn new() -> Self { + Self { + commit_uuid: None, + key_metadata: None, + snapshot_properties: Default::default(), + data_files_to_add: vec![], + data_files_to_delete: vec![], + } + } + + /// Add data files to the snapshot. + pub fn add_data_files( + mut self, + data_files: impl IntoIterator, + ) -> Result { + self.data_files_to_add.extend(data_files); + Ok(self) + } + + /// Add data files to delete to the snapshot. + pub fn delete_data_files( + mut self, + data_files: impl IntoIterator, + ) -> Result { + self.data_files_to_delete.extend(data_files); + Ok(self) + } + + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } +} + +#[async_trait] +impl TransactionAction for RewriteFilesAction { + async fn commit(self: Arc, table: &Table) -> Result { + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.data_files_to_add.clone(), + self.data_files_to_delete.clone(), + ); + + // todo should be able to configure merge manifest process + snapshot_producer + .commit(RewriteFilesOperation, DefaultManifestProcess) + .await + } +} + +impl SnapshotProduceOperation for RewriteFilesOperation { + fn operation(&self) -> Operation { + Operation::Replace + } + + async fn delete_entries( + &self, + snapshot_producer: &SnapshotProducer<'_>, + ) -> Result> { + // Find entries that are associated with files to delete + let snapshot = snapshot_producer.table.metadata().current_snapshot(); + + if let Some(snapshot) = snapshot { + let gen_manifest_entry = |old_entry: &Arc| { + // todo should not unwrap + let builder = ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(old_entry.snapshot_id().unwrap()) + .sequence_number(old_entry.sequence_number().unwrap()) + .file_sequence_number(old_entry.file_sequence_number().unwrap()) + .data_file(old_entry.data_file().clone()); + + builder.build() + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_producer.table.file_io(), + snapshot_producer.table.metadata(), + ) + .await?; + + let mut deleted_entries = Vec::new(); + + for manifest_file in manifest_list.entries() { + let manifest = manifest_file + .load_manifest(snapshot_producer.table.file_io()) + .await?; + + for entry in manifest.entries() { + if snapshot_producer + .data_files_to_delete + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + deleted_entries.push(gen_manifest_entry(entry)); + } + } + } + + Ok(deleted_entries) + } else { + Ok(vec![]) + } + } + + async fn existing_manifest( + &self, + snapshot_producer: &mut SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_producer.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_producer.table.file_io(), + snapshot_producer.table.metadata(), + ) + .await?; + + let mut existing_files = Vec::new(); + + for manifest_file in manifest_list.entries() { + let manifest = manifest_file + .load_manifest(snapshot_producer.table.file_io()) + .await?; + + // Find files to delete from the current manifest entries + let found_files_to_delete: HashSet<_> = manifest + .entries() + .iter() + .filter_map(|entry| { + if snapshot_producer + .data_files_to_delete + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + Some(entry.data_file().file_path().to_string()) + } else { + None + } + }) + .collect(); + + if found_files_to_delete.is_empty() + && (manifest_file.has_added_files() || manifest_file.has_existing_files()) + { + // All files from the existing manifest entries are still valid + existing_files.push(manifest_file.clone()); + } else { + // Some files are about to be deleted + // Rewrite the manifest file and exclude the data files to delete + let mut manifest_writer = snapshot_producer.new_manifest_writer( + ManifestContentType::Data, + manifest_file.partition_spec_id, + )?; + + manifest + .entries() + .iter() + .filter(|entry| { + entry.status() != ManifestStatus::Deleted + && !found_files_to_delete.contains(entry.data_file().file_path()) + }) + .try_for_each(|entry| manifest_writer.add_entry((**entry).clone()))?; + + existing_files.push(manifest_writer.write_manifest_file().await?); + } + } + + Ok(existing_files) + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 4f85962ff1..3316e41b23 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -17,14 +17,15 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; -use std::ops::RangeFrom; +use std::ops::{Deref, RangeFrom}; use uuid::Uuid; use crate::error::Result; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, - ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, + DataContentType, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, + ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULTSnapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, TableProperties, update_snapshot_summaries, }; @@ -36,14 +37,13 @@ const META_ROOT_PATH: &str = "metadata"; pub(crate) trait SnapshotProduceOperation: Send + Sync { fn operation(&self) -> Operation; - #[allow(unused)] fn delete_entries( &self, snapshot_produce: &SnapshotProducer, ) -> impl Future>> + Send; fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> impl Future>> + Send; } @@ -73,7 +73,8 @@ pub(crate) struct SnapshotProducer<'a> { commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, - added_data_files: Vec, + data_files_to_add: Vec, + pub data_files_to_delete: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -86,7 +87,8 @@ impl<'a> SnapshotProducer<'a> { commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, - added_data_files: Vec, + data_files_to_add: Vec, + data_files_to_delete: Vec, ) -> Self { Self { table, @@ -94,7 +96,8 @@ impl<'a> SnapshotProducer<'a> { commit_uuid, key_metadata, snapshot_properties, - added_data_files, + data_files_to_add, + data_files_to_delete, manifest_counter: (0..), } } @@ -185,7 +188,11 @@ impl<'a> SnapshotProducer<'a> { snapshot_id } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + pub fn new_manifest_writer( + &mut self, + content: ManifestContentType, + spec_id: i32, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -202,8 +209,12 @@ impl<'a> SnapshotProducer<'a> { self.table.metadata().current_schema().clone(), self.table .metadata() - .default_partition_spec() - .as_ref() + .partition_spec_by_id(spec_id) + .ok_or(Error::new( + ErrorKind::DataInvalid, + format!("Partition spec with id: {spec_id} is not found!"), + ))? + .deref() .clone(), ); match self.table.metadata().format_version() { @@ -252,8 +263,8 @@ impl<'a> SnapshotProducer<'a> { // Write manifest file for added data files and return the ManifestFile for ManifestList. async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); - if added_data_files.is_empty() { + let data_files_to_add = std::mem::take(&mut self.data_files_to_add); + if data_files_to_add.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, "No added data files found when write an added manifest file", @@ -262,7 +273,7 @@ impl<'a> SnapshotProducer<'a> { let snapshot_id = self.snapshot_id; let format_version = self.table.metadata().format_version(); - let manifest_entries = added_data_files.into_iter().map(|data_file| { + let manifest_entries = data_files_to_add.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) .data_file(data_file); @@ -274,13 +285,69 @@ impl<'a> SnapshotProducer<'a> { builder.build() } }); - let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; + let mut writer = self.new_manifest_writer( + ManifestContentType::Data, + self.table.metadata().default_partition_spec_id(), + )?; for entry in manifest_entries { writer.add_entry(entry)?; } writer.write_manifest_file().await } + async fn write_deleted_manifest( + &mut self, + deleted_entries: Vec, + ) -> Result> { + if deleted_entries.is_empty() { + Ok(Vec::new()) + } else { + // Initialize partition groups + let mut partition_groups = HashMap::new(); + for entry in deleted_entries { + partition_groups + .entry(entry.data_file().partition_spec_id) + .or_insert_with(Vec::new) + .push(entry); + } + + // Write manifest files for each spec-entries pair + let mut deleted_manifests = Vec::new(); + for (spec_id, entries) in partition_groups { + let mut data_manifest_writer: Option = None; + let mut delete_manifest_writer: Option = None; + for entry in entries { + match entry.data_file().content_type() { + DataContentType::Data => data_manifest_writer + .get_or_insert( + self.new_manifest_writer(ManifestContentType::Data, spec_id)?, + ) + .add_entry(entry)?, + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + delete_manifest_writer + .get_or_insert( + self.new_manifest_writer( + ManifestContentType::Deletes, + spec_id, + )?, + ) + .add_delete_entry(entry)? + } + } + } + + if let Some(writer) = data_manifest_writer { + deleted_manifests.push(writer.write_manifest_file().await?); + }; + if let Some(writer) = delete_manifest_writer { + deleted_manifests.push(writer.write_manifest_file().await?); + }; + } + + Ok(deleted_manifests) + } + } + async fn manifest_file( &mut self, snapshot_produce_operation: &OP, @@ -291,7 +358,7 @@ impl<'a> SnapshotProducer<'a> { // TODO: Allowing snapshot property setup with no added data files is a workaround. // We should clean it up after all necessary actions are supported. // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 - if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() { + if self.data_files_to_add.is_empty() && self.snapshot_properties.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, "No added data files or added snapshot properties found when write a manifest file", @@ -302,13 +369,15 @@ impl<'a> SnapshotProducer<'a> { let mut manifest_files = existing_manifests; // Process added entries. - if !self.added_data_files.is_empty() { + if !self.data_files_to_add.is_empty() { let added_manifest = self.write_added_manifest().await?; manifest_files.push(added_manifest); } - // # TODO - // Support process delete entries. + let delete_manifests = self + .write_deleted_manifest(snapshot_produce_operation.delete_entries(self).await?) + .await?; + manifest_files.extend(delete_manifests); let manifest_files = manifest_process.process_manifests(self, manifest_files); Ok(manifest_files) @@ -337,7 +406,7 @@ impl<'a> SnapshotProducer<'a> { summary_collector.set_partition_summary_limit(partition_summary_limit); - for data_file in &self.added_data_files { + for data_file in &self.data_files_to_add { summary_collector.add_file( data_file, table_metadata.current_schema().clone(), From 01f4b82427e813e301c907562d7f41741b8074b1 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 14 Aug 2025 23:37:01 -0700 Subject: [PATCH 02/20] minor --- crates/iceberg/src/transaction/snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 3316e41b23..69bdce187b 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -104,7 +104,7 @@ impl<'a> SnapshotProducer<'a> { pub(crate) fn validate_added_data_files(&self, added_data_files: &[DataFile]) -> Result<()> { for data_file in added_data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { + if data_file.content_type() != DataContentType::Data { return Err(Error::new( ErrorKind::DataInvalid, "Only data content type is allowed for fast append", From 98b1c274d10052838ba1e57f4e7af5be04e15537 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 14 Aug 2025 23:55:55 -0700 Subject: [PATCH 03/20] some improvements --- .../iceberg/src/transaction/rewrite_files.rs | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index 1ba56d3cc9..a1147f8f6b 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -23,10 +23,8 @@ use uuid::Uuid; use super::snapshot::{DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer}; use super::{ActionCommit, TransactionAction}; -use crate::error::Result; -use crate::spec::{ - DataFile, ManifestContentType, ManifestEntry, ManifestFile, ManifestStatus, Operation, -}; +use crate::error::{Error, ErrorKind, Result}; +use crate::spec::{DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile, ManifestStatus, Operation}; use crate::table::Table; /// Transaction action for rewriting files. @@ -108,6 +106,32 @@ impl TransactionAction for RewriteFilesAction { } } +fn set_deleted_status(entry: &ManifestEntryRef) -> Result { + let builder = ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(entry.snapshot_id().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Missing snapshot_id for entry with file path: {}", entry.file_path()), + ) + })?) + .sequence_number(entry.sequence_number().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Missing sequence_number for entry with file path: {}", entry.file_path()), + ) + })?) + .file_sequence_number(entry.file_sequence_number().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Missing file_sequence_number for entry with file path: {}", entry.file_path()), + ) + })?) + .data_file(entry.data_file().clone()); + + Ok(builder.build()) +} + impl SnapshotProduceOperation for RewriteFilesOperation { fn operation(&self) -> Operation { Operation::Replace @@ -121,18 +145,6 @@ impl SnapshotProduceOperation for RewriteFilesOperation { let snapshot = snapshot_producer.table.metadata().current_snapshot(); if let Some(snapshot) = snapshot { - let gen_manifest_entry = |old_entry: &Arc| { - // todo should not unwrap - let builder = ManifestEntry::builder() - .status(ManifestStatus::Deleted) - .snapshot_id(old_entry.snapshot_id().unwrap()) - .sequence_number(old_entry.sequence_number().unwrap()) - .file_sequence_number(old_entry.file_sequence_number().unwrap()) - .data_file(old_entry.data_file().clone()); - - builder.build() - }; - let manifest_list = snapshot .load_manifest_list( snapshot_producer.table.file_io(), @@ -153,7 +165,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation { .iter() .any(|f| f.file_path == entry.data_file().file_path) { - deleted_entries.push(gen_manifest_entry(entry)); + deleted_entries.push(set_deleted_status(entry)?); } } } From 35c3c3178fae45a3db9c95103d2c262e7387e8ba Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 15 Aug 2025 10:43:54 -0700 Subject: [PATCH 04/20] ready to start testing --- crates/iceberg/src/transaction/rewrite_files.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index a1147f8f6b..f95f6055c9 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -33,7 +33,6 @@ pub struct RewriteFilesAction { key_metadata: Option>, snapshot_properties: HashMap, data_files_to_add: Vec, - // Data files and delete files to delete data_files_to_delete: Vec, } @@ -106,7 +105,7 @@ impl TransactionAction for RewriteFilesAction { } } -fn set_deleted_status(entry: &ManifestEntryRef) -> Result { +fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { let builder = ManifestEntry::builder() .status(ManifestStatus::Deleted) .snapshot_id(entry.snapshot_id().ok_or_else(|| { @@ -165,7 +164,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation { .iter() .any(|f| f.file_path == entry.data_file().file_path) { - deleted_entries.push(set_deleted_status(entry)?); + deleted_entries.push(copy_with_deleted_status(entry)?); } } } From e4269fe7a07fcf717ad0187149565bcf045947d2 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 15 Aug 2025 10:58:09 -0700 Subject: [PATCH 05/20] old var name is better --- .../iceberg/src/transaction/rewrite_files.rs | 36 +++++++++---------- crates/iceberg/src/transaction/snapshot.rs | 24 ++++++------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index f95f6055c9..26516819a4 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -32,8 +32,8 @@ pub struct RewriteFilesAction { commit_uuid: Option, key_metadata: Option>, snapshot_properties: HashMap, - data_files_to_add: Vec, - data_files_to_delete: Vec, + added_data_files: Vec, + deleted_data_files: Vec, } pub struct RewriteFilesOperation; @@ -44,26 +44,26 @@ impl RewriteFilesAction { commit_uuid: None, key_metadata: None, snapshot_properties: Default::default(), - data_files_to_add: vec![], - data_files_to_delete: vec![], + added_data_files: vec![], + deleted_data_files: vec![], } } - /// Add data files to the snapshot. + /// Add added data files to the snapshot. pub fn add_data_files( mut self, data_files: impl IntoIterator, ) -> Result { - self.data_files_to_add.extend(data_files); + self.added_data_files.extend(data_files); Ok(self) } - /// Add data files to delete to the snapshot. + /// Add deleted data files to the snapshot. pub fn delete_data_files( mut self, data_files: impl IntoIterator, ) -> Result { - self.data_files_to_delete.extend(data_files); + self.deleted_data_files.extend(data_files); Ok(self) } @@ -94,8 +94,8 @@ impl TransactionAction for RewriteFilesAction { self.commit_uuid.unwrap_or_else(Uuid::now_v7), self.key_metadata.clone(), self.snapshot_properties.clone(), - self.data_files_to_add.clone(), - self.data_files_to_delete.clone(), + self.added_data_files.clone(), + self.deleted_data_files.clone(), ); // todo should be able to configure merge manifest process @@ -140,7 +140,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation { &self, snapshot_producer: &SnapshotProducer<'_>, ) -> Result> { - // Find entries that are associated with files to delete + // Find entries that are associated with deleted files let snapshot = snapshot_producer.table.metadata().current_snapshot(); if let Some(snapshot) = snapshot { @@ -151,7 +151,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation { ) .await?; - let mut deleted_entries = Vec::new(); + let mut delete_entries = Vec::new(); for manifest_file in manifest_list.entries() { let manifest = manifest_file @@ -160,16 +160,16 @@ impl SnapshotProduceOperation for RewriteFilesOperation { for entry in manifest.entries() { if snapshot_producer - .data_files_to_delete + .deleted_data_files .iter() .any(|f| f.file_path == entry.data_file().file_path) { - deleted_entries.push(copy_with_deleted_status(entry)?); + delete_entries.push(copy_with_deleted_status(entry)?); } } } - Ok(deleted_entries) + Ok(delete_entries) } else { Ok(vec![]) } @@ -203,7 +203,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation { .iter() .filter_map(|entry| { if snapshot_producer - .data_files_to_delete + .deleted_data_files .iter() .any(|f| f.file_path == entry.data_file().file_path) { @@ -220,8 +220,8 @@ impl SnapshotProduceOperation for RewriteFilesOperation { // All files from the existing manifest entries are still valid existing_files.push(manifest_file.clone()); } else { - // Some files are about to be deleted - // Rewrite the manifest file and exclude the data files to delete + // Some files are deleted already + // Rewrite the manifest file and exclude the deleted data files let mut manifest_writer = snapshot_producer.new_manifest_writer( ManifestContentType::Data, manifest_file.partition_spec_id, diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 69bdce187b..f22012ee38 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -73,8 +73,8 @@ pub(crate) struct SnapshotProducer<'a> { commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, - data_files_to_add: Vec, - pub data_files_to_delete: Vec, + added_data_files: Vec, + pub deleted_data_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -87,8 +87,8 @@ impl<'a> SnapshotProducer<'a> { commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, - data_files_to_add: Vec, - data_files_to_delete: Vec, + added_data_files: Vec, + deleted_data_files: Vec, ) -> Self { Self { table, @@ -96,8 +96,8 @@ impl<'a> SnapshotProducer<'a> { commit_uuid, key_metadata, snapshot_properties, - data_files_to_add, - data_files_to_delete, + added_data_files, + deleted_data_files, manifest_counter: (0..), } } @@ -263,8 +263,8 @@ impl<'a> SnapshotProducer<'a> { // Write manifest file for added data files and return the ManifestFile for ManifestList. async fn write_added_manifest(&mut self) -> Result { - let data_files_to_add = std::mem::take(&mut self.data_files_to_add); - if data_files_to_add.is_empty() { + let added_data_files = std::mem::take(&mut self.added_data_files); + if added_data_files.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, "No added data files found when write an added manifest file", @@ -273,7 +273,7 @@ impl<'a> SnapshotProducer<'a> { let snapshot_id = self.snapshot_id; let format_version = self.table.metadata().format_version(); - let manifest_entries = data_files_to_add.into_iter().map(|data_file| { + let manifest_entries = added_data_files.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) .data_file(data_file); @@ -358,7 +358,7 @@ impl<'a> SnapshotProducer<'a> { // TODO: Allowing snapshot property setup with no added data files is a workaround. // We should clean it up after all necessary actions are supported. // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 - if self.data_files_to_add.is_empty() && self.snapshot_properties.is_empty() { + if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, "No added data files or added snapshot properties found when write a manifest file", @@ -369,7 +369,7 @@ impl<'a> SnapshotProducer<'a> { let mut manifest_files = existing_manifests; // Process added entries. - if !self.data_files_to_add.is_empty() { + if !self.added_data_files.is_empty() { let added_manifest = self.write_added_manifest().await?; manifest_files.push(added_manifest); } @@ -406,7 +406,7 @@ impl<'a> SnapshotProducer<'a> { summary_collector.set_partition_summary_limit(partition_summary_limit); - for data_file in &self.data_files_to_add { + for data_file in &self.added_data_files { summary_collector.add_file( data_file, table_metadata.current_schema().clone(), From f0bd611b548d358257929265af1863ec5b47215e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 15 Aug 2025 14:35:36 -0700 Subject: [PATCH 06/20] use separate vec to store data and delete files in snapshot producer --- crates/iceberg/src/transaction/append.rs | 2 + .../iceberg/src/transaction/rewrite_files.rs | 102 ++++++++++++++---- crates/iceberg/src/transaction/snapshot.rs | 6 ++ 3 files changed, 90 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index cbaf714d31..04ff960c1c 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -91,6 +91,8 @@ impl TransactionAction for FastAppendAction { self.snapshot_properties.clone(), self.added_data_files.clone(), vec![], + vec![], + vec![], ); // validate added files diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index 26516819a4..6d5173ef0b 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -24,7 +24,10 @@ use uuid::Uuid; use super::snapshot::{DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer}; use super::{ActionCommit, TransactionAction}; use crate::error::{Error, ErrorKind, Result}; -use crate::spec::{DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile, ManifestStatus, Operation}; +use crate::spec::{ + DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile, + ManifestStatus, Operation, +}; use crate::table::Table; /// Transaction action for rewriting files. @@ -33,7 +36,9 @@ pub struct RewriteFilesAction { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, deleted_data_files: Vec, + deleted_delete_files: Vec, } pub struct RewriteFilesOperation; @@ -45,7 +50,9 @@ impl RewriteFilesAction { key_metadata: None, snapshot_properties: Default::default(), added_data_files: vec![], + added_delete_files: vec![], deleted_data_files: vec![], + deleted_delete_files: vec![], } } @@ -54,7 +61,14 @@ impl RewriteFilesAction { mut self, data_files: impl IntoIterator, ) -> Result { - self.added_data_files.extend(data_files); + for data_file in data_files { + match data_file.content { + DataContentType::Data => self.added_data_files.push(data_file), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.added_delete_files.push(data_file) + } + } + } Ok(self) } @@ -63,7 +77,15 @@ impl RewriteFilesAction { mut self, data_files: impl IntoIterator, ) -> Result { - self.deleted_data_files.extend(data_files); + for data_file in data_files { + match data_file.content { + DataContentType::Data => self.deleted_data_files.push(data_file), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.deleted_delete_files.push(data_file) + } + } + } + Ok(self) } @@ -95,9 +117,15 @@ impl TransactionAction for RewriteFilesAction { self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), + self.added_delete_files.clone(), self.deleted_data_files.clone(), + self.deleted_delete_files.clone(), ); + // todo need to figure out validation + // 1. validate replace and added files + // 2. validate no new deletes using the starting snapshot id + // todo should be able to configure merge manifest process snapshot_producer .commit(RewriteFilesOperation, DefaultManifestProcess) @@ -111,19 +139,28 @@ fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { .snapshot_id(entry.snapshot_id().ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - format!("Missing snapshot_id for entry with file path: {}", entry.file_path()), + format!( + "Missing snapshot_id for entry with file path: {}", + entry.file_path() + ), ) })?) .sequence_number(entry.sequence_number().ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - format!("Missing sequence_number for entry with file path: {}", entry.file_path()), + format!( + "Missing sequence_number for entry with file path: {}", + entry.file_path() + ), ) })?) .file_sequence_number(entry.file_sequence_number().ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - format!("Missing file_sequence_number for entry with file path: {}", entry.file_path()), + format!( + "Missing file_sequence_number for entry with file path: {}", + entry.file_path() + ), ) })?) .data_file(entry.data_file().clone()); @@ -159,12 +196,25 @@ impl SnapshotProduceOperation for RewriteFilesOperation { .await?; for entry in manifest.entries() { - if snapshot_producer - .deleted_data_files - .iter() - .any(|f| f.file_path == entry.data_file().file_path) - { - delete_entries.push(copy_with_deleted_status(entry)?); + match entry.content_type() { + DataContentType::Data => { + if snapshot_producer + .deleted_data_files + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + delete_entries.push(copy_with_deleted_status(entry)?) + } + } + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + if snapshot_producer + .deleted_delete_files + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + delete_entries.push(copy_with_deleted_status(entry)?) + } + } } } } @@ -202,15 +252,27 @@ impl SnapshotProduceOperation for RewriteFilesOperation { .entries() .iter() .filter_map(|entry| { - if snapshot_producer - .deleted_data_files - .iter() - .any(|f| f.file_path == entry.data_file().file_path) - { - Some(entry.data_file().file_path().to_string()) - } else { - None + match entry.content_type() { + DataContentType::Data => { + if snapshot_producer + .deleted_data_files + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + return Some(entry.data_file().file_path().to_string()); + } + } + DataContentType::EqualityDeletes | DataContentType::PositionDeletes => { + if snapshot_producer + .deleted_delete_files + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + return Some(entry.data_file().file_path().to_string()); + } + } } + None }) .collect(); diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index f22012ee38..a34f5b224b 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -74,7 +74,9 @@ pub(crate) struct SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, pub deleted_data_files: Vec, + pub deleted_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -88,7 +90,9 @@ impl<'a> SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, deleted_data_files: Vec, + deleted_delete_files: Vec, ) -> Self { Self { table, @@ -97,7 +101,9 @@ impl<'a> SnapshotProducer<'a> { key_metadata, snapshot_properties, added_data_files, + added_delete_files, deleted_data_files, + deleted_delete_files, manifest_counter: (0..), } } From d5ac764fd775771480e1352a2559734e3dfb6407 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 15 Aug 2025 15:25:42 -0700 Subject: [PATCH 07/20] add validator, write added delete files to manifest --- crates/iceberg/src/transaction/append.rs | 3 ++ crates/iceberg/src/transaction/mod.rs | 1 + .../iceberg/src/transaction/rewrite_files.rs | 51 ++++++++++++++++--- crates/iceberg/src/transaction/snapshot.rs | 23 +++++++-- crates/iceberg/src/transaction/validate.rs | 42 +++++++++++++++ 5 files changed, 109 insertions(+), 11 deletions(-) create mode 100644 crates/iceberg/src/transaction/validate.rs diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 04ff960c1c..3a5faafe4a 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -27,6 +27,7 @@ use crate::table::Table; use crate::transaction::snapshot::{ DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, }; +use crate::transaction::validate::SnapshotValidator; use crate::transaction::{ActionCommit, TransactionAction}; /// FastAppendAction is a transaction action for fast append data files to the table. @@ -113,6 +114,8 @@ impl TransactionAction for FastAppendAction { struct FastAppendOperation; +impl SnapshotValidator for FastAppendOperation {} + impl SnapshotProduceOperation for FastAppendOperation { fn operation(&self) -> Operation { Operation::Append diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 4adc46dbbf..f51e579a88 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -61,6 +61,7 @@ mod update_location; mod update_properties; mod update_statistics; mod upgrade_format_version; +mod validate; use std::sync::Arc; use std::time::Duration; diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index 6d5173ef0b..9fc23fca04 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -26,9 +26,10 @@ use super::{ActionCommit, TransactionAction}; use crate::error::{Error, ErrorKind, Result}; use crate::spec::{ DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile, - ManifestStatus, Operation, + ManifestStatus, Operation, SnapshotRef, }; use crate::table::Table; +use crate::transaction::validate::SnapshotValidator; /// Transaction action for rewriting files. pub struct RewriteFilesAction { @@ -41,7 +42,12 @@ pub struct RewriteFilesAction { deleted_delete_files: Vec, } -pub struct RewriteFilesOperation; +pub struct RewriteFilesOperation { + added_data_files: Vec, + added_delete_files: Vec, + deleted_data_files: Vec, + deleted_delete_files: Vec, +} impl RewriteFilesAction { pub fn new() -> Self { @@ -122,13 +128,16 @@ impl TransactionAction for RewriteFilesAction { self.deleted_delete_files.clone(), ); - // todo need to figure out validation - // 1. validate replace and added files - // 2. validate no new deletes using the starting snapshot id + let rewrite_operation = RewriteFilesOperation { + added_data_files: self.added_data_files.clone(), + added_delete_files: self.added_delete_files.clone(), + deleted_data_files: self.deleted_data_files.clone(), + deleted_delete_files: self.deleted_delete_files.clone(), + }; // todo should be able to configure merge manifest process snapshot_producer - .commit(RewriteFilesOperation, DefaultManifestProcess) + .commit(rewrite_operation, DefaultManifestProcess) .await } } @@ -168,6 +177,36 @@ fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { Ok(builder.build()) } +impl SnapshotValidator for RewriteFilesOperation { + fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> { + // Validate replaced and added files + if self.deleted_data_files.is_empty() && self.deleted_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Files to delete cannot be empty", + )); + } + if self.deleted_data_files.is_empty() && !self.added_data_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Data files to add must be empty because there's no data file to be rewritten", + )); + } + if self.deleted_delete_files.is_empty() && !self.added_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Delete files to add must be empty because there's no delete file to be rewritten", + )); + } + + // todo add use_starting_seq_number to help the validation + // todo validate no new deletes since the current base + // if there are replaced data files, there cannot be any new row-level deletes for those data files + + Ok(()) + } +} + impl SnapshotProduceOperation for RewriteFilesOperation { fn operation(&self) -> Operation { Operation::Replace diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index a34f5b224b..0ce4568f9d 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -31,11 +31,12 @@ use crate::spec::{ }; use crate::table::Table; use crate::transaction::ActionCommit; +use crate::transaction::validate::SnapshotValidator; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; -pub(crate) trait SnapshotProduceOperation: Send + Sync { +pub(crate) trait SnapshotProduceOperation: Send + Sync + SnapshotValidator { fn operation(&self) -> Operation; fn delete_entries( &self, @@ -268,8 +269,12 @@ impl<'a> SnapshotProducer<'a> { } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); + async fn write_added_manifest(&mut self, content_type: ManifestContentType) -> Result { + let added_data_files = match content_type { + ManifestContentType::Data => std::mem::take(&mut self.added_data_files), + ManifestContentType::Deletes => std::mem::take(&mut self.added_delete_files), + }; + if added_data_files.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, @@ -292,7 +297,7 @@ impl<'a> SnapshotProducer<'a> { } }); let mut writer = self.new_manifest_writer( - ManifestContentType::Data, + content_type, self.table.metadata().default_partition_spec_id(), )?; for entry in manifest_entries { @@ -376,7 +381,11 @@ impl<'a> SnapshotProducer<'a> { // Process added entries. if !self.added_data_files.is_empty() { - let added_manifest = self.write_added_manifest().await?; + let added_manifest = self.write_added_manifest(ManifestContentType::Data).await?; + manifest_files.push(added_manifest); + } + if !self.added_delete_files.is_empty() { + let added_manifest = self.write_added_manifest(ManifestContentType::Deletes).await?; manifest_files.push(added_manifest); } @@ -458,6 +467,10 @@ impl<'a> SnapshotProducer<'a> { snapshot_produce_operation: OP, process: MP, ) -> Result { + // Validate to avoid conflicts + snapshot_produce_operation + .validate(self.table, self.table.metadata().current_snapshot())?; + let manifest_list_path = self.generate_manifest_list_file_path(0); let next_seq_num = self.table.metadata().next_sequence_number(); let first_row_id = self.table.metadata().next_row_id(); diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs new file mode 100644 index 0000000000..4766fd833a --- /dev/null +++ b/crates/iceberg/src/transaction/validate.rs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; + +use crate::error::Result; +use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef}; +use crate::table::Table; + +pub(crate) trait SnapshotValidator { + fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> { + // todo: add default implementation + Ok(()) + } + + #[allow(dead_code)] + async fn validation_history( + &self, + _base: &Table, + _from_snapshot: Option<&SnapshotRef>, + _to_snapshot: &SnapshotRef, + _matching_operations: HashSet, + _manifest_content_type: ManifestContentType, + ) -> Result<(Vec, HashSet)> { + // todo: add default implementation + Ok((vec![], HashSet::new())) + } +} From ea3fa457694a58d89997ef2d2a5479f533453fe6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 15 Aug 2025 15:33:59 -0700 Subject: [PATCH 08/20] confused --- crates/iceberg/src/spec/manifest/entry.rs | 6 ------ crates/iceberg/src/transaction/rewrite_files.rs | 12 +++--------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/spec/manifest/entry.rs b/crates/iceberg/src/spec/manifest/entry.rs index ff6d9ae083..e8fe0f223a 100644 --- a/crates/iceberg/src/spec/manifest/entry.rs +++ b/crates/iceberg/src/spec/manifest/entry.rs @@ -130,12 +130,6 @@ impl ManifestEntry { self.snapshot_id } - /// File sequence number - #[inline] - pub fn file_sequence_number(&self) -> Option { - self.file_sequence_number - } - /// Data sequence number. #[inline] pub fn sequence_number(&self) -> Option { diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index 9fc23fca04..e0dfd7057f 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -143,6 +143,7 @@ impl TransactionAction for RewriteFilesAction { } fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { + // todo should we fail on missing properties? let builder = ManifestEntry::builder() .status(ManifestStatus::Deleted) .snapshot_id(entry.snapshot_id().ok_or_else(|| { @@ -163,15 +164,8 @@ fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { ), ) })?) - .file_sequence_number(entry.file_sequence_number().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Missing file_sequence_number for entry with file path: {}", - entry.file_path() - ), - ) - })?) + // todo copy file seq no as well + .data_file(entry.data_file().clone()); Ok(builder.build()) From 9362326c4f28b3e2f115887a8f6e7d03e0e14229 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 15 Aug 2025 15:36:34 -0700 Subject: [PATCH 09/20] daily fmt fixing --- crates/iceberg/src/transaction/rewrite_files.rs | 1 - crates/iceberg/src/transaction/snapshot.rs | 11 ++++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index e0dfd7057f..5aa13fa12b 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -165,7 +165,6 @@ fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { ) })?) // todo copy file seq no as well - .data_file(entry.data_file().clone()); Ok(builder.build()) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 0ce4568f9d..8a40f6eac3 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -269,8 +269,11 @@ impl<'a> SnapshotProducer<'a> { } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self, content_type: ManifestContentType) -> Result { - let added_data_files = match content_type { + async fn write_added_manifest( + &mut self, + content_type: ManifestContentType, + ) -> Result { + let added_data_files = match content_type { ManifestContentType::Data => std::mem::take(&mut self.added_data_files), ManifestContentType::Deletes => std::mem::take(&mut self.added_delete_files), }; @@ -385,7 +388,9 @@ impl<'a> SnapshotProducer<'a> { manifest_files.push(added_manifest); } if !self.added_delete_files.is_empty() { - let added_manifest = self.write_added_manifest(ManifestContentType::Deletes).await?; + let added_manifest = self + .write_added_manifest(ManifestContentType::Deletes) + .await?; manifest_files.push(added_manifest); } From cd844ab39f2155c6e7cd605f77f7f388d455373d Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 15 Aug 2025 16:01:12 -0700 Subject: [PATCH 10/20] i fix clippy every other day --- crates/iceberg/src/transaction/rewrite_files.rs | 6 ++++++ crates/iceberg/src/transaction/snapshot.rs | 1 + 2 files changed, 7 insertions(+) diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index 5aa13fa12b..1f0b47384a 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -114,6 +114,12 @@ impl RewriteFilesAction { } } +impl Default for RewriteFilesAction { + fn default() -> Self { + Self::new() + } +} + #[async_trait] impl TransactionAction for RewriteFilesAction { async fn commit(self: Arc, table: &Table) -> Result { diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8a40f6eac3..ec52318f12 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -85,6 +85,7 @@ pub(crate) struct SnapshotProducer<'a> { } impl<'a> SnapshotProducer<'a> { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( table: &'a Table, commit_uuid: Uuid, From 67c36e609f422408f9e22104c8680ca6409da669 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 19 Aug 2025 23:23:22 -0700 Subject: [PATCH 11/20] borrow snapshot utils from #1470 --- crates/iceberg/src/lib.rs | 2 + crates/iceberg/src/util/mod.rs | 19 ++++++++ crates/iceberg/src/util/snapshot.rs | 72 +++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+) create mode 100644 crates/iceberg/src/util/mod.rs create mode 100644 crates/iceberg/src/util/snapshot.rs diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index aae8efed74..c65e37b068 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -97,3 +97,5 @@ pub mod writer; mod delete_vector; pub mod puffin; +/// Utility functions and modules. +pub mod util; diff --git a/crates/iceberg/src/util/mod.rs b/crates/iceberg/src/util/mod.rs new file mode 100644 index 0000000000..bb5bdc6085 --- /dev/null +++ b/crates/iceberg/src/util/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Utilities for working with snapshots. +pub mod snapshot; \ No newline at end of file diff --git a/crates/iceberg/src/util/snapshot.rs b/crates/iceberg/src/util/snapshot.rs new file mode 100644 index 0000000000..7de1deedde --- /dev/null +++ b/crates/iceberg/src/util/snapshot.rs @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::{SnapshotRef, TableMetadataRef}; + +struct Ancestors { + next: Option, + get_snapshot: Box Option + Send>, +} + +impl Iterator for Ancestors { + type Item = SnapshotRef; + + fn next(&mut self) -> Option { + let snapshot = self.next.take()?; + let result = snapshot.clone(); + self.next = snapshot + .parent_snapshot_id() + .and_then(|id| (self.get_snapshot)(id)); + Some(result) + } +} + +/// Iterate starting from `snapshot` (inclusive) to the root snapshot. +pub fn ancestors_of( + table_metadata: &TableMetadataRef, + snapshot: i64, +) -> Box + Send> { + if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) { + let table_metadata = table_metadata.clone(); + Box::new(Ancestors { + next: Some(snapshot.clone()), + get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()), + }) + } else { + Box::new(std::iter::empty()) + } +} + +/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive). +pub fn ancestors_between( + table_metadata: &TableMetadataRef, + latest_snapshot_id: i64, + oldest_snapshot_id: Option, +) -> Box + Send> { + let Some(oldest_snapshot_id) = oldest_snapshot_id else { + return Box::new(ancestors_of(table_metadata, latest_snapshot_id)); + }; + + if latest_snapshot_id == oldest_snapshot_id { + return Box::new(std::iter::empty()); + } + + Box::new( + ancestors_of(table_metadata, latest_snapshot_id) + .take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id), + ) +} \ No newline at end of file From e85bcfb54894a67d8837ab6da4e480aa2774e653 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 19 Aug 2025 23:26:08 -0700 Subject: [PATCH 12/20] continue on validation, added validate_no_new_delete_files_for_data_files --- crates/iceberg/src/spec/snapshot.rs | 2 +- crates/iceberg/src/transaction/append.rs | 1 + .../iceberg/src/transaction/rewrite_files.rs | 20 +++ crates/iceberg/src/transaction/snapshot.rs | 4 + crates/iceberg/src/transaction/validate.rs | 127 ++++++++++++++++-- 5 files changed, 144 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 5371cf68f2..131694cb29 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -38,7 +38,7 @@ pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1; /// Reference to [`Snapshot`]. pub type SnapshotRef = Arc; -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)] #[serde(rename_all = "lowercase")] /// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots. pub enum Operation { diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 3a5faafe4a..b39b7dcfac 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -94,6 +94,7 @@ impl TransactionAction for FastAppendAction { vec![], vec![], vec![], + None, ); // validate added files diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index 1f0b47384a..8e39c4951c 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -40,6 +40,8 @@ pub struct RewriteFilesAction { added_delete_files: Vec, deleted_data_files: Vec, deleted_delete_files: Vec, + starting_sequence_number: Option, + starting_snapshot_id: Option, } pub struct RewriteFilesOperation { @@ -47,6 +49,7 @@ pub struct RewriteFilesOperation { added_delete_files: Vec, deleted_data_files: Vec, deleted_delete_files: Vec, + starting_snapshot_id: Option, } impl RewriteFilesAction { @@ -59,6 +62,8 @@ impl RewriteFilesAction { added_delete_files: vec![], deleted_data_files: vec![], deleted_delete_files: vec![], + starting_sequence_number: None, + starting_snapshot_id: None, } } @@ -112,6 +117,19 @@ impl RewriteFilesAction { self.snapshot_properties = snapshot_properties; self } + + /// Set the data sequence number for this rewrite operation. + /// The number will be used for all new data files that are added in this rewrite. + pub fn set_starting_sequence_number(mut self, sequence_number: i64) -> Self { + self.starting_sequence_number = Some(sequence_number); + self + } + + /// Set the snapshot ID used in any reads for this operation. + pub fn set_starting_snapshot_id(mut self, snapshot_id: i64) -> Self { + self.starting_snapshot_id = Some(snapshot_id); + self + } } impl Default for RewriteFilesAction { @@ -132,6 +150,7 @@ impl TransactionAction for RewriteFilesAction { self.added_delete_files.clone(), self.deleted_data_files.clone(), self.deleted_delete_files.clone(), + self.starting_sequence_number.clone(), ); let rewrite_operation = RewriteFilesOperation { @@ -139,6 +158,7 @@ impl TransactionAction for RewriteFilesAction { added_delete_files: self.added_delete_files.clone(), deleted_data_files: self.deleted_data_files.clone(), deleted_delete_files: self.deleted_delete_files.clone(), + starting_snapshot_id: self.starting_snapshot_id.clone(), }; // todo should be able to configure merge manifest process diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index ec52318f12..84b233f369 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -82,9 +82,11 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, + starting_sequence_number: Option, } impl<'a> SnapshotProducer<'a> { + // todo add a builder for this to fix the clippy #[allow(clippy::too_many_arguments)] pub(crate) fn new( table: &'a Table, @@ -95,6 +97,7 @@ impl<'a> SnapshotProducer<'a> { added_delete_files: Vec, deleted_data_files: Vec, deleted_delete_files: Vec, + starting_sequence_number: Option, ) -> Self { Self { table, @@ -107,6 +110,7 @@ impl<'a> SnapshotProducer<'a> { deleted_data_files, deleted_delete_files, manifest_counter: (0..), + starting_sequence_number, } } diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index 4766fd833a..aaec8a3e39 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -16,27 +16,136 @@ // under the License. use std::collections::HashSet; +use std::sync::Arc; +use futures::{Sink, SinkExt}; +use futures::future::try_join_all; +use once_cell::sync::Lazy; + +use crate::delete_file_index::DeleteFileIndex; use crate::error::Result; -use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef}; +use crate::scan::DeleteFileContext; +use crate::spec::{ + DataFile, ManifestContentType, ManifestFile, Operation, SnapshotRef, +}; use crate::table::Table; +use crate::util::snapshot::ancestors_between; +use crate::{Error, ErrorKind}; + +static VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Lazy> = + Lazy::new(|| HashSet::from([Operation::Overwrite, Operation::Delete])); pub(crate) trait SnapshotValidator { + // todo doc + // table: base table + // snapshot: parent snapshot + // usually snapshot is the latest snapshot of base table, unless it's non-main branch + // but we don't support writing to branches as of now fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> { // todo: add default implementation Ok(()) } - #[allow(dead_code)] + // todo doc async fn validation_history( &self, - _base: &Table, - _from_snapshot: Option<&SnapshotRef>, - _to_snapshot: &SnapshotRef, - _matching_operations: HashSet, - _manifest_content_type: ManifestContentType, + base: &Table, + to_snapshot: SnapshotRef, // todo maybe the naming/variable order can be better, or just snapshot id is better? this is parent + from_snapshot_id: Option, + matching_operations: &HashSet, + manifest_content_type: ManifestContentType, ) -> Result<(Vec, HashSet)> { - // todo: add default implementation - Ok((vec![], HashSet::new())) + let mut manifests: Vec = vec![]; + let mut new_snapshots = HashSet::new(); + let mut last_snapshot: Option = None; + + let snapshots = ancestors_between( + &Arc::new(base.metadata().clone()), + to_snapshot.snapshot_id(), + from_snapshot_id.clone(), + ); + + for current_snapshot in snapshots { + last_snapshot = Some(current_snapshot.clone()); + + // Find all snapshots with the matching operations + // and their manifest files with the matching content type + if matching_operations.contains(¤t_snapshot.summary().operation) { + new_snapshots.insert(current_snapshot.snapshot_id()); + current_snapshot + .load_manifest_list(base.file_io(), base.metadata()) + .await? + .entries() + .iter() + .for_each(|manifest| { + if manifest.content == manifest_content_type + && manifest.added_snapshot_id == current_snapshot.snapshot_id() + { + manifests.push(manifest.clone()); + } + }); + } + } + + if last_snapshot.is_some() + && last_snapshot.clone().unwrap().parent_snapshot_id() != from_snapshot_id + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot determine history between starting snapshot {} and the last known ancestor {}", + from_snapshot_id.unwrap_or(-1), + last_snapshot.unwrap().snapshot_id() + ), + )); + } + + Ok((manifests, new_snapshots)) + } + + #[allow(dead_code)] + async fn validate_no_new_delete_files_for_data_files( + &self, + base: &Table, + from_snapshot_id: Option, + _data_files: &[DataFile], + to_snapshot: SnapshotRef, + ) -> Result<()> { + // Get matching delete files have been added since the from_snapshot_id + let (delete_manifests, snapshot_ids) = self + .validation_history( + base, + to_snapshot, + from_snapshot_id, + &VALIDATE_ADDED_DELETE_FILES_OPERATIONS, + ManifestContentType::Deletes, + ) + .await?; + + // Building delete file index + let (_delete_file_index, mut delete_file_tx) = DeleteFileIndex::new(); + let manifests = try_join_all( + delete_manifests + .iter() + .map(|f| f.load_manifest(base.file_io())) + .collect::>(), + ) + .await?; + + let delete_files_ctx = manifests + .iter() + .flat_map(|manifest| manifest.entries()) + .map(|entry| DeleteFileContext { + manifest_entry: entry.clone(), + partition_spec_id: entry.data_file().partition_spec_id + }).collect::>(); + + for ctx in delete_files_ctx { + delete_file_tx.send(ctx).await? + } + + // todo validate if there are deletes + + Ok(()) } } From 752dc0eae48918d54a5fdb82e62aeb90ad45bb37 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 19 Aug 2025 23:27:22 -0700 Subject: [PATCH 13/20] fix fmt --- crates/iceberg/src/transaction/validate.rs | 25 +++++++++++----------- crates/iceberg/src/util/mod.rs | 2 +- crates/iceberg/src/util/snapshot.rs | 2 +- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index aaec8a3e39..e7aeef0a8a 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -18,16 +18,14 @@ use std::collections::HashSet; use std::sync::Arc; -use futures::{Sink, SinkExt}; use futures::future::try_join_all; +use futures::{Sink, SinkExt}; use once_cell::sync::Lazy; use crate::delete_file_index::DeleteFileIndex; use crate::error::Result; use crate::scan::DeleteFileContext; -use crate::spec::{ - DataFile, ManifestContentType, ManifestFile, Operation, SnapshotRef, -}; +use crate::spec::{DataFile, ManifestContentType, ManifestFile, Operation, SnapshotRef}; use crate::table::Table; use crate::util::snapshot::ancestors_between; use crate::{Error, ErrorKind}; @@ -131,19 +129,20 @@ pub(crate) trait SnapshotValidator { .collect::>(), ) .await?; - + let delete_files_ctx = manifests - .iter() - .flat_map(|manifest| manifest.entries()) - .map(|entry| DeleteFileContext { - manifest_entry: entry.clone(), - partition_spec_id: entry.data_file().partition_spec_id - }).collect::>(); - + .iter() + .flat_map(|manifest| manifest.entries()) + .map(|entry| DeleteFileContext { + manifest_entry: entry.clone(), + partition_spec_id: entry.data_file().partition_spec_id, + }) + .collect::>(); + for ctx in delete_files_ctx { delete_file_tx.send(ctx).await? } - + // todo validate if there are deletes Ok(()) diff --git a/crates/iceberg/src/util/mod.rs b/crates/iceberg/src/util/mod.rs index bb5bdc6085..b614c981ec 100644 --- a/crates/iceberg/src/util/mod.rs +++ b/crates/iceberg/src/util/mod.rs @@ -16,4 +16,4 @@ // under the License. /// Utilities for working with snapshots. -pub mod snapshot; \ No newline at end of file +pub mod snapshot; diff --git a/crates/iceberg/src/util/snapshot.rs b/crates/iceberg/src/util/snapshot.rs index 7de1deedde..62aa6769ec 100644 --- a/crates/iceberg/src/util/snapshot.rs +++ b/crates/iceberg/src/util/snapshot.rs @@ -69,4 +69,4 @@ pub fn ancestors_between( ancestors_of(table_metadata, latest_snapshot_id) .take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id), ) -} \ No newline at end of file +} From d62dbdf7e90a7de25ff1c49329e384f6a47edb04 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 20 Aug 2025 15:12:53 -0700 Subject: [PATCH 14/20] finished validate_no_new_delete_files_for_data_files --- .../iceberg/src/transaction/rewrite_files.rs | 35 ++++--- crates/iceberg/src/transaction/snapshot.rs | 3 +- crates/iceberg/src/transaction/validate.rs | 95 ++++++++++++++----- 3 files changed, 97 insertions(+), 36 deletions(-) diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index 8e39c4951c..70ae187790 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -26,7 +26,7 @@ use super::{ActionCommit, TransactionAction}; use crate::error::{Error, ErrorKind, Result}; use crate::spec::{ DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile, - ManifestStatus, Operation, SnapshotRef, + ManifestStatus, Operation, }; use crate::table::Table; use crate::transaction::validate::SnapshotValidator; @@ -40,7 +40,7 @@ pub struct RewriteFilesAction { added_delete_files: Vec, deleted_data_files: Vec, deleted_delete_files: Vec, - starting_sequence_number: Option, + data_sequence_number: Option, starting_snapshot_id: Option, } @@ -50,6 +50,7 @@ pub struct RewriteFilesOperation { deleted_data_files: Vec, deleted_delete_files: Vec, starting_snapshot_id: Option, + data_sequence_number: Option, } impl RewriteFilesAction { @@ -62,7 +63,7 @@ impl RewriteFilesAction { added_delete_files: vec![], deleted_data_files: vec![], deleted_delete_files: vec![], - starting_sequence_number: None, + data_sequence_number: None, starting_snapshot_id: None, } } @@ -120,8 +121,8 @@ impl RewriteFilesAction { /// Set the data sequence number for this rewrite operation. /// The number will be used for all new data files that are added in this rewrite. - pub fn set_starting_sequence_number(mut self, sequence_number: i64) -> Self { - self.starting_sequence_number = Some(sequence_number); + pub fn set_data_sequence_number(mut self, sequence_number: i64) -> Self { + self.data_sequence_number = Some(sequence_number); self } @@ -150,7 +151,7 @@ impl TransactionAction for RewriteFilesAction { self.added_delete_files.clone(), self.deleted_data_files.clone(), self.deleted_delete_files.clone(), - self.starting_sequence_number.clone(), + self.data_sequence_number.clone(), ); let rewrite_operation = RewriteFilesOperation { @@ -159,9 +160,10 @@ impl TransactionAction for RewriteFilesAction { deleted_data_files: self.deleted_data_files.clone(), deleted_delete_files: self.deleted_delete_files.clone(), starting_snapshot_id: self.starting_snapshot_id.clone(), + data_sequence_number: self.data_sequence_number.clone(), }; - // todo should be able to configure merge manifest process + // todo should be able to configure to use the merge manifest process snapshot_producer .commit(rewrite_operation, DefaultManifestProcess) .await @@ -169,7 +171,7 @@ impl TransactionAction for RewriteFilesAction { } fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { - // todo should we fail on missing properties? + // todo should we fail on missing properties or should we ignore them when they don't exist? let builder = ManifestEntry::builder() .status(ManifestStatus::Deleted) .snapshot_id(entry.snapshot_id().ok_or_else(|| { @@ -197,7 +199,7 @@ fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { } impl SnapshotValidator for RewriteFilesOperation { - fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> { + async fn validate(&self, base: &Table, parent_snapshot_id: Option) -> Result<()> { // Validate replaced and added files if self.deleted_data_files.is_empty() && self.deleted_delete_files.is_empty() { return Err(Error::new( @@ -218,9 +220,18 @@ impl SnapshotValidator for RewriteFilesOperation { )); } - // todo add use_starting_seq_number to help the validation - // todo validate no new deletes since the current base - // if there are replaced data files, there cannot be any new row-level deletes for those data files + // todo add use_starting_seq_number to determine if we want to use data_sequence_number + // If there are replaced data files, there cannot be any new row-level deletes for those data files + if !self.deleted_data_files.is_empty() { + self.validate_no_new_delete_files_for_data_files( + base, + self.starting_snapshot_id, + parent_snapshot_id, + &self.deleted_data_files, + self.data_sequence_number.is_some(), + ) + .await?; + } Ok(()) } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 84b233f369..a8c8f331c5 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -479,7 +479,8 @@ impl<'a> SnapshotProducer<'a> { ) -> Result { // Validate to avoid conflicts snapshot_produce_operation - .validate(self.table, self.table.metadata().current_snapshot())?; + .validate(self.table, self.table.metadata().current_snapshot_id) + .await?; let manifest_list_path = self.generate_manifest_list_file_path(0); let next_seq_num = self.table.metadata().next_sequence_number(); diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index e7aeef0a8a..20c254cb50 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -18,14 +18,17 @@ use std::collections::HashSet; use std::sync::Arc; +use futures::SinkExt; use futures::future::try_join_all; -use futures::{Sink, SinkExt}; use once_cell::sync::Lazy; use crate::delete_file_index::DeleteFileIndex; use crate::error::Result; use crate::scan::DeleteFileContext; -use crate::spec::{DataFile, ManifestContentType, ManifestFile, Operation, SnapshotRef}; +use crate::spec::{ + DataContentType, DataFile, FormatVersion, INITIAL_SEQUENCE_NUMBER, ManifestContentType, + ManifestFile, Operation, SnapshotRef, +}; use crate::table::Table; use crate::util::snapshot::ancestors_between; use crate::{Error, ErrorKind}; @@ -39,7 +42,7 @@ pub(crate) trait SnapshotValidator { // snapshot: parent snapshot // usually snapshot is the latest snapshot of base table, unless it's non-main branch // but we don't support writing to branches as of now - fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> { + async fn validate(&self, _base: &Table, _parent_snapshot_id: Option) -> Result<()> { // todo: add default implementation Ok(()) } @@ -48,8 +51,8 @@ pub(crate) trait SnapshotValidator { async fn validation_history( &self, base: &Table, - to_snapshot: SnapshotRef, // todo maybe the naming/variable order can be better, or just snapshot id is better? this is parent from_snapshot_id: Option, + to_snapshot_id: i64, matching_operations: &HashSet, manifest_content_type: ManifestContentType, ) -> Result<(Vec, HashSet)> { @@ -59,7 +62,7 @@ pub(crate) trait SnapshotValidator { let snapshots = ancestors_between( &Arc::new(base.metadata().clone()), - to_snapshot.snapshot_id(), + to_snapshot_id, from_snapshot_id.clone(), ); @@ -101,27 +104,33 @@ pub(crate) trait SnapshotValidator { Ok((manifests, new_snapshots)) } - #[allow(dead_code)] async fn validate_no_new_delete_files_for_data_files( &self, base: &Table, from_snapshot_id: Option, - _data_files: &[DataFile], - to_snapshot: SnapshotRef, + to_snapshot_id: Option, + data_files: &[DataFile], + ignore_equality_deletes: bool, ) -> Result<()> { + // If there is no current table state, no files have been added + if to_snapshot_id.is_none() || base.metadata().format_version() != FormatVersion::V1 { + return Ok(()); + } + let to_snapshot_id = to_snapshot_id.unwrap(); + // Get matching delete files have been added since the from_snapshot_id - let (delete_manifests, snapshot_ids) = self + let (delete_manifests, _) = self .validation_history( base, - to_snapshot, from_snapshot_id, + to_snapshot_id, &VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContentType::Deletes, ) .await?; - // Building delete file index - let (_delete_file_index, mut delete_file_tx) = DeleteFileIndex::new(); + // Build delete file index + let (delete_file_index, mut delete_file_tx) = DeleteFileIndex::new(); let manifests = try_join_all( delete_manifests .iter() @@ -129,21 +138,61 @@ pub(crate) trait SnapshotValidator { .collect::>(), ) .await?; - - let delete_files_ctx = manifests - .iter() - .flat_map(|manifest| manifest.entries()) - .map(|entry| DeleteFileContext { + let manifest_entries = manifests.iter().flat_map(|manifest| manifest.entries()); + for entry in manifest_entries { + let delete_file_ctx = DeleteFileContext { manifest_entry: entry.clone(), partition_spec_id: entry.data_file().partition_spec_id, - }) - .collect::>(); - - for ctx in delete_files_ctx { - delete_file_tx.send(ctx).await? + }; + delete_file_tx.send(delete_file_ctx).await?; } - // todo validate if there are deletes + // Get starting seq num from starting snapshot if available + let starting_sequence_number = if from_snapshot_id.is_some() + && base + .metadata() + .snapshots + .get(&from_snapshot_id.unwrap()) + .is_some() + { + base.metadata() + .snapshots + .get(&from_snapshot_id.unwrap()) + .unwrap() + .sequence_number() + } else { + INITIAL_SEQUENCE_NUMBER + }; + + // Validate if there are deletes using delete file index + for data_file in data_files { + let delete_files = delete_file_index + .get_deletes_for_data_file(data_file, Some(starting_sequence_number)) + .await; + + if ignore_equality_deletes { + if delete_files + .iter() + .any(|delete_file| delete_file.file_type == DataContentType::PositionDeletes) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot commit, found new positional delete for added data file: {}", + data_file.file_path + ), + )); + } + } else if !delete_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot commit, found new delete for added data file: {}", + data_file.file_path + ), + )); + } + } Ok(()) } From 998b8e647dbea331f23eb82fb6224beebc431877 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 20 Aug 2025 15:29:58 -0700 Subject: [PATCH 15/20] i like writing doc --- crates/iceberg/src/transaction/validate.rs | 59 +++++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index 20c254cb50..1beee77a53 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -36,18 +36,46 @@ use crate::{Error, ErrorKind}; static VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Lazy> = Lazy::new(|| HashSet::from([Operation::Overwrite, Operation::Delete])); +/// A trait for validating snapshots in an Iceberg table. +/// +/// This trait provides methods to validate snapshots and their history, +/// ensuring data integrity and consistency across table operations. pub(crate) trait SnapshotValidator { - // todo doc - // table: base table - // snapshot: parent snapshot - // usually snapshot is the latest snapshot of base table, unless it's non-main branch - // but we don't support writing to branches as of now + /// Validates a snapshot against a table. + /// + /// # Arguments + /// + /// * `base` - The base table to validate against + /// * `parent_snapshot_id` - The ID of the parent snapshot, if any. This is usually + /// the latest snapshot of the base table, unless it's a non-main branch + /// (note: writing to branches is not currently supported) + /// + /// # Returns + /// + /// A `Result` indicating success or an error if validation fails async fn validate(&self, _base: &Table, _parent_snapshot_id: Option) -> Result<()> { - // todo: add default implementation Ok(()) } - // todo doc + /// Retrieves the history of snapshots between two points with matching operations and content type. + /// + /// # Arguments + /// + /// * `base` - The base table to retrieve history from + /// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the beginning + /// * `to_snapshot_id` - The ending snapshot ID (inclusive) + /// * `matching_operations` - Set of operations to match when collecting snapshots + /// * `manifest_content_type` - The content type of manifests to collect + /// + /// # Returns + /// + /// A tuple containing: + /// * A vector of manifest files matching the criteria + /// * A set of snapshot IDs that were collected + /// + /// # Errors + /// + /// Returns an error if the history between the snapshots cannot be determined async fn validation_history( &self, base: &Table, @@ -104,6 +132,23 @@ pub(crate) trait SnapshotValidator { Ok((manifests, new_snapshots)) } + /// Validates that there are no new delete files for the given data files. + /// + /// # Arguments + /// + /// * `base` - The base table to validate against + /// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the beginning + /// * `to_snapshot_id` - The ending snapshot ID (inclusive), or None if there is no current table state + /// * `data_files` - The data files to check for conflicting delete files + /// * `ignore_equality_deletes` - Whether to ignore equality deletes and only check for positional deletes + /// + /// # Returns + /// + /// A `Result` indicating success or an error if validation fails + /// + /// # Errors + /// + /// Returns an error if new delete files are found for any of the data files async fn validate_no_new_delete_files_for_data_files( &self, base: &Table, From 5a422fca415ab774889786885ee9284ef266aafd Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 20 Aug 2025 17:12:04 -0700 Subject: [PATCH 16/20] fix clippy --- crates/iceberg/src/transaction/append.rs | 1 - .../iceberg/src/transaction/rewrite_files.rs | 5 ++--- crates/iceberg/src/transaction/snapshot.rs | 3 --- crates/iceberg/src/transaction/validate.rs | 20 +++++++------------ 4 files changed, 9 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index b39b7dcfac..3a5faafe4a 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -94,7 +94,6 @@ impl TransactionAction for FastAppendAction { vec![], vec![], vec![], - None, ); // validate added files diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index 70ae187790..cc046ed4c1 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -151,7 +151,6 @@ impl TransactionAction for RewriteFilesAction { self.added_delete_files.clone(), self.deleted_data_files.clone(), self.deleted_delete_files.clone(), - self.data_sequence_number.clone(), ); let rewrite_operation = RewriteFilesOperation { @@ -159,8 +158,8 @@ impl TransactionAction for RewriteFilesAction { added_delete_files: self.added_delete_files.clone(), deleted_data_files: self.deleted_data_files.clone(), deleted_delete_files: self.deleted_delete_files.clone(), - starting_snapshot_id: self.starting_snapshot_id.clone(), - data_sequence_number: self.data_sequence_number.clone(), + starting_snapshot_id: self.starting_snapshot_id, + data_sequence_number: self.data_sequence_number, }; // todo should be able to configure to use the merge manifest process diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index a8c8f331c5..c8461b33b5 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -82,7 +82,6 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, - starting_sequence_number: Option, } impl<'a> SnapshotProducer<'a> { @@ -97,7 +96,6 @@ impl<'a> SnapshotProducer<'a> { added_delete_files: Vec, deleted_data_files: Vec, deleted_delete_files: Vec, - starting_sequence_number: Option, ) -> Self { Self { table, @@ -110,7 +108,6 @@ impl<'a> SnapshotProducer<'a> { deleted_data_files, deleted_delete_files, manifest_counter: (0..), - starting_sequence_number, } } diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index 1beee77a53..d0291c666f 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -91,7 +91,7 @@ pub(crate) trait SnapshotValidator { let snapshots = ancestors_between( &Arc::new(base.metadata().clone()), to_snapshot_id, - from_snapshot_id.clone(), + from_snapshot_id, ); for current_snapshot in snapshots { @@ -193,18 +193,12 @@ pub(crate) trait SnapshotValidator { } // Get starting seq num from starting snapshot if available - let starting_sequence_number = if from_snapshot_id.is_some() - && base - .metadata() - .snapshots - .get(&from_snapshot_id.unwrap()) - .is_some() - { - base.metadata() - .snapshots - .get(&from_snapshot_id.unwrap()) - .unwrap() - .sequence_number() + + let starting_sequence_number = if let Some(from_snapshot_id) = from_snapshot_id { + match base.metadata().snapshots.get(&from_snapshot_id) { + Some(snapshot) => snapshot.sequence_number(), + None => INITIAL_SEQUENCE_NUMBER, + } } else { INITIAL_SEQUENCE_NUMBER }; From 6659e95d005e02d81b680105b4b30f549807efb7 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 20 Aug 2025 17:12:24 -0700 Subject: [PATCH 17/20] minor --- crates/iceberg/src/transaction/validate.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index d0291c666f..52147b7b0e 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -193,7 +193,6 @@ pub(crate) trait SnapshotValidator { } // Get starting seq num from starting snapshot if available - let starting_sequence_number = if let Some(from_snapshot_id) = from_snapshot_id { match base.metadata().snapshots.get(&from_snapshot_id) { Some(snapshot) => snapshot.sequence_number(), From 12c8fcc818a2c2fd0911067d321647d56f684fd2 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 20 Aug 2025 17:17:54 -0700 Subject: [PATCH 18/20] minor --- crates/iceberg/src/transaction/rewrite_files.rs | 2 +- crates/iceberg/src/transaction/validate.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index cc046ed4c1..b9c69d04e9 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -222,7 +222,7 @@ impl SnapshotValidator for RewriteFilesOperation { // todo add use_starting_seq_number to determine if we want to use data_sequence_number // If there are replaced data files, there cannot be any new row-level deletes for those data files if !self.deleted_data_files.is_empty() { - self.validate_no_new_delete_files_for_data_files( + self.validate_no_new_deletes_for_data_files( base, self.starting_snapshot_id, parent_snapshot_id, diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index 52147b7b0e..28e9083c23 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -149,7 +149,7 @@ pub(crate) trait SnapshotValidator { /// # Errors /// /// Returns an error if new delete files are found for any of the data files - async fn validate_no_new_delete_files_for_data_files( + async fn validate_no_new_deletes_for_data_files( &self, base: &Table, from_snapshot_id: Option, From 02ca330b90698df00babe96c0d2d660af55d2b55 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 4 Nov 2025 23:03:58 -0800 Subject: [PATCH 19/20] ignore fields in manifest that don't exist --- .../iceberg/src/transaction/rewrite_files.rs | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index b9c69d04e9..76feac3e48 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -169,31 +169,21 @@ impl TransactionAction for RewriteFilesAction { } } -fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { - // todo should we fail on missing properties or should we ignore them when they don't exist? - let builder = ManifestEntry::builder() +fn copy_with_deleted_status(entry: &ManifestEntry) -> Result { + let mut builder = ManifestEntry::builder() .status(ManifestStatus::Deleted) - .snapshot_id(entry.snapshot_id().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Missing snapshot_id for entry with file path: {}", - entry.file_path() - ), - ) - })?) - .sequence_number(entry.sequence_number().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Missing sequence_number for entry with file path: {}", - entry.file_path() - ), - ) - })?) - // todo copy file seq no as well .data_file(entry.data_file().clone()); + if let Some(snapshot_id) = entry.snapshot_id() { + builder = builder.snapshot_id(snapshot_id); + } + + if let Some(sequence_number) = entry.sequence_number() { + builder = builder.sequence_number(sequence_number); + } + + // todo copy file seq no as well + Ok(builder.build()) } @@ -271,7 +261,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation { .iter() .any(|f| f.file_path == entry.data_file().file_path) { - delete_entries.push(copy_with_deleted_status(entry)?) + delete_entries.push(copy_with_deleted_status(entry.as_ref())?) } } DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { @@ -280,7 +270,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation { .iter() .any(|f| f.file_path == entry.data_file().file_path) { - delete_entries.push(copy_with_deleted_status(entry)?) + delete_entries.push(copy_with_deleted_status(entry.as_ref())?) } } } From 336c974842236b47ddccf2a9705652fe835f731f Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 4 Nov 2025 23:30:47 -0800 Subject: [PATCH 20/20] rebase and ignore optional fields when copying deleted status --- crates/iceberg/src/transaction/rewrite_files.rs | 16 +++++----------- crates/iceberg/src/transaction/snapshot.rs | 9 ++++----- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs index 76feac3e48..eb285692c7 100644 --- a/crates/iceberg/src/transaction/rewrite_files.rs +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -25,8 +25,8 @@ use super::snapshot::{DefaultManifestProcess, SnapshotProduceOperation, Snapshot use super::{ActionCommit, TransactionAction}; use crate::error::{Error, ErrorKind, Result}; use crate::spec::{ - DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile, - ManifestStatus, Operation, + DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestFile, ManifestStatus, + Operation, }; use crate::table::Table; use crate::transaction::validate::SnapshotValidator; @@ -170,18 +170,12 @@ impl TransactionAction for RewriteFilesAction { } fn copy_with_deleted_status(entry: &ManifestEntry) -> Result { - let mut builder = ManifestEntry::builder() + let builder = ManifestEntry::builder() .status(ManifestStatus::Deleted) + .snapshot_id_opt(entry.snapshot_id()) + .sequence_number_opt(entry.sequence_number()) .data_file(entry.data_file().clone()); - if let Some(snapshot_id) = entry.snapshot_id() { - builder = builder.snapshot_id(snapshot_id); - } - - if let Some(sequence_number) = entry.sequence_number() { - builder = builder.sequence_number(sequence_number); - } - // todo copy file seq no as well Ok(builder.build()) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8461b33b5..c13d388c66 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -23,11 +23,10 @@ use uuid::Uuid; use crate::error::Result; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, - ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, - PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULTSnapshot, - SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, - TableProperties, update_snapshot_summaries, + DataContentType, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, + Operation, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, + StructType, Summary, TableProperties, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit;