diff --git a/README.md b/README.md index b33d1bad..03f3cb05 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,13 @@ It provides an Iceberg integration for the [Datafusion](https://arrow.apache.org | Equality deletes | :white_check_mark: | | Positional deletes | | +### Table Maintenance + +| Feature | Status | +| --- | --- | +| Expire snapshots | :white_check_mark: | +| Orphan file cleanup | :white_check_mark: | + ### Iceberg Views | Feature | Status | @@ -62,6 +69,8 @@ It provides an Iceberg integration for the [Datafusion](https://arrow.apache.org Check out the [datafusion examples](datafusion_iceberg/examples). +### Basic Table Operations + ```rust use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; use datafusion_iceberg::DataFusionTable; diff --git a/iceberg-rust/src/lib.rs b/iceberg-rust/src/lib.rs index 75bc6276..70852d39 100644 --- a/iceberg-rust/src/lib.rs +++ b/iceberg-rust/src/lib.rs @@ -13,6 +13,7 @@ //! * Time travel and snapshot isolation //! * View and materialized view support //! * Multiple catalog implementations (REST, AWS Glue, File-based) +//! * Table maintenance operations (snapshot expiration, orphan file cleanup) //! //! # Components //! @@ -43,6 +44,19 @@ //! .update_schema(new_schema) //! .commit() //! .await?; +//! +//! // Expire old snapshots for maintenance +//! table +//! .new_transaction(None) +//! .expire_snapshots( +//! Some(chrono::Utc::now().timestamp_millis() - 30 * 24 * 60 * 60 * 1000), +//! Some(10), +//! true, +//! true, +//! false, +//! ) +//! .commit() +//! .await?; //! # Ok(()) //! # } //! ``` diff --git a/iceberg-rust/src/table/mod.rs b/iceberg-rust/src/table/mod.rs index 6649dc31..dcb60436 100644 --- a/iceberg-rust/src/table/mod.rs +++ b/iceberg-rust/src/table/mod.rs @@ -297,6 +297,7 @@ impl Table { pub fn new_transaction(&mut self, branch: Option<&str>) -> TableTransaction<'_> { TableTransaction::new(self, branch) } + } /// Path of a Manifest file diff --git a/iceberg-rust/src/table/transaction/mod.rs b/iceberg-rust/src/table/transaction/mod.rs index 7a9ad004..7b057336 100644 --- a/iceberg-rust/src/table/transaction/mod.rs +++ b/iceberg-rust/src/table/transaction/mod.rs @@ -38,8 +38,9 @@ pub(crate) static REPLACE_INDEX: usize = 4; pub(crate) static OVERWRITE_INDEX: usize = 5; pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 6; pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 7; +pub(crate) static EXPIRE_SNAPSHOTS_INDEX: usize = 8; -pub(crate) static NUM_OPERATIONS: usize = 8; +pub(crate) static NUM_OPERATIONS: usize = 9; /// A transaction that can perform multiple operations on a table atomically /// @@ -395,6 +396,54 @@ impl<'table> TableTransaction<'table> { self.operations[SET_SNAPSHOT_REF_INDEX] = Some(Operation::SetSnapshotRef(entry)); self } + + /// Expire snapshots based on the provided configuration + /// + /// This operation expires snapshots according to the retention policies specified. + /// It can expire snapshots older than a certain timestamp, retain only the most recent N snapshots, + /// and optionally clean up orphaned data files. + /// + /// # Arguments + /// * `older_than` - Optional timestamp (ms since Unix epoch) to expire snapshots older than this time + /// * `retain_last` - Optional number of most recent snapshots to keep, regardless of timestamp + /// * `clean_orphan_files` - Whether to clean up data files that are no longer referenced + /// * `retain_ref_snapshots` - Whether to preserve snapshots that are referenced by branches/tags + /// * `dry_run` - Whether to perform a dry run without actually deleting anything + /// + /// # Returns + /// * `Self` - The transaction builder for method chaining + /// + /// # Examples + /// ``` + /// let result = table.new_transaction(None) + /// .expire_snapshots( + /// Some(chrono::Utc::now().timestamp_millis() - 7 * 24 * 60 * 60 * 1000), + /// Some(5), + /// true, + /// true, + /// false + /// ) + /// .commit() + /// .await?; + /// ``` + pub fn expire_snapshots( + mut self, + older_than: Option, + retain_last: Option, + clean_orphan_files: bool, + retain_ref_snapshots: bool, + dry_run: bool, + ) -> Self { + self.operations[EXPIRE_SNAPSHOTS_INDEX] = Some(Operation::ExpireSnapshots { + older_than, + retain_last, + clean_orphan_files, + retain_ref_snapshots, + dry_run, + }); + self + } + /// Commits all operations in this transaction atomically /// /// This method executes all operations in the transaction and updates the table diff --git a/iceberg-rust/src/table/transaction/operation.rs b/iceberg-rust/src/table/transaction/operation.rs index 96b85b2c..eaf37beb 100644 --- a/iceberg-rust/src/table/transaction/operation.rs +++ b/iceberg-rust/src/table/transaction/operation.rs @@ -103,8 +103,14 @@ pub enum Operation { // NewRowDelta, // /// Delete files in the table and commit // NewDelete, - // /// Expire snapshots in the table - // ExpireSnapshots, + /// Expire snapshots in the table + ExpireSnapshots { + older_than: Option, + retain_last: Option, + clean_orphan_files: bool, + retain_ref_snapshots: bool, + dry_run: bool, + }, // /// Manage snapshots in the table // ManageSnapshots, // /// Read and write table data and metadata files @@ -843,10 +849,238 @@ impl Operation { debug!("Executing SetDefaultSpec operation: spec_id={}", spec_id); Ok((None, vec![TableUpdate::SetDefaultSpec { spec_id }])) } + Operation::ExpireSnapshots { + older_than, + retain_last, + clean_orphan_files: _, + retain_ref_snapshots, + dry_run, + } => { + debug!("Executing ExpireSnapshots operation"); + + // Validate parameters + if older_than.is_none() && retain_last.is_none() { + return Err(Error::InvalidFormat( + "Must specify either older_than or retain_last for snapshot expiration".into() + )); + } + + // Get all snapshots sorted by timestamp (newest first) + let mut all_snapshots: Vec<_> = table_metadata.snapshots.values().collect(); + all_snapshots.sort_by(|a, b| b.timestamp_ms().cmp(a.timestamp_ms())); + + // Get current snapshot ID to ensure we never expire it + let current_snapshot_id = table_metadata.current_snapshot_id; + + // Get snapshot IDs referenced by branches/tags if we should preserve them + let ref_snapshot_ids = if retain_ref_snapshots { + let mut referenced_ids = std::collections::HashSet::new(); + for snapshot_ref in table_metadata.refs.values() { + referenced_ids.insert(snapshot_ref.snapshot_id); + } + referenced_ids + } else { + std::collections::HashSet::new() + }; + + let mut snapshots_to_expire = Vec::new(); + + // Apply retention logic + for (index, snapshot) in all_snapshots.iter().enumerate() { + let snapshot_id = *snapshot.snapshot_id(); + let mut should_retain = false; + + // Never expire the current snapshot + if Some(snapshot_id) == current_snapshot_id { + should_retain = true; + } + // Never expire snapshots referenced by branches/tags + else if ref_snapshot_ids.contains(&snapshot_id) { + should_retain = true; + } + // Keep the most recent N snapshots if retain_last is specified + else if let Some(retain_count) = retain_last { + if index < retain_count { + should_retain = true; + } + } + + // Apply older_than filter only if not already marked for retention + if !should_retain { + if let Some(threshold) = older_than { + if *snapshot.timestamp_ms() >= threshold { + should_retain = true; + } + } + } + + if !should_retain { + snapshots_to_expire.push(snapshot_id); + } + } + + // If dry run, return without making changes + if dry_run { + debug!("Dry run: would expire {} snapshots: {:?}", snapshots_to_expire.len(), snapshots_to_expire); + return Ok((None, vec![])); + } + + // If no snapshots to expire, return early + if snapshots_to_expire.is_empty() { + debug!("No snapshots to expire"); + return Ok((None, vec![])); + } + + debug!("Expiring {} snapshots: {:?}", snapshots_to_expire.len(), snapshots_to_expire); + + // Return the RemoveSnapshots update + Ok((None, vec![TableUpdate::RemoveSnapshots { + snapshot_ids: snapshots_to_expire, + }])) + } } } } +#[cfg(test)] +mod tests { + use super::*; + use futures::executor::block_on; + use iceberg_rust_spec::spec::schema::SchemaBuilder; + use iceberg_rust_spec::spec::table_metadata::TableMetadataBuilder; + use iceberg_rust_spec::spec::types::{PrimitiveType, StructField, Type}; + use object_store::memory::InMemory; + + fn sample_metadata( + snapshot_defs: &[(i64, i64)], + current_snapshot: Option, + refs: &[(&str, i64)], + ) -> TableMetadata { + let schema = SchemaBuilder::default() + .with_schema_id(0) + .with_struct_field(StructField { + id: 1, + name: "id".to_string(), + required: true, + field_type: Type::Primitive(PrimitiveType::Long), + doc: None, + }) + .build() + .unwrap(); + + let snapshots = snapshot_defs + .iter() + .enumerate() + .map(|(idx, (snapshot_id, timestamp))| { + let snapshot = SnapshotBuilder::default() + .with_snapshot_id(*snapshot_id) + .with_sequence_number((idx + 1) as i64) + .with_timestamp_ms(*timestamp) + .with_manifest_list(format!("manifest-{snapshot_id}.avro")) + .with_summary(Summary { + operation: SnapshotOperation::Append, + other: HashMap::new(), + }) + .with_schema_id(0) + .build() + .unwrap(); + (*snapshot_id, snapshot) + }) + .collect::>(); + + let refs = refs + .iter() + .map(|(name, snapshot_id)| { + ( + (*name).to_string(), + SnapshotReference { + snapshot_id: *snapshot_id, + retention: SnapshotRetention::default(), + }, + ) + }) + .collect::>(); + + TableMetadataBuilder::default() + .location("s3://tests/table".to_owned()) + .current_schema_id(0) + .schemas(HashMap::from_iter(vec![(0, schema)])) + .snapshots(snapshots) + .current_snapshot_id(current_snapshot) + .last_sequence_number(snapshot_defs.len() as i64) + .refs(refs) + .build() + .unwrap() + } + + fn execute_operation( + metadata: &TableMetadata, + older_than: Option, + retain_last: Option, + retain_refs: bool, + dry_run: bool, + ) -> Result, Error> { + let op = Operation::ExpireSnapshots { + older_than, + retain_last, + clean_orphan_files: false, + retain_ref_snapshots: retain_refs, + dry_run, + }; + let store = Arc::new(InMemory::new()); + block_on(op.execute(metadata, store)).map(|(_, updates)| updates) + } + + fn collect_snapshot_ids(updates: &[TableUpdate]) -> Vec { + updates + .iter() + .flat_map(|update| match update { + TableUpdate::RemoveSnapshots { snapshot_ids } => snapshot_ids.clone(), + _ => Vec::new(), + }) + .collect() + } + + #[test] + fn snapshot_expiration_requires_policy() { + let metadata = sample_metadata(&[(1, 1_000)], Some(1), &[]); + let result = execute_operation(&metadata, None, None, true, false); + assert!(matches!(result, Err(Error::InvalidFormat(_)))); + } + + #[test] + fn snapshot_expiration_applies_time_and_count_filters() { + let metadata = sample_metadata( + &[(1, 1_000), (2, 2_000), (3, 3_000), (4, 4_000)], + Some(4), + &[], + ); + let updates = execute_operation(&metadata, Some(2_500), Some(2), true, false).unwrap(); + let mut expired = collect_snapshot_ids(&updates); + expired.sort(); + assert_eq!(expired, vec![1, 2]); + } + + #[test] + fn snapshot_expiration_preserves_current_and_refs() { + let metadata = sample_metadata( + &[(10, 1_000), (20, 2_000), (30, 3_000)], + Some(30), + &[ ("branch", 20) ], + ); + let updates = execute_operation(&metadata, Some(1_500), None, true, false).unwrap(); + // Snapshot 10 is the only candidate because 20 is referenced and 30 is current. + assert_eq!(collect_snapshot_ids(&updates), vec![10]); + } + + #[test] + fn snapshot_expiration_supports_dry_run() { + let metadata = sample_metadata(&[(1, 1_000), (2, 900)], Some(1), &[]); + let updates = execute_operation(&metadata, Some(950), None, true, true).unwrap(); + assert!(updates.is_empty()); + } +} + pub fn bounding_partition_values<'a>( mut iter: impl Iterator, partition_column_names: &SmallVec<[&str; 4]>,