From f9f957ac97dbecc428c1d55a85e9c5e35efc13d1 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Thu, 22 Aug 2024 14:37:33 +0800 Subject: [PATCH 1/2] feat: add catalog API --- crates/paimon/Cargo.toml | 2 +- crates/paimon/src/catalog/mod.rs | 254 +++++++++++++++++++++++++++++++ crates/paimon/src/error.rs | 32 ++++ crates/paimon/src/lib.rs | 1 + 4 files changed, 288 insertions(+), 1 deletion(-) create mode 100644 crates/paimon/src/catalog/mod.rs diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 9e22e5b..eacd021 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -34,8 +34,8 @@ storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] [dependencies] +async-trait = "0.1" url = "2.5.2" -async-trait = "0.1.81" bytes = "1.7.1" bitflags = "2.6.0" tokio = { version = "1.39.2", features = ["macros"] } diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs new file mode 100644 index 0000000..68de13e --- /dev/null +++ b/crates/paimon/src/catalog/mod.rs @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt; +use std::hash::Hash; + +use async_trait::async_trait; +use chrono::Duration; + +use crate::error::Result; +use crate::io::FileIO; +use crate::spec::{RowType, SchemaChange, TableSchema}; + +/// This interface is responsible for reading and writing metadata such as database/table from a paimon catalog. +/// +/// Impl References: +#[async_trait] +pub trait Catalog: Send + Sync { + const DEFAULT_DATABASE: &'static str = "default"; + const SYSTEM_TABLE_SPLITTER: &'static str = "$"; + const SYSTEM_DATABASE_NAME: &'static str = "sys"; + + /// Returns the warehouse root path containing all database directories in this catalog. + fn warehouse(&self) -> &str; + + /// Returns the catalog options. + fn options(&self) -> &HashMap; + + /// Returns the FileIO instance. + fn file_io(&self) -> &FileIO; + + /// Lists all databases in this catalog. + async fn list_databases(&self) -> Result>; + + /// Checks if a database exists in this catalog. + async fn database_exists(&self, database_name: &str) -> Result; + + /// Creates a new database. + async fn create_database( + &self, + name: &str, + ignore_if_exists: bool, + properties: Option>, + ) -> Result<()>; + + /// Loads database properties. + async fn load_database_properties(&self, name: &str) -> Result>; + + /// Drops a database. + async fn drop_database( + &self, + name: &str, + ignore_if_not_exists: bool, + cascade: bool, + ) -> Result<()>; + + /// Returns a Table instance for the specified identifier. + async fn get_table(&self, identifier: &Identifier) -> Result; + + /// Lists all tables in the specified database. + async fn list_tables(&self, database_name: &str) -> Result>; + + /// Checks if a table exists. + async fn table_exists(&self, identifier: &Identifier) -> Result { + match self.get_table(identifier).await { + Ok(_) => Ok(true), + Err(e) => match e { + crate::error::Error::TableNotExist { .. } => Ok(false), + _ => Err(e), + }, + } + } + + /// Drops a table. + async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()>; + + /// Creates a new table. + async fn create_table( + &self, + identifier: &Identifier, + schema: TableSchema, + ignore_if_exists: bool, + ) -> Result<()>; + + /// Renames a table. + async fn rename_table( + &self, + from_table: &Identifier, + to_table: &Identifier, + ignore_if_not_exists: bool, + ) -> Result<()>; + + /// Alters an existing table. + async fn alter_table( + &self, + identifier: &Identifier, + changes: Vec, + ignore_if_not_exists: bool, + ) -> Result<()>; + + /// Drops a partition from the specified table. + async fn drop_partition( + &self, + identifier: &Identifier, + partitions: &HashMap, + ) -> Result<()>; + + /// Returns whether this catalog is case-sensitive. + fn case_sensitive(&self) -> bool { + true + } +} + +/// Identifies an object in a catalog. +/// +/// Impl References: +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Identifier { + database: String, + table: String, +} + +impl Identifier { + pub const UNKNOWN_DATABASE: &'static str = "unknown"; + + /// Create a new identifier. + pub fn new(database: String, table: String) -> Self { + Self { database, table } + } + + /// Get the table name. + pub fn database_name(&self) -> &str { + &self.database + } + + /// Get the table name. + pub fn object_name(&self) -> &str { + &self.table + } + + /// Get the full name of the identifier. + pub fn full_name(&self) -> String { + if self.database == Self::UNKNOWN_DATABASE { + self.table.clone() + } else { + format!("{}.{}", self.database, self.table) + } + } + + /// Get the full name of the identifier with a specified character. + pub fn escaped_full_name(&self) -> String { + self.escaped_full_name_with_char('`') + } + + /// Get the full name of the identifier with a specified character. + pub fn escaped_full_name_with_char(&self, escape_char: char) -> String { + format!( + "{0}{1}{0}.{0}{2}{0}", + escape_char, self.database, self.table + ) + } + + /// Create a new identifier. + pub fn create(db: &str, table: &str) -> Self { + Self::new(db.to_string(), table.to_string()) + } +} + +impl fmt::Display for Identifier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.full_name()) + } +} + +/// A table provides basic abstraction for a table type and table scan, and table read. +/// +/// Impl Reference: +pub trait Table { + // ================== Table Metadata ===================== + + /// A name to identify this table. + fn name(&self) -> &str; + + /// Returns the row type of this table. + fn row_type(&self) -> &RowType; + + /// Partition keys of this table. + fn partition_keys(&self) -> Vec; + + /// Primary keys of this table. + fn primary_keys(&self) -> Vec; + + /// Options of this table. + fn options(&self) -> HashMap; + + /// Optional comment of this table. + fn comment(&self) -> Option<&String>; + + // ================= Table Operations ==================== + + /// Copy this table with adding dynamic options. + fn copy(&self, dynamic_options: HashMap) -> Box; + + /// Rollback table's state to a specific snapshot. + fn rollback_to(&mut self, snapshot_id: u64); + + /// Create a tag from given snapshot. + fn create_tag(&mut self, tag_name: &str, from_snapshot_id: u64); + + fn create_tag_with_retention( + &mut self, + tag_name: &str, + from_snapshot_id: u64, + time_retained: Duration, + ); + + /// Create a tag from the latest snapshot. + fn create_tag_from_latest(&mut self, tag_name: &str); + + fn create_tag_from_latest_with_retention(&mut self, tag_name: &str, time_retained: Duration); + + /// Delete a tag by name. + fn delete_tag(&mut self, tag_name: &str); + + /// Rollback table's state to a specific tag. + fn rollback_to_tag(&mut self, tag_name: &str); + + /// Create an empty branch. + fn create_branch(&mut self, branch_name: &str); + + /// Create a branch from given snapshot. + fn create_branch_from_snapshot(&mut self, branch_name: &str, snapshot_id: u64); + + /// Create a branch from given tag. + fn create_branch_from_tag(&mut self, branch_name: &str, tag_name: &str); + + /// Delete a branch by branchName. + fn delete_branch(&mut self, branch_name: &str); +} diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs index f42b465..7f2e2c3 100644 --- a/crates/paimon/src/error.rs +++ b/crates/paimon/src/error.rs @@ -17,6 +17,8 @@ use snafu::prelude::*; +use crate::catalog::Identifier; + /// Result type used in paimon. pub type Result = std::result::Result; @@ -52,6 +54,36 @@ pub enum Error { display("Paimon hitting invalid config: {}", message) )] ConfigInvalid { message: String }, + + #[snafu(display("Database {} is not empty.", database))] + DatabaseNotEmpty { database: String }, + + #[snafu(display("Database {} already exists.", database))] + DatabaseAlreadyExist { database: String }, + + #[snafu(display("Database {} does not exist.", database))] + DatabaseNotExist { database: String }, + + #[snafu(display("Can't do operation on system database."))] + ProcessSystemDatabase, + + #[snafu(display("Table {} already exists.", identifier.full_name()))] + TableAlreadyExist { identifier: Identifier }, + + #[snafu(display("Table {} does not exist.", identifier.full_name()))] + TableNotExist { identifier: Identifier }, + + #[snafu(display("Partition {} do not exist in the table {}.", identifier.full_name(), partitions))] + PartitionNotExist { + identifier: Identifier, + partitions: String, + }, + + #[snafu(display("Column {} already exists.", column_name))] + ColumnAlreadyExist { column_name: String }, + + #[snafu(display("Column {} does not exist.", column_name))] + ColumnNotExist { column_name: String }, } impl From for Error { diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index 6e15e0b..50e1645 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -19,5 +19,6 @@ mod error; pub use error::Error; pub use error::Result; +pub mod catalog; pub mod io; pub mod spec; From bd0968bbf90a9993d7ce393501f936b4df90f213 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Thu, 22 May 2025 15:06:16 +0800 Subject: [PATCH 2/2] refactor: convert Table to struct and extract TableOperations --- crates/paimon/src/catalog/mod.rs | 133 ++++++++++++++++++++++++------- 1 file changed, 106 insertions(+), 27 deletions(-) diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs index 68de13e..e95a0eb 100644 --- a/crates/paimon/src/catalog/mod.rs +++ b/crates/paimon/src/catalog/mod.rs @@ -26,14 +26,33 @@ use crate::error::Result; use crate::io::FileIO; use crate::spec::{RowType, SchemaChange, TableSchema}; +/// Information about a catalog's default values and system settings +#[derive(Debug, Clone)] +pub struct CatalogInfo { + pub default_database: String, + pub system_table_splitter: String, + pub system_database_name: String, +} + +impl Default for CatalogInfo { + fn default() -> Self { + Self { + default_database: "default".to_string(), + system_table_splitter: "$".to_string(), + system_database_name: "sys".to_string(), + } + } +} + /// This interface is responsible for reading and writing metadata such as database/table from a paimon catalog. /// /// Impl References: #[async_trait] pub trait Catalog: Send + Sync { - const DEFAULT_DATABASE: &'static str = "default"; - const SYSTEM_TABLE_SPLITTER: &'static str = "$"; - const SYSTEM_DATABASE_NAME: &'static str = "sys"; + /// Returns information about the catalog's default values and system settings + fn info(&self) -> CatalogInfo { + CatalogInfo::default() + } /// Returns the warehouse root path containing all database directories in this catalog. fn warehouse(&self) -> &str; @@ -70,7 +89,7 @@ pub trait Catalog: Send + Sync { ) -> Result<()>; /// Returns a Table instance for the specified identifier. - async fn get_table(&self, identifier: &Identifier) -> Result; + async fn get_table(&self, identifier: &Identifier) -> Result; /// Lists all tables in the specified database. async fn list_tables(&self, database_name: &str) -> Result>; @@ -190,65 +209,125 @@ impl fmt::Display for Identifier { /// A table provides basic abstraction for a table type and table scan, and table read. /// /// Impl Reference: -pub trait Table { - // ================== Table Metadata ===================== +pub struct Table { + name: String, + row_type: RowType, + partition_keys: Vec, + primary_keys: Vec, + options: HashMap, + comment: Option, +} + +impl Table { + /// Create a new table instance + pub fn new( + name: String, + row_type: RowType, + partition_keys: Vec, + primary_keys: Vec, + options: HashMap, + comment: Option, + ) -> Self { + Self { + name, + row_type, + partition_keys, + primary_keys, + options, + comment, + } + } /// A name to identify this table. - fn name(&self) -> &str; + pub fn name(&self) -> &str { + &self.name + } /// Returns the row type of this table. - fn row_type(&self) -> &RowType; + pub fn row_type(&self) -> &RowType { + &self.row_type + } /// Partition keys of this table. - fn partition_keys(&self) -> Vec; + pub fn partition_keys(&self) -> &[String] { + &self.partition_keys + } /// Primary keys of this table. - fn primary_keys(&self) -> Vec; + pub fn primary_keys(&self) -> &[String] { + &self.primary_keys + } /// Options of this table. - fn options(&self) -> HashMap; + pub fn options(&self) -> &HashMap { + &self.options + } /// Optional comment of this table. - fn comment(&self) -> Option<&String>; - - // ================= Table Operations ==================== + pub fn comment(&self) -> Option<&String> { + self.comment.as_ref() + } /// Copy this table with adding dynamic options. - fn copy(&self, dynamic_options: HashMap) -> Box; + pub fn copy(&self, dynamic_options: HashMap) -> Self { + let mut options = self.options.clone(); + options.extend(dynamic_options); + Self { + name: self.name.clone(), + row_type: self.row_type.clone(), + partition_keys: self.partition_keys.clone(), + primary_keys: self.primary_keys.clone(), + options, + comment: self.comment.clone(), + } + } +} +/// Experimental operations for tables that support snapshots, tags, and branches. +/// +/// These operations are marked as experimental and may change in future releases. +/// Not all table implementations may support these operations. +#[async_trait] +pub trait TableOperations: Send + Sync { /// Rollback table's state to a specific snapshot. - fn rollback_to(&mut self, snapshot_id: u64); + async fn rollback_to(&mut self, snapshot_id: u64) -> Result<()>; /// Create a tag from given snapshot. - fn create_tag(&mut self, tag_name: &str, from_snapshot_id: u64); + async fn create_tag(&mut self, tag_name: &str, from_snapshot_id: u64) -> Result<()>; - fn create_tag_with_retention( + /// Create a tag from given snapshot with retention period. + async fn create_tag_with_retention( &mut self, tag_name: &str, from_snapshot_id: u64, time_retained: Duration, - ); + ) -> Result<()>; /// Create a tag from the latest snapshot. - fn create_tag_from_latest(&mut self, tag_name: &str); + async fn create_tag_from_latest(&mut self, tag_name: &str) -> Result<()>; - fn create_tag_from_latest_with_retention(&mut self, tag_name: &str, time_retained: Duration); + /// Create a tag from the latest snapshot with retention period. + async fn create_tag_from_latest_with_retention( + &mut self, + tag_name: &str, + time_retained: Duration, + ) -> Result<()>; /// Delete a tag by name. - fn delete_tag(&mut self, tag_name: &str); + async fn delete_tag(&mut self, tag_name: &str) -> Result<()>; /// Rollback table's state to a specific tag. - fn rollback_to_tag(&mut self, tag_name: &str); + async fn rollback_to_tag(&mut self, tag_name: &str) -> Result<()>; /// Create an empty branch. - fn create_branch(&mut self, branch_name: &str); + async fn create_branch(&mut self, branch_name: &str) -> Result<()>; /// Create a branch from given snapshot. - fn create_branch_from_snapshot(&mut self, branch_name: &str, snapshot_id: u64); + async fn create_branch_from_snapshot(&mut self, branch_name: &str, snapshot_id: u64) -> Result<()>; /// Create a branch from given tag. - fn create_branch_from_tag(&mut self, branch_name: &str, tag_name: &str); + async fn create_branch_from_tag(&mut self, branch_name: &str, tag_name: &str) -> Result<()>; /// Delete a branch by branchName. - fn delete_branch(&mut self, branch_name: &str); + async fn delete_branch(&mut self, branch_name: &str) -> Result<()>; }