diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 4a4839d090..5535ba794b 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,6 +29,9 @@ keywords = ["iceberg"] [dependencies] anyhow = "1.0.72" apache-avro = "0.15" +arrow-arith = { version = ">=46" } +arrow-array = { version = ">=46" } +arrow-schema = { version = ">=46" } async-trait = "0.1" bimap = "0.6" bitvec = "1.0.1" @@ -38,6 +41,7 @@ either = "1" futures = "0.3" itertools = "0.11" lazy_static = "1" +murmur3 = "0.5.2" once_cell = "1" opendal = "0.39" ordered-float = "3.7.0" diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 573b58ef2a..0f2a134138 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -37,3 +37,5 @@ pub mod table; mod avro; pub mod io; pub mod spec; + +pub mod transform; diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs new file mode 100644 index 0000000000..c9fe9df37b --- /dev/null +++ b/crates/iceberg/src/transform/bucket.rs @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::ArrayRef; +use arrow_schema::{DataType, TimeUnit}; + +use super::TransformFunction; + +pub struct Bucket { + mod_n: u32, +} + +impl Bucket { + pub fn new(mod_n: u32) -> Self { + Self { mod_n } + } +} + +impl Bucket { + /// When switch the hash function, we only need to change this function. + fn hash_bytes(mut v: &[u8]) -> i32 { + murmur3::murmur3_32(&mut v, 0).unwrap() as i32 + } + + fn hash_int(v: i32) -> i32 { + Self::hash_long(v as i64) + } + + fn hash_long(v: i64) -> i32 { + Self::hash_bytes(v.to_le_bytes().as_slice()) + } + + /// v is days from unix epoch + fn hash_date(v: i32) -> i32 { + Self::hash_int(v) + } + + /// v is microseconds from midnight + fn hash_time(v: i64) -> i32 { + Self::hash_long(v) + } + + /// v is microseconds from unix epoch + fn hash_timestamp(v: i64) -> i32 { + Self::hash_long(v) + } + + fn hash_str(s: &str) -> i32 { + Self::hash_bytes(s.as_bytes()) + } + + /// Decimal values are hashed using the minimum number of bytes required to hold the unscaled value as a two’s complement big-endian + /// ref: https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements + fn hash_decimal(v: i128) -> i32 { + let bytes = v.to_be_bytes(); + if let Some(start) = bytes.iter().position(|&x| x != 0) { + Self::hash_bytes(&bytes[start..]) + } else { + Self::hash_bytes(&[0]) + } + } + + /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N + /// ref: https://iceberg.apache.org/spec/#partitioning + fn bucket_n(&self, v: i32) -> i32 { + (v & i32::MAX) % (self.mod_n as i32) + } +} + +impl TransformFunction for Bucket { + fn transform(&self, input: ArrayRef) -> crate::Result { + let res: arrow_array::Int32Array = match input.data_type() { + DataType::Int32 => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_int(v))), + DataType::Int64 => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_long(v))), + DataType::Decimal128(_, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_decimal(v))), + DataType::Date32 => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_date(v))), + DataType::Time64(TimeUnit::Microsecond) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_time(v))), + DataType::Timestamp(TimeUnit::Microsecond, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_timestamp(v))), + DataType::Utf8 => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))), + ), + DataType::LargeUtf8 => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))), + ), + DataType::Binary => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))), + ), + DataType::LargeBinary => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))), + ), + DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))), + ), + _ => unreachable!("Unsupported data type: {:?}", input.data_type()), + }; + Ok(Arc::new(res)) + } +} + +#[cfg(test)] +mod test { + use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; + + use super::Bucket; + #[test] + fn test_hash() { + // test int + assert_eq!(Bucket::hash_int(34), 2017239379); + // test long + assert_eq!(Bucket::hash_long(34), 2017239379); + // test decimal + assert_eq!(Bucket::hash_decimal(1420), -500754589); + // test date + let date = NaiveDate::from_ymd_opt(2017, 11, 16).unwrap(); + assert_eq!( + Bucket::hash_date( + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + ), + -653330422 + ); + // test time + let time = NaiveTime::from_hms_opt(22, 31, 8).unwrap(); + assert_eq!( + Bucket::hash_time( + time.signed_duration_since(NaiveTime::from_hms_opt(0, 0, 0).unwrap()) + .num_microseconds() + .unwrap() + ), + -662762989 + ); + // test timestamp + let timestamp = + NaiveDateTime::parse_from_str("2017-11-16 22:31:08", "%Y-%m-%d %H:%M:%S").unwrap(); + assert_eq!( + Bucket::hash_timestamp( + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str("1970-01-01 00:00:00", "%Y-%m-%d %H:%M:%S") + .unwrap() + ) + .num_microseconds() + .unwrap() + ), + -2047944441 + ); + // test timestamp with tz + let timestamp = DateTime::parse_from_rfc3339("2017-11-16T14:31:08-08:00").unwrap(); + assert_eq!( + Bucket::hash_timestamp( + timestamp + .signed_duration_since( + DateTime::parse_from_rfc3339("1970-01-01T00:00:00-00:00").unwrap() + ) + .num_microseconds() + .unwrap() + ), + -2047944441 + ); + // test str + assert_eq!(Bucket::hash_str("iceberg"), 1210000089); + // test uuid + assert_eq!( + Bucket::hash_bytes( + [ + 0xF7, 0x9C, 0x3E, 0x09, 0x67, 0x7C, 0x4B, 0xBD, 0xA4, 0x79, 0x3F, 0x34, 0x9C, + 0xB7, 0x85, 0xE7 + ] + .as_ref() + ), + 1488055340 + ); + // test fixed and binary + assert_eq!( + Bucket::hash_bytes([0x00, 0x01, 0x02, 0x03].as_ref()), + -188683207 + ); + } +} diff --git a/crates/iceberg/src/transform/identity.rs b/crates/iceberg/src/transform/identity.rs new file mode 100644 index 0000000000..2ea6a203b9 --- /dev/null +++ b/crates/iceberg/src/transform/identity.rs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Result; +use arrow_array::ArrayRef; + +use super::TransformFunction; + +/// Return identity array. +pub struct Identity {} + +impl TransformFunction for Identity { + fn transform(&self, input: ArrayRef) -> Result { + Ok(input) + } +} diff --git a/crates/iceberg/src/transform/mod.rs b/crates/iceberg/src/transform/mod.rs new file mode 100644 index 0000000000..dead9db89b --- /dev/null +++ b/crates/iceberg/src/transform/mod.rs @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Transform function used to compute partition values. +use crate::{spec::Transform, Result}; +use arrow_array::ArrayRef; + +mod bucket; +mod identity; +mod temporal; +mod truncate; +mod void; + +/// TransformFunction is a trait that defines the interface for all transform functions. +pub trait TransformFunction: Send { + /// transform will take an input array and transform it into a new array. + /// The implementation of this function will need to check and downcast the input to specific + /// type. + fn transform(&self, input: ArrayRef) -> Result; +} + +/// BoxedTransformFunction is a boxed trait object of TransformFunction. +pub type BoxedTransformFunction = Box; + +/// create_transform_function creates a boxed trait object of TransformFunction from a Transform. +pub fn create_transform_function(transform: &Transform) -> Result { + match transform { + Transform::Identity => Ok(Box::new(identity::Identity {})), + Transform::Void => Ok(Box::new(void::Void {})), + Transform::Year => Ok(Box::new(temporal::Year {})), + Transform::Month => Ok(Box::new(temporal::Month {})), + Transform::Day => Ok(Box::new(temporal::Day {})), + Transform::Hour => Ok(Box::new(temporal::Hour {})), + Transform::Bucket(mod_n) => Ok(Box::new(bucket::Bucket::new(*mod_n))), + Transform::Truncate(width) => Ok(Box::new(truncate::Truncate::new(*width))), + Transform::Unknown => Err(crate::error::Error::new( + crate::ErrorKind::FeatureUnsupported, + "Transform Unknown is not implemented", + )), + } +} diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs new file mode 100644 index 0000000000..f914d50f9d --- /dev/null +++ b/crates/iceberg/src/transform/temporal.rs @@ -0,0 +1,408 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::TransformFunction; +use crate::{Error, ErrorKind, Result}; +use arrow_arith::{ + arity::binary, + temporal::{month_dyn, year_dyn}, +}; +use arrow_array::{ + types::Date32Type, Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray, +}; +use arrow_schema::{DataType, TimeUnit}; +use chrono::Datelike; +use std::sync::Arc; + +/// The number of days since unix epoch. +const DAY_SINCE_UNIX_EPOCH: i32 = 719163; +/// Hour in one second. +const HOUR_PER_SECOND: f64 = 1.0_f64 / 3600.0_f64; +/// Day in one second. +const DAY_PER_SECOND: f64 = 1.0_f64 / 24.0_f64 / 3600.0_f64; +/// Year of unix epoch. +const UNIX_EPOCH_YEAR: i32 = 1970; + +/// Extract a date or timestamp year, as years from 1970 +pub struct Year; + +impl TransformFunction for Year { + fn transform(&self, input: ArrayRef) -> Result { + let array = + year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; + Ok(Arc::::new( + array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| v - UNIX_EPOCH_YEAR), + )) + } +} + +/// Extract a date or timestamp month, as months from 1970-01-01 +pub struct Month; + +impl TransformFunction for Month { + fn transform(&self, input: ArrayRef) -> Result { + let year_array = + year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; + let year_array: Int32Array = year_array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| 12 * (v - UNIX_EPOCH_YEAR)); + let month_array = + month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; + Ok(Arc::::new( + binary( + month_array.as_any().downcast_ref::().unwrap(), + year_array.as_any().downcast_ref::().unwrap(), + // Compute month from 1970-01-01, so minus 1 here. + |a, b| a + b - 1, + ) + .unwrap(), + )) + } +} + +/// Extract a date or timestamp day, as days from 1970-01-01 +pub struct Day; + +impl TransformFunction for Day { + fn transform(&self, input: ArrayRef) -> Result { + let res: Int32Array = match input.data_type() { + DataType::Timestamp(TimeUnit::Microsecond, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32 }), + DataType::Date32 => { + input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { + Date32Type::to_naive_date(v).num_days_from_ce() - DAY_SINCE_UNIX_EPOCH + }) + } + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Should not call internally for unsupported data type {:?}", + input.data_type() + ), + )) + } + }; + Ok(Arc::new(res)) + } +} + +/// Extract a timestamp hour, as hours from 1970-01-01 00:00:00 +pub struct Hour; + +impl TransformFunction for Hour { + fn transform(&self, input: ArrayRef) -> Result { + let res: Int32Array = match input.data_type() { + DataType::Timestamp(TimeUnit::Microsecond, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { (v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0) as i32 }), + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Should not call internally for unsupported data type {:?}", + input.data_type() + ), + )) + } + }; + Ok(Arc::new(res)) + } +} + +#[cfg(test)] +mod test { + use arrow_array::{ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray}; + use chrono::{NaiveDate, NaiveDateTime}; + use std::sync::Arc; + + use crate::transform::TransformFunction; + + #[test] + fn test_transform_years() { + let year = super::Year; + + // Test Date32 + let ori_date = vec![ + NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2000, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2030, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2060, 1, 1).unwrap(), + ]; + let date_array: ArrayRef = Arc::new(Date32Array::from( + ori_date + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + }) + .collect::>(), + )); + let res = year.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30); + assert_eq!(res.value(2), 60); + assert_eq!(res.value(3), 90); + + // Test TimestampMicrosecond + let ori_timestamp = vec![ + NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2000-01-01 19:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2030-01-01 10:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2060-01-01 11:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + ]; + let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( + ori_timestamp + .into_iter() + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:00.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) + .num_microseconds() + .unwrap() + }) + .collect::>(), + )); + let res = year.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30); + assert_eq!(res.value(2), 60); + assert_eq!(res.value(3), 90); + } + + #[test] + fn test_transform_months() { + let month = super::Month; + + // Test Date32 + let ori_date = vec![ + NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(), + NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(), + NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(), + ]; + let date_array: ArrayRef = Arc::new(Date32Array::from( + ori_date + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + }) + .collect::>(), + )); + let res = month.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30 * 12 + 3); + assert_eq!(res.value(2), 60 * 12 + 6); + assert_eq!(res.value(3), 90 * 12 + 9); + + // Test TimestampMicrosecond + let ori_timestamp = vec![ + NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2000-04-01 19:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2030-07-01 10:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + ]; + let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( + ori_timestamp + .into_iter() + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:00.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) + .num_microseconds() + .unwrap() + }) + .collect::>(), + )); + let res = month.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30 * 12 + 3); + assert_eq!(res.value(2), 60 * 12 + 6); + assert_eq!(res.value(3), 90 * 12 + 9); + } + + #[test] + fn test_transform_days() { + let day = super::Day; + let ori_date = vec![ + NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(), + NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(), + NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(), + ]; + let expect_day = ori_date + .clone() + .into_iter() + .map(|data| { + data.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + }) + .collect::>(); + + // Test Date32 + let date_array: ArrayRef = Arc::new(Date32Array::from( + ori_date + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + }) + .collect::>(), + )); + let res = day.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_day[0]); + assert_eq!(res.value(1), expect_day[1]); + assert_eq!(res.value(2), expect_day[2]); + assert_eq!(res.value(3), expect_day[3]); + + // Test TimestampMicrosecond + let ori_timestamp = vec![ + NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2000-04-01 19:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2030-07-01 10:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + ]; + let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( + ori_timestamp + .into_iter() + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:00.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) + .num_microseconds() + .unwrap() + }) + .collect::>(), + )); + let res = day.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_day[0]); + assert_eq!(res.value(1), expect_day[1]); + assert_eq!(res.value(2), expect_day[2]); + assert_eq!(res.value(3), expect_day[3]); + } + + #[test] + fn test_transform_hours() { + let hour = super::Hour; + let ori_timestamp = vec![ + NaiveDateTime::parse_from_str("1970-01-01 19:01:23.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2000-03-01 12:01:23.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2030-10-02 10:01:23.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2060-09-01 05:03:23.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + ]; + let expect_hour = ori_timestamp + .clone() + .into_iter() + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:0.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) + .num_hours() as i32 + }) + .collect::>(); + + // Test TimestampMicrosecond + let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( + ori_timestamp + .into_iter() + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:0.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) + .num_microseconds() + .unwrap() + }) + .collect::>(), + )); + let res = hour.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_hour[0]); + assert_eq!(res.value(1), expect_hour[1]); + assert_eq!(res.value(2), expect_hour[2]); + assert_eq!(res.value(3), expect_hour[3]); + } +} diff --git a/crates/iceberg/src/transform/truncate.rs b/crates/iceberg/src/transform/truncate.rs new file mode 100644 index 0000000000..43e79e4b94 --- /dev/null +++ b/crates/iceberg/src/transform/truncate.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::ArrayRef; +use arrow_schema::DataType; + +use crate::Error; + +use super::TransformFunction; + +pub struct Truncate { + width: u32, +} + +impl Truncate { + pub fn new(width: u32) -> Self { + Self { width } + } + + fn truncate_str_by_char(s: &str, max_chars: usize) -> &str { + match s.char_indices().nth(max_chars) { + None => s, + Some((idx, _)) => &s[..idx], + } + } +} + +impl TransformFunction for Truncate { + fn transform(&self, input: ArrayRef) -> crate::Result { + match input.data_type() { + DataType::Int32 => { + let width: i32 = self.width.try_into().map_err(|_| { + Error::new( + crate::ErrorKind::DataInvalid, + "width is failed to convert to i32 when truncate Int32Array", + ) + })?; + let res: arrow_array::Int32Array = input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| v - v.rem_euclid(width)); + Ok(Arc::new(res)) + } + DataType::Int64 => { + let width = self.width as i64; + let res: arrow_array::Int64Array = input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| v - (((v % width) + width) % width)); + Ok(Arc::new(res)) + } + DataType::Decimal128(precision, scale) => { + let width = self.width as i128; + let res: arrow_array::Decimal128Array = input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| v - (((v % width) + width) % width)) + .with_precision_and_scale(*precision, *scale) + .map_err(|err| Error::new(crate::ErrorKind::Unexpected, format!("{err}")))?; + Ok(Arc::new(res)) + } + DataType::Utf8 => { + let len = self.width as usize; + let res: arrow_array::StringArray = arrow_array::StringArray::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.map(|v| Self::truncate_str_by_char(v, len))), + ); + Ok(Arc::new(res)) + } + DataType::LargeUtf8 => { + let len = self.width as usize; + let res: arrow_array::LargeStringArray = arrow_array::LargeStringArray::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.map(|v| Self::truncate_str_by_char(v, len))), + ); + Ok(Arc::new(res)) + } + _ => unreachable!("Truncate transform only supports (int,long,decimal,string) types"), + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ + builder::PrimitiveBuilder, types::Decimal128Type, Decimal128Array, Int32Array, Int64Array, + }; + + use crate::transform::TransformFunction; + + // Test case ref from: https://iceberg.apache.org/spec/#truncate-transform-details + #[test] + fn test_truncate_simple() { + // test truncate int + let input = Arc::new(Int32Array::from(vec![1, -1])); + let res = super::Truncate::new(10).transform(input).unwrap(); + assert_eq!( + res.as_any().downcast_ref::().unwrap().value(0), + 0 + ); + assert_eq!( + res.as_any().downcast_ref::().unwrap().value(1), + -10 + ); + + // test truncate long + let input = Arc::new(Int64Array::from(vec![1, -1])); + let res = super::Truncate::new(10).transform(input).unwrap(); + assert_eq!( + res.as_any().downcast_ref::().unwrap().value(0), + 0 + ); + assert_eq!( + res.as_any().downcast_ref::().unwrap().value(1), + -10 + ); + + // test decimal + let mut buidler = PrimitiveBuilder::::new() + .with_precision_and_scale(20, 2) + .unwrap(); + buidler.append_value(1065); + let input = Arc::new(buidler.finish()); + let res = super::Truncate::new(50).transform(input).unwrap(); + assert_eq!( + res.as_any() + .downcast_ref::() + .unwrap() + .value(0), + 1050 + ); + + // test string + let input = Arc::new(arrow_array::StringArray::from(vec!["iceberg"])); + let res = super::Truncate::new(3).transform(input).unwrap(); + assert_eq!( + res.as_any() + .downcast_ref::() + .unwrap() + .value(0), + "ice" + ); + + // test large string + let input = Arc::new(arrow_array::LargeStringArray::from(vec!["iceberg"])); + let res = super::Truncate::new(3).transform(input).unwrap(); + assert_eq!( + res.as_any() + .downcast_ref::() + .unwrap() + .value(0), + "ice" + ); + } + + #[test] + fn test_string_truncate() { + let test1 = "イロハニホヘト"; + let test1_2_expected = "イロ"; + assert_eq!( + super::Truncate::truncate_str_by_char(test1, 2), + test1_2_expected + ); + + let test1_3_expected = "イロハ"; + assert_eq!( + super::Truncate::truncate_str_by_char(test1, 3), + test1_3_expected + ); + + let test2 = "щщаεはчωいにπάほхεろへσκζ"; + let test2_7_expected = "щщаεはчω"; + assert_eq!( + super::Truncate::truncate_str_by_char(test2, 7), + test2_7_expected + ); + + let test3 = "\u{FFFF}\u{FFFF}"; + assert_eq!(super::Truncate::truncate_str_by_char(test3, 2), test3); + + let test4 = "\u{10000}\u{10000}"; + let test4_1_expected = "\u{10000}"; + assert_eq!( + super::Truncate::truncate_str_by_char(test4, 1), + test4_1_expected + ); + } +} diff --git a/crates/iceberg/src/transform/void.rs b/crates/iceberg/src/transform/void.rs new file mode 100644 index 0000000000..56fc3c5295 --- /dev/null +++ b/crates/iceberg/src/transform/void.rs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Result; +use arrow_array::{new_null_array, ArrayRef}; + +use super::TransformFunction; + +pub struct Void {} + +impl TransformFunction for Void { + fn transform(&self, input: ArrayRef) -> Result { + Ok(new_null_array(input.data_type(), input.len())) + } +}