diff --git a/Cargo.lock b/Cargo.lock index e0b82186a..e5b2d4d50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3447,9 +3447,9 @@ checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" [[package]] name = "libc" -version = "0.2.176" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libloading" @@ -4938,6 +4938,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "sedona-extension" +version = "0.2.0" +dependencies = [ + "arrow-array", + "arrow-schema", + "datafusion-common", + "datafusion-expr", + "libc", + "sedona-common", + "sedona-expr", + "sedona-schema", + "sedona-testing", +] + [[package]] name = "sedona-functions" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 2f7e60571..a0c62fa9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ # under the License. [workspace] members = [ + "c/sedona-extension", "c/sedona-geoarrow-c", "c/sedona-geos", "c/sedona-proj", diff --git a/c/sedona-extension/Cargo.toml b/c/sedona-extension/Cargo.toml new file mode 100644 index 000000000..e8f8c17c5 --- /dev/null +++ b/c/sedona-extension/Cargo.toml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "sedona-extension" +version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description.workspace = true +readme.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +arrow-array = { workspace = true, features = ["ffi"]} +arrow-schema = { workspace = true, features = ["ffi"]} +datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } +libc = "0.2.178" +sedona-common = { workspace = true } +sedona-expr = { workspace = true } +sedona-schema = { workspace = true } +sedona-testing = { path = "../../rust/sedona-testing" } diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs new file mode 100644 index 000000000..7d957a57c --- /dev/null +++ b/c/sedona-extension/src/extension.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + ffi::c_int, + os::raw::{c_char, c_void}, + ptr::null_mut, +}; + +use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; + +/// Raw FFI representation of the SedonaCScalarKernel +/// +/// See the ImportedScalarKernel and ExportedScalarKernel for high-level +/// APIs to import and export implementations using this struct. +#[derive(Default)] +#[repr(C)] +pub struct SedonaCScalarKernel { + pub function_name: + Option *const c_char>, + pub new_impl: Option< + unsafe extern "C" fn(self_: *const SedonaCScalarKernel, out: *mut SedonaCScalarKernelImpl), + >, + + pub release: Option, + pub private_data: *mut c_void, +} + +unsafe impl Send for SedonaCScalarKernel {} +unsafe impl Sync for SedonaCScalarKernel {} + +impl Drop for SedonaCScalarKernel { + fn drop(&mut self) { + if let Some(releaser) = self.release { + unsafe { releaser(self) } + self.release = None; + self.private_data = null_mut(); + } + } +} + +/// Raw FFI representation of the SedonaCScalarKernelImpl +#[derive(Default)] +#[repr(C)] +pub struct SedonaCScalarKernelImpl { + pub init: Option< + unsafe extern "C" fn( + self_: *mut SedonaCScalarKernelImpl, + arg_types: *const *const FFI_ArrowSchema, + scalar_args: *const *mut FFI_ArrowArray, + n_args: i64, + out: *mut FFI_ArrowSchema, + ) -> c_int, + >, + + pub execute: Option< + unsafe extern "C" fn( + self_: *mut SedonaCScalarKernelImpl, + args: *const *mut FFI_ArrowArray, + n_args: i64, + n_rows: i64, + out: *mut FFI_ArrowArray, + ) -> c_int, + >, + + pub get_last_error: + Option *const c_char>, + + pub release: Option, + + pub private_data: *mut c_void, +} + +impl Drop for SedonaCScalarKernelImpl { + fn drop(&mut self) { + if let Some(releaser) = self.release { + unsafe { releaser(self) } + self.release = None; + self.private_data = null_mut(); + } + } +} + +/// Check if a schema is valid +/// +/// The [FFI_ArrowSchema] doesn't have the ability to check for a NULL release callback, +/// so we provide a mechanism to do so here. +pub fn ffi_arrow_schema_is_valid(schema: *const FFI_ArrowSchema) -> bool { + let schema_internal = schema as *const c_void as *const ArrowSchemaInternal; + if let Some(schema_ref) = unsafe { schema_internal.as_ref() } { + schema_ref.release.is_some() + } else { + false + } +} + +#[repr(C)] +struct ArrowSchemaInternal { + format: *const c_char, + name: *const c_char, + metadata: *const c_char, + flags: i64, + n_children: i64, + children: *mut *mut ArrowSchemaInternal, + dictionary: *mut ArrowSchemaInternal, + release: Option, + private_data: *mut c_void, +} diff --git a/c/sedona-extension/src/lib.rs b/c/sedona-extension/src/lib.rs new file mode 100644 index 000000000..adc0a9213 --- /dev/null +++ b/c/sedona-extension/src/lib.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod extension; +pub mod scalar_kernel; diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs new file mode 100644 index 000000000..17972356e --- /dev/null +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -0,0 +1,882 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ + ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema}, + make_array, ArrayRef, +}; +use arrow_schema::{ArrowError, Field}; +use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue}; +use datafusion_expr::ColumnarValue; +use sedona_common::sedona_internal_err; +use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel}; +use sedona_schema::datatypes::SedonaType; +use std::{ + ffi::{c_char, c_int, c_void, CStr, CString}, + fmt::Debug, + iter::zip, + ptr::{null, null_mut, swap_nonoverlapping}, + str::FromStr, +}; + +use crate::extension::{ffi_arrow_schema_is_valid, SedonaCScalarKernel, SedonaCScalarKernelImpl}; + +/// Wrapper around a [SedonaCScalarKernel] that implements [SedonaScalarKernel] +/// +/// This is the means by which a kernel implementation may be imported from a +/// C implementation. +pub struct ImportedScalarKernel { + inner: SedonaCScalarKernel, + function_name: Option, +} + +impl Debug for ImportedScalarKernel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ImportedScalarKernel") + .field("inner", &"") + .finish() + } +} + +impl TryFrom for ImportedScalarKernel { + type Error = DataFusionError; + + fn try_from(value: SedonaCScalarKernel) -> Result { + match (&value.function_name, &value.new_impl, &value.release) { + (Some(function_name), Some(_), Some(_)) => { + let name_ptr = unsafe { function_name(&value) }; + let name = if name_ptr.is_null() { + None + } else { + Some( + unsafe { CStr::from_ptr(name_ptr) } + .to_string_lossy() + .into_owned(), + ) + }; + + Ok(Self { + inner: value, + function_name: name, + }) + } + _ => sedona_internal_err!("Can't import released or uninitialized SedonaCScalarKernel"), + } + } +} + +impl ImportedScalarKernel { + pub fn function_name(&self) -> Option<&str> { + self.function_name.as_deref() + } +} + +impl SedonaScalarKernel for ImportedScalarKernel { + fn return_type_from_args_and_scalars( + &self, + args: &[SedonaType], + scalar_args: &[Option<&ScalarValue>], + ) -> Result> { + let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?; + inner_impl.init(args, scalar_args) + } + + fn invoke_batch_from_args( + &self, + arg_types: &[SedonaType], + args: &[ColumnarValue], + return_type: &SedonaType, + num_rows: usize, + ) -> Result { + let arg_scalars = args + .iter() + .map(|arg| { + if let ColumnarValue::Scalar(scalar) = arg { + Some(scalar) + } else { + None + } + }) + .collect::>(); + + let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?; + inner_impl.init(arg_types, &arg_scalars)?; + let result_array = inner_impl.execute(args, return_type, num_rows)?; + for arg in args { + if let ColumnarValue::Array(_) = arg { + return Ok(ColumnarValue::Array(result_array)); + } + } + + if result_array.len() != 1 { + sedona_internal_err!( + "Expected scalar result but got result with length {}", + result_array.len() + ) + } else { + Ok(ColumnarValue::Scalar(ScalarValue::try_from_array( + &result_array, + 0, + )?)) + } + } + + fn return_type(&self, _args: &[SedonaType]) -> Result> { + sedona_internal_err!( + "Should not be called because return_type_from_args_and_scalars() is implemented" + ) + } + + fn invoke_batch( + &self, + _arg_types: &[SedonaType], + _args: &[ColumnarValue], + ) -> Result { + sedona_internal_err!("Should not be called because invoke_batch_from_args() is implemented") + } +} + +/// Wrapper class handling the verbose details of preparing and executing FFI calls +/// for the [SedonaCScalarKernelImpl] +struct CScalarKernelImplWrapper { + inner: SedonaCScalarKernelImpl, +} + +impl CScalarKernelImplWrapper { + fn try_new(factory: &SedonaCScalarKernel) -> Result { + if let Some(init) = factory.new_impl { + let mut inner = SedonaCScalarKernelImpl::default(); + unsafe { init(factory, &mut inner) }; + Ok(Self { inner }) + } else { + sedona_internal_err!("SedonaCScalarKernel is not valid") + } + } + + fn init( + &mut self, + arg_types: &[SedonaType], + arg_scalars: &[Option<&ScalarValue>], + ) -> Result> { + if arg_types.len() != arg_scalars.len() { + return sedona_internal_err!("field/scalar lengths must be identical"); + } + + // Convert arg_types to Vec + let arg_fields = arg_types + .iter() + .map(|sedona_type| sedona_type.to_storage_field("", true)) + .collect::>>()?; + + // Convert arg types to Vec + let ffi_fields = arg_fields + .iter() + .map(FFI_ArrowSchema::try_from) + .collect::, ArrowError>>()?; + + // Convert arg types to Vec<*const FFI_ArrowSchema> + let ffi_field_ptrs = ffi_fields + .iter() + .map(|ffi_field| ffi_field as *const FFI_ArrowSchema) + .collect::>(); + + // Convert arg_scalars to Vec> + let mut ffi_scalars = arg_scalars + .iter() + .map(|maybe_scalar| { + if let Some(scalar) = maybe_scalar { + let array = scalar.to_array()?; + Ok(Some(FFI_ArrowArray::new(&array.to_data()))) + } else { + Ok(None) + } + }) + .collect::>>()?; + + // Convert arg_scalars to Vec<*mut FFI_ArrowArray> + let ffi_scalar_ptrs = ffi_scalars + .iter_mut() + .map(|maybe_ffi_scalar| match maybe_ffi_scalar { + Some(ffi_scalar) => ffi_scalar as *mut FFI_ArrowArray, + None => null_mut(), + }) + .collect::>(); + + // Call the FFI implementation of init + if let Some(init) = self.inner.init { + let mut ffi_out = FFI_ArrowSchema::empty(); + let code = unsafe { + init( + &mut self.inner, + ffi_field_ptrs.as_ptr(), + ffi_scalar_ptrs.as_ptr(), + arg_types.len() as i64, + &mut ffi_out, + ) + }; + + // On success, convert the output to SedonaType. If the implementation + // returned a "released" schema, this is the equivalent to our return_type() + // returning None (for "this kernel doesn't apply"). + // On error, query the FFI implementation for the last error string. + if code == 0 { + if ffi_arrow_schema_is_valid(&ffi_out) { + let field = Field::try_from(&ffi_out)?; + Ok(Some(SedonaType::from_storage_field(&field)?)) + } else { + Ok(None) + } + } else { + plan_err!( + "SedonaCScalarKernelImpl::init failed: {}", + self.last_error(code) + ) + } + } else { + sedona_internal_err!("Invalid SedonaCScalarKernelImpl") + } + } + + fn execute( + &mut self, + args: &[ColumnarValue], + return_type: &SedonaType, + num_rows: usize, + ) -> Result { + // Convert args to Vec + let arg_arrays = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(array) => Ok(array.clone()), + ColumnarValue::Scalar(scalar_value) => scalar_value.to_array(), + }) + .collect::>>()?; + + // Convert args to Vec + let mut ffi_args = arg_arrays + .iter() + .map(|arg| FFI_ArrowArray::new(&arg.to_data())) + .collect::>(); + let ffi_arg_ptrs = ffi_args + .iter_mut() + .map(|arg| arg as *mut FFI_ArrowArray) + .collect::>(); + + // Call the FFI implementation of execute() + if let Some(execute) = self.inner.execute { + let mut ffi_out = FFI_ArrowArray::empty(); + let code = unsafe { + execute( + &mut self.inner, + ffi_arg_ptrs.as_ptr(), + args.len() as i64, + num_rows as i64, + &mut ffi_out, + ) + }; + + // On success, convert the result to an ArrayRef. + // On error, query the FFI implementation for the last error string. + if code == 0 { + let data = unsafe { + arrow_array::ffi::from_ffi_and_data_type( + ffi_out, + return_type.storage_type().clone(), + )? + }; + Ok(arrow_array::make_array(data)) + } else { + plan_err!( + "SedonaCScalarKernelImpl::execute failed: {}", + self.last_error(code) + ) + } + } else { + sedona_internal_err!("Invalid SedonaCScalarKernelImpl") + } + } + + /// Helper to get the last error from the FFI implementation as a Rust String + fn last_error(&mut self, code: c_int) -> String { + if let Some(get_last_error) = self.inner.get_last_error { + let c_err = unsafe { get_last_error(&mut self.inner) }; + if c_err.is_null() { + format!("({code})") + } else { + unsafe { CStr::from_ptr(c_err) } + .to_string_lossy() + .into_owned() + } + } else { + "Invalid SedonaCScalarKernelImpl".to_string() + } + } +} + +/// Wrapper around a [ScalarKernelRef] that may be used to export an existing +/// kernel across an FFI boundary using the [SedonaCScalarKernel] +pub struct ExportedScalarKernel { + inner: ScalarKernelRef, + function_name: Option, +} + +impl From for ExportedScalarKernel { + fn from(value: ScalarKernelRef) -> Self { + ExportedScalarKernel { + inner: value, + function_name: None, + } + } +} + +impl From for SedonaCScalarKernel { + fn from(value: ExportedScalarKernel) -> Self { + let box_value = Box::new(value); + Self { + function_name: Some(c_factory_function_name), + new_impl: Some(c_factory_new_impl), + release: Some(c_factory_release), + private_data: Box::leak(box_value) as *mut ExportedScalarKernel as *mut c_void, + } + } +} + +impl ExportedScalarKernel { + /// Add a function name to this exported kernel + /// + /// This ensures the kernel will be registered with the appropriate function + /// when passed across a boundary. + pub fn with_function_name(self, function_name: impl AsRef) -> Self { + Self { + inner: self.inner, + function_name: Some(CString::from_str(function_name.as_ref()).unwrap()), + } + } + + fn new_impl(&self) -> ExportedScalarKernelImpl { + ExportedScalarKernelImpl::new(self.inner.clone()) + } +} + +/// C callable wrapper to expose [ExportedScalarKernel::function_name] +unsafe extern "C" fn c_factory_function_name(self_: *const SedonaCScalarKernel) -> *const c_char { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + .as_ref() + .unwrap(); + if let Some(function_name) = &private_data.function_name { + function_name.as_ptr() + } else { + null() + } +} + +/// C callable wrapper around [ExportedScalarKernel::new_impl] +unsafe extern "C" fn c_factory_new_impl( + self_: *const SedonaCScalarKernel, + out: *mut SedonaCScalarKernelImpl, +) { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + .as_ref() + .unwrap(); + *out = SedonaCScalarKernelImpl::from(private_data.new_impl()) +} + +/// C Callable wrapper called when this value is dropped via FFI +unsafe extern "C" fn c_factory_release(self_: *mut SedonaCScalarKernel) { + assert!(!self_.is_null()); + let self_ref = self_.as_mut().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let boxed = Box::from_raw(self_ref.private_data as *mut ExportedScalarKernel); + drop(boxed); + + self_ref.private_data = null_mut(); + self_ref.release = None; +} + +/// Rust-backed implementation of [SedonaCScalarKernelImpl] +struct ExportedScalarKernelImpl { + inner: ScalarKernelRef, + last_arg_types: Option>, + last_return_type: Option, + last_error: CString, +} + +impl From for SedonaCScalarKernelImpl { + fn from(value: ExportedScalarKernelImpl) -> Self { + let box_value = Box::new(value); + Self { + init: Some(c_kernel_init), + execute: Some(c_kernel_execute), + get_last_error: Some(c_kernel_last_error), + release: Some(c_kernel_release), + private_data: Box::leak(box_value) as *mut ExportedScalarKernelImpl as *mut c_void, + } + } +} + +impl ExportedScalarKernelImpl { + fn new(kernel: ScalarKernelRef) -> Self { + Self { + inner: kernel, + last_arg_types: None, + last_return_type: None, + last_error: CString::default(), + } + } + + fn init( + &mut self, + ffi_types: &[*const FFI_ArrowSchema], + ffi_scalar_args: &[*mut FFI_ArrowArray], + ) -> Result> { + // Convert the input types to Vec + let arg_fields = ffi_types + .iter() + .map(|ptr| { + if let Some(ffi_schema) = unsafe { ptr.as_ref() } { + Field::try_from(ffi_schema) + } else { + Err(ArrowError::CDataInterface( + "FFI_ArrowSchema is NULL".to_string(), + )) + } + }) + .collect::, ArrowError>>()?; + + // Convert the input types to Vec + let args = arg_fields + .iter() + .map(SedonaType::from_storage_field) + .collect::>>()?; + + // Convert the scalar arguments to Vec> + let arg_arrays = zip(ffi_scalar_args, &args) + .map(|(ptr, arg)| { + if ptr.is_null() { + Ok(None) + } else { + let owned_ffi_array = unsafe { FFI_ArrowArray::from_raw(*ptr) }; + let data = unsafe { + from_ffi_and_data_type(owned_ffi_array, arg.storage_type().clone())? + }; + Ok(Some(make_array(data))) + } + }) + .collect::, ArrowError>>()?; + + // Convert the scalar arguments to Vec> + let scalar_args = arg_arrays + .iter() + .map(|maybe_array| { + if let Some(array) = maybe_array { + Ok(Some(ScalarValue::try_from_array(array, 0)?)) + } else { + Ok(None) + } + }) + .collect::>>()?; + + // Convert the scalar arguments to Vec> + let scalar_arg_refs = scalar_args + .iter() + .map(|arg| arg.as_ref()) + .collect::>(); + + // Call the implementation + let maybe_return_type = self + .inner + .return_type_from_args_and_scalars(&args, &scalar_arg_refs)?; + + // Convert the result to FFI_ArrowSchema (if not None) + let return_ffi_schema = if let Some(return_type) = &maybe_return_type { + let return_field = return_type.to_storage_field("", true)?; + let return_ffi_schema = FFI_ArrowSchema::try_from(&return_field)?; + Some(return_ffi_schema) + } else { + None + }; + + // Save the argument types and return type for following calls to execute() + self.last_arg_types.replace(args); + self.last_return_type = maybe_return_type; + + Ok(return_ffi_schema) + } + + fn execute(&self, ffi_args: &[*mut FFI_ArrowArray], num_rows: i64) -> Result { + match (&self.last_arg_types, &self.last_return_type) { + (Some(arg_types), Some(return_type)) => { + // Resolve args as Vec + let arg_arrays = zip(ffi_args, arg_types) + .map(|(ptr, arg)| { + let owned_ffi_array = unsafe { FFI_ArrowArray::from_raw(*ptr) }; + let data = unsafe { + from_ffi_and_data_type(owned_ffi_array, arg.storage_type().clone())? + }; + Ok(make_array(data)) + }) + .collect::, ArrowError>>()?; + + // Resolve args as Vec + let args = arg_arrays + .into_iter() + .map(|array| { + if array.len() as i64 == num_rows { + Ok(ColumnarValue::Array(array)) + } else { + Ok(ColumnarValue::Scalar(ScalarValue::try_from_array( + &array, 0, + )?)) + } + }) + .collect::>>()?; + + // Call the implementation + let result_value = self.inner.invoke_batch_from_args( + arg_types, + &args, + return_type, + num_rows as usize, + )?; + + // Convert the result to an ArrayRef + let result_array = match result_value { + ColumnarValue::Array(array) => array, + ColumnarValue::Scalar(scalar_value) => scalar_value.to_array()?, + }; + + // Convert the result to a FFI_ArrowArray + let result_ffi_array = FFI_ArrowArray::new(&result_array.to_data()); + Ok(result_ffi_array) + } + _ => { + sedona_internal_err!("Call to ExportedScalarKernel::execute() before init()") + } + } + } +} + +/// C callable wrapper around [ExportedScalarKernelImpl::init] +unsafe extern "C" fn c_kernel_init( + self_: *mut SedonaCScalarKernelImpl, + arg_types: *const *const FFI_ArrowSchema, + scalar_args: *const *mut FFI_ArrowArray, + n_args: i64, + out: *mut FFI_ArrowSchema, +) -> c_int { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl) + .as_mut() + .unwrap(); + + let ffi_types = std::slice::from_raw_parts(arg_types, n_args as usize); + let ffi_scalar_args = std::slice::from_raw_parts(scalar_args, n_args as usize); + + match private_data.init(ffi_types, ffi_scalar_args) { + Ok(Some(mut return_ffi_schema)) => { + swap_nonoverlapping(&mut return_ffi_schema as *mut _, out, 1); + 0 + } + Ok(None) => { + *out = FFI_ArrowSchema::empty(); + 0 + } + Err(err) => { + private_data.last_error = + CString::from_str(&err.message()).unwrap_or(CString::default()); + libc::EINVAL + } + } +} + +/// C callable wrapper around [ExportedScalarKernelImpl::execute] +unsafe extern "C" fn c_kernel_execute( + self_: *mut SedonaCScalarKernelImpl, + args: *const *mut FFI_ArrowArray, + n_args: i64, + n_rows: i64, + out: *mut FFI_ArrowArray, +) -> c_int { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl) + .as_mut() + .unwrap(); + + let ffi_args = std::slice::from_raw_parts(args, n_args as usize); + match private_data.execute(ffi_args, n_rows) { + Ok(mut ffi_array) => { + swap_nonoverlapping(&mut ffi_array as *mut _, out, 1); + 0 + } + Err(err) => { + private_data.last_error = + CString::from_str(&err.message()).unwrap_or(CString::default()); + libc::EINVAL + } + } +} + +/// C Callable wrapper to retrieve the last error string +unsafe extern "C" fn c_kernel_last_error(self_: *mut SedonaCScalarKernelImpl) -> *const c_char { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl) + .as_ref() + .unwrap(); + private_data.last_error.as_ptr() +} + +/// C Callable wrapper called when this value is dropped via FFI +unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarKernelImpl) { + assert!(!self_.is_null()); + let self_ref = self_.as_mut().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let boxed = Box::from_raw(self_ref.private_data as *mut ExportedScalarKernelImpl); + drop(boxed); + + self_ref.private_data = null_mut(); + self_ref.release = None; +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_schema::DataType; + use datafusion_common::exec_err; + use datafusion_expr::Volatility; + use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel}; + use sedona_schema::{datatypes::WKB_GEOMETRY, matchers::ArgMatcher}; + use sedona_testing::{create::create_array, testers::ScalarUdfTester}; + + use super::*; + + #[test] + fn ffi_roundtrip() { + let kernel = SimpleSedonaScalarKernel::new_ref( + ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY), + Arc::new(|_, args| Ok(args[0].clone())), + ); + + let array_value = create_array(&[Some("POINT (0 1)"), None], &WKB_GEOMETRY); + + let udf_native = SedonaScalarUDF::new( + "simple_udf", + vec![kernel.clone()], + Volatility::Immutable, + None, + ); + + let tester = ScalarUdfTester::new(udf_native.into(), vec![WKB_GEOMETRY]); + tester.assert_return_type(WKB_GEOMETRY); + + let result = tester.invoke_scalar("POINT (0 1)").unwrap(); + tester.assert_scalar_result_equals(result, "POINT (0 1)"); + + assert_eq!( + &tester.invoke_array(array_value.clone()).unwrap(), + &array_value + ); + + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + + let udf_from_ffi = SedonaScalarUDF::new( + "simple_udf_from_ffi", + vec![Arc::new(imported_kernel)], + Volatility::Immutable, + None, + ); + + let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), vec![WKB_GEOMETRY]); + ffi_tester.assert_return_type(WKB_GEOMETRY); + + let result = ffi_tester.invoke_scalar("POINT (0 1)").unwrap(); + ffi_tester.assert_scalar_result_equals(result, "POINT (0 1)"); + + assert_eq!( + &ffi_tester.invoke_array(array_value.clone()).unwrap(), + &array_value + ); + + // Check the case of a kernel that does not apply to input arguments + let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), vec![]); + let err = ffi_tester.return_type().unwrap_err(); + assert_eq!( + err.message(), + "simple_udf_from_ffi([]): No kernel matching arguments" + ); + } + + #[test] + fn named_kernel() { + let kernel = SimpleSedonaScalarKernel::new_ref( + ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY), + Arc::new(|_, args| Ok(args[0].clone())), + ); + + // Without intervening, we have a None name + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + assert!(imported_kernel.function_name().is_none()); + + // If we set a function name, it should be roundtripped + let exported_kernel = + ExportedScalarKernel::from(kernel.clone()).with_function_name("foofy"); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + assert_eq!(imported_kernel.function_name(), Some("foofy")); + } + + #[test] + fn invoke_batch_from_scalar() { + let kernel = Arc::new(ReturnTypeFromScalars {}) as ScalarKernelRef; + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + + let udf_from_ffi = SedonaScalarUDF::new( + "simple_udf_from_ffi", + vec![Arc::new(imported_kernel)], + Volatility::Immutable, + None, + ); + + let ffi_tester = ScalarUdfTester::new( + udf_from_ffi.clone().into(), + vec![SedonaType::Arrow(DataType::Utf8)], + ); + let return_type = ffi_tester.return_type_with_scalar(Some("foofy")).unwrap(); + assert_eq!(return_type, SedonaType::Arrow(DataType::Utf8)); + } + + #[derive(Debug)] + struct ReturnTypeFromScalars {} + + impl SedonaScalarKernel for ReturnTypeFromScalars { + fn return_type_from_args_and_scalars( + &self, + _args: &[SedonaType], + scalar_args: &[Option<&ScalarValue>], + ) -> Result> { + if let Some(arg0) = scalar_args[0] { + Ok(Some(SedonaType::Arrow(arg0.data_type()))) + } else { + Ok(None) + } + } + + fn return_type(&self, _args: &[SedonaType]) -> Result> { + unreachable!() + } + + fn invoke_batch( + &self, + _arg_types: &[SedonaType], + _args: &[ColumnarValue], + ) -> Result { + unreachable!() + } + } + + #[test] + fn erroring_invoke_batch() { + let kernel = SimpleSedonaScalarKernel::new_ref( + ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY), + Arc::new(|_, _args| exec_err!("this invoke_batch() always errors")), + ); + + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + + let udf_from_ffi = SedonaScalarUDF::new( + "simple_udf_from_ffi", + vec![Arc::new(imported_kernel)], + Volatility::Immutable, + None, + ); + + let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), vec![WKB_GEOMETRY]); + ffi_tester.assert_return_type(WKB_GEOMETRY); + + let err = ffi_tester.invoke_scalar("POINT (0 1)").unwrap_err(); + assert_eq!( + err.message(), + "SedonaCScalarKernelImpl::execute failed: this invoke_batch() always errors" + ); + } + + #[test] + fn erroring_return_type() { + let kernel = Arc::new(ErroringReturnType {}) as ScalarKernelRef; + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + + let udf_from_ffi = SedonaScalarUDF::new( + "simple_udf_from_ffi", + vec![Arc::new(imported_kernel)], + Volatility::Immutable, + None, + ); + + let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), vec![WKB_GEOMETRY]); + let err = ffi_tester.return_type().unwrap_err(); + assert_eq!( + err.message(), + "SedonaCScalarKernelImpl::init failed: this implementation of return_type always errors" + ); + } + + #[derive(Debug)] + struct ErroringReturnType {} + + impl SedonaScalarKernel for ErroringReturnType { + fn return_type(&self, _args: &[SedonaType]) -> Result> { + plan_err!("this implementation of return_type always errors") + } + + fn invoke_batch( + &self, + _arg_types: &[SedonaType], + _args: &[ColumnarValue], + ) -> Result { + unreachable!() + } + } +} diff --git a/c/sedona-extension/src/sedona_extension.h b/c/sedona-extension/src/sedona_extension.h new file mode 100644 index 000000000..7191af21f --- /dev/null +++ b/c/sedona-extension/src/sedona_extension.h @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef SEDONA_EXTENSION_H +#define SEDONA_EXTENSION_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +// Extra guard for versions of Arrow without the canonical guard +#ifndef ARROW_FLAG_DICTIONARY_ORDERED + +#ifndef ARROW_C_DATA_INTERFACE +#define ARROW_C_DATA_INTERFACE + +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +struct ArrowSchema { + // Array type description + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct ArrowSchema** children; + struct ArrowSchema* dictionary; + + // Release callback + void (*release)(struct ArrowSchema*); + // Opaque producer-specific data + void* private_data; +}; + +struct ArrowArray { + // Array data description + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct ArrowArray** children; + struct ArrowArray* dictionary; + + // Release callback + void (*release)(struct ArrowArray*); + // Opaque producer-specific data + void* private_data; +}; + +#endif // ARROW_C_DATA_INTERFACE + +#ifndef ARROW_C_STREAM_INTERFACE +#define ARROW_C_STREAM_INTERFACE + +struct ArrowArrayStream { + // Callback to get the stream type + // (will be the same for all arrays in the stream). + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + + // Callback to get the next array + // (if no error and the array is released, the stream has ended) + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowArray must be released independently from the stream. + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + + // Callback to get optional detailed error information. + // This must only be called if the last stream operation failed + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback: release the stream's own resources. + // Note that arrays returned by `get_next` must be individually released. + void (*release)(struct ArrowArrayStream*); + + // Opaque producer-specific data + void* private_data; +}; + +#endif // ARROW_C_STREAM_INTERFACE +#endif // ARROW_FLAG_DICTIONARY_ORDERED + +/// \brief Simple ABI-stable scalar function implementation +/// +/// This object is not thread safe: callers must take care to serialize +/// access to methods if an instance is shared across threads. In general, +/// constructing and initializing this structure should be sufficiently +/// cheap that it shouldn't need to be shared in this way. +/// +/// Briefly, the SedonaCScalarKernelImpl is typically the stack-allocated +/// structure that is not thread safe and the SedonaCScalarKernel is the +/// value that lives in a registry (whose job it is to initialize implementations +/// on each stack that needs one). +struct SedonaCScalarKernelImpl { + /// \brief Initialize the state of this instance and calculate a return type + /// + /// The init callback either computes a return ArrowSchema or initializes the + /// return ArrowSchema to an explicitly released value to indicate that this + /// implementation does not apply to the arguments passed. An implementation + /// that does not apply to the arguments passed is not necessarily an error + /// (there may be another implementation prepared to handle such a case). + /// + /// \param arg_types Argument types + /// \param scalar_args An optional array of scalar arguments. The entire + /// array may be null to indicate that none of the arguments are scalars, or + /// individual items in the array may be NULL to indicate that a particular + /// argument is not a scalar. Any non-NULL arrays must be of length 1. + /// Implementations MAY take ownership over the elements of scalar_args but + /// are not required to do so (i.e., caller must check if these elements were + /// released, and must release them if needed). + /// \param n_args Number of elements in the arg_types and/or scalar_args arrays. + /// \param out Will be populated with the return type on success, or initialized + /// to a released value if this implementation does not apply to the arguments + /// passed. + /// + /// \return An errno-compatible error code, or zero on success. + int (*init)(struct SedonaCScalarKernelImpl* self, + const struct ArrowSchema* const* arg_types, + struct ArrowArray* const* scalar_args, int64_t n_args, + struct ArrowSchema* out); + + /// \brief Execute a single batch + /// + /// \param args Input arguments. Input must be length one (e.g., a scalar) + /// or the size of the batch. Implementations must handle scalar or array + /// inputs. + /// \param n_args The number of pointers in args + /// \param out Will be populated with the result on success. + int (*execute)(struct SedonaCScalarKernelImpl* self, struct ArrowArray* const* args, + int64_t n_args, int64_t n_rows, struct ArrowArray* out); + + /// \brief Get the last error message + /// + /// The result is valid until the next call to a UDF method. + const char* (*get_last_error)(struct SedonaCScalarKernelImpl* self); + + /// \brief Release this instance + /// + /// Implementations of this callback must set self->release to NULL. + void (*release)(struct SedonaCScalarKernelImpl* self); + + /// \brief Opaque implementation-specific data + void* private_data; +}; + +/// \brief Scalar function/kernel initializer +/// +/// Usually a SedonaCScalarKernelImpl will be used to execute a single batch +/// (although it may be reused if a caller can serialize callback use). This +/// structure is a factory object that initializes such objects. The +/// SedonaCScalarKernel is designed to be thread-safe and live in a registry. +struct SedonaCScalarKernel { + /// \brief Function name + /// + /// Optional function name. This is used to register the kernel with the + /// appropriate function when passing this kernel across a boundary. + const char* (*function_name)(const struct SedonaCScalarKernel* self); + + /// \brief Initialize a new implementation struct + /// + /// This callback is thread safe and may be called concurrently from any + /// thread at any time (as long as this object is valid). + void (*new_impl)(const struct SedonaCScalarKernel* self, + struct SedonaCScalarKernelImpl* out); + + /// \brief Release this instance + /// + /// Implementations of this callback must set self->release to NULL. + void (*release)(struct SedonaCScalarKernel* self); + + /// \brief Opaque implementation-specific data + void* private_data; +}; + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/rust/sedona/src/ffi.rs b/rust/sedona/src/ffi.rs deleted file mode 100644 index e1cb1f7bf..000000000 --- a/rust/sedona/src/ffi.rs +++ /dev/null @@ -1,515 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -use std::{any::Any, sync::Arc}; - -use abi_stable::StableAbi; -use arrow_schema::{DataType, Field, FieldRef, Schema}; -use datafusion::physical_plan::{expressions::Column, PhysicalExpr}; -use datafusion_common::{config::ConfigOptions, DataFusionError, Result, ScalarValue}; -use datafusion_expr::{ - function::{AccumulatorArgs, StateFieldsArgs}, - Accumulator, AggregateUDF, AggregateUDFImpl, ColumnarValue, ReturnFieldArgs, - ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, -}; -use datafusion_ffi::{ - udaf::{FFI_AggregateUDF, ForeignAggregateUDF}, - udf::{FFI_ScalarUDF, ForeignScalarUDF}, -}; -use sedona_common::sedona_internal_err; -use sedona_schema::datatypes::SedonaType; - -use sedona_expr::{ - aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef}, - scalar_udf::{ScalarKernelRef, SedonaScalarKernel}, -}; - -/// A stable struct for sharing [SedonaScalarKernel]s across FFI boundaries -/// -/// The primary interface for importing or exporting these is `.from()` -/// and `.into()` between the [FFI_SedonaScalarKernel] and the [ScalarKernelRef]. -/// -/// Internally this struct uses the [FFI_ScalarUDF] from DataFusion's FFI -/// library to avoid having to invent an FFI ourselves. Like the [FFI_ScalarUDF], -/// this struct is only convenient to use when the libraries on both sides of -/// a boundary are written in Rust. Because Rust makes it relatively easy to -/// wrap C or C++ libraries, this should not be a barrier for most types of -/// kernels we might want to implement; however, it is also an option to -/// create our own FFI using simpler primitives if using DataFusion's -/// introduces performance or implementation issues. -#[repr(C)] -#[derive(Debug, StableAbi)] -#[allow(non_camel_case_types)] -pub struct FFI_SedonaScalarKernel { - inner: FFI_ScalarUDF, -} - -impl From for FFI_SedonaScalarKernel { - fn from(value: ScalarKernelRef) -> Self { - let exported = ScalarUDF::new_from_impl(ExportedScalarKernel::from(value)); - FFI_SedonaScalarKernel { - inner: Arc::new(exported).into(), - } - } -} - -impl TryFrom<&FFI_SedonaScalarKernel> for ScalarKernelRef { - type Error = DataFusionError; - - fn try_from(value: &FFI_SedonaScalarKernel) -> Result { - Ok(Arc::new(ImportedScalarKernel::try_from(value)?)) - } -} - -impl TryFrom for ScalarKernelRef { - type Error = DataFusionError; - - fn try_from(value: FFI_SedonaScalarKernel) -> Result { - Self::try_from(&value) - } -} - -#[derive(Debug)] -struct ExportedScalarKernel { - name: String, - signature: Signature, - sedona_impl: ScalarKernelRef, -} - -impl PartialEq for ExportedScalarKernel { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - } -} - -impl Eq for ExportedScalarKernel {} - -impl std::hash::Hash for ExportedScalarKernel { - fn hash(&self, state: &mut H) { - self.name.hash(state); - } -} - -impl From for ExportedScalarKernel { - fn from(value: ScalarKernelRef) -> Self { - Self { - name: "ExportedScalarKernel".to_string(), - signature: Signature::any(0, datafusion_expr::Volatility::Volatile), - sedona_impl: value, - } - } -} - -impl ScalarUDFImpl for ExportedScalarKernel { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn name(&self) -> &str { - &self.name - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, _arg_types: &[DataType]) -> Result { - sedona_internal_err!("should not be called") - } - - fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { - let sedona_types = args - .arg_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - match self.sedona_impl.return_type(&sedona_types)? { - Some(output_type) => Ok(output_type.to_storage_field("", true)?.into()), - // Sedona kernels return None to indicate the kernel doesn't apply to the inputs, - // but the ScalarUDFImpl doesn't have a way to natively indicate that. We use - // NotImplemented with a special message and catch it on the other side. - None => Err(DataFusionError::NotImplemented( - "::kernel does not match input args::".to_string(), - )), - } - } - - fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let sedona_types = args - .arg_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - self.sedona_impl.invoke_batch(&sedona_types, &args.args) - } -} - -#[derive(Debug)] -struct ImportedScalarKernel { - udf_impl: ScalarUDF, -} - -impl TryFrom<&FFI_SedonaScalarKernel> for ImportedScalarKernel { - type Error = DataFusionError; - - fn try_from(value: &FFI_SedonaScalarKernel) -> Result { - let wrapped = ForeignScalarUDF::try_from(&value.inner)?; - Ok(Self { - udf_impl: ScalarUDF::new_from_impl(wrapped), - }) - } -} - -impl SedonaScalarKernel for ImportedScalarKernel { - fn return_type(&self, args: &[SedonaType]) -> Result> { - let df_args = ReturnFieldArgs { - arg_fields: &args - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?, - scalar_arguments: &[], - }; - match self.udf_impl.return_field_from_args(df_args) { - Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)), - Err(err) => { - if matches!(err, DataFusionError::NotImplemented(_)) { - Ok(None) - } else { - Err(err) - } - } - } - } - - fn invoke_batch( - &self, - arg_types: &[SedonaType], - args: &[ColumnarValue], - ) -> Result { - let arg_rows = Self::output_size(args); - - let scalar_fn_args = ScalarFunctionArgs { - args: args.to_vec(), - arg_fields: arg_types - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?, - number_rows: arg_rows.unwrap_or(1), - // Wrapper code on the other side of this doesn't use this value - return_field: Field::new("", DataType::Null, true).into(), - config_options: Arc::new(ConfigOptions::default()), - }; - - // DataFusion's FFI_ScalarUDF always returns array output but - // our original UDFs were careful to return ScalarValues. - match self.udf_impl.invoke_with_args(scalar_fn_args)? { - ColumnarValue::Array(array) => match arg_rows { - Some(_) => Ok(ColumnarValue::Array(array)), - None => Ok(ColumnarValue::Scalar(ScalarValue::try_from_array( - &array, 0, - )?)), - }, - ColumnarValue::Scalar(scalar_value) => { - // This branch is probably never taken but may in the future - Ok(ColumnarValue::Scalar(scalar_value)) - } - } - } -} - -impl ImportedScalarKernel { - fn output_size(args: &[ColumnarValue]) -> Option { - for original_arg in args { - if let ColumnarValue::Array(array) = original_arg { - return Some(array.len()); - } - } - None - } -} - -/// A stable struct for sharing [SedonaAccumulator]s across FFI boundaries -/// -/// The primary interface for importing or exporting these is `.from()` -/// and `.into()` between the [FFI_SedonaAggregateKernel] and the [SedonaAccumulatorRef]. -/// -/// Internally this struct uses the [FFI_AggregateUDF] from DataFusion's FFI -/// library to avoid having to invent an FFI ourselves. See [FFI_SedonaScalarKernel] -/// for general information about the rationale and usage of FFI implementations. -#[repr(C)] -#[derive(Debug, StableAbi)] -#[allow(non_camel_case_types)] -pub struct FFI_SedonaAggregateKernel { - inner: FFI_AggregateUDF, -} - -impl From for FFI_SedonaAggregateKernel { - fn from(value: SedonaAccumulatorRef) -> Self { - let exported: AggregateUDF = ExportedSedonaAccumulator::from(value).into(); - FFI_SedonaAggregateKernel { - inner: Arc::new(exported).into(), - } - } -} - -impl TryFrom<&FFI_SedonaAggregateKernel> for SedonaAccumulatorRef { - type Error = DataFusionError; - - fn try_from(value: &FFI_SedonaAggregateKernel) -> Result { - Ok(Arc::new(ImportedSedonaAccumulator::try_from(value)?)) - } -} - -impl TryFrom for SedonaAccumulatorRef { - type Error = DataFusionError; - - fn try_from(value: FFI_SedonaAggregateKernel) -> Result { - Self::try_from(&value) - } -} - -#[derive(Debug)] -struct ExportedSedonaAccumulator { - name: String, - signature: Signature, - sedona_impl: SedonaAccumulatorRef, -} - -impl PartialEq for ExportedSedonaAccumulator { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - } -} - -impl Eq for ExportedSedonaAccumulator {} - -impl std::hash::Hash for ExportedSedonaAccumulator { - fn hash(&self, state: &mut H) { - self.name.hash(state); - } -} - -impl From for ExportedSedonaAccumulator { - fn from(value: SedonaAccumulatorRef) -> Self { - Self { - name: "ExportedSedonaAccumulator".to_string(), - signature: Signature::any(0, datafusion_expr::Volatility::Volatile), - sedona_impl: value, - } - } -} - -impl AggregateUDFImpl for ExportedSedonaAccumulator { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - &self.name - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_field(&self, arg_fields: &[FieldRef]) -> Result { - let sedona_types = arg_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - match self.sedona_impl.return_type(&sedona_types)? { - Some(output_type) => Ok(Arc::new(output_type.to_storage_field("", true)?)), - // Sedona kernels return None to indicate the kernel doesn't apply to the inputs, - // but the ScalarUDFImpl doesn't have a way to natively indicate that. We use - // NotImplemented with a special message and catch it on the other side. - None => Err(DataFusionError::NotImplemented( - "::kernel does not match input args::".to_string(), - )), - } - } - - fn return_type(&self, _arg_types: &[DataType]) -> Result { - sedona_internal_err!("This should not be called (use return_field())") - } - - fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - let arg_fields = acc_args - .exprs - .iter() - .map(|expr| expr.return_field(acc_args.schema)) - .collect::>>()?; - let sedona_types = arg_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - if let Some(output_type) = self.sedona_impl.return_type(&sedona_types)? { - self.sedona_impl.accumulator(&sedona_types, &output_type) - } else { - Err(DataFusionError::NotImplemented( - "::kernel does not match input args::".to_string(), - )) - } - } - - fn state_fields(&self, args: StateFieldsArgs) -> Result> { - let sedona_types = args - .input_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - self.sedona_impl.state_fields(&sedona_types) - } -} - -#[derive(Debug)] -struct ImportedSedonaAccumulator { - aggregate_impl: AggregateUDF, -} - -impl TryFrom<&FFI_SedonaAggregateKernel> for ImportedSedonaAccumulator { - type Error = DataFusionError; - - fn try_from(value: &FFI_SedonaAggregateKernel) -> Result { - let wrapped = ForeignAggregateUDF::try_from(&value.inner)?; - Ok(Self { - aggregate_impl: wrapped.into(), - }) - } -} - -impl SedonaAccumulator for ImportedSedonaAccumulator { - fn return_type(&self, args: &[SedonaType]) -> Result> { - let arg_fields = args - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?; - - match self.aggregate_impl.return_field(&arg_fields) { - Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)), - Err(err) => { - if matches!(err, DataFusionError::NotImplemented(_)) { - Ok(None) - } else { - Err(err) - } - } - } - } - - fn accumulator( - &self, - args: &[SedonaType], - output_type: &SedonaType, - ) -> Result> { - let arg_fields = args - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?; - let mock_schema = Schema::new(arg_fields); - let exprs = (0..mock_schema.fields().len()) - .map(|i| -> Arc { Arc::new(Column::new("col", i)) }) - .collect::>(); - - let return_field = output_type.to_storage_field("", true)?; - - let args = AccumulatorArgs { - return_field: return_field.into(), - schema: &mock_schema, - ignore_nulls: true, - order_bys: &[], - is_reversed: false, - name: "", - is_distinct: false, - exprs: &exprs, - }; - - self.aggregate_impl.accumulator(args) - } - - fn state_fields(&self, args: &[SedonaType]) -> Result> { - let arg_fields = args - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?; - - let state_field_args = StateFieldsArgs { - name: "", - input_fields: &arg_fields, - return_field: Arc::new(Field::new("", DataType::Null, false)), - ordering_fields: &[], - is_distinct: false, - }; - - self.aggregate_impl.state_fields(state_field_args) - } -} - -#[cfg(test)] -mod test { - use datafusion_expr::Volatility; - use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel}; - use sedona_schema::{datatypes::WKB_GEOMETRY, matchers::ArgMatcher}; - use sedona_testing::{create::create_array, testers::ScalarUdfTester}; - - use super::*; - - #[test] - fn ffi_roundtrip() { - let kernel = SimpleSedonaScalarKernel::new_ref( - ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY), - Arc::new(|_, args| Ok(args[0].clone())), - ); - - let array_value = create_array(&[Some("POINT (0 1)"), None], &WKB_GEOMETRY); - - let udf_native = SedonaScalarUDF::new( - "simple_udf", - vec![kernel.clone()], - Volatility::Immutable, - None, - ); - - let tester = ScalarUdfTester::new(udf_native.into(), vec![WKB_GEOMETRY]); - tester.assert_return_type(WKB_GEOMETRY); - - let result = tester.invoke_scalar("POINT (0 1)").unwrap(); - tester.assert_scalar_result_equals(result, "POINT (0 1)"); - - assert_eq!( - &tester.invoke_array(array_value.clone()).unwrap(), - &array_value - ); - - let ffi_kernel = FFI_SedonaScalarKernel::from(kernel.clone()); - let udf_from_ffi = SedonaScalarUDF::new( - "simple_udf_from_ffi", - vec![ffi_kernel.try_into().unwrap()], - Volatility::Immutable, - None, - ); - - let ffi_tester = ScalarUdfTester::new(udf_from_ffi.into(), vec![WKB_GEOMETRY]); - ffi_tester.assert_return_type(WKB_GEOMETRY); - - let result = ffi_tester.invoke_scalar("POINT (0 1)").unwrap(); - ffi_tester.assert_scalar_result_equals(result, "POINT (0 1)"); - - assert_eq!( - &ffi_tester.invoke_array(array_value.clone()).unwrap(), - &array_value - ); - } -} diff --git a/rust/sedona/src/lib.rs b/rust/sedona/src/lib.rs index 52b543874..9d18064cc 100644 --- a/rust/sedona/src/lib.rs +++ b/rust/sedona/src/lib.rs @@ -17,7 +17,6 @@ mod catalog; pub mod context; mod exec; -pub mod ffi; mod object_storage; pub mod random_geometry_provider; pub mod reader;