Skip to content

Commit 675b96c

Browse files
authored
Ensure Substrait consumer can handle expressions in VirtualTable (#16857)
* Handle expression and value elements in Substrait VirtualTable * Added test * Modified test plan, changed conditional check for clarity * Consume expressions with empty input schema
1 parent 070517a commit 675b96c

File tree

3 files changed

+117
-3
lines changed

3 files changed

+117
-3
lines changed

datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,37 @@ pub async fn from_read_rel(
114114
.await
115115
}
116116
Some(ReadType::VirtualTable(vt)) => {
117-
if vt.values.is_empty() {
117+
if vt.values.is_empty() && vt.expressions.is_empty() {
118118
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
119119
produce_one_row: false,
120120
schema: DFSchemaRef::new(substrait_schema),
121121
}));
122122
}
123123

124-
let values = vt
124+
let values = if !vt.expressions.is_empty() {
125+
let mut exprs = vec![];
126+
for row in &vt.expressions {
127+
let mut name_idx = 0;
128+
let mut row_exprs = vec![];
129+
for expression in &row.fields {
130+
name_idx += 1;
131+
let expr = consumer
132+
.consume_expression(expression, &DFSchema::empty())
133+
.await?;
134+
row_exprs.push(expr);
135+
}
136+
if name_idx != named_struct.names.len() {
137+
return substrait_err!(
138+
"Names list must match exactly to nested schema, but found {} uses for {} names",
139+
name_idx,
140+
named_struct.names.len()
141+
);
142+
}
143+
exprs.push(row_exprs);
144+
}
145+
exprs
146+
} else {
147+
vt
125148
.values
126149
.iter()
127150
.map(|row| {
@@ -148,7 +171,8 @@ pub async fn from_read_rel(
148171
}
149172
Ok(lits)
150173
})
151-
.collect::<datafusion::common::Result<_>>()?;
174+
.collect::<datafusion::common::Result<_>>()?
175+
};
152176

153177
Ok(LogicalPlan::Values(Values {
154178
schema: DFSchemaRef::new(substrait_schema),

datafusion/substrait/tests/cases/consumer_integration.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,21 @@ mod tests {
519519
Ok(())
520520
}
521521

522+
#[tokio::test]
523+
async fn test_expressions_in_virtual_table() -> Result<()> {
524+
let plan_str =
525+
test_plan_to_string("virtual_table_with_expressions.substrait.json").await?;
526+
527+
assert_snapshot!(
528+
plan_str,
529+
@r#"
530+
Projection: dummy1 AS result1, dummy2 AS result2
531+
Values: (Int64(0), Utf8("temp")), (Int64(1), Utf8("test"))
532+
"#
533+
);
534+
Ok(())
535+
}
536+
522537
#[tokio::test]
523538
async fn test_multiple_joins() -> Result<()> {
524539
let plan_str = test_plan_to_string("multiple_joins.json").await?;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
{
2+
"relations": [
3+
{
4+
"root": {
5+
"input": {
6+
"read": {
7+
"common": {
8+
"direct": {
9+
}
10+
},
11+
"baseSchema": {
12+
"names": [
13+
"dummy1", "dummy2"
14+
],
15+
"struct": {
16+
"types": [
17+
{
18+
"i64": {
19+
"nullability": "NULLABILITY_REQUIRED"
20+
}
21+
},
22+
{
23+
"string": {
24+
"nullability": "NULLABILITY_REQUIRED"
25+
}
26+
}
27+
],
28+
"nullability": "NULLABILITY_REQUIRED"
29+
}
30+
},
31+
"virtualTable": {
32+
"expressions": [
33+
{
34+
"fields": [
35+
{
36+
"literal": {
37+
"i64": "0",
38+
"nullable": false
39+
}
40+
},
41+
{
42+
"literal": {
43+
"string": "temp",
44+
"nullable": false
45+
}
46+
}
47+
]
48+
},
49+
{
50+
"fields": [
51+
{
52+
"literal": {
53+
"i64": "1",
54+
"nullable": false
55+
}
56+
},
57+
{
58+
"literal": {
59+
"string": "test",
60+
"nullable": false
61+
}
62+
}
63+
]
64+
}
65+
]
66+
}
67+
}
68+
},
69+
"names": [
70+
"result1", "result2"
71+
]
72+
}
73+
}
74+
]
75+
}

0 commit comments

Comments
 (0)