Skip to content

Commit e6d5a8b

Browse files
committed
Implement BaseTaskWriter
1 parent c0efced commit e6d5a8b

File tree

3 files changed

+111
-1
lines changed

3 files changed

+111
-1
lines changed

crates/iceberg/src/arrow/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,6 @@ mod value;
3434

3535
pub use reader::*;
3636
pub use value::*;
37-
pub(crate) mod record_batch_partition_splitter;
37+
/// Record batch partition splitter for partitioned tables
38+
pub mod record_batch_partition_splitter;
39+
pub use record_batch_partition_splitter::*;

crates/iceberg/src/arrow/record_batch_partition_splitter.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,17 @@ pub struct RecordBatchPartitionSplitter {
5151
// Remove this after partition writer supported.
5252
#[allow(dead_code)]
5353
impl RecordBatchPartitionSplitter {
54+
/// Create a new RecordBatchPartitionSplitter.
55+
///
56+
/// # Arguments
57+
///
58+
/// * `input_schema` - The Arrow schema of the input record batches
59+
/// * `iceberg_schema` - The Iceberg schema reference
60+
/// * `partition_spec` - The partition specification reference
61+
///
62+
/// # Returns
63+
///
64+
/// Returns a new `RecordBatchPartitionSplitter` instance or an error if initialization fails.
5465
pub fn new(
5566
input_schema: ArrowSchemaRef,
5667
iceberg_schema: SchemaRef,

crates/iceberg/src/writer/task/mod.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
//! for writing data to Iceberg tables with automatic partition handling.
2222
2323
use crate::Result;
24+
use crate::arrow::RecordBatchPartitionSplitter;
25+
use crate::spec::{PartitionKey, PartitionSpecRef, SchemaRef, Struct};
26+
use crate::writer::partitioning::PartitioningWriter;
2427
use crate::writer::{DefaultInput, DefaultOutput};
2528

2629
/// High-level async trait for writing tasks to Iceberg tables.
@@ -53,3 +56,97 @@ pub trait TaskWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
5356
/// or an error if the close operation fails.
5457
async fn close(self) -> Result<O>;
5558
}
59+
60+
/// A high-level writer implementation for writing data to Iceberg tables.
61+
///
62+
/// `BaseTaskWriter` handles both partitioned and non-partitioned tables by composing
63+
/// a [`PartitioningWriter`] with an optional [`RecordBatchPartitionSplitter`].
64+
///
65+
/// # Type Parameters
66+
///
67+
/// * `W` - The underlying [`PartitioningWriter`] implementation
68+
pub struct BaseTaskWriter<W: PartitioningWriter> {
69+
writer: W,
70+
partition_splitter: Option<RecordBatchPartitionSplitter>,
71+
schema: SchemaRef,
72+
partition_spec: PartitionSpecRef,
73+
}
74+
75+
impl<W: PartitioningWriter> BaseTaskWriter<W> {
76+
/// Create a new BaseTaskWriter.
77+
///
78+
/// # Parameters
79+
///
80+
/// * `writer` - The underlying [`PartitioningWriter`] implementation
81+
/// * `partition_splitter` - Optional partition splitter for partitioned tables.
82+
/// Should be `None` for unpartitioned tables and `Some` for partitioned tables.
83+
/// * `schema` - The Iceberg schema reference
84+
/// * `partition_spec` - The partition specification reference
85+
///
86+
/// # Returns
87+
///
88+
/// Returns a new `BaseTaskWriter` instance, or an error if the partition splitter
89+
/// configuration is invalid (e.g., missing splitter for a partitioned table).
90+
///
91+
/// # Errors
92+
///
93+
/// Returns an error if:
94+
/// - A partitioned table is provided without a partition splitter
95+
pub fn new(
96+
writer: W,
97+
partition_splitter: Option<RecordBatchPartitionSplitter>,
98+
schema: SchemaRef,
99+
partition_spec: PartitionSpecRef,
100+
) -> Result<Self> {
101+
// Validate that partitioned tables have a splitter
102+
if !partition_spec.is_unpartitioned() && partition_splitter.is_none() {
103+
return Err(crate::Error::new(
104+
crate::ErrorKind::DataInvalid,
105+
"Partition splitter is required for partitioned tables",
106+
));
107+
}
108+
109+
Ok(Self {
110+
writer,
111+
partition_splitter,
112+
schema,
113+
partition_spec,
114+
})
115+
}
116+
}
117+
118+
#[async_trait::async_trait]
119+
impl<W: PartitioningWriter> TaskWriter for BaseTaskWriter<W> {
120+
async fn write(&mut self, input: DefaultInput) -> Result<()> {
121+
if self.partition_spec.is_unpartitioned() {
122+
// Unpartitioned table: create empty PartitionKey and write directly
123+
let partition_key = PartitionKey::new(
124+
self.partition_spec.as_ref().clone(),
125+
self.schema.clone(),
126+
Struct::empty(),
127+
);
128+
self.writer.write(partition_key, input).await?;
129+
} else {
130+
// Partitioned table: must have a splitter
131+
let splitter = self.partition_splitter.as_ref().ok_or_else(|| {
132+
crate::Error::new(
133+
crate::ErrorKind::DataInvalid,
134+
"Partition splitter is required for partitioned tables",
135+
)
136+
})?;
137+
138+
// Split batch and write each partition
139+
let partitioned_batches = splitter.split(&input)?;
140+
141+
for (partition_key, batch) in partitioned_batches {
142+
self.writer.write(partition_key, batch).await?;
143+
}
144+
}
145+
146+
Ok(())
147+
}
148+
149+
async fn close(self) -> Result<DefaultOutput> {
150+
self.writer.close().await
151+
}
152+
}

0 commit comments

Comments
 (0)