Skip to content

Commit 5c5fa9d

Browse files
Standing-Manadriangb
authored andcommitted
feat(spark): implement Spark datetime function last_day (apache#16828)
* feat(spark): implement Spark datetime function last_day Signed-off-by: Alan Tang <jmtangcs@gmail.com> * chore: fix the export function name Signed-off-by: Alan Tang <jmtangcs@gmail.com> * chore: Fix Cargo.toml formatting Signed-off-by: Alan Tang <jmtangcs@gmail.com> * test: add more tests for spark function last_day Signed-off-by: Alan Tang <jmtangcs@gmail.com> * feat(spark): set the signature to be taking exactly one Date32 type Signed-off-by: Alan Tang <jmtangcs@gmail.com> * test(spark): add more bad cases Signed-off-by: Alan Tang <jmtangcs@gmail.com> * chore: clean up redundant package Signed-off-by: Alan Tang <jmtangcs@gmail.com> --------- Signed-off-by: Alan Tang <jmtangcs@gmail.com>
1 parent 17ddda4 commit 5c5fa9d

File tree

5 files changed

+238
-6
lines changed

5 files changed

+238
-6
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/spark/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ name = "datafusion_spark"
3737

3838
[dependencies]
3939
arrow = { workspace = true }
40+
chrono = { workspace = true }
4041
datafusion-catalog = { workspace = true }
4142
datafusion-common = { workspace = true }
4243
datafusion-execution = { workspace = true }
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{ArrayRef, AsArray, Date32Array};
22+
use arrow::datatypes::{DataType, Date32Type};
23+
use chrono::{Datelike, Duration, NaiveDate};
24+
use datafusion_common::{exec_datafusion_err, internal_err, Result, ScalarValue};
25+
use datafusion_expr::{
26+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
27+
};
28+
29+
#[derive(Debug)]
30+
pub struct SparkLastDay {
31+
signature: Signature,
32+
}
33+
34+
impl Default for SparkLastDay {
35+
fn default() -> Self {
36+
Self::new()
37+
}
38+
}
39+
40+
impl SparkLastDay {
41+
pub fn new() -> Self {
42+
Self {
43+
signature: Signature::exact(vec![DataType::Date32], Volatility::Immutable),
44+
}
45+
}
46+
}
47+
48+
impl ScalarUDFImpl for SparkLastDay {
49+
fn as_any(&self) -> &dyn Any {
50+
self
51+
}
52+
53+
fn name(&self) -> &str {
54+
"last_day"
55+
}
56+
57+
fn signature(&self) -> &Signature {
58+
&self.signature
59+
}
60+
61+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
62+
Ok(DataType::Date32)
63+
}
64+
65+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
66+
let ScalarFunctionArgs { args, .. } = args;
67+
let [arg] = args.as_slice() else {
68+
return internal_err!(
69+
"Spark `last_day` function requires 1 argument, got {}",
70+
args.len()
71+
);
72+
};
73+
match arg {
74+
ColumnarValue::Scalar(ScalarValue::Date32(days)) => {
75+
if let Some(days) = days {
76+
Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some(
77+
spark_last_day(*days)?,
78+
))))
79+
} else {
80+
Ok(ColumnarValue::Scalar(ScalarValue::Date32(None)))
81+
}
82+
}
83+
ColumnarValue::Array(array) => {
84+
let result = match array.data_type() {
85+
DataType::Date32 => {
86+
let result: Date32Array = array
87+
.as_primitive::<Date32Type>()
88+
.try_unary(spark_last_day)?
89+
.with_data_type(DataType::Date32);
90+
Ok(Arc::new(result) as ArrayRef)
91+
}
92+
other => {
93+
internal_err!("Unsupported data type {other:?} for Spark function `last_day`")
94+
}
95+
}?;
96+
Ok(ColumnarValue::Array(result))
97+
}
98+
other => {
99+
internal_err!("Unsupported arg {other:?} for Spark function `last_day")
100+
}
101+
}
102+
}
103+
}
104+
105+
fn spark_last_day(days: i32) -> Result<i32> {
106+
let date = Date32Type::to_naive_date(days);
107+
108+
let (year, month) = (date.year(), date.month());
109+
let (next_year, next_month) = if month == 12 {
110+
(year + 1, 1)
111+
} else {
112+
(year, month + 1)
113+
};
114+
115+
let first_day_next_month = NaiveDate::from_ymd_opt(next_year, next_month, 1)
116+
.ok_or_else(|| {
117+
exec_datafusion_err!(
118+
"Spark `last_day`: Unable to parse date from {next_year}, {next_month}, 1"
119+
)
120+
})?;
121+
122+
Ok(Date32Type::from_naive_date(
123+
first_day_next_month - Duration::days(1),
124+
))
125+
}

datafusion/spark/src/function/datetime/mod.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,24 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
pub mod last_day;
19+
1820
use datafusion_expr::ScalarUDF;
21+
use datafusion_functions::make_udf_function;
1922
use std::sync::Arc;
2023

21-
pub mod expr_fn {}
24+
make_udf_function!(last_day::SparkLastDay, last_day);
25+
26+
pub mod expr_fn {
27+
use datafusion_functions::export_functions;
28+
29+
export_functions!((
30+
last_day,
31+
"Returns the last day of the month which the date belongs to.",
32+
arg1
33+
));
34+
}
2235

2336
pub fn functions() -> Vec<Arc<ScalarUDF>> {
24-
vec![]
37+
vec![last_day()]
2538
}

datafusion/sqllogictest/test_files/spark/datetime/last_day.slt

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,99 @@
2121
# For more information, please see:
2222
# https://github.com/apache/datafusion/issues/15914
2323

24-
## Original Query: SELECT last_day('2009-01-12');
25-
## PySpark 3.5.5 Result: {'last_day(2009-01-12)': datetime.date(2009, 1, 31), 'typeof(last_day(2009-01-12))': 'date', 'typeof(2009-01-12)': 'string'}
26-
#query
27-
#SELECT last_day('2009-01-12'::string);
24+
query D
25+
SELECT last_day('2009-01-12'::DATE);
26+
----
27+
2009-01-31
28+
29+
30+
query D
31+
SELECT last_day('2015-02-28'::DATE);
32+
----
33+
2015-02-28
34+
35+
query D
36+
SELECT last_day('2015-03-27'::DATE);
37+
----
38+
2015-03-31
39+
40+
query D
41+
SELECT last_day('2015-04-26'::DATE);
42+
----
43+
2015-04-30
44+
45+
query D
46+
SELECT last_day('2015-05-25'::DATE);
47+
----
48+
2015-05-31
49+
50+
query D
51+
SELECT last_day('2015-06-24'::DATE);
52+
----
53+
2015-06-30
54+
55+
query D
56+
SELECT last_day('2015-07-23'::DATE);
57+
----
58+
2015-07-31
59+
60+
query D
61+
SELECT last_day('2015-08-01'::DATE);
62+
----
63+
2015-08-31
64+
65+
query D
66+
SELECT last_day('2015-09-02'::DATE);
67+
----
68+
2015-09-30
69+
70+
query D
71+
SELECT last_day('2015-10-03'::DATE);
72+
----
73+
2015-10-31
74+
75+
query D
76+
SELECT last_day('2015-11-04'::DATE);
77+
----
78+
2015-11-30
79+
80+
query D
81+
SELECT last_day('2015-12-05'::DATE);
82+
----
83+
2015-12-31
84+
85+
86+
query D
87+
SELECT last_day('2016-01-06'::DATE);
88+
----
89+
2016-01-31
90+
91+
query D
92+
SELECT last_day('2016-02-07'::DATE);
93+
----
94+
2016-02-29
95+
96+
97+
query D
98+
SELECT last_day(null::DATE);
99+
----
100+
NULL
101+
102+
103+
statement error Failed to coerce arguments to satisfy a call to 'last_day' function
104+
select last_day('foo');
105+
106+
107+
statement error Failed to coerce arguments to satisfy a call to 'last_day' function
108+
select last_day(123);
109+
110+
111+
statement error 'last_day' does not support zero arguments
112+
select last_day();
113+
114+
statement error Failed to coerce arguments to satisfy a call to 'last_day' function
115+
select last_day(last_day('2016-02-07'::string, 'foo'));
116+
117+
statement error Failed to coerce arguments to satisfy a call to 'last_day' function
118+
select last_day(last_day('2016-02-31'::string));
119+

0 commit comments

Comments
 (0)