Skip to content

Commit b3691fc

Browse files
committed
feat: add compression level configuration for JSON/CSV writers
Adds `compression_level` option to `JsonOptions` and `CsvOptions` allowing users to specify compression level for ZSTD, GZIP, BZIP2, and XZ algorithms. - Add compression_level field to JsonOptions and CsvOptions in config.rs - Add convert_async_writer_with_level method (non-breaking, extends API) - Keep original convert_async_writer for backward compatibility - Update JsonWriterOptions and CsvWriterOptions with compression_level - Update ObjectWriterBuilder to support compression level - Update JSON and CSV sinks to pass compression level through - Update proto definitions and conversions for serialization Closes #18947
1 parent 769f367 commit b3691fc

File tree

16 files changed

+194
-12
lines changed

16 files changed

+194
-12
lines changed

datafusion/common/src/config.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,6 +1661,7 @@ config_field!(bool, value => default_config_transform(value.to_lowercase().as_st
16611661
config_field!(usize);
16621662
config_field!(f64);
16631663
config_field!(u64);
1664+
config_field!(u32);
16641665

16651666
impl ConfigField for u8 {
16661667
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
@@ -2786,6 +2787,14 @@ config_namespace! {
27862787
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
27872788
pub newlines_in_values: Option<bool>, default = None
27882789
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2790+
/// Compression level for the output file. The valid range depends on the
2791+
/// compression algorithm:
2792+
/// - ZSTD: 1 to 22 (default: 3)
2793+
/// - GZIP: 0 to 10 (default: varies by implementation)
2794+
/// - BZIP2: 0 to 9 (default: 6)
2795+
/// - XZ: 0 to 9 (default: 6)
2796+
/// If not specified, the default level for the compression algorithm is used.
2797+
pub compression_level: Option<u32>, default = None
27892798
pub schema_infer_max_rec: Option<usize>, default = None
27902799
pub date_format: Option<String>, default = None
27912800
pub datetime_format: Option<String>, default = None
@@ -2908,6 +2917,14 @@ impl CsvOptions {
29082917
self
29092918
}
29102919

2920+
/// Set the compression level for the output file.
2921+
/// The valid range depends on the compression algorithm.
2922+
/// If not specified, the default level for the algorithm is used.
2923+
pub fn with_compression_level(mut self, level: u32) -> Self {
2924+
self.compression_level = Some(level);
2925+
self
2926+
}
2927+
29112928
/// The delimiter character.
29122929
pub fn delimiter(&self) -> u8 {
29132930
self.delimiter
@@ -2933,6 +2950,14 @@ config_namespace! {
29332950
/// Options controlling JSON format
29342951
pub struct JsonOptions {
29352952
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2953+
/// Compression level for the output file. The valid range depends on the
2954+
/// compression algorithm:
2955+
/// - ZSTD: 1 to 22 (default: 3)
2956+
/// - GZIP: 0 to 10 (default: varies by implementation)
2957+
/// - BZIP2: 0 to 9 (default: 6)
2958+
/// - XZ: 0 to 9 (default: 6)
2959+
/// If not specified, the default level for the compression algorithm is used.
2960+
pub compression_level: Option<u32>, default = None
29362961
pub schema_infer_max_rec: Option<usize>, default = None
29372962
}
29382963
}

datafusion/common/src/file_options/csv_writer.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ pub struct CsvWriterOptions {
3131
/// Compression to apply after ArrowWriter serializes RecordBatches.
3232
/// This compression is applied by DataFusion not the ArrowWriter itself.
3333
pub compression: CompressionTypeVariant,
34+
/// Compression level for the output file.
35+
pub compression_level: Option<u32>,
3436
}
3537

3638
impl CsvWriterOptions {
@@ -41,6 +43,20 @@ impl CsvWriterOptions {
4143
Self {
4244
writer_options,
4345
compression,
46+
compression_level: None,
47+
}
48+
}
49+
50+
/// Create a new `CsvWriterOptions` with the specified compression level.
51+
pub fn new_with_level(
52+
writer_options: WriterBuilder,
53+
compression: CompressionTypeVariant,
54+
compression_level: Option<u32>,
55+
) -> Self {
56+
Self {
57+
writer_options,
58+
compression,
59+
compression_level,
4460
}
4561
}
4662
}
@@ -81,6 +97,7 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
8197
Ok(CsvWriterOptions {
8298
writer_options: builder,
8399
compression: value.compression,
100+
compression_level: value.compression_level,
84101
})
85102
}
86103
}

datafusion/common/src/file_options/json_writer.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,26 @@ use crate::{
2727
#[derive(Clone, Debug)]
2828
pub struct JsonWriterOptions {
2929
pub compression: CompressionTypeVariant,
30+
pub compression_level: Option<u32>,
3031
}
3132

3233
impl JsonWriterOptions {
3334
pub fn new(compression: CompressionTypeVariant) -> Self {
34-
Self { compression }
35+
Self {
36+
compression,
37+
compression_level: None,
38+
}
39+
}
40+
41+
/// Create a new `JsonWriterOptions` with the specified compression and level.
42+
pub fn new_with_level(
43+
compression: CompressionTypeVariant,
44+
compression_level: Option<u32>,
45+
) -> Self {
46+
Self {
47+
compression,
48+
compression_level,
49+
}
3550
}
3651
}
3752

@@ -41,6 +56,7 @@ impl TryFrom<&JsonOptions> for JsonWriterOptions {
4156
fn try_from(value: &JsonOptions) -> Result<Self> {
4257
Ok(JsonWriterOptions {
4358
compression: value.compression,
59+
compression_level: value.compression_level,
4460
})
4561
}
4662
}

datafusion/datasource-csv/src/file_format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -780,6 +780,7 @@ impl FileSink for CsvSink {
780780
context,
781781
serializer,
782782
self.writer_options.compression.into(),
783+
self.writer_options.compression_level,
783784
object_store,
784785
demux_task,
785786
file_stream_rx,

datafusion/datasource-json/src/file_format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ impl FileSink for JsonSink {
379379
context,
380380
serializer,
381381
self.writer_options.compression.into(),
382+
self.writer_options.compression_level,
382383
object_store,
383384
demux_task,
384385
file_stream_rx,

datafusion/datasource/src/file_compression_type.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,25 +155,63 @@ impl FileCompressionType {
155155
}
156156

157157
/// Wrap the given `BufWriter` so that it performs compressed writes
158-
/// according to this `FileCompressionType`.
158+
/// according to this `FileCompressionType` using the default compression level.
159159
pub fn convert_async_writer(
160160
&self,
161161
w: BufWriter,
162162
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
163+
self.convert_async_writer_with_level(w, None)
164+
}
165+
166+
/// Wrap the given `BufWriter` so that it performs compressed writes
167+
/// according to this `FileCompressionType`.
168+
///
169+
/// If `compression_level` is `Some`, the encoder will use the specified
170+
/// compression level. If `None`, the default level for each algorithm is used.
171+
pub fn convert_async_writer_with_level(
172+
&self,
173+
w: BufWriter,
174+
compression_level: Option<u32>,
175+
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
176+
#[cfg(feature = "compression")]
177+
use async_compression::Level;
178+
163179
Ok(match self.variant {
164180
#[cfg(feature = "compression")]
165-
GZIP => Box::new(GzipEncoder::new(w)),
181+
GZIP => match compression_level {
182+
Some(level) => {
183+
Box::new(GzipEncoder::with_quality(w, Level::Precise(level as i32)))
184+
}
185+
None => Box::new(GzipEncoder::new(w)),
186+
},
166187
#[cfg(feature = "compression")]
167-
BZIP2 => Box::new(BzEncoder::new(w)),
188+
BZIP2 => match compression_level {
189+
Some(level) => {
190+
Box::new(BzEncoder::with_quality(w, Level::Precise(level as i32)))
191+
}
192+
None => Box::new(BzEncoder::new(w)),
193+
},
168194
#[cfg(feature = "compression")]
169-
XZ => Box::new(XzEncoder::new(w)),
195+
XZ => match compression_level {
196+
Some(level) => {
197+
Box::new(XzEncoder::with_quality(w, Level::Precise(level as i32)))
198+
}
199+
None => Box::new(XzEncoder::new(w)),
200+
},
170201
#[cfg(feature = "compression")]
171-
ZSTD => Box::new(ZstdEncoder::new(w)),
202+
ZSTD => match compression_level {
203+
Some(level) => {
204+
Box::new(ZstdEncoder::with_quality(w, Level::Precise(level as i32)))
205+
}
206+
None => Box::new(ZstdEncoder::new(w)),
207+
},
172208
#[cfg(not(feature = "compression"))]
173209
GZIP | BZIP2 | XZ | ZSTD => {
210+
// compression_level is not used when compression feature is disabled
211+
let _ = compression_level;
174212
return Err(DataFusionError::NotImplemented(
175213
"Compression feature is not enabled".to_owned(),
176-
))
214+
));
177215
}
178216
UNCOMPRESSED => Box::new(w),
179217
})

datafusion/datasource/src/write/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ pub struct ObjectWriterBuilder {
131131
object_store: Arc<dyn ObjectStore>,
132132
/// The size of the buffer for the object writer.
133133
buffer_size: Option<usize>,
134+
/// The compression level for the object writer.
135+
compression_level: Option<u32>,
134136
}
135137

136138
impl ObjectWriterBuilder {
@@ -145,6 +147,7 @@ impl ObjectWriterBuilder {
145147
location: location.clone(),
146148
object_store,
147149
buffer_size: None,
150+
compression_level: None,
148151
}
149152
}
150153

@@ -202,6 +205,22 @@ impl ObjectWriterBuilder {
202205
self.buffer_size
203206
}
204207

208+
/// Set compression level for object writer.
209+
pub fn set_compression_level(&mut self, compression_level: Option<u32>) {
210+
self.compression_level = compression_level;
211+
}
212+
213+
/// Set compression level for object writer, returning the builder.
214+
pub fn with_compression_level(mut self, compression_level: Option<u32>) -> Self {
215+
self.compression_level = compression_level;
216+
self
217+
}
218+
219+
/// Currently specified compression level.
220+
pub fn get_compression_level(&self) -> Option<u32> {
221+
self.compression_level
222+
}
223+
205224
/// Return a writer object that writes to the object store location.
206225
///
207226
/// If a buffer size has not been set, the default buffer buffer size will
@@ -215,13 +234,15 @@ impl ObjectWriterBuilder {
215234
location,
216235
object_store,
217236
buffer_size,
237+
compression_level,
218238
} = self;
219239

220240
let buf_writer = match buffer_size {
221241
Some(size) => BufWriter::with_capacity(object_store, location, size),
222242
None => BufWriter::new(object_store, location),
223243
};
224244

225-
file_compression_type.convert_async_writer(buf_writer)
245+
file_compression_type
246+
.convert_async_writer_with_level(buf_writer, compression_level)
226247
}
227248
}

datafusion/datasource/src/write/orchestration.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ pub async fn spawn_writer_tasks_and_join(
240240
context: &Arc<TaskContext>,
241241
serializer: Arc<dyn BatchSerializer>,
242242
compression: FileCompressionType,
243+
compression_level: Option<u32>,
243244
object_store: Arc<dyn ObjectStore>,
244245
demux_task: SpawnedTask<Result<()>>,
245246
mut file_stream_rx: DemuxedStreamReceiver,
@@ -265,6 +266,7 @@ pub async fn spawn_writer_tasks_and_join(
265266
.execution
266267
.objectstore_writer_buffer_size,
267268
))
269+
.with_compression_level(compression_level)
268270
.build()?;
269271

270272
if tx_file_bundle

datafusion/expr/src/udf.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@ use crate::udf_eq::UdfEq;
2525
use crate::{ColumnarValue, Documentation, Expr, Signature};
2626
use arrow::datatypes::{DataType, Field, FieldRef};
2727
use datafusion_common::config::ConfigOptions;
28-
use datafusion_common::{
29-
assert_or_internal_err, not_impl_err, DataFusionError, ExprSchema, Result,
30-
ScalarValue,
31-
};
28+
#[cfg(debug_assertions)]
29+
use datafusion_common::{assert_or_internal_err, DataFusionError};
30+
use datafusion_common::{not_impl_err, ExprSchema, Result, ScalarValue};
3231
use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
3332
use datafusion_expr_common::interval_arithmetic::Interval;
3433
use std::any::Any;

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,12 +461,14 @@ message CsvOptions {
461461
bytes newlines_in_values = 16; // Indicates if newlines are supported in values
462462
bytes terminator = 17; // Optional terminator character as a byte
463463
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
464+
optional uint32 compression_level = 19; // Optional compression level
464465
}
465466

466467
// Options controlling CSV format
467468
message JsonOptions {
468469
CompressionTypeVariant compression = 1; // Compression type
469470
optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference
471+
optional uint32 compression_level = 3; // Optional compression level
470472
}
471473

472474
message TableParquetOptions {

0 commit comments

Comments
 (0)