Skip to content

Commit f70f28c

Browse files
committed
feat: impl UDTF Server
1 parent a6ac9a5 commit f70f28c

File tree

27 files changed

+1038
-157
lines changed

27 files changed

+1038
-157
lines changed

src/meta/app/src/principal/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ pub use user_defined_function::UDAFScript;
116116
pub use user_defined_function::UDFDefinition;
117117
pub use user_defined_function::UDFScript;
118118
pub use user_defined_function::UDFServer;
119+
pub use user_defined_function::UDTFServer;
119120
pub use user_defined_function::UserDefinedFunction;
120121
pub use user_defined_function::UDTF;
121122
pub use user_grant::GrantEntry;

src/meta/app/src/principal/user_defined_function.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ pub struct UDFScript {
5252
pub immutable: Option<bool>,
5353
}
5454

55+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
56+
pub struct UDTFServer {
57+
pub address: String,
58+
pub handler: String,
59+
pub headers: BTreeMap<String, String>,
60+
pub language: String,
61+
pub arg_names: Vec<String>,
62+
pub arg_types: Vec<DataType>,
63+
pub return_types: Vec<(String, DataType)>,
64+
pub immutable: Option<bool>,
65+
}
66+
5567
/// User Defined Table Function (UDTF)
5668
///
5769
/// # Fields
@@ -98,6 +110,7 @@ pub enum UDFDefinition {
98110
UDFServer(UDFServer),
99111
UDFScript(UDFScript),
100112
UDAFScript(UDAFScript),
113+
UDTFServer(UDTFServer),
101114
UDTF(UDTF),
102115
ScalarUDF(ScalarUDF),
103116
}
@@ -110,7 +123,8 @@ impl UDFDefinition {
110123
Self::UDFScript(_) => "UDFScript",
111124
Self::UDAFScript(_) => "UDAFScript",
112125
Self::UDTF(_) => "UDTF",
113-
UDFDefinition::ScalarUDF(_) => "ScalarUDF",
126+
Self::UDTFServer(_) => "UDTFServer",
127+
Self::ScalarUDF(_) => "ScalarUDF",
114128
}
115129
}
116130

@@ -120,6 +134,7 @@ impl UDFDefinition {
120134
Self::UDFServer(_) => false,
121135
Self::UDFScript(_) => false,
122136
Self::UDTF(_) => false,
137+
Self::UDTFServer(_) => false,
123138
Self::ScalarUDF(_) => false,
124139
Self::UDAFScript(_) => true,
125140
}
@@ -130,6 +145,7 @@ impl UDFDefinition {
130145
Self::LambdaUDF(_) => "SQL",
131146
Self::UDTF(_) => "SQL",
132147
Self::ScalarUDF(_) => "SQL",
148+
Self::UDTFServer(x) => x.language.as_str(),
133149
Self::UDFServer(x) => x.language.as_str(),
134150
Self::UDFScript(x) => x.language.as_str(),
135151
Self::UDAFScript(x) => x.language.as_str(),
@@ -220,6 +236,13 @@ impl UserDefinedFunction {
220236
created_on: Utc::now(),
221237
}
222238
}
239+
240+
pub fn as_udtf_server(self) -> Option<UDTFServer> {
241+
if let UDFDefinition::UDTFServer(udtf_server) = self.definition {
242+
return Some(udtf_server);
243+
}
244+
None
245+
}
223246
}
224247

225248
impl Display for UDFDefinition {
@@ -353,6 +376,50 @@ impl Display for UDFDefinition {
353376
}
354377
write!(f, ") AS $${sql}$$")?;
355378
}
379+
UDFDefinition::UDTFServer(UDTFServer {
380+
address,
381+
handler,
382+
headers,
383+
language,
384+
arg_names,
385+
arg_types,
386+
return_types,
387+
immutable,
388+
}) => {
389+
for (i, (name, ty)) in arg_names.iter().zip(arg_types.iter()).enumerate() {
390+
if i > 0 {
391+
write!(f, ", ")?;
392+
}
393+
write!(f, "{name} {ty}")?;
394+
}
395+
write!(f, ") RETURNS (")?;
396+
for (i, (name, ty)) in return_types.iter().enumerate() {
397+
if i > 0 {
398+
write!(f, ", ")?;
399+
}
400+
write!(f, "{name} {ty}")?;
401+
}
402+
write!(f, ") LANGUAGE {language}")?;
403+
if let Some(immutable) = immutable {
404+
if *immutable {
405+
write!(f, " IMMUTABLE")?;
406+
} else {
407+
write!(f, " VOLATILE")?;
408+
}
409+
}
410+
write!(f, " HANDLER = {handler}")?;
411+
if !headers.is_empty() {
412+
write!(f, " HEADERS = (")?;
413+
for (i, (key, value)) in headers.iter().enumerate() {
414+
if i > 0 {
415+
write!(f, ", ")?;
416+
}
417+
write!(f, "{key} = {value}")?;
418+
}
419+
write!(f, ")")?;
420+
}
421+
write!(f, " ADDRESS = {address}")?;
422+
}
356423
UDFDefinition::ScalarUDF(ScalarUDF {
357424
arg_types,
358425
return_type,

src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,88 @@ impl FromToProto for mt::UDTF {
359359
}
360360
}
361361

362+
impl FromToProto for mt::UDTFServer {
363+
type PB = pb::UdtfServer;
364+
365+
fn get_pb_ver(p: &Self::PB) -> u64 {
366+
p.ver
367+
}
368+
369+
fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
370+
where Self: Sized {
371+
reader_check_msg(p.ver, p.min_reader_ver)?;
372+
373+
let mut arg_types = Vec::with_capacity(p.arg_types.len());
374+
for arg_type in p.arg_types {
375+
let arg_type = DataType::from(&TableDataType::from_pb(arg_type)?);
376+
arg_types.push(arg_type);
377+
}
378+
let mut return_types = Vec::new();
379+
for return_ty in p.return_types {
380+
let ty_pb = return_ty.ty.ok_or_else(|| {
381+
Incompatible::new("UDTF.arg_types.ty can not be None".to_string())
382+
})?;
383+
let ty = TableDataType::from_pb(ty_pb)?;
384+
385+
return_types.push((return_ty.name, (&ty).into()));
386+
}
387+
388+
Ok(mt::UDTFServer {
389+
address: p.address,
390+
arg_types,
391+
return_types,
392+
handler: p.handler,
393+
headers: p.headers,
394+
language: p.language,
395+
immutable: p.immutable,
396+
arg_names: p.arg_names,
397+
})
398+
}
399+
400+
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
401+
let mut arg_types = Vec::with_capacity(self.arg_types.len());
402+
for arg_type in self.arg_types.iter() {
403+
let arg_type = infer_schema_type(arg_type)
404+
.map_err(|e| {
405+
Incompatible::new(format!(
406+
"Convert DataType to TableDataType failed: {}",
407+
e.message()
408+
))
409+
})?
410+
.to_pb()?;
411+
arg_types.push(arg_type);
412+
}
413+
let mut return_types = Vec::with_capacity(self.return_types.len());
414+
for (return_name, return_type) in self.return_types.iter() {
415+
let return_type = infer_schema_type(return_type)
416+
.map_err(|e| {
417+
Incompatible::new(format!(
418+
"Convert DataType to TableDataType failed: {}",
419+
e.message()
420+
))
421+
})?
422+
.to_pb()?;
423+
return_types.push(UdtfArg {
424+
name: return_name.clone(),
425+
ty: Some(return_type),
426+
});
427+
}
428+
429+
Ok(pb::UdtfServer {
430+
ver: VER,
431+
min_reader_ver: MIN_READER_VER,
432+
address: self.address.clone(),
433+
handler: self.handler.clone(),
434+
headers: self.headers.clone(),
435+
language: self.language.clone(),
436+
arg_types,
437+
return_types,
438+
immutable: self.immutable,
439+
arg_names: self.arg_names.clone(),
440+
})
441+
}
442+
}
443+
362444
impl FromToProto for mt::ScalarUDF {
363445
type PB = pb::ScalarUdf;
364446

@@ -454,6 +536,9 @@ impl FromToProto for mt::UserDefinedFunction {
454536
Some(pb::user_defined_function::Definition::ScalarUdf(scalar_udf)) => {
455537
mt::UDFDefinition::ScalarUDF(mt::ScalarUDF::from_pb(scalar_udf)?)
456538
}
539+
Some(pb::user_defined_function::Definition::UdtfServer(udtf_server)) => {
540+
mt::UDFDefinition::UDTFServer(mt::UDTFServer::from_pb(udtf_server)?)
541+
}
457542
None => {
458543
return Err(Incompatible::new(
459544
"UserDefinedFunction.definition cannot be None".to_string(),
@@ -492,6 +577,9 @@ impl FromToProto for mt::UserDefinedFunction {
492577
mt::UDFDefinition::ScalarUDF(scalar_udf) => {
493578
pb::user_defined_function::Definition::ScalarUdf(scalar_udf.to_pb()?)
494579
}
580+
mt::UDFDefinition::UDTFServer(udtf_server) => {
581+
pb::user_defined_function::Definition::UdtfServer(udtf_server.to_pb()?)
582+
}
495583
};
496584

497585
Ok(pb::UserDefinedFunction {

src/meta/protos/proto/udf.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,20 @@ message UDTF {
8686
string sql = 3;
8787
}
8888

89+
message UDTFServer {
90+
uint64 ver = 100;
91+
uint64 min_reader_ver = 101;
92+
93+
string address = 1;
94+
string handler = 2;
95+
string language = 3;
96+
repeated DataType arg_types = 4;
97+
repeated UDTFArg return_types = 5;
98+
map<string, string> headers = 6;
99+
optional bool immutable = 7;
100+
repeated string arg_names = 8;
101+
}
102+
89103
message ScalarUDF {
90104
uint64 ver = 100;
91105
uint64 min_reader_ver = 101;
@@ -112,6 +126,7 @@ message UserDefinedFunction {
112126
UDAFScript udaf_script = 7;
113127
UDTF udtf = 8;
114128
ScalarUDF scalar_udf = 9;
129+
UDTFServer udtf_server = 10;
115130
}
116131
// The time udf created.
117132
optional string created_on = 5;

src/query/ast/src/ast/statements/udf.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,15 @@ pub enum UDFDefinition {
8888
return_types: Vec<(Identifier, TypeName)>,
8989
sql: String,
9090
},
91+
UDTFServer {
92+
arg_types: Vec<(Identifier, TypeName)>,
93+
return_types: Vec<(Identifier, TypeName)>,
94+
address: String,
95+
handler: String,
96+
headers: BTreeMap<String, String>,
97+
language: String,
98+
immutable: Option<bool>,
99+
},
91100
ScalarUDF {
92101
arg_types: Vec<(Identifier, TypeName)>,
93102
definition: String,
@@ -282,6 +291,46 @@ impl Display for UDFDefinition {
282291
)?;
283292
write!(f, ") AS $$\n{sql}\n$$")?;
284293
}
294+
UDFDefinition::UDTFServer {
295+
arg_types,
296+
return_types,
297+
address,
298+
handler,
299+
headers,
300+
language,
301+
immutable,
302+
} => {
303+
write!(f, "(")?;
304+
write_comma_separated_list(
305+
f,
306+
arg_types.iter().map(|(name, ty)| format!("{name} {ty}")),
307+
)?;
308+
write!(f, ") RETURNS TABLE (")?;
309+
write_comma_separated_list(
310+
f,
311+
return_types.iter().map(|(name, ty)| format!("{name} {ty}")),
312+
)?;
313+
write!(f, ") LANGUAGE {language}")?;
314+
if let Some(immutable) = immutable {
315+
if *immutable {
316+
write!(f, " IMMUTABLE")?;
317+
} else {
318+
write!(f, " VOLATILE")?;
319+
}
320+
}
321+
write!(f, " HANDLER = '{handler}'")?;
322+
if !headers.is_empty() {
323+
write!(f, " HEADERS = (")?;
324+
for (i, (key, value)) in headers.iter().enumerate() {
325+
if i > 0 {
326+
write!(f, ", ")?;
327+
}
328+
write!(f, "'{key}' = '{value}'")?;
329+
}
330+
write!(f, ")")?;
331+
}
332+
write!(f, " ADDRESS = '{address}'")?;
333+
}
285334
UDFDefinition::ScalarUDF {
286335
arg_types,
287336
definition,

0 commit comments

Comments
 (0)