From f0ef60f9fb47992635af4a755f9f36a50af3d7cb Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 28 Aug 2023 20:50:09 +0800 Subject: [PATCH 01/10] support transform function and add related test: 1. identity 2. void 3. temporal --- crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/error.rs | 5 + crates/iceberg/src/lib.rs | 2 + crates/iceberg/src/transform/identity.rs | 13 + crates/iceberg/src/transform/mod.rs | 51 ++ crates/iceberg/src/transform/temporal.rs | 669 +++++++++++++++++++++++ crates/iceberg/src/transform/void.rs | 12 + 7 files changed, 753 insertions(+) create mode 100644 crates/iceberg/src/transform/identity.rs create mode 100644 crates/iceberg/src/transform/mod.rs create mode 100644 crates/iceberg/src/transform/temporal.rs create mode 100644 crates/iceberg/src/transform/void.rs diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 4a4839d090..a6180f1b85 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -27,6 +27,7 @@ license = "Apache-2.0" keywords = ["iceberg"] [dependencies] +arrow = { version = ">=46" } anyhow = "1.0.72" apache-avro = "0.15" async-trait = "0.1" diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index e4ae576d82..d701faa2f4 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -44,6 +44,10 @@ pub enum ErrorKind { /// /// This error is returned when given iceberg feature is not supported. FeatureUnsupported, + /// Arrow compute error. + /// + /// Error returned by compute function in arrow. + ArrowCompute, } impl ErrorKind { @@ -59,6 +63,7 @@ impl From for &'static str { ErrorKind::Unexpected => "Unexpected", ErrorKind::DataInvalid => "DataInvalid", ErrorKind::FeatureUnsupported => "FeatureUnsupported", + ErrorKind::ArrowCompute => "ArrowCompute", } } } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 573b58ef2a..0f2a134138 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -37,3 +37,5 @@ pub mod table; mod avro; pub mod io; pub mod spec; + +pub mod transform; diff --git a/crates/iceberg/src/transform/identity.rs b/crates/iceberg/src/transform/identity.rs new file mode 100644 index 0000000000..3acd01615f --- /dev/null +++ b/crates/iceberg/src/transform/identity.rs @@ -0,0 +1,13 @@ +use crate::Result; +use arrow::array::ArrayRef; + +use super::TransformFunction; + +/// Return identity array. +pub struct Identity {} + +impl TransformFunction for Identity { + fn transform(&self, input: ArrayRef) -> Result { + Ok(input) + } +} diff --git a/crates/iceberg/src/transform/mod.rs b/crates/iceberg/src/transform/mod.rs new file mode 100644 index 0000000000..4dd0235fb7 --- /dev/null +++ b/crates/iceberg/src/transform/mod.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Transform function used to compute partition values. +use crate::{spec::Transform, Result}; +use arrow::array::ArrayRef; + +mod identity; +mod temporal; +mod void; + +/// TransformFunction is a trait that defines the interface for all transform functions. +pub trait TransformFunction: Send { + /// transform will take an input array and transform it into a new array. + /// The implementation of this function will need to check and downcast the input to specific + /// type. + fn transform(&self, input: ArrayRef) -> Result; +} + +/// BoxedTransformFunction is a boxed trait object of TransformFunction. +pub type BoxedTransformFunction = Box; + +/// create_transform_function creates a boxed trait object of TransformFunction from a Transform. +pub fn create_transform_function(transform: &Transform) -> Result { + match transform { + Transform::Identity => Ok(Box::new(identity::Identity {})), + Transform::Void => Ok(Box::new(void::Void {})), + Transform::Year => Ok(Box::new(temporal::Year {})), + Transform::Month => Ok(Box::new(temporal::Month {})), + Transform::Day => Ok(Box::new(temporal::Day {})), + Transform::Hour => Ok(Box::new(temporal::Hour {})), + _ => Err(crate::error::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!("Transform {:?} is not implemented", transform), + )), + } +} diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs new file mode 100644 index 0000000000..ce685f0b31 --- /dev/null +++ b/crates/iceberg/src/transform/temporal.rs @@ -0,0 +1,669 @@ +use super::TransformFunction; +use crate::{Error, Result}; +use arrow::array::{ + Array, Date64Array, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow::compute::binary; +use arrow::datatypes; +use arrow::datatypes::DataType; +use arrow::{ + array::{ArrayRef, Date32Array, Int32Array}, + compute::{month_dyn, year_dyn}, +}; +use chrono::Datelike; +use std::sync::Arc; + +/// 719163 is the number of days from 0000-01-01 to 1970-01-01 +const EPOCH_DAY_FROM_CE: i32 = 719163; +const DAY_PER_SECOND: f64 = 0.0000115741; +const HOUR_PER_SECOND: f64 = 1_f64 / 3600.0; + +/// Extract a date or timestamp year, as years from 1970 +pub struct Year; + +impl TransformFunction for Year { + fn transform(&self, input: ArrayRef) -> Result { + let array = year_dyn(&input).map_err(|err| { + Error::new( + crate::ErrorKind::ArrowCompute, + format!("error in transformfunction: {}", err), + ) + })?; + Ok(Arc::::new( + array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| v - 1970), + )) + } +} + +/// Extract a date or timestamp month, as months from 1970-01-01 +pub struct Month; + +impl TransformFunction for Month { + fn transform(&self, input: ArrayRef) -> Result { + let year_array = year_dyn(&input) + .map_err(|err| Error::new(crate::ErrorKind::ArrowCompute, format!("{err}")))?; + let year_array: Int32Array = year_array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| 12 * (v - 1970)); + let month_array = month_dyn(&input) + .map_err(|err| Error::new(crate::ErrorKind::ArrowCompute, format!("{err}")))?; + Ok(Arc::::new( + binary( + month_array.as_any().downcast_ref::().unwrap(), + year_array.as_any().downcast_ref::().unwrap(), + // Compute month from 1970-01-01, so minus 1 here. + |a, b| a + b - 1, + ) + .unwrap(), + )) + } +} + +/// Extract a date or timestamp day, as days from 1970-01-01 +pub struct Day; + +impl TransformFunction for Day { + fn transform(&self, input: ArrayRef) -> Result { + let res: Int32Array = match input.data_type() { + DataType::Timestamp(unit, _) => match unit { + datatypes::TimeUnit::Second => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { (v as f64 * DAY_PER_SECOND) as i32 }), + datatypes::TimeUnit::Millisecond => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { (v as f64 / 1000.0 * DAY_PER_SECOND) as i32 }), + datatypes::TimeUnit::Microsecond => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32 }), + datatypes::TimeUnit::Nanosecond => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { + (v as f64 / 1000.0 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32 + }), + }, + DataType::Date32 => { + input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { + datatypes::Date32Type::to_naive_date(v).num_days_from_ce() + - EPOCH_DAY_FROM_CE + }) + } + DataType::Date64 => { + input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { + datatypes::Date64Type::to_naive_date(v).num_days_from_ce() + - EPOCH_DAY_FROM_CE + }) + } + _ => unreachable!( + "Should not call transform in Day with type {:?}", + input.data_type() + ), + }; + Ok(Arc::new(res)) + } +} + +/// Extract a timestamp hour, as hours from 1970-01-01 00:00:00 +pub struct Hour; + +impl TransformFunction for Hour { + fn transform(&self, input: ArrayRef) -> Result { + let res: Int32Array = match input.data_type() { + DataType::Timestamp(unit, _) => match unit { + datatypes::TimeUnit::Second => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { + println!("second: {}", v); + (v as f64 * HOUR_PER_SECOND) as i32 + }), + datatypes::TimeUnit::Millisecond => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { + println!("mill: {}", v); + (v as f64 * HOUR_PER_SECOND / 1000.0) as i32 + }), + datatypes::TimeUnit::Microsecond => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { + println!("micro: {}", v); + (v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0) as i32 + }), + datatypes::TimeUnit::Nanosecond => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { + println!("nano: {}", v); + (v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0 / 1000.0) as i32 + }), + }, + _ => unreachable!( + "Should not call transform in Day with type {:?}", + input.data_type() + ), + }; + Ok(Arc::new(res)) + } +} + +#[cfg(test)] +mod test { + use arrow::array::{ + ArrayRef, Date32Array, Date64Array, Int32Array, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + }; + use chrono::NaiveDate; + use std::sync::Arc; + + use crate::transform::TransformFunction; + + #[test] + fn test_transform_years() { + let year = super::Year; + let ori_date = vec![ + NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2000, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2030, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2060, 1, 1).unwrap(), + ]; + + // Test Date32 + let date_array: ArrayRef = Arc::new(Date32Array::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + }) + .collect::>(), + )); + let res = year.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30); + assert_eq!(res.value(2), 60); + assert_eq!(res.value(3), 90); + + // Test Date64 + let date_array: ArrayRef = Arc::new(Date64Array::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_milliseconds() + }) + .collect::>(), + )); + let res = year.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30); + assert_eq!(res.value(2), 60); + assert_eq!(res.value(3), 90); + + // Test TimestampSecond + let date_array: ArrayRef = Arc::new(TimestampSecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_seconds() + }) + .collect::>(), + )); + let res = year.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30); + assert_eq!(res.value(2), 60); + assert_eq!(res.value(3), 90); + + // Test TimestampMillisecond + let date_array: ArrayRef = Arc::new(TimestampMillisecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_milliseconds() + }) + .collect::>(), + )); + let res = year.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30); + assert_eq!(res.value(2), 60); + assert_eq!(res.value(3), 90); + + // Test TimestampMicrosecond + let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_microseconds() + .unwrap() + }) + .collect::>(), + )); + let res = year.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30); + assert_eq!(res.value(2), 60); + assert_eq!(res.value(3), 90); + + // Test TimestampNanosecond + let date_array: ArrayRef = Arc::new(TimestampNanosecondArray::from( + ori_date + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_nanoseconds() + .unwrap() + }) + .collect::>(), + )); + let res = year.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30); + assert_eq!(res.value(2), 60); + assert_eq!(res.value(3), 90); + } + + #[test] + fn test_transform_months() { + let month = super::Month; + let ori_date = vec![ + NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(), + NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(), + NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(), + ]; + + // Test Date32 + let date_array: ArrayRef = Arc::new(Date32Array::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + }) + .collect::>(), + )); + let res = month.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30 * 12 + 3); + assert_eq!(res.value(2), 60 * 12 + 6); + assert_eq!(res.value(3), 90 * 12 + 9); + + // Test Date64 + let date_array: ArrayRef = Arc::new(Date64Array::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_milliseconds() + }) + .collect::>(), + )); + let res = month.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30 * 12 + 3); + assert_eq!(res.value(2), 60 * 12 + 6); + assert_eq!(res.value(3), 90 * 12 + 9); + + // Test TimestampSecond + let date_array: ArrayRef = Arc::new(TimestampSecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_seconds() + }) + .collect::>(), + )); + let res = month.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30 * 12 + 3); + assert_eq!(res.value(2), 60 * 12 + 6); + assert_eq!(res.value(3), 90 * 12 + 9); + + // Test TimestampMillisecond + let date_array: ArrayRef = Arc::new(TimestampMillisecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_milliseconds() + }) + .collect::>(), + )); + let res = month.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30 * 12 + 3); + assert_eq!(res.value(2), 60 * 12 + 6); + assert_eq!(res.value(3), 90 * 12 + 9); + + // Test TimestampMicrosecond + let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_microseconds() + .unwrap() + }) + .collect::>(), + )); + let res = month.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30 * 12 + 3); + assert_eq!(res.value(2), 60 * 12 + 6); + assert_eq!(res.value(3), 90 * 12 + 9); + + // Test TimestampNanosecond + let date_array: ArrayRef = Arc::new(TimestampNanosecondArray::from( + ori_date + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_nanoseconds() + .unwrap() + }) + .collect::>(), + )); + let res = month.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), 0); + assert_eq!(res.value(1), 30 * 12 + 3); + assert_eq!(res.value(2), 60 * 12 + 6); + assert_eq!(res.value(3), 90 * 12 + 9); + } + + #[test] + fn test_transform_days() { + let day = super::Day; + let ori_date = vec![ + NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(), + NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(), + NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(), + ]; + let expect_day = ori_date + .clone() + .into_iter() + .map(|data| { + data.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + }) + .collect::>(); + + // Test Date32 + let date_array: ArrayRef = Arc::new(Date32Array::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + }) + .collect::>(), + )); + let res = day.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_day[0]); + assert_eq!(res.value(1), expect_day[1]); + assert_eq!(res.value(2), expect_day[2]); + assert_eq!(res.value(3), expect_day[3]); + + // Test Date64 + let date_array: ArrayRef = Arc::new(Date64Array::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_milliseconds() + }) + .collect::>(), + )); + let res = day.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_day[0]); + assert_eq!(res.value(1), expect_day[1]); + assert_eq!(res.value(2), expect_day[2]); + assert_eq!(res.value(3), expect_day[3]); + + // Test TimestampSecond + let date_array: ArrayRef = Arc::new(TimestampSecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_seconds() + }) + .collect::>(), + )); + let res = day.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_day[0]); + assert_eq!(res.value(1), expect_day[1]); + assert_eq!(res.value(2), expect_day[2]); + assert_eq!(res.value(3), expect_day[3]); + + // Test TimestampMillisecond + let date_array: ArrayRef = Arc::new(TimestampMillisecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_milliseconds() + }) + .collect::>(), + )); + let res = day.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_day[0]); + assert_eq!(res.value(1), expect_day[1]); + assert_eq!(res.value(2), expect_day[2]); + assert_eq!(res.value(3), expect_day[3]); + + // Test TimestampMicrosecond + let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_microseconds() + .unwrap() + }) + .collect::>(), + )); + let res = day.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_day[0]); + assert_eq!(res.value(1), expect_day[1]); + assert_eq!(res.value(2), expect_day[2]); + assert_eq!(res.value(3), expect_day[3]); + + // Test TimestampNanosecond + let date_array: ArrayRef = Arc::new(TimestampNanosecondArray::from( + ori_date + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_nanoseconds() + .unwrap() + }) + .collect::>(), + )); + let res = day.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_day[0]); + assert_eq!(res.value(1), expect_day[1]); + assert_eq!(res.value(2), expect_day[2]); + assert_eq!(res.value(3), expect_day[3]); + } + + #[test] + fn test_transform_hours() { + let hour = super::Hour; + let ori_date = vec![ + NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(), + NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(), + NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(), + ]; + let expect_hour = ori_date + .clone() + .into_iter() + .map(|data| { + data.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_hours() as i32 + }) + .collect::>(); + + // Test TimestampSecond + let date_array: ArrayRef = Arc::new(TimestampSecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_seconds() + }) + .collect::>(), + )); + let res = hour.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_hour[0]); + assert_eq!(res.value(1), expect_hour[1]); + assert_eq!(res.value(2), expect_hour[2]); + assert_eq!(res.value(3), expect_hour[3]); + + // Test TimestampMillisecond + let date_array: ArrayRef = Arc::new(TimestampMillisecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_milliseconds() + }) + .collect::>(), + )); + let res = hour.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_hour[0]); + assert_eq!(res.value(1), expect_hour[1]); + assert_eq!(res.value(2), expect_hour[2]); + assert_eq!(res.value(3), expect_hour[3]); + + // Test TimestampMicrosecond + let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( + ori_date + .clone() + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_microseconds() + .unwrap() + }) + .collect::>(), + )); + let res = hour.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_hour[0]); + assert_eq!(res.value(1), expect_hour[1]); + assert_eq!(res.value(2), expect_hour[2]); + assert_eq!(res.value(3), expect_hour[3]); + + // Test TimestampNanosecond + let date_array: ArrayRef = Arc::new(TimestampNanosecondArray::from( + ori_date + .into_iter() + .map(|date| { + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_nanoseconds() + .unwrap() + }) + .collect::>(), + )); + let res = hour.transform(date_array).unwrap(); + let res = res.as_any().downcast_ref::().unwrap(); + assert_eq!(res.len(), 4); + assert_eq!(res.value(0), expect_hour[0]); + assert_eq!(res.value(1), expect_hour[1]); + assert_eq!(res.value(2), expect_hour[2]); + assert_eq!(res.value(3), expect_hour[3]); + } +} diff --git a/crates/iceberg/src/transform/void.rs b/crates/iceberg/src/transform/void.rs new file mode 100644 index 0000000000..984f1c4141 --- /dev/null +++ b/crates/iceberg/src/transform/void.rs @@ -0,0 +1,12 @@ +use crate::Result; +use arrow::array::{new_null_array, ArrayRef}; + +use super::TransformFunction; + +pub struct Void {} + +impl TransformFunction for Void { + fn transform(&self, input: ArrayRef) -> Result { + Ok(new_null_array(input.data_type(), input.len())) + } +} From f271a651007482fa609928ab9cbcc784a9650df8 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 29 Aug 2023 13:30:31 +0800 Subject: [PATCH 02/10] 1. remove unsupport type in iceberg 2. refactor error handle --- crates/iceberg/src/error.rs | 5 - crates/iceberg/src/transform/temporal.rs | 399 ++--------------------- 2 files changed, 21 insertions(+), 383 deletions(-) diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index d701faa2f4..e4ae576d82 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -44,10 +44,6 @@ pub enum ErrorKind { /// /// This error is returned when given iceberg feature is not supported. FeatureUnsupported, - /// Arrow compute error. - /// - /// Error returned by compute function in arrow. - ArrowCompute, } impl ErrorKind { @@ -63,7 +59,6 @@ impl From for &'static str { ErrorKind::Unexpected => "Unexpected", ErrorKind::DataInvalid => "DataInvalid", ErrorKind::FeatureUnsupported => "FeatureUnsupported", - ErrorKind::ArrowCompute => "ArrowCompute", } } } diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index ce685f0b31..d3104127b9 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -1,9 +1,6 @@ use super::TransformFunction; -use crate::{Error, Result}; -use arrow::array::{ - Array, Date64Array, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, -}; +use crate::Result; +use arrow::array::{Array, TimestampMicrosecondArray}; use arrow::compute::binary; use arrow::datatypes; use arrow::datatypes::DataType; @@ -18,24 +15,20 @@ use std::sync::Arc; const EPOCH_DAY_FROM_CE: i32 = 719163; const DAY_PER_SECOND: f64 = 0.0000115741; const HOUR_PER_SECOND: f64 = 1_f64 / 3600.0; +const BASE_YEAR: i32 = 1970; /// Extract a date or timestamp year, as years from 1970 pub struct Year; impl TransformFunction for Year { fn transform(&self, input: ArrayRef) -> Result { - let array = year_dyn(&input).map_err(|err| { - Error::new( - crate::ErrorKind::ArrowCompute, - format!("error in transformfunction: {}", err), - ) - })?; + let array = year_dyn(&input).expect("Should not call transform in Year with non-date type"); Ok(Arc::::new( array .as_any() .downcast_ref::() .unwrap() - .unary(|v| v - 1970), + .unary(|v| v - BASE_YEAR), )) } } @@ -45,15 +38,15 @@ pub struct Month; impl TransformFunction for Month { fn transform(&self, input: ArrayRef) -> Result { - let year_array = year_dyn(&input) - .map_err(|err| Error::new(crate::ErrorKind::ArrowCompute, format!("{err}")))?; + let year_array = + year_dyn(&input).expect("Should not call transform in Month with non-date type"); let year_array: Int32Array = year_array .as_any() .downcast_ref::() .unwrap() - .unary(|v| 12 * (v - 1970)); - let month_array = month_dyn(&input) - .map_err(|err| Error::new(crate::ErrorKind::ArrowCompute, format!("{err}")))?; + .unary(|v| 12 * (v - BASE_YEAR)); + let month_array = + month_dyn(&input).expect("Should not call transform in Month with non-date type"); Ok(Arc::::new( binary( month_array.as_any().downcast_ref::().unwrap(), @@ -72,30 +65,11 @@ pub struct Day; impl TransformFunction for Day { fn transform(&self, input: ArrayRef) -> Result { let res: Int32Array = match input.data_type() { - DataType::Timestamp(unit, _) => match unit { - datatypes::TimeUnit::Second => input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { (v as f64 * DAY_PER_SECOND) as i32 }), - datatypes::TimeUnit::Millisecond => input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { (v as f64 / 1000.0 * DAY_PER_SECOND) as i32 }), - datatypes::TimeUnit::Microsecond => input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32 }), - datatypes::TimeUnit::Nanosecond => input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { - (v as f64 / 1000.0 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32 - }), - }, + DataType::Timestamp(datatypes::TimeUnit::Microsecond, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32 }), DataType::Date32 => { input .as_any() @@ -106,16 +80,6 @@ impl TransformFunction for Day { - EPOCH_DAY_FROM_CE }) } - DataType::Date64 => { - input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { - datatypes::Date64Type::to_naive_date(v).num_days_from_ce() - - EPOCH_DAY_FROM_CE - }) - } _ => unreachable!( "Should not call transform in Day with type {:?}", input.data_type() @@ -131,40 +95,11 @@ pub struct Hour; impl TransformFunction for Hour { fn transform(&self, input: ArrayRef) -> Result { let res: Int32Array = match input.data_type() { - DataType::Timestamp(unit, _) => match unit { - datatypes::TimeUnit::Second => input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { - println!("second: {}", v); - (v as f64 * HOUR_PER_SECOND) as i32 - }), - datatypes::TimeUnit::Millisecond => input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { - println!("mill: {}", v); - (v as f64 * HOUR_PER_SECOND / 1000.0) as i32 - }), - datatypes::TimeUnit::Microsecond => input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { - println!("micro: {}", v); - (v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0) as i32 - }), - datatypes::TimeUnit::Nanosecond => input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { - println!("nano: {}", v); - (v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0 / 1000.0) as i32 - }), - }, + DataType::Timestamp(datatypes::TimeUnit::Microsecond, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { (v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0) as i32 }), _ => unreachable!( "Should not call transform in Day with type {:?}", input.data_type() @@ -176,10 +111,7 @@ impl TransformFunction for Hour { #[cfg(test)] mod test { - use arrow::array::{ - ArrayRef, Date32Array, Date64Array, Int32Array, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, - }; + use arrow::array::{ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray}; use chrono::NaiveDate; use std::sync::Arc; @@ -214,67 +146,9 @@ mod test { assert_eq!(res.value(2), 60); assert_eq!(res.value(3), 90); - // Test Date64 - let date_array: ArrayRef = Arc::new(Date64Array::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_milliseconds() - }) - .collect::>(), - )); - let res = year.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), 0); - assert_eq!(res.value(1), 30); - assert_eq!(res.value(2), 60); - assert_eq!(res.value(3), 90); - - // Test TimestampSecond - let date_array: ArrayRef = Arc::new(TimestampSecondArray::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_seconds() - }) - .collect::>(), - )); - let res = year.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), 0); - assert_eq!(res.value(1), 30); - assert_eq!(res.value(2), 60); - assert_eq!(res.value(3), 90); - - // Test TimestampMillisecond - let date_array: ArrayRef = Arc::new(TimestampMillisecondArray::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_milliseconds() - }) - .collect::>(), - )); - let res = year.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), 0); - assert_eq!(res.value(1), 30); - assert_eq!(res.value(2), 60); - assert_eq!(res.value(3), 90); - // Test TimestampMicrosecond let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( ori_date - .clone() .into_iter() .map(|date| { date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) @@ -290,25 +164,6 @@ mod test { assert_eq!(res.value(1), 30); assert_eq!(res.value(2), 60); assert_eq!(res.value(3), 90); - - // Test TimestampNanosecond - let date_array: ArrayRef = Arc::new(TimestampNanosecondArray::from( - ori_date - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_nanoseconds() - .unwrap() - }) - .collect::>(), - )); - let res = year.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), 0); - assert_eq!(res.value(1), 30); - assert_eq!(res.value(2), 60); - assert_eq!(res.value(3), 90); } #[test] @@ -340,67 +195,9 @@ mod test { assert_eq!(res.value(2), 60 * 12 + 6); assert_eq!(res.value(3), 90 * 12 + 9); - // Test Date64 - let date_array: ArrayRef = Arc::new(Date64Array::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_milliseconds() - }) - .collect::>(), - )); - let res = month.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), 0); - assert_eq!(res.value(1), 30 * 12 + 3); - assert_eq!(res.value(2), 60 * 12 + 6); - assert_eq!(res.value(3), 90 * 12 + 9); - - // Test TimestampSecond - let date_array: ArrayRef = Arc::new(TimestampSecondArray::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_seconds() - }) - .collect::>(), - )); - let res = month.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), 0); - assert_eq!(res.value(1), 30 * 12 + 3); - assert_eq!(res.value(2), 60 * 12 + 6); - assert_eq!(res.value(3), 90 * 12 + 9); - - // Test TimestampMillisecond - let date_array: ArrayRef = Arc::new(TimestampMillisecondArray::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_milliseconds() - }) - .collect::>(), - )); - let res = month.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), 0); - assert_eq!(res.value(1), 30 * 12 + 3); - assert_eq!(res.value(2), 60 * 12 + 6); - assert_eq!(res.value(3), 90 * 12 + 9); - // Test TimestampMicrosecond let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( ori_date - .clone() .into_iter() .map(|date| { date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) @@ -416,25 +213,6 @@ mod test { assert_eq!(res.value(1), 30 * 12 + 3); assert_eq!(res.value(2), 60 * 12 + 6); assert_eq!(res.value(3), 90 * 12 + 9); - - // Test TimestampNanosecond - let date_array: ArrayRef = Arc::new(TimestampNanosecondArray::from( - ori_date - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_nanoseconds() - .unwrap() - }) - .collect::>(), - )); - let res = month.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), 0); - assert_eq!(res.value(1), 30 * 12 + 3); - assert_eq!(res.value(2), 60 * 12 + 6); - assert_eq!(res.value(3), 90 * 12 + 9); } #[test] @@ -474,67 +252,9 @@ mod test { assert_eq!(res.value(2), expect_day[2]); assert_eq!(res.value(3), expect_day[3]); - // Test Date64 - let date_array: ArrayRef = Arc::new(Date64Array::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_milliseconds() - }) - .collect::>(), - )); - let res = day.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), expect_day[0]); - assert_eq!(res.value(1), expect_day[1]); - assert_eq!(res.value(2), expect_day[2]); - assert_eq!(res.value(3), expect_day[3]); - - // Test TimestampSecond - let date_array: ArrayRef = Arc::new(TimestampSecondArray::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_seconds() - }) - .collect::>(), - )); - let res = day.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), expect_day[0]); - assert_eq!(res.value(1), expect_day[1]); - assert_eq!(res.value(2), expect_day[2]); - assert_eq!(res.value(3), expect_day[3]); - - // Test TimestampMillisecond - let date_array: ArrayRef = Arc::new(TimestampMillisecondArray::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_milliseconds() - }) - .collect::>(), - )); - let res = day.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), expect_day[0]); - assert_eq!(res.value(1), expect_day[1]); - assert_eq!(res.value(2), expect_day[2]); - assert_eq!(res.value(3), expect_day[3]); - // Test TimestampMicrosecond let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( ori_date - .clone() .into_iter() .map(|date| { date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) @@ -550,25 +270,6 @@ mod test { assert_eq!(res.value(1), expect_day[1]); assert_eq!(res.value(2), expect_day[2]); assert_eq!(res.value(3), expect_day[3]); - - // Test TimestampNanosecond - let date_array: ArrayRef = Arc::new(TimestampNanosecondArray::from( - ori_date - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_nanoseconds() - .unwrap() - }) - .collect::>(), - )); - let res = day.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), expect_day[0]); - assert_eq!(res.value(1), expect_day[1]); - assert_eq!(res.value(2), expect_day[2]); - assert_eq!(res.value(3), expect_day[3]); } #[test] @@ -589,48 +290,9 @@ mod test { }) .collect::>(); - // Test TimestampSecond - let date_array: ArrayRef = Arc::new(TimestampSecondArray::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_seconds() - }) - .collect::>(), - )); - let res = hour.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), expect_hour[0]); - assert_eq!(res.value(1), expect_hour[1]); - assert_eq!(res.value(2), expect_hour[2]); - assert_eq!(res.value(3), expect_hour[3]); - - // Test TimestampMillisecond - let date_array: ArrayRef = Arc::new(TimestampMillisecondArray::from( - ori_date - .clone() - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_milliseconds() - }) - .collect::>(), - )); - let res = hour.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), expect_hour[0]); - assert_eq!(res.value(1), expect_hour[1]); - assert_eq!(res.value(2), expect_hour[2]); - assert_eq!(res.value(3), expect_hour[3]); - // Test TimestampMicrosecond let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( ori_date - .clone() .into_iter() .map(|date| { date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) @@ -646,24 +308,5 @@ mod test { assert_eq!(res.value(1), expect_hour[1]); assert_eq!(res.value(2), expect_hour[2]); assert_eq!(res.value(3), expect_hour[3]); - - // Test TimestampNanosecond - let date_array: ArrayRef = Arc::new(TimestampNanosecondArray::from( - ori_date - .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) - .num_nanoseconds() - .unwrap() - }) - .collect::>(), - )); - let res = hour.transform(date_array).unwrap(); - let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); - assert_eq!(res.value(0), expect_hour[0]); - assert_eq!(res.value(1), expect_hour[1]); - assert_eq!(res.value(2), expect_hour[2]); - assert_eq!(res.value(3), expect_hour[3]); } } From ea9ae0b1e9e3c42db910830412cf5cdd312859c0 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 29 Aug 2023 13:39:39 +0800 Subject: [PATCH 03/10] add license --- crates/iceberg/src/transform/identity.rs | 17 +++++++++++++++++ crates/iceberg/src/transform/temporal.rs | 17 +++++++++++++++++ crates/iceberg/src/transform/void.rs | 17 +++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/crates/iceberg/src/transform/identity.rs b/crates/iceberg/src/transform/identity.rs index 3acd01615f..ec0f09af37 100644 --- a/crates/iceberg/src/transform/identity.rs +++ b/crates/iceberg/src/transform/identity.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::Result; use arrow::array::ArrayRef; diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index d3104127b9..f9add8ed86 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use super::TransformFunction; use crate::Result; use arrow::array::{Array, TimestampMicrosecondArray}; diff --git a/crates/iceberg/src/transform/void.rs b/crates/iceberg/src/transform/void.rs index 984f1c4141..086e257ee3 100644 --- a/crates/iceberg/src/transform/void.rs +++ b/crates/iceberg/src/transform/void.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::Result; use arrow::array::{new_null_array, ArrayRef}; From 6c18a007fffbc223f60969ab12336dcd4c477a7d Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 29 Aug 2023 18:22:50 +0800 Subject: [PATCH 04/10] clean code --- crates/iceberg/src/transform/temporal.rs | 54 +++++++++++++++--------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index f9add8ed86..ad9481793b 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -16,7 +16,7 @@ // under the License. use super::TransformFunction; -use crate::Result; +use crate::{Error, ErrorKind, Result}; use arrow::array::{Array, TimestampMicrosecondArray}; use arrow::compute::binary; use arrow::datatypes; @@ -28,24 +28,28 @@ use arrow::{ use chrono::Datelike; use std::sync::Arc; -/// 719163 is the number of days from 0000-01-01 to 1970-01-01 -const EPOCH_DAY_FROM_CE: i32 = 719163; -const DAY_PER_SECOND: f64 = 0.0000115741; -const HOUR_PER_SECOND: f64 = 1_f64 / 3600.0; -const BASE_YEAR: i32 = 1970; +/// The number of days since unix epoch. +const DAY_SINCE_UNIX_EPOCH: i32 = 719163; +/// Hour in one second. +const HOUR_PER_SECOND: f64 = 1.0_f64 / 3600.0_f64; +/// Day in one second. +const DAY_PER_SECOND: f64 = 1.0_f64 / 24.0_f64 / 3600.0_f64; +/// Year of unix epoch. +const UNIX_EPOCH_YEAR: i32 = 1970; /// Extract a date or timestamp year, as years from 1970 pub struct Year; impl TransformFunction for Year { fn transform(&self, input: ArrayRef) -> Result { - let array = year_dyn(&input).expect("Should not call transform in Year with non-date type"); + let array = + year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; Ok(Arc::::new( array .as_any() .downcast_ref::() .unwrap() - .unary(|v| v - BASE_YEAR), + .unary(|v| v - UNIX_EPOCH_YEAR), )) } } @@ -56,14 +60,14 @@ pub struct Month; impl TransformFunction for Month { fn transform(&self, input: ArrayRef) -> Result { let year_array = - year_dyn(&input).expect("Should not call transform in Month with non-date type"); + year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; let year_array: Int32Array = year_array .as_any() .downcast_ref::() .unwrap() - .unary(|v| 12 * (v - BASE_YEAR)); + .unary(|v| 12 * (v - UNIX_EPOCH_YEAR)); let month_array = - month_dyn(&input).expect("Should not call transform in Month with non-date type"); + month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; Ok(Arc::::new( binary( month_array.as_any().downcast_ref::().unwrap(), @@ -94,13 +98,18 @@ impl TransformFunction for Day { .unwrap() .unary(|v| -> i32 { datatypes::Date32Type::to_naive_date(v).num_days_from_ce() - - EPOCH_DAY_FROM_CE + - DAY_SINCE_UNIX_EPOCH }) } - _ => unreachable!( - "Should not call transform in Day with type {:?}", - input.data_type() - ), + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Should not call internally for unsupport data type {:?}", + input.data_type() + ), + )) + } }; Ok(Arc::new(res)) } @@ -117,10 +126,15 @@ impl TransformFunction for Hour { .downcast_ref::() .unwrap() .unary(|v| -> i32 { (v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0) as i32 }), - _ => unreachable!( - "Should not call transform in Day with type {:?}", - input.data_type() - ), + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Should not call internally for unsupport data type {:?}", + input.data_type() + ), + )) + } }; Ok(Arc::new(res)) } From e6f1b071f13c4f90c4db5b7c4f3ad90ec6f4e03b Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 31 Aug 2023 14:20:08 +0800 Subject: [PATCH 05/10] 1. use arrow-* instead of arrow 2. refine test --- crates/iceberg/Cargo.toml | 4 +- crates/iceberg/src/transform/identity.rs | 2 +- crates/iceberg/src/transform/mod.rs | 2 +- crates/iceberg/src/transform/temporal.rs | 145 ++++++++++++++++------- crates/iceberg/src/transform/void.rs | 2 +- 5 files changed, 111 insertions(+), 44 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index a6180f1b85..6197844d81 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -27,7 +27,9 @@ license = "Apache-2.0" keywords = ["iceberg"] [dependencies] -arrow = { version = ">=46" } +arrow-array = { version = ">=46" } +arrow-schema = { version = ">=46" } +arrow-arith = { version = ">=46" } anyhow = "1.0.72" apache-avro = "0.15" async-trait = "0.1" diff --git a/crates/iceberg/src/transform/identity.rs b/crates/iceberg/src/transform/identity.rs index ec0f09af37..2ea6a203b9 100644 --- a/crates/iceberg/src/transform/identity.rs +++ b/crates/iceberg/src/transform/identity.rs @@ -16,7 +16,7 @@ // under the License. use crate::Result; -use arrow::array::ArrayRef; +use arrow_array::ArrayRef; use super::TransformFunction; diff --git a/crates/iceberg/src/transform/mod.rs b/crates/iceberg/src/transform/mod.rs index 4dd0235fb7..bbb86056d5 100644 --- a/crates/iceberg/src/transform/mod.rs +++ b/crates/iceberg/src/transform/mod.rs @@ -17,7 +17,7 @@ //! Transform function used to compute partition values. use crate::{spec::Transform, Result}; -use arrow::array::ArrayRef; +use arrow_array::ArrayRef; mod identity; mod temporal; diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index ad9481793b..3ba6ec0991 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -17,14 +17,14 @@ use super::TransformFunction; use crate::{Error, ErrorKind, Result}; -use arrow::array::{Array, TimestampMicrosecondArray}; -use arrow::compute::binary; -use arrow::datatypes; -use arrow::datatypes::DataType; -use arrow::{ - array::{ArrayRef, Date32Array, Int32Array}, - compute::{month_dyn, year_dyn}, +use arrow_arith::{ + arity::binary, + temporal::{month_dyn, year_dyn}, }; +use arrow_array::{ + types::Date32Type, Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray, +}; +use arrow_schema::{DataType, TimeUnit}; use chrono::Datelike; use std::sync::Arc; @@ -86,7 +86,7 @@ pub struct Day; impl TransformFunction for Day { fn transform(&self, input: ArrayRef) -> Result { let res: Int32Array = match input.data_type() { - DataType::Timestamp(datatypes::TimeUnit::Microsecond, _) => input + DataType::Timestamp(TimeUnit::Microsecond, _) => input .as_any() .downcast_ref::() .unwrap() @@ -97,8 +97,7 @@ impl TransformFunction for Day { .downcast_ref::() .unwrap() .unary(|v| -> i32 { - datatypes::Date32Type::to_naive_date(v).num_days_from_ce() - - DAY_SINCE_UNIX_EPOCH + Date32Type::to_naive_date(v).num_days_from_ce() - DAY_SINCE_UNIX_EPOCH }) } _ => { @@ -121,7 +120,7 @@ pub struct Hour; impl TransformFunction for Hour { fn transform(&self, input: ArrayRef) -> Result { let res: Int32Array = match input.data_type() { - DataType::Timestamp(datatypes::TimeUnit::Microsecond, _) => input + DataType::Timestamp(TimeUnit::Microsecond, _) => input .as_any() .downcast_ref::() .unwrap() @@ -142,8 +141,8 @@ impl TransformFunction for Hour { #[cfg(test)] mod test { - use arrow::array::{ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray}; - use chrono::NaiveDate; + use arrow_array::{ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray}; + use chrono::{NaiveDate, NaiveDateTime}; use std::sync::Arc; use crate::transform::TransformFunction; @@ -151,17 +150,16 @@ mod test { #[test] fn test_transform_years() { let year = super::Year; + + // Test Date32 let ori_date = vec![ NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), NaiveDate::from_ymd_opt(2000, 1, 1).unwrap(), NaiveDate::from_ymd_opt(2030, 1, 1).unwrap(), NaiveDate::from_ymd_opt(2060, 1, 1).unwrap(), ]; - - // Test Date32 let date_array: ArrayRef = Arc::new(Date32Array::from( ori_date - .clone() .into_iter() .map(|date| { date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) @@ -178,11 +176,28 @@ mod test { assert_eq!(res.value(3), 90); // Test TimestampMicrosecond + let ori_timestamp = vec![ + NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2000-01-01 19:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2030-01-01 10:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2060-01-01 11:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + ]; let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( - ori_date + ori_timestamp .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:00.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) .num_microseconds() .unwrap() }) @@ -200,17 +215,16 @@ mod test { #[test] fn test_transform_months() { let month = super::Month; + + // Test Date32 let ori_date = vec![ NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(), NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(), NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(), ]; - - // Test Date32 let date_array: ArrayRef = Arc::new(Date32Array::from( ori_date - .clone() .into_iter() .map(|date| { date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) @@ -227,11 +241,28 @@ mod test { assert_eq!(res.value(3), 90 * 12 + 9); // Test TimestampMicrosecond + let ori_timestamp = vec![ + NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2000-04-01 19:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2030-07-01 10:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + ]; let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( - ori_date + ori_timestamp .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:00.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) .num_microseconds() .unwrap() }) @@ -267,7 +298,6 @@ mod test { // Test Date32 let date_array: ArrayRef = Arc::new(Date32Array::from( ori_date - .clone() .into_iter() .map(|date| { date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) @@ -284,11 +314,28 @@ mod test { assert_eq!(res.value(3), expect_day[3]); // Test TimestampMicrosecond + let ori_timestamp = vec![ + NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2000-04-01 19:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2030-07-01 10:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + ]; let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( - ori_date + ori_timestamp .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:00.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) .num_microseconds() .unwrap() }) @@ -306,27 +353,45 @@ mod test { #[test] fn test_transform_hours() { let hour = super::Hour; - let ori_date = vec![ - NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), - NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(), - NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(), - NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(), + let ori_timestamp = vec![ + NaiveDateTime::parse_from_str("1970-01-01 19:01:23.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2000-03-01 12:01:23.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2030-10-02 10:01:23.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), + NaiveDateTime::parse_from_str("2060-09-01 05:03:23.123", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), ]; - let expect_hour = ori_date + let expect_hour = ori_timestamp .clone() .into_iter() - .map(|data| { - data.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:0.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) .num_hours() as i32 }) .collect::>(); // Test TimestampMicrosecond let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( - ori_date + ori_timestamp .into_iter() - .map(|date| { - date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .map(|timestamp| { + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str( + "1970-01-01 00:00:0.0", + "%Y-%m-%d %H:%M:%S.%f", + ) + .unwrap(), + ) .num_microseconds() .unwrap() }) diff --git a/crates/iceberg/src/transform/void.rs b/crates/iceberg/src/transform/void.rs index 086e257ee3..56fc3c5295 100644 --- a/crates/iceberg/src/transform/void.rs +++ b/crates/iceberg/src/transform/void.rs @@ -16,7 +16,7 @@ // under the License. use crate::Result; -use arrow::array::{new_null_array, ArrayRef}; +use arrow_array::{new_null_array, ArrayRef}; use super::TransformFunction; From fba5c7ce7c52f71b86e7d01112d6cf8944ea8917 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 7 Sep 2023 14:57:17 +0800 Subject: [PATCH 06/10] support bucket and truncate transform function --- crates/iceberg/Cargo.toml | 2 + crates/iceberg/src/transform/bucket.rs | 227 +++++++++++++++++++++++ crates/iceberg/src/transform/mod.rs | 8 +- crates/iceberg/src/transform/truncate.rs | 177 ++++++++++++++++++ 4 files changed, 412 insertions(+), 2 deletions(-) create mode 100644 crates/iceberg/src/transform/bucket.rs create mode 100644 crates/iceberg/src/transform/truncate.rs diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6197844d81..708dddc15f 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -52,6 +52,8 @@ serde_json = "^1.0" serde_repr = "0.1.16" url = "2" uuid = "1.4.1" +murmur3 = "0.5.2" + [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs new file mode 100644 index 0000000000..c58f3de6f4 --- /dev/null +++ b/crates/iceberg/src/transform/bucket.rs @@ -0,0 +1,227 @@ +use std::sync::Arc; + +use arrow_array::ArrayRef; +use arrow_schema::{DataType, TimeUnit}; + +use super::TransformFunction; + +pub struct Bucket { + mod_n: u32, +} + +impl Bucket { + pub fn new(mod_n: u32) -> Self { + Self { mod_n } + } +} + +impl Bucket { + /// When switch the hash function, we only need to change this function. + fn hash_bytes(mut v: &[u8]) -> i32 { + murmur3::murmur3_32(&mut v, 0).unwrap() as i32 + } + + fn hash_int(v: i32) -> i32 { + Self::hash_long(v as i64) + } + + fn hash_long(v: i64) -> i32 { + Self::hash_bytes(v.to_le_bytes().as_slice()) + } + + /// v is days from unix epoch + fn hash_date(v: i32) -> i32 { + Self::hash_int(v) + } + + /// v is microseconds from midnight + fn hash_time(v: i64) -> i32 { + Self::hash_long(v) + } + + /// v is microseconds from unix epoch + fn hash_timestamp(v: i64) -> i32 { + Self::hash_long(v) + } + + fn hash_str(s: &str) -> i32 { + Self::hash_bytes(s.as_bytes()) + } + + /// Decimal values are hashed using the minimum number of bytes required to hold the unscaled value as a two’s complement big-endian + /// ref: https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements + fn hash_decimal(v: i128) -> i32 { + let bytes = v.to_be_bytes(); + if let Some(start) = bytes.iter().position(|&x| x != 0) { + Self::hash_bytes(&bytes[start..]) + } else { + Self::hash_bytes(&[0]) + } + } + + /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N + /// ref: https://iceberg.apache.org/spec/#partitioning + fn bucket_n(&self, v: i32) -> i32 { + (v & i32::MAX) % (self.mod_n as i32) + } +} + +impl TransformFunction for Bucket { + fn transform(&self, input: ArrayRef) -> crate::Result { + let res: arrow_array::Int32Array = match input.data_type() { + DataType::Int32 => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_int(v))), + DataType::Int64 => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_long(v))), + DataType::Decimal128(_, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_decimal(v))), + DataType::Date32 => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_date(v))), + DataType::Time64(TimeUnit::Microsecond) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_time(v))), + DataType::Timestamp(TimeUnit::Microsecond, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_n(Self::hash_timestamp(v))), + DataType::Utf8 => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))), + ), + DataType::LargeUtf8 => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))), + ), + DataType::Binary => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))), + ), + DataType::LargeBinary => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))), + ), + DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))), + ), + _ => unreachable!("Unsupported data type: {:?}", input.data_type()), + }; + Ok(Arc::new(res)) + } +} + +#[cfg(test)] +mod test { + use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; + + use super::Bucket; + #[test] + fn test_hash() { + // test int + assert_eq!(Bucket::hash_int(34), 2017239379); + // test long + assert_eq!(Bucket::hash_long(34), 2017239379); + // test decimal + assert_eq!(Bucket::hash_decimal(1420), -500754589); + // test date + let date = NaiveDate::from_ymd_opt(2017, 11, 16).unwrap(); + assert_eq!( + Bucket::hash_date( + date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32 + ), + -653330422 + ); + // test time + let time = NaiveTime::from_hms_opt(22, 31, 8).unwrap(); + assert_eq!( + Bucket::hash_time( + time.signed_duration_since(NaiveTime::from_hms_opt(0, 0, 0).unwrap()) + .num_microseconds() + .unwrap() + ), + -662762989 + ); + // test timestamp + let timestamp = + NaiveDateTime::parse_from_str("2017-11-16 22:31:08", "%Y-%m-%d %H:%M:%S").unwrap(); + assert_eq!( + Bucket::hash_timestamp( + timestamp + .signed_duration_since( + NaiveDateTime::parse_from_str("1970-01-01 00:00:00", "%Y-%m-%d %H:%M:%S") + .unwrap() + ) + .num_microseconds() + .unwrap() + ), + -2047944441 + ); + // test timestamp with tz + let timestamp = DateTime::parse_from_rfc3339("2017-11-16T14:31:08-08:00").unwrap(); + assert_eq!( + Bucket::hash_timestamp( + timestamp + .signed_duration_since( + DateTime::parse_from_rfc3339("1970-01-01T00:00:00-00:00").unwrap() + ) + .num_microseconds() + .unwrap() + ), + -2047944441 + ); + // test str + assert_eq!(Bucket::hash_str("iceberg"), 1210000089); + // test uuid + assert_eq!( + Bucket::hash_bytes( + [ + 0xF7, 0x9C, 0x3E, 0x09, 0x67, 0x7C, 0x4B, 0xBD, 0xA4, 0x79, 0x3F, 0x34, 0x9C, + 0xB7, 0x85, 0xE7 + ] + .as_ref() + ), + 1488055340 + ); + // test fixed and binary + assert_eq!( + Bucket::hash_bytes([0x00, 0x01, 0x02, 0x03].as_ref()), + -188683207 + ); + } +} diff --git a/crates/iceberg/src/transform/mod.rs b/crates/iceberg/src/transform/mod.rs index bbb86056d5..dead9db89b 100644 --- a/crates/iceberg/src/transform/mod.rs +++ b/crates/iceberg/src/transform/mod.rs @@ -19,8 +19,10 @@ use crate::{spec::Transform, Result}; use arrow_array::ArrayRef; +mod bucket; mod identity; mod temporal; +mod truncate; mod void; /// TransformFunction is a trait that defines the interface for all transform functions. @@ -43,9 +45,11 @@ pub fn create_transform_function(transform: &Transform) -> Result Ok(Box::new(temporal::Month {})), Transform::Day => Ok(Box::new(temporal::Day {})), Transform::Hour => Ok(Box::new(temporal::Hour {})), - _ => Err(crate::error::Error::new( + Transform::Bucket(mod_n) => Ok(Box::new(bucket::Bucket::new(*mod_n))), + Transform::Truncate(width) => Ok(Box::new(truncate::Truncate::new(*width))), + Transform::Unknown => Err(crate::error::Error::new( crate::ErrorKind::FeatureUnsupported, - format!("Transform {:?} is not implemented", transform), + "Transform Unknown is not implemented", )), } } diff --git a/crates/iceberg/src/transform/truncate.rs b/crates/iceberg/src/transform/truncate.rs new file mode 100644 index 0000000000..bc496fb857 --- /dev/null +++ b/crates/iceberg/src/transform/truncate.rs @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::ArrayRef; +use arrow_schema::DataType; + +use crate::Error; + +use super::TransformFunction; + +pub struct Truncate { + width: u32, +} + +impl Truncate { + pub fn new(width: u32) -> Self { + Self { width } + } +} + +impl TransformFunction for Truncate { + fn transform(&self, input: ArrayRef) -> crate::Result { + match input.data_type() { + DataType::Int32 => { + let width: i32 = self.width.try_into().map_err(|_| { + Error::new( + crate::ErrorKind::DataInvalid, + "width is failed to convert to i32 when truncate Int32Array", + ) + })?; + let res: arrow_array::Int32Array = input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| v - (((v % width) + width) % width)); + Ok(Arc::new(res)) + } + DataType::Int64 => { + let width = self.width as i64; + let res: arrow_array::Int64Array = input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| v - (((v % width) + width) % width)); + Ok(Arc::new(res)) + } + DataType::Decimal128(precision, scale) => { + let width = self.width as i128; + let res: arrow_array::Decimal128Array = input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| v - (((v % width) + width) % width)) + .with_precision_and_scale(*precision, *scale) + .map_err(|err| Error::new(crate::ErrorKind::Unexpected, format!("{err}")))?; + Ok(Arc::new(res)) + } + DataType::Utf8 => { + let len = self.width as usize; + let res: arrow_array::StringArray = arrow_array::StringArray::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.map(|v| &v[..len])), + ); + Ok(Arc::new(res)) + } + DataType::LargeUtf8 => { + let len = self.width as usize; + let res: arrow_array::LargeStringArray = arrow_array::LargeStringArray::from_iter( + input + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.map(|v| &v[..len])), + ); + Ok(Arc::new(res)) + } + _ => unreachable!("Truncate transform only supports (int,long,decimal,string) types"), + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ + builder::PrimitiveBuilder, types::Decimal128Type, Decimal128Array, Int32Array, Int64Array, + }; + + use crate::transform::TransformFunction; + + // Test case ref from: https://iceberg.apache.org/spec/#truncate-transform-details + #[test] + fn test_truncate() { + // test truncate int + let input = Arc::new(Int32Array::from(vec![1, -1])); + let res = super::Truncate::new(10).transform(input).unwrap(); + assert_eq!( + res.as_any().downcast_ref::().unwrap().value(0), + 0 + ); + assert_eq!( + res.as_any().downcast_ref::().unwrap().value(1), + -10 + ); + + // test truncate long + let input = Arc::new(Int64Array::from(vec![1, -1])); + let res = super::Truncate::new(10).transform(input).unwrap(); + assert_eq!( + res.as_any().downcast_ref::().unwrap().value(0), + 0 + ); + assert_eq!( + res.as_any().downcast_ref::().unwrap().value(1), + -10 + ); + + // test decimal + let mut buidler = PrimitiveBuilder::::new() + .with_precision_and_scale(20, 2) + .unwrap(); + buidler.append_value(1065); + let input = Arc::new(buidler.finish()); + let res = super::Truncate::new(50).transform(input).unwrap(); + assert_eq!( + res.as_any() + .downcast_ref::() + .unwrap() + .value(0), + 1050 + ); + + // test string + let input = Arc::new(arrow_array::StringArray::from(vec!["iceberg"])); + let res = super::Truncate::new(3).transform(input).unwrap(); + assert_eq!( + res.as_any() + .downcast_ref::() + .unwrap() + .value(0), + "ice" + ); + + // test large string + let input = Arc::new(arrow_array::LargeStringArray::from(vec!["iceberg"])); + let res = super::Truncate::new(3).transform(input).unwrap(); + assert_eq!( + res.as_any() + .downcast_ref::() + .unwrap() + .value(0), + "ice" + ); + } +} From 53044e3408849753f4495fdcfb71f4a4266c248d Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 11 Sep 2023 10:10:33 +0800 Subject: [PATCH 07/10] make code more clear --- crates/iceberg/src/transform/truncate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transform/truncate.rs b/crates/iceberg/src/transform/truncate.rs index bc496fb857..436ec14108 100644 --- a/crates/iceberg/src/transform/truncate.rs +++ b/crates/iceberg/src/transform/truncate.rs @@ -48,7 +48,7 @@ impl TransformFunction for Truncate { .as_any() .downcast_ref::() .unwrap() - .unary(|v| v - (((v % width) + width) % width)); + .unary(|v| v - v.rem_euclid(width)); Ok(Arc::new(res)) } DataType::Int64 => { From 8da239fb30800b1a5ee04fbd707523e4725901d8 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 13 Sep 2023 16:03:44 +0800 Subject: [PATCH 08/10] fix check --- crates/iceberg/src/transform/bucket.rs | 17 +++++++++++++++++ crates/iceberg/src/transform/temporal.rs | 4 ++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs index c58f3de6f4..c9fe9df37b 100644 --- a/crates/iceberg/src/transform/bucket.rs +++ b/crates/iceberg/src/transform/bucket.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::sync::Arc; use arrow_array::ArrayRef; diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index 3ba6ec0991..f914d50f9d 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -104,7 +104,7 @@ impl TransformFunction for Day { return Err(Error::new( ErrorKind::Unexpected, format!( - "Should not call internally for unsupport data type {:?}", + "Should not call internally for unsupported data type {:?}", input.data_type() ), )) @@ -129,7 +129,7 @@ impl TransformFunction for Hour { return Err(Error::new( ErrorKind::Unexpected, format!( - "Should not call internally for unsupport data type {:?}", + "Should not call internally for unsupported data type {:?}", input.data_type() ), )) From bd9279cb4d4349a5b5e0fe50aeb4592f97b41c49 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 19 Sep 2023 23:03:46 +0800 Subject: [PATCH 09/10] fix to truncate Unicode correctly and add related test --- crates/iceberg/Cargo.toml | 9 +++-- crates/iceberg/src/transform/truncate.rs | 46 ++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 708dddc15f..aff622e627 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -27,12 +27,12 @@ license = "Apache-2.0" keywords = ["iceberg"] [dependencies] -arrow-array = { version = ">=46" } -arrow-schema = { version = ">=46" } -arrow-arith = { version = ">=46" } anyhow = "1.0.72" apache-avro = "0.15" async-trait = "0.1" +arrow-arith = { version = ">=46" } +arrow-array = { version = ">=46" } +arrow-schema = { version = ">=46" } bimap = "0.6" bitvec = "1.0.1" chrono = "0.4" @@ -41,6 +41,7 @@ either = "1" futures = "0.3" itertools = "0.11" lazy_static = "1" +murmur3 = "0.5.2" once_cell = "1" opendal = "0.39" ordered-float = "3.7.0" @@ -52,8 +53,6 @@ serde_json = "^1.0" serde_repr = "0.1.16" url = "2" uuid = "1.4.1" -murmur3 = "0.5.2" - [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/crates/iceberg/src/transform/truncate.rs b/crates/iceberg/src/transform/truncate.rs index 436ec14108..43e79e4b94 100644 --- a/crates/iceberg/src/transform/truncate.rs +++ b/crates/iceberg/src/transform/truncate.rs @@ -32,6 +32,13 @@ impl Truncate { pub fn new(width: u32) -> Self { Self { width } } + + fn truncate_str_by_char(s: &str, max_chars: usize) -> &str { + match s.char_indices().nth(max_chars) { + None => s, + Some((idx, _)) => &s[..idx], + } + } } impl TransformFunction for Truncate { @@ -79,7 +86,7 @@ impl TransformFunction for Truncate { .downcast_ref::() .unwrap() .iter() - .map(|v| v.map(|v| &v[..len])), + .map(|v| v.map(|v| Self::truncate_str_by_char(v, len))), ); Ok(Arc::new(res)) } @@ -91,7 +98,7 @@ impl TransformFunction for Truncate { .downcast_ref::() .unwrap() .iter() - .map(|v| v.map(|v| &v[..len])), + .map(|v| v.map(|v| Self::truncate_str_by_char(v, len))), ); Ok(Arc::new(res)) } @@ -112,7 +119,7 @@ mod test { // Test case ref from: https://iceberg.apache.org/spec/#truncate-transform-details #[test] - fn test_truncate() { + fn test_truncate_simple() { // test truncate int let input = Arc::new(Int32Array::from(vec![1, -1])); let res = super::Truncate::new(10).transform(input).unwrap(); @@ -174,4 +181,37 @@ mod test { "ice" ); } + + #[test] + fn test_string_truncate() { + let test1 = "イロハニホヘト"; + let test1_2_expected = "イロ"; + assert_eq!( + super::Truncate::truncate_str_by_char(test1, 2), + test1_2_expected + ); + + let test1_3_expected = "イロハ"; + assert_eq!( + super::Truncate::truncate_str_by_char(test1, 3), + test1_3_expected + ); + + let test2 = "щщаεはчωいにπάほхεろへσκζ"; + let test2_7_expected = "щщаεはчω"; + assert_eq!( + super::Truncate::truncate_str_by_char(test2, 7), + test2_7_expected + ); + + let test3 = "\u{FFFF}\u{FFFF}"; + assert_eq!(super::Truncate::truncate_str_by_char(test3, 2), test3); + + let test4 = "\u{10000}\u{10000}"; + let test4_1_expected = "\u{10000}"; + assert_eq!( + super::Truncate::truncate_str_by_char(test4, 1), + test4_1_expected + ); + } } From d48625bcb72df0f4fafe947c066c888fe5064da9 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 22 Sep 2023 18:00:48 +0800 Subject: [PATCH 10/10] fix cargo-sort --- crates/iceberg/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index aff622e627..5535ba794b 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,10 +29,10 @@ keywords = ["iceberg"] [dependencies] anyhow = "1.0.72" apache-avro = "0.15" -async-trait = "0.1" arrow-arith = { version = ">=46" } arrow-array = { version = ">=46" } arrow-schema = { version = ">=46" } +async-trait = "0.1" bimap = "0.6" bitvec = "1.0.1" chrono = "0.4"