From 116aae8aba9b5fd5226cc9bca106c84d41b05141 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 29 Oct 2025 02:32:34 -0700 Subject: [PATCH 1/3] Add new task writer and unpartitioned writer --- crates/iceberg/src/writer/partitioning/mod.rs | 1 + .../partitioning/unpartitioned_writer.rs | 198 +++++++ crates/integrations/datafusion/src/lib.rs | 3 + .../datafusion/src/task_writer.rs | 531 ++++++++++++++++++ 4 files changed, 733 insertions(+) create mode 100644 crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs create mode 100644 crates/integrations/datafusion/src/task_writer.rs diff --git a/crates/iceberg/src/writer/partitioning/mod.rs b/crates/iceberg/src/writer/partitioning/mod.rs index f63a9d0d26..c8106041ac 100644 --- a/crates/iceberg/src/writer/partitioning/mod.rs +++ b/crates/iceberg/src/writer/partitioning/mod.rs @@ -23,6 +23,7 @@ pub mod clustered_writer; pub mod fanout_writer; +pub mod unpartitioned_writer; use crate::Result; use crate::spec::PartitionKey; diff --git a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs new file mode 100644 index 0000000000..0fb9cba3f1 --- /dev/null +++ b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides the `UnpartitionedWriter` implementation. + +use std::marker::PhantomData; + +use crate::Result; +use crate::writer::{DefaultInput, DefaultOutput, IcebergWriter, IcebergWriterBuilder}; + +/// A simple wrapper around `IcebergWriterBuilder` for unpartitioned tables. +/// +/// This writer lazily creates the underlying writer on the first write operation +/// and writes all data to a single file (or set of files if rolling). +/// +/// # Type Parameters +/// +/// * `B` - The inner writer builder type +/// * `I` - Input type (defaults to `RecordBatch`) +/// * `O` - Output collection type (defaults to `Vec`) +pub struct UnpartitionedWriter +where + B: IcebergWriterBuilder, + O: IntoIterator + FromIterator<::Item>, + ::Item: Clone, +{ + inner_builder: B, + writer: Option, + output: Vec<::Item>, + _phantom: PhantomData, +} + +impl UnpartitionedWriter +where + B: IcebergWriterBuilder, + I: Send + 'static, + O: IntoIterator + FromIterator<::Item>, + ::Item: Send + Clone, +{ + /// Create a new `UnpartitionedWriter`. + pub fn new(inner_builder: B) -> Self { + Self { + inner_builder, + writer: None, + output: Vec::new(), + _phantom: PhantomData, + } + } + + /// Write data to the writer. + /// + /// The underlying writer is lazily created on the first write operation. + /// + /// # Parameters + /// + /// * `input` - The input data to write + /// + /// # Returns + /// + /// `Ok(())` on success, or an error if the write operation fails. + pub async fn write(&mut self, input: I) -> Result<()> { + // Lazily create writer on first write + if self.writer.is_none() { + self.writer = Some(self.inner_builder.clone().build(None).await?); + } + + // Write directly to inner writer + self.writer + .as_mut() + .expect("Writer should be initialized") + .write(input) + .await + } + + /// Close the writer and return all written data files. + /// + /// This method consumes the writer to prevent further use. + /// + /// # Returns + /// + /// The accumulated output from all write operations, or an empty collection + /// if no data was written. + pub async fn close(mut self) -> Result { + if let Some(mut writer) = self.writer.take() { + self.output.extend(writer.close().await?); + } + Ok(O::from_iter(self.output)) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::Result; + use crate::io::FileIOBuilder; + use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Struct, Type}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + + #[tokio::test] + async fn test_unpartitioned_writer() -> Result<()> { + let temp_dir = TempDir::new()?; + + // Build Iceberg schema + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?, + ); + + // Build Arrow schema + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + // Build writer + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io, + location_gen, + file_name_gen, + ); + let writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + let mut writer = UnpartitionedWriter::new(writer_builder); + + // Write two batches + let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + ])?; + let batch2 = RecordBatch::try_new(arrow_schema, vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["Charlie", "Dave"])), + ])?; + + writer.write(batch1).await?; + writer.write(batch2).await?; + + let data_files = writer.close().await?; + + // Verify files have empty partition and correct format + assert!(!data_files.is_empty()); + for file in &data_files { + assert_eq!(file.partition, Struct::empty()); + assert_eq!(file.file_format, DataFileFormat::Parquet); + assert_eq!(file.record_count, 4); + } + + Ok(()) + } +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 09d1cac4ce..2795805844 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -26,3 +26,6 @@ mod schema; pub mod table; pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; + +pub mod task_writer; +pub use task_writer::*; diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs new file mode 100644 index 0000000000..34edbc9d69 --- /dev/null +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! TaskWriter for DataFusion integration. +//! +//! This module provides a high-level writer that handles partitioning and routing +//! of RecordBatch data to Iceberg tables. + +use datafusion::arrow::array::RecordBatch; +use iceberg::Result; +use iceberg::arrow::RecordBatchPartitionSplitter; +use iceberg::spec::{DataFile, PartitionSpecRef, SchemaRef}; +use iceberg::writer::IcebergWriterBuilder; +use iceberg::writer::partitioning::PartitioningWriter; +use iceberg::writer::partitioning::clustered_writer::ClusteredWriter; +use iceberg::writer::partitioning::fanout_writer::FanoutWriter; +use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; + +/// High-level writer for DataFusion that handles partitioning and routing of RecordBatch data. +/// +/// TaskWriter coordinates writing data to Iceberg tables by: +/// - Selecting the appropriate partitioning strategy (unpartitioned, fanout, or clustered) +/// - Lazily initializing the partition splitter on first write +/// - Routing data to the underlying writer +/// - Collecting all written data files +/// +/// # Type Parameters +/// +/// * `B` - The IcebergWriterBuilder type used to create underlying writers +/// +/// # Example +/// +/// ```rust,ignore +/// use iceberg::spec::{PartitionSpec, Schema}; +/// use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +/// use iceberg_datafusion::writer::task_writer::TaskWriter; +/// +/// // Create a TaskWriter for an unpartitioned table +/// let task_writer = TaskWriter::new( +/// data_file_writer_builder, +/// false, // fanout_enabled +/// schema, +/// partition_spec, +/// ); +/// +/// // Write data +/// task_writer.write(record_batch).await?; +/// +/// // Close and get data files +/// let data_files = task_writer.close().await?; +/// ``` +pub struct TaskWriter { + /// The underlying writer (UnpartitionedWriter, FanoutWriter, or ClusteredWriter) + writer: SupportedWriter, + /// Lazily initialized partition splitter for partitioned tables + partition_splitter: Option, + /// Iceberg schema reference + schema: SchemaRef, + /// Partition specification reference + partition_spec: PartitionSpecRef, +} + +/// Internal enum to hold the different writer types. +/// +/// This enum allows TaskWriter to work with different partitioning strategies +/// while maintaining a unified interface. +enum SupportedWriter { + /// Writer for unpartitioned tables + Unpartitioned(UnpartitionedWriter), + /// Writer for partitioned tables with unsorted data (maintains multiple active writers) + Fanout(FanoutWriter), + /// Writer for partitioned tables with sorted data (maintains single active writer) + Clustered(ClusteredWriter), +} + +impl TaskWriter { + /// Create a new TaskWriter. + /// + /// # Parameters + /// + /// * `writer_builder` - The IcebergWriterBuilder to use for creating underlying writers + /// * `fanout_enabled` - If true, use FanoutWriter for partitioned tables; otherwise use ClusteredWriter + /// * `schema` - The Iceberg schema reference + /// * `partition_spec` - The partition specification reference + /// + /// # Returns + /// + /// Returns a new TaskWriter instance. + /// + /// # Writer Selection Logic + /// + /// - If partition_spec is unpartitioned: creates UnpartitionedWriter + /// - If partition_spec is partitioned AND fanout_enabled is true: creates FanoutWriter + /// - If partition_spec is partitioned AND fanout_enabled is false: creates ClusteredWriter + /// + /// # Example + /// + /// ```rust,ignore + /// use iceberg::spec::{PartitionSpec, Schema}; + /// use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; + /// use iceberg_datafusion::writer::task_writer::TaskWriter; + /// + /// // Create a TaskWriter for an unpartitioned table + /// let task_writer = TaskWriter::new( + /// data_file_writer_builder, + /// false, // fanout_enabled + /// schema, + /// partition_spec, + /// ); + /// ``` + pub fn new( + writer_builder: B, + fanout_enabled: bool, + schema: SchemaRef, + partition_spec: PartitionSpecRef, + ) -> Self { + let writer = if partition_spec.is_unpartitioned() { + SupportedWriter::Unpartitioned(UnpartitionedWriter::new(writer_builder)) + } else if fanout_enabled { + SupportedWriter::Fanout(FanoutWriter::new(writer_builder)) + } else { + SupportedWriter::Clustered(ClusteredWriter::new(writer_builder)) + }; + + Self { + writer, + partition_splitter: None, + schema, + partition_spec, + } + } + + /// Write a RecordBatch to the TaskWriter. + /// + /// For the first write to a partitioned table, this method initializes the partition splitter. + /// For unpartitioned tables, data is written directly without splitting. + /// + /// # Parameters + /// + /// * `batch` - The RecordBatch to write + /// + /// # Returns + /// + /// Returns `Ok(())` on success, or an error if the write fails. + /// + /// # Errors + /// + /// This method will return an error if: + /// - Partition splitter initialization fails + /// - Splitting the batch by partition fails + /// - Writing to the underlying writer fails + /// + /// # Example + /// + /// ```rust,ignore + /// use arrow_array::RecordBatch; + /// use iceberg_datafusion::writer::task_writer::TaskWriter; + /// + /// // Write a RecordBatch + /// task_writer.write(record_batch).await?; + /// ``` + pub async fn write(&mut self, batch: RecordBatch) -> Result<()> { + match &mut self.writer { + SupportedWriter::Unpartitioned(writer) => { + // Unpartitioned: write directly without splitting + writer.write(batch).await + } + SupportedWriter::Fanout(writer) => { + // Initialize splitter on first write if needed + if self.partition_splitter.is_none() { + self.partition_splitter = + Some(RecordBatchPartitionSplitter::new_with_precomputed_values( + self.schema.clone(), + self.partition_spec.clone(), + )?); + } + + // Split and write partitioned data + Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await + } + SupportedWriter::Clustered(writer) => { + // Initialize splitter on first write if needed + if self.partition_splitter.is_none() { + self.partition_splitter = + Some(RecordBatchPartitionSplitter::new_with_precomputed_values( + self.schema.clone(), + self.partition_spec.clone(), + )?); + } + + // Split and write partitioned data + Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await + } + } + } + + /// Helper method to split and write partitioned data. + /// + /// This method handles the common logic for both FanoutWriter and ClusteredWriter: + /// - Splits the batch by partition key using the provided splitter + /// - Writes each partition to the underlying writer + /// + /// # Parameters + /// + /// * `writer` - The underlying PartitioningWriter (FanoutWriter or ClusteredWriter) + /// * `partition_splitter` - The partition splitter (must be initialized) + /// * `batch` - The RecordBatch to write + /// + /// # Returns + /// + /// Returns `Ok(())` on success, or an error if the operation fails. + async fn write_partitioned_batches( + writer: &mut W, + partition_splitter: &Option, + batch: &RecordBatch, + ) -> Result<()> { + // Split batch by partition + let splitter = partition_splitter + .as_ref() + .expect("Partition splitter should be initialized"); + let partitioned_batches = splitter.split(batch)?; + + // Write each partition + for (partition_key, partition_batch) in partitioned_batches { + writer.write(partition_key, partition_batch).await?; + } + + Ok(()) + } + + /// Close the TaskWriter and return all written data files. + /// + /// This method consumes the TaskWriter to prevent further use. + /// + /// # Returns + /// + /// Returns a `Vec` containing all written files, or an error if closing fails. + /// + /// # Errors + /// + /// This method will return an error if: + /// - Closing the underlying writer fails + /// - Any I/O operation fails during the close process + /// + /// # Example + /// + /// ```rust,ignore + /// use iceberg_datafusion::writer::task_writer::TaskWriter; + /// + /// // Close the writer and get all data files + /// let data_files = task_writer.close().await?; + /// ``` + pub async fn close(self) -> Result> { + match self.writer { + SupportedWriter::Unpartitioned(writer) => writer.close().await, + SupportedWriter::Fanout(writer) => writer.close().await, + SupportedWriter::Clustered(writer) => writer.close().await, + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, StructArray}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN; + use iceberg::io::FileIOBuilder; + use iceberg::spec::{DataFileFormat, NestedField, PartitionSpec, PrimitiveType, Type}; + use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use iceberg::writer::file_writer::ParquetWriterBuilder; + use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + + fn create_test_schema() -> Result> { + Ok(Arc::new( + iceberg::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "region", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build()?, + )) + } + + fn create_arrow_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ])) + } + + fn create_arrow_schema_with_partition() -> Arc { + let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1000".to_string())]), + ); + let partition_struct_field = Field::new( + PROJECTED_PARTITION_VALUE_COLUMN, + DataType::Struct(vec![partition_field].into()), + false, + ); + + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + partition_struct_field, + ])) + } + + fn create_writer_builder( + temp_dir: &TempDir, + schema: Arc, + ) -> Result< + DataFileWriterBuilder< + ParquetWriterBuilder, + DefaultLocationGenerator, + DefaultFileNameGenerator, + >, + > { + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io, + location_gen, + file_name_gen, + ); + Ok(DataFileWriterBuilder::new(rolling_writer_builder)) + } + + #[tokio::test] + async fn test_task_writer_unpartitioned() -> Result<()> { + let temp_dir = TempDir::new()?; + let schema = create_test_schema()?; + let arrow_schema = create_arrow_schema(); + + // Create unpartitioned spec + let partition_spec = Arc::new(PartitionSpec::builder(schema.clone()).build()?); + + let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; + let mut task_writer = TaskWriter::new(writer_builder, false, schema, partition_spec); + + // Write data + let batch = RecordBatch::try_new(arrow_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + Arc::new(StringArray::from(vec!["US", "EU", "US"])), + ])?; + + task_writer.write(batch).await?; + let data_files = task_writer.close().await?; + + // Verify results + assert!(!data_files.is_empty()); + assert_eq!(data_files[0].record_count(), 3); + + Ok(()) + } + + /// Helper to verify partition data files + fn verify_partition_files( + data_files: &[iceberg::spec::DataFile], + expected_total: u64, + ) -> HashMap { + let total_records: u64 = data_files.iter().map(|f| f.record_count()).sum(); + assert_eq!(total_records, expected_total, "Total record count mismatch"); + + let mut partition_counts = HashMap::new(); + for data_file in data_files { + let partition_value = data_file.partition(); + let region_literal = partition_value.fields()[0] + .as_ref() + .expect("Partition value should not be null"); + let region = match region_literal + .as_primitive_literal() + .expect("Should be primitive literal") + { + iceberg::spec::PrimitiveLiteral::String(s) => s.clone(), + _ => panic!("Expected string partition value"), + }; + + *partition_counts.entry(region.clone()).or_insert(0) += data_file.record_count(); + + // Verify file path contains partition information + assert!( + data_file.file_path().contains("region="), + "File path should contain partition info" + ); + } + partition_counts + } + + #[tokio::test] + async fn test_task_writer_partitioned_fanout() -> Result<()> { + let temp_dir = TempDir::new()?; + let schema = create_test_schema()?; + let arrow_schema = create_arrow_schema_with_partition(); + + let partition_spec = Arc::new( + PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("region", "region", iceberg::spec::Transform::Identity)? + .build()?, + ); + + let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; + let mut task_writer = TaskWriter::new(writer_builder, true, schema, partition_spec); + + // Create partition column + let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1000".to_string())]), + ); + let partition_values = StringArray::from(vec!["US", "EU", "US", "EU"]); + let partition_struct = StructArray::from(vec![( + Arc::new(partition_field), + Arc::new(partition_values) as ArrayRef, + )]); + + let batch = RecordBatch::try_new(arrow_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])), + Arc::new(StringArray::from(vec!["US", "EU", "US", "EU"])), + Arc::new(partition_struct), + ])?; + + task_writer.write(batch).await?; + let data_files = task_writer.close().await?; + + let partition_counts = verify_partition_files(&data_files, 4); + assert_eq!(partition_counts.get("US"), Some(&2)); + assert_eq!(partition_counts.get("EU"), Some(&2)); + + Ok(()) + } + + #[tokio::test] + async fn test_task_writer_partitioned_clustered() -> Result<()> { + let temp_dir = TempDir::new()?; + let schema = create_test_schema()?; + let arrow_schema = create_arrow_schema_with_partition(); + + let partition_spec = Arc::new( + PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("region", "region", iceberg::spec::Transform::Identity)? + .build()?, + ); + + let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; + let mut task_writer = TaskWriter::new(writer_builder, false, schema, partition_spec); + + // Create partition column + let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1000".to_string())]), + ); + let partition_values = StringArray::from(vec!["ASIA", "ASIA", "EU", "EU"]); + let partition_struct = StructArray::from(vec![( + Arc::new(partition_field), + Arc::new(partition_values) as ArrayRef, + )]); + + // ClusteredWriter expects data to be pre-sorted by partition + let batch = RecordBatch::try_new(arrow_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])), + Arc::new(StringArray::from(vec!["ASIA", "ASIA", "EU", "EU"])), + Arc::new(partition_struct), + ])?; + + task_writer.write(batch).await?; + let data_files = task_writer.close().await?; + + let partition_counts = verify_partition_files(&data_files, 4); + assert_eq!(partition_counts.get("ASIA"), Some(&2)); + assert_eq!(partition_counts.get("EU"), Some(&2)); + + Ok(()) + } +} From a53d872105b6a384d7fc946f863fb853a5f13d24 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 29 Oct 2025 06:50:56 -0700 Subject: [PATCH 2/3] removed allow dead code for partition splitter --- crates/iceberg/src/arrow/record_batch_partition_splitter.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs index 66371fac16..fbc3a8fb87 100644 --- a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs +++ b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs @@ -38,9 +38,6 @@ pub const PROJECTED_PARTITION_VALUE_COLUMN: &str = "_partition"; /// The splitter supports two modes for obtaining partition values: /// - **Computed mode** (`calculator` is `Some`): Computes partition values from source columns using transforms /// - **Pre-computed mode** (`calculator` is `None`): Expects a `_partition` column in the input batch -// # TODO -// Remove this after partition writer supported. -#[allow(dead_code)] pub struct RecordBatchPartitionSplitter { schema: SchemaRef, partition_spec: PartitionSpecRef, @@ -48,9 +45,6 @@ pub struct RecordBatchPartitionSplitter { partition_type: StructType, } -// # TODO -// Remove this after partition writer supported. -#[allow(dead_code)] impl RecordBatchPartitionSplitter { /// Create a new RecordBatchPartitionSplitter. /// From f1c1f7a1397c448a0e5a441b89c463020d8af49b Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 4 Nov 2025 22:14:31 -0800 Subject: [PATCH 3/3] make task writer pub(crate) --- crates/integrations/datafusion/src/lib.rs | 3 +-- crates/integrations/datafusion/src/task_writer.rs | 5 ++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 2795805844..4b0ea8606d 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -27,5 +27,4 @@ pub mod table; pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; -pub mod task_writer; -pub use task_writer::*; +pub(crate) mod task_writer; diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index 34edbc9d69..d27b2e6fbf 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -63,7 +63,8 @@ use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; /// // Close and get data files /// let data_files = task_writer.close().await?; /// ``` -pub struct TaskWriter { +#[allow(dead_code)] +pub(crate) struct TaskWriter { /// The underlying writer (UnpartitionedWriter, FanoutWriter, or ClusteredWriter) writer: SupportedWriter, /// Lazily initialized partition splitter for partitioned tables @@ -78,6 +79,7 @@ pub struct TaskWriter { /// /// This enum allows TaskWriter to work with different partitioning strategies /// while maintaining a unified interface. +#[allow(dead_code)] enum SupportedWriter { /// Writer for unpartitioned tables Unpartitioned(UnpartitionedWriter), @@ -87,6 +89,7 @@ enum SupportedWriter { Clustered(ClusteredWriter), } +#[allow(dead_code)] impl TaskWriter { /// Create a new TaskWriter. ///