Skip to content

Commit e01fa0c

Browse files
authored
Cut Parquet over to PhysicalExprAdapter, remove SchemaAdapter (#18998)
Chips away at #14993 and #16800 Changes made in this PR: - Update DefaultPhysicalExprAdapter to handle struct column evolution - Remove use of SchemaAdapter from row filter / predicate pushdown / late materialization. I believe it was already not doing much here other than where tests checked specific behavior and had not been updated to use PhysicalExprAdapter - Changed projection handling to use `PhysicalExprAdapter` instead of `SchemaAdapter` - Kept intermediary `Vec<usize>` so we can use `ProjectionMask::roots` and punt the complexity of implementing `ProjectionExprs` -> `ProjectionMask` until a later PR (there is a draft in #18966 of what that might look like).
1 parent a3a020f commit e01fa0c

File tree

14 files changed

+1020
-855
lines changed

14 files changed

+1020
-855
lines changed

datafusion/core/src/datasource/mod.rs

Lines changed: 54 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -55,30 +55,35 @@ mod tests {
5555
use crate::prelude::SessionContext;
5656
use ::object_store::{path::Path, ObjectMeta};
5757
use arrow::{
58-
array::{Int32Array, StringArray},
59-
datatypes::{DataType, Field, Schema, SchemaRef},
58+
array::Int32Array,
59+
datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
6060
record_batch::RecordBatch,
6161
};
62-
use datafusion_common::{record_batch, test_util::batches_to_sort_string};
62+
use datafusion_common::{
63+
record_batch,
64+
test_util::batches_to_sort_string,
65+
tree_node::{Transformed, TransformedResult, TreeNode},
66+
Result, ScalarValue,
67+
};
6368
use datafusion_datasource::{
64-
file::FileSource,
6569
file_scan_config::FileScanConfigBuilder,
66-
schema_adapter::{
67-
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
68-
SchemaMapper,
69-
},
70-
source::DataSourceExec,
70+
schema_adapter::DefaultSchemaAdapterFactory, source::DataSourceExec,
7171
PartitionedFile,
7272
};
7373
use datafusion_datasource_parquet::source::ParquetSource;
74+
use datafusion_physical_expr::expressions::{Column, Literal};
75+
use datafusion_physical_expr_adapter::{
76+
PhysicalExprAdapter, PhysicalExprAdapterFactory,
77+
};
78+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
7479
use datafusion_physical_plan::collect;
7580
use std::{fs, sync::Arc};
7681
use tempfile::TempDir;
7782

7883
#[tokio::test]
79-
async fn can_override_schema_adapter() {
80-
// Test shows that SchemaAdapter can add a column that doesn't existing in the
81-
// record batches returned from parquet. This can be useful for schema evolution
84+
async fn can_override_physical_expr_adapter() {
85+
// Test shows that PhysicalExprAdapter can add a column that doesn't exist in the
86+
// record batches returned from parquet. This can be useful for schema evolution
8287
// where older files may not have all columns.
8388

8489
use datafusion_execution::object_store::ObjectStoreUrl;
@@ -124,12 +129,11 @@ mod tests {
124129
let f2 = Field::new("extra_column", DataType::Utf8, true);
125130

126131
let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
127-
let source = ParquetSource::new(Arc::clone(&schema))
128-
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
129-
.unwrap();
132+
let source = Arc::new(ParquetSource::new(Arc::clone(&schema)));
130133
let base_conf =
131134
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
132135
.with_file(partitioned_file)
136+
.with_expr_adapter(Some(Arc::new(TestPhysicalExprAdapterFactory)))
133137
.build();
134138

135139
let parquet_exec = DataSourceExec::from_data_source(base_conf);
@@ -200,72 +204,54 @@ mod tests {
200204
}
201205

202206
#[derive(Debug)]
203-
struct TestSchemaAdapterFactory;
207+
struct TestPhysicalExprAdapterFactory;
204208

205-
impl SchemaAdapterFactory for TestSchemaAdapterFactory {
209+
impl PhysicalExprAdapterFactory for TestPhysicalExprAdapterFactory {
206210
fn create(
207211
&self,
208-
projected_table_schema: SchemaRef,
209-
_table_schema: SchemaRef,
210-
) -> Box<dyn SchemaAdapter> {
211-
Box::new(TestSchemaAdapter {
212-
table_schema: projected_table_schema,
212+
logical_file_schema: SchemaRef,
213+
physical_file_schema: SchemaRef,
214+
) -> Arc<dyn PhysicalExprAdapter> {
215+
Arc::new(TestPhysicalExprAdapter {
216+
logical_file_schema,
217+
physical_file_schema,
213218
})
214219
}
215220
}
216221

217-
struct TestSchemaAdapter {
218-
/// Schema for the table
219-
table_schema: SchemaRef,
222+
#[derive(Debug)]
223+
struct TestPhysicalExprAdapter {
224+
logical_file_schema: SchemaRef,
225+
physical_file_schema: SchemaRef,
220226
}
221227

222-
impl SchemaAdapter for TestSchemaAdapter {
223-
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
224-
let field = self.table_schema.field(index);
225-
Some(file_schema.fields.find(field.name())?.0)
226-
}
227-
228-
fn map_schema(
229-
&self,
230-
file_schema: &Schema,
231-
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
232-
let mut projection = Vec::with_capacity(file_schema.fields().len());
233-
234-
for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
235-
if self.table_schema.fields().find(file_field.name()).is_some() {
236-
projection.push(file_idx);
228+
impl PhysicalExprAdapter for TestPhysicalExprAdapter {
229+
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
230+
expr.transform(|e| {
231+
if let Some(column) = e.as_any().downcast_ref::<Column>() {
232+
// If column is "extra_column" and missing from physical schema, inject "foo"
233+
if column.name() == "extra_column"
234+
&& self.physical_file_schema.index_of("extra_column").is_err()
235+
{
236+
return Ok(Transformed::yes(Arc::new(Literal::new(
237+
ScalarValue::Utf8(Some("foo".to_string())),
238+
))
239+
as Arc<dyn PhysicalExpr>));
240+
}
237241
}
238-
}
239-
240-
Ok((Arc::new(TestSchemaMapping {}), projection))
241-
}
242-
}
243-
244-
#[derive(Debug)]
245-
struct TestSchemaMapping {}
246-
247-
impl SchemaMapper for TestSchemaMapping {
248-
fn map_batch(
249-
&self,
250-
batch: RecordBatch,
251-
) -> datafusion_common::Result<RecordBatch> {
252-
let f1 = Field::new("id", DataType::Int32, true);
253-
let f2 = Field::new("extra_column", DataType::Utf8, true);
254-
255-
let schema = Arc::new(Schema::new(vec![f1, f2]));
256-
257-
let extra_column = Arc::new(StringArray::from(vec!["foo"]));
258-
let mut new_columns = batch.columns().to_vec();
259-
new_columns.push(extra_column);
260-
261-
Ok(RecordBatch::try_new(schema, new_columns).unwrap())
242+
Ok(Transformed::no(e))
243+
})
244+
.data()
262245
}
263246

264-
fn map_column_statistics(
247+
fn with_partition_values(
265248
&self,
266-
_file_col_statistics: &[datafusion_common::ColumnStatistics],
267-
) -> datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
268-
unimplemented!()
249+
_partition_values: Vec<(FieldRef, ScalarValue)>,
250+
) -> Arc<dyn PhysicalExprAdapter> {
251+
Arc::new(TestPhysicalExprAdapter {
252+
logical_file_schema: self.logical_file_schema.clone(),
253+
physical_file_schema: self.physical_file_schema.clone(),
254+
})
269255
}
270256
}
271257
}

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,7 +1257,7 @@ mod tests {
12571257
("c3", c3.clone()),
12581258
]);
12591259

1260-
// batch2: c3(int8), c2(int64), c1(string), c4(string)
1260+
// batch2: c3(date64), c2(int64), c1(string)
12611261
let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);
12621262

12631263
let table_schema = Schema::new(vec![
@@ -1272,7 +1272,7 @@ mod tests {
12721272
.round_trip_to_batches(vec![batch1, batch2])
12731273
.await;
12741274
assert_contains!(read.unwrap_err().to_string(),
1275-
"Cannot cast file schema field c3 of type Date64 to table schema field of type Int8");
1275+
"Cannot cast column 'c3' from 'Date64' (physical data type) to 'Int8' (logical data type)");
12761276
}
12771277

12781278
#[tokio::test]

0 commit comments

Comments
 (0)