Skip to content

Commit d00db21

Browse files
committed
feat(spark): implement Spark datetime function last_day
Signed-off-by: Alan Tang <jmtangcs@gmail.com>
1 parent eb25e8d commit d00db21

File tree

5 files changed

+168
-4
lines changed

5 files changed

+168
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/spark/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ datafusion-expr = { workspace = true }
4444
datafusion-functions = { workspace = true, features = ["crypto_expressions"] }
4545
datafusion-macros = { workspace = true }
4646
log = { workspace = true }
47+
chrono = { workspace = true }
4748

4849
[dev-dependencies]
4950
criterion = { workspace = true }
5051
rand = { workspace = true }
52+
chrono = { workspace = true }
5153

5254
[[bench]]
5355
harness = false
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{ArrayRef, AsArray, Date32Array};
22+
use arrow::datatypes::{DataType, Date32Type};
23+
use chrono::{Datelike, Duration, NaiveDate};
24+
use datafusion_common::types::NativeType;
25+
use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result, ScalarValue};
26+
use datafusion_expr::{
27+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
28+
};
29+
30+
#[derive(Debug)]
31+
pub struct SparkLastDay {
32+
signature: Signature,
33+
}
34+
35+
impl Default for SparkLastDay {
36+
fn default() -> Self {
37+
Self::new()
38+
}
39+
}
40+
41+
impl SparkLastDay {
42+
pub fn new() -> Self {
43+
Self {
44+
signature: Signature::user_defined(Volatility::Immutable),
45+
}
46+
}
47+
}
48+
49+
impl ScalarUDFImpl for SparkLastDay {
50+
fn as_any(&self) -> &dyn Any {
51+
self
52+
}
53+
54+
fn name(&self) -> &str {
55+
"spark_last_day"
56+
}
57+
58+
fn signature(&self) -> &Signature {
59+
&self.signature
60+
}
61+
62+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
63+
Ok(DataType::Date32)
64+
}
65+
66+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
67+
let ScalarFunctionArgs { args, .. } = args;
68+
let [arg] = args.as_slice() else {
69+
return exec_err!(
70+
"Spark `last_day` function requires 1 argument, got {}",
71+
args.len()
72+
);
73+
};
74+
match arg {
75+
ColumnarValue::Scalar(ScalarValue::Date32(days)) => {
76+
if let Some(days) = days {
77+
Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some(
78+
spark_last_day(*days)?,
79+
))))
80+
} else {
81+
Ok(ColumnarValue::Scalar(ScalarValue::Date32(None)))
82+
}
83+
}
84+
ColumnarValue::Array(array) => {
85+
let result = match array.data_type() {
86+
DataType::Date32 => {
87+
let result: Date32Array = array
88+
.as_primitive::<Date32Type>()
89+
.try_unary(spark_last_day)?
90+
.with_data_type(DataType::Date32);
91+
Ok(Arc::new(result) as ArrayRef)
92+
}
93+
other => {
94+
exec_err!("Unsupported data type {other:?} for Spark function `last_day`")
95+
}
96+
}?;
97+
Ok(ColumnarValue::Array(result))
98+
}
99+
other => exec_err!("Unsupported arg {other:?} for Spark function `last_day"),
100+
}
101+
}
102+
103+
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
104+
if arg_types.len() != 1 {
105+
return exec_err!(
106+
"Spark `last_day` function requires 1 argument, got {}",
107+
arg_types.len()
108+
);
109+
}
110+
111+
let current_native_type: NativeType = (&arg_types[0]).into();
112+
if matches!(current_native_type, NativeType::Date)
113+
|| matches!(current_native_type, NativeType::String)
114+
|| matches!(current_native_type, NativeType::Null)
115+
{
116+
Ok(vec![DataType::Date32])
117+
} else {
118+
plan_err!(
119+
"The first argument of the Spark `last_day` function can only be a date or string, but got {}", &arg_types[0]
120+
)
121+
}
122+
}
123+
}
124+
125+
fn spark_last_day(days: i32) -> Result<i32> {
126+
let date = Date32Type::to_naive_date(days);
127+
128+
let (year, month) = (date.year(), date.month());
129+
let (next_year, next_month) = if month == 12 {
130+
(year + 1, 1)
131+
} else {
132+
(year, month + 1)
133+
};
134+
135+
let first_day_next_month = NaiveDate::from_ymd_opt(next_year, next_month, 1)
136+
.ok_or_else(|| {
137+
exec_datafusion_err!(
138+
"Spark `last_day`: Unable to parse date from {next_year}, {next_month}, 1"
139+
)
140+
})?;
141+
142+
Ok(Date32Type::from_naive_date(
143+
first_day_next_month - Duration::days(1),
144+
))
145+
}

datafusion/spark/src/function/datetime/mod.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,24 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
pub mod last_day;
19+
1820
use datafusion_expr::ScalarUDF;
21+
use datafusion_functions::make_udf_function;
1922
use std::sync::Arc;
2023

21-
pub mod expr_fn {}
24+
make_udf_function!(last_day::SparkLastDay, last_day);
25+
26+
pub mod expr_fn {
27+
use datafusion_functions::export_functions;
28+
29+
export_functions!((
30+
last_day,
31+
"Returns the last day of the month which the date belongs to.",
32+
arg1
33+
));
34+
}
2235

2336
pub fn functions() -> Vec<Arc<ScalarUDF>> {
24-
vec![]
37+
vec![last_day()]
2538
}

datafusion/sqllogictest/test_files/spark/datetime/last_day.slt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,8 @@
2323

2424
## Original Query: SELECT last_day('2009-01-12');
2525
## PySpark 3.5.5 Result: {'last_day(2009-01-12)': datetime.date(2009, 1, 31), 'typeof(last_day(2009-01-12))': 'date', 'typeof(2009-01-12)': 'string'}
26-
#query
27-
#SELECT last_day('2009-01-12'::string);
26+
27+
query D
28+
SELECT last_day('2009-01-12'::string);
29+
----
30+
2009-01-31

0 commit comments

Comments
 (0)