Skip to content

Commit 4bf1527

Browse files
committed
feat: impl UDTF Server
1 parent 39442d6 commit 4bf1527

File tree

33 files changed

+1297
-167
lines changed

33 files changed

+1297
-167
lines changed

src/meta/app/src/principal/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ pub use user_defined_function::UDAFScript;
123123
pub use user_defined_function::UDFDefinition;
124124
pub use user_defined_function::UDFScript;
125125
pub use user_defined_function::UDFServer;
126+
pub use user_defined_function::UDTFServer;
126127
pub use user_defined_function::UserDefinedFunction;
127128
pub use user_defined_function::UDTF;
128129
pub use user_grant::GrantEntry;

src/meta/app/src/principal/user_defined_function.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ pub struct UDFScript {
5252
pub immutable: Option<bool>,
5353
}
5454

55+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
56+
pub struct UDTFServer {
57+
pub address: String,
58+
pub handler: String,
59+
pub headers: BTreeMap<String, String>,
60+
pub language: String,
61+
pub arg_names: Vec<String>,
62+
pub arg_types: Vec<DataType>,
63+
pub return_types: Vec<(String, DataType)>,
64+
pub immutable: Option<bool>,
65+
}
66+
5567
/// User Defined Table Function (UDTF)
5668
///
5769
/// # Fields
@@ -98,6 +110,7 @@ pub enum UDFDefinition {
98110
UDFServer(UDFServer),
99111
UDFScript(UDFScript),
100112
UDAFScript(UDAFScript),
113+
UDTFServer(UDTFServer),
101114
UDTF(UDTF),
102115
ScalarUDF(ScalarUDF),
103116
}
@@ -110,7 +123,8 @@ impl UDFDefinition {
110123
Self::UDFScript(_) => "UDFScript",
111124
Self::UDAFScript(_) => "UDAFScript",
112125
Self::UDTF(_) => "UDTF",
113-
UDFDefinition::ScalarUDF(_) => "ScalarUDF",
126+
Self::UDTFServer(_) => "UDTFServer",
127+
Self::ScalarUDF(_) => "ScalarUDF",
114128
}
115129
}
116130

@@ -120,6 +134,7 @@ impl UDFDefinition {
120134
Self::UDFServer(_) => false,
121135
Self::UDFScript(_) => false,
122136
Self::UDTF(_) => false,
137+
Self::UDTFServer(_) => false,
123138
Self::ScalarUDF(_) => false,
124139
Self::UDAFScript(_) => true,
125140
}
@@ -130,6 +145,7 @@ impl UDFDefinition {
130145
Self::LambdaUDF(_) => "SQL",
131146
Self::UDTF(_) => "SQL",
132147
Self::ScalarUDF(_) => "SQL",
148+
Self::UDTFServer(x) => x.language.as_str(),
133149
Self::UDFServer(x) => x.language.as_str(),
134150
Self::UDFScript(x) => x.language.as_str(),
135151
Self::UDAFScript(x) => x.language.as_str(),
@@ -220,6 +236,13 @@ impl UserDefinedFunction {
220236
created_on: Utc::now(),
221237
}
222238
}
239+
240+
pub fn as_udtf_server(self) -> Option<UDTFServer> {
241+
if let UDFDefinition::UDTFServer(udtf_server) = self.definition {
242+
return Some(udtf_server);
243+
}
244+
None
245+
}
223246
}
224247

225248
impl Display for UDFDefinition {
@@ -353,6 +376,50 @@ impl Display for UDFDefinition {
353376
}
354377
write!(f, ") AS $${sql}$$")?;
355378
}
379+
UDFDefinition::UDTFServer(UDTFServer {
380+
address,
381+
handler,
382+
headers,
383+
language,
384+
arg_names,
385+
arg_types,
386+
return_types,
387+
immutable,
388+
}) => {
389+
for (i, (name, ty)) in arg_names.iter().zip(arg_types.iter()).enumerate() {
390+
if i > 0 {
391+
write!(f, ", ")?;
392+
}
393+
write!(f, "{name} {ty}")?;
394+
}
395+
write!(f, ") RETURNS (")?;
396+
for (i, (name, ty)) in return_types.iter().enumerate() {
397+
if i > 0 {
398+
write!(f, ", ")?;
399+
}
400+
write!(f, "{name} {ty}")?;
401+
}
402+
write!(f, ") LANGUAGE {language}")?;
403+
if let Some(immutable) = immutable {
404+
if *immutable {
405+
write!(f, " IMMUTABLE")?;
406+
} else {
407+
write!(f, " VOLATILE")?;
408+
}
409+
}
410+
write!(f, " HANDLER = {handler}")?;
411+
if !headers.is_empty() {
412+
write!(f, " HEADERS = (")?;
413+
for (i, (key, value)) in headers.iter().enumerate() {
414+
if i > 0 {
415+
write!(f, ", ")?;
416+
}
417+
write!(f, "{key} = {value}")?;
418+
}
419+
write!(f, ")")?;
420+
}
421+
write!(f, " ADDRESS = {address}")?;
422+
}
356423
UDFDefinition::ScalarUDF(ScalarUDF {
357424
arg_types,
358425
return_type,

src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,88 @@ impl FromToProto for mt::UDTF {
359359
}
360360
}
361361

362+
impl FromToProto for mt::UDTFServer {
363+
type PB = pb::UdtfServer;
364+
365+
fn get_pb_ver(p: &Self::PB) -> u64 {
366+
p.ver
367+
}
368+
369+
fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
370+
where Self: Sized {
371+
reader_check_msg(p.ver, p.min_reader_ver)?;
372+
373+
let mut arg_types = Vec::with_capacity(p.arg_types.len());
374+
for arg_type in p.arg_types {
375+
let arg_type = DataType::from(&TableDataType::from_pb(arg_type)?);
376+
arg_types.push(arg_type);
377+
}
378+
let mut return_types = Vec::new();
379+
for return_ty in p.return_types {
380+
let ty_pb = return_ty.ty.ok_or_else(|| {
381+
Incompatible::new("UDTF.arg_types.ty can not be None".to_string())
382+
})?;
383+
let ty = TableDataType::from_pb(ty_pb)?;
384+
385+
return_types.push((return_ty.name, (&ty).into()));
386+
}
387+
388+
Ok(mt::UDTFServer {
389+
address: p.address,
390+
arg_types,
391+
return_types,
392+
handler: p.handler,
393+
headers: p.headers,
394+
language: p.language,
395+
immutable: p.immutable,
396+
arg_names: p.arg_names,
397+
})
398+
}
399+
400+
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
401+
let mut arg_types = Vec::with_capacity(self.arg_types.len());
402+
for arg_type in self.arg_types.iter() {
403+
let arg_type = infer_schema_type(arg_type)
404+
.map_err(|e| {
405+
Incompatible::new(format!(
406+
"Convert DataType to TableDataType failed: {}",
407+
e.message()
408+
))
409+
})?
410+
.to_pb()?;
411+
arg_types.push(arg_type);
412+
}
413+
let mut return_types = Vec::with_capacity(self.return_types.len());
414+
for (return_name, return_type) in self.return_types.iter() {
415+
let return_type = infer_schema_type(return_type)
416+
.map_err(|e| {
417+
Incompatible::new(format!(
418+
"Convert DataType to TableDataType failed: {}",
419+
e.message()
420+
))
421+
})?
422+
.to_pb()?;
423+
return_types.push(UdtfArg {
424+
name: return_name.clone(),
425+
ty: Some(return_type),
426+
});
427+
}
428+
429+
Ok(pb::UdtfServer {
430+
ver: VER,
431+
min_reader_ver: MIN_READER_VER,
432+
address: self.address.clone(),
433+
handler: self.handler.clone(),
434+
headers: self.headers.clone(),
435+
language: self.language.clone(),
436+
arg_types,
437+
return_types,
438+
immutable: self.immutable,
439+
arg_names: self.arg_names.clone(),
440+
})
441+
}
442+
}
443+
362444
impl FromToProto for mt::ScalarUDF {
363445
type PB = pb::ScalarUdf;
364446

@@ -454,6 +536,9 @@ impl FromToProto for mt::UserDefinedFunction {
454536
Some(pb::user_defined_function::Definition::ScalarUdf(scalar_udf)) => {
455537
mt::UDFDefinition::ScalarUDF(mt::ScalarUDF::from_pb(scalar_udf)?)
456538
}
539+
Some(pb::user_defined_function::Definition::UdtfServer(udtf_server)) => {
540+
mt::UDFDefinition::UDTFServer(mt::UDTFServer::from_pb(udtf_server)?)
541+
}
457542
None => {
458543
return Err(Incompatible::new(
459544
"UserDefinedFunction.definition cannot be None".to_string(),
@@ -492,6 +577,9 @@ impl FromToProto for mt::UserDefinedFunction {
492577
mt::UDFDefinition::ScalarUDF(scalar_udf) => {
493578
pb::user_defined_function::Definition::ScalarUdf(scalar_udf.to_pb()?)
494579
}
580+
mt::UDFDefinition::UDTFServer(udtf_server) => {
581+
pb::user_defined_function::Definition::UdtfServer(udtf_server.to_pb()?)
582+
}
495583
};
496584

497585
Ok(pb::UserDefinedFunction {

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
187187
(155, "2025-10-24: Add: RowAccessPolicyMeta::RowAccessPolicyArg"),
188188
(156, "2025-10-22: Add: DataMaskMeta add DataMaskArg"),
189189
(157, "2025-10-22: Add: TableDataType TimestampTz"),
190+
(158, "2025-10-22: Add: Server UDTF"),
190191
// Dear developer:
191192
// If you're gonna add a new metadata version, you'll have to add a test for it.
192193
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,4 @@ mod v154_vacuum_watermark;
149149
mod v155_row_access_policy_args;
150150
mod v156_data_mask_args;
151151
mod v157_type_timestamp_tz;
152+
mod v158_udtf_server;
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
17+
use chrono::DateTime;
18+
use chrono::Utc;
19+
use databend_common_expression::types::DataType;
20+
use databend_common_expression::types::NumberDataType;
21+
use databend_common_meta_app::principal::UDFDefinition;
22+
use databend_common_meta_app::principal::UDTFServer;
23+
use databend_common_meta_app::principal::UserDefinedFunction;
24+
use fastrace::func_name;
25+
26+
use crate::common;
27+
28+
// These bytes are built when a new version in introduced,
29+
// and are kept for backward compatibility test.
30+
//
31+
// *************************************************************
32+
// * These messages should never be updated, *
33+
// * only be added when a new version is added, *
34+
// * or be removed when an old version is no longer supported. *
35+
// *************************************************************
36+
//
37+
// The message bytes are built from the output of `test_pb_from_to()`
38+
#[test]
39+
fn test_decode_v158_server_udtf() -> anyhow::Result<()> {
40+
let bytes = vec![
41+
10, 15, 116, 101, 115, 116, 95, 115, 99, 97, 108, 97, 114, 95, 117, 100, 102, 18, 21, 84,
42+
104, 105, 115, 32, 105, 115, 32, 97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111,
43+
110, 82, 144, 1, 10, 21, 104, 116, 116, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 111,
44+
115, 116, 58, 56, 56, 56, 56, 18, 11, 112, 108, 117, 115, 95, 105, 110, 116, 95, 112, 121,
45+
26, 6, 112, 121, 116, 104, 111, 110, 34, 10, 146, 2, 0, 160, 6, 158, 1, 168, 6, 24, 34, 10,
46+
138, 2, 0, 160, 6, 158, 1, 168, 6, 24, 42, 16, 10, 2, 99, 49, 18, 10, 146, 2, 0, 160, 6,
47+
158, 1, 168, 6, 24, 42, 25, 10, 2, 99, 50, 18, 19, 154, 2, 9, 42, 0, 160, 6, 158, 1, 168,
48+
6, 24, 160, 6, 158, 1, 168, 6, 24, 50, 14, 10, 4, 107, 101, 121, 49, 18, 6, 118, 97, 108,
49+
117, 101, 49, 66, 2, 99, 49, 66, 2, 99, 50, 160, 6, 158, 1, 168, 6, 24, 42, 23, 50, 48, 50,
50+
51, 45, 49, 50, 45, 49, 53, 32, 48, 49, 58, 50, 54, 58, 48, 57, 32, 85, 84, 67, 160, 6,
51+
158, 1, 168, 6, 24,
52+
];
53+
54+
let want = || UserDefinedFunction {
55+
name: "test_scalar_udf".to_string(),
56+
description: "This is a description".to_string(),
57+
definition: UDFDefinition::UDTFServer(UDTFServer {
58+
address: "http://localhost:8888".to_string(),
59+
handler: "plus_int_py".to_string(),
60+
headers: vec![("key1".to_string(), "value1".to_string())]
61+
.into_iter()
62+
.collect(),
63+
language: "python".to_string(),
64+
arg_names: vec![s("c1"), s("c2")],
65+
arg_types: vec![DataType::String, DataType::Boolean],
66+
return_types: vec![
67+
(s("c1"), DataType::String),
68+
(s("c2"), DataType::Number(NumberDataType::Int8)),
69+
],
70+
immutable: None,
71+
}),
72+
created_on: DateTime::<Utc>::from_timestamp(1702603569, 0).unwrap(),
73+
};
74+
75+
common::test_pb_from_to(func_name!(), want())?;
76+
common::test_load_old(func_name!(), bytes.as_slice(), 158, want())
77+
}
78+
79+
fn s(ss: impl ToString) -> String {
80+
ss.to_string()
81+
}

src/meta/protos/proto/udf.proto

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,21 @@ message UDTF {
8686
string sql = 3;
8787
}
8888

89+
message UDTFServer {
90+
uint64 ver = 100;
91+
uint64 min_reader_ver = 101;
92+
93+
string address = 1;
94+
string handler = 2;
95+
string language = 3;
96+
repeated DataType arg_types = 4;
97+
// return column name with data type
98+
repeated UDTFArg return_types = 5;
99+
map<string, string> headers = 6;
100+
optional bool immutable = 7;
101+
repeated string arg_names = 8;
102+
}
103+
89104
message ScalarUDF {
90105
uint64 ver = 100;
91106
uint64 min_reader_ver = 101;
@@ -112,6 +127,7 @@ message UserDefinedFunction {
112127
UDAFScript udaf_script = 7;
113128
UDTF udtf = 8;
114129
ScalarUDF scalar_udf = 9;
130+
UDTFServer udtf_server = 10;
115131
}
116132
// The time udf created.
117133
optional string created_on = 5;

0 commit comments

Comments
 (0)