From e1e21e3ebce3e2365de1117cef6f9cf0cc9117ca Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Dec 2025 14:42:00 -0600 Subject: [PATCH 01/20] add c extension crate --- Cargo.lock | 9 ++ Cargo.toml | 1 + c/sedona-extension/Cargo.toml | 33 ++++ c/sedona-extension/src/extension.rs | 49 ++++++ c/sedona-extension/src/lib.rs | 19 +++ c/sedona-extension/src/scalar_kernel.rs | 30 ++++ c/sedona-extension/src/sedona_extension.h | 181 ++++++++++++++++++++++ 7 files changed, 322 insertions(+) create mode 100644 c/sedona-extension/Cargo.toml create mode 100644 c/sedona-extension/src/extension.rs create mode 100644 c/sedona-extension/src/lib.rs create mode 100644 c/sedona-extension/src/scalar_kernel.rs create mode 100644 c/sedona-extension/src/sedona_extension.h diff --git a/Cargo.lock b/Cargo.lock index e0b82186a..e9d04c19a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4938,6 +4938,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "sedona-extension" +version = "0.2.0" +dependencies = [ + "arrow-array", + "arrow-schema", + "sedona-expr", +] + [[package]] name = "sedona-functions" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 2f7e60571..a0c62fa9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ # under the License. [workspace] members = [ + "c/sedona-extension", "c/sedona-geoarrow-c", "c/sedona-geos", "c/sedona-proj", diff --git a/c/sedona-extension/Cargo.toml b/c/sedona-extension/Cargo.toml new file mode 100644 index 000000000..af3137025 --- /dev/null +++ b/c/sedona-extension/Cargo.toml @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "sedona-extension" +version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description.workspace = true +readme.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +arrow-array = { workspace = true, features = ["ffi"]} +arrow-schema = { workspace = true, features = ["ffi"]} +sedona-expr = { workspace = true } diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs new file mode 100644 index 000000000..ff70f06d1 --- /dev/null +++ b/c/sedona-extension/src/extension.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::os::raw::{c_char, c_void}; + +use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; + +#[repr(C)] +pub struct SedonaCScalarUdf { + pub init: Option< + unsafe extern "C" fn( + self_: *mut SedonaCScalarUdf, + arg_types: *const *const FFI_ArrowSchema, + scalar_args: *mut *mut FFI_ArrowArray, + n_args: i64, + out: *mut FFI_ArrowSchema, + ) -> i32, + >, + + pub execute: Option< + unsafe extern "C" fn( + self_: *mut SedonaCScalarUdf, + args: *mut *mut FFI_ArrowArray, + n_args: i64, + n_rows: i64, + out: *mut FFI_ArrowArray, + ) -> i32, + >, + + pub get_last_error: Option *const c_char>, + + pub release: Option, + + pub private_data: *mut c_void, +} diff --git a/c/sedona-extension/src/lib.rs b/c/sedona-extension/src/lib.rs new file mode 100644 index 000000000..073ed939b --- /dev/null +++ b/c/sedona-extension/src/lib.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod extension; +pub mod scalar_kernel; diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs new file mode 100644 index 000000000..830a039c8 --- /dev/null +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::extension::SedonaCScalarUdf; + +pub struct ExtensionSedonaScalarKernel { + inner: SedonaCScalarUdf, +} + +impl Drop for ExtensionSedonaScalarKernel { + fn drop(&mut self) { + if let Some(releaser) = self.inner.release { + unsafe { releaser(&mut self.inner) } + } + } +} diff --git a/c/sedona-extension/src/sedona_extension.h b/c/sedona-extension/src/sedona_extension.h new file mode 100644 index 000000000..4f8fe58c8 --- /dev/null +++ b/c/sedona-extension/src/sedona_extension.h @@ -0,0 +1,181 @@ + +#include + +#ifndef SEDONA_EXTENSION_H +#define SEDONA_EXTENSION_H + +#ifdef __cplusplus +extern "C" { +#endif + +// Extra guard for versions of Arrow without the canonical guard +#ifndef ARROW_FLAG_DICTIONARY_ORDERED + +#ifndef ARROW_C_DATA_INTERFACE +#define ARROW_C_DATA_INTERFACE + +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +struct ArrowSchema { + // Array type description + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct ArrowSchema** children; + struct ArrowSchema* dictionary; + + // Release callback + void (*release)(struct ArrowSchema*); + // Opaque producer-specific data + void* private_data; +}; + +struct ArrowArray { + // Array data description + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct ArrowArray** children; + struct ArrowArray* dictionary; + + // Release callback + void (*release)(struct ArrowArray*); + // Opaque producer-specific data + void* private_data; +}; + +#endif // ARROW_C_DATA_INTERFACE + +#ifndef ARROW_C_STREAM_INTERFACE +#define ARROW_C_STREAM_INTERFACE + +struct ArrowArrayStream { + // Callback to get the stream type + // (will be the same for all arrays in the stream). + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + + // Callback to get the next array + // (if no error and the array is released, the stream has ended) + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowArray must be released independently from the stream. + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + + // Callback to get optional detailed error information. + // This must only be called if the last stream operation failed + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback: release the stream's own resources. + // Note that arrays returned by `get_next` must be individually released. + void (*release)(struct ArrowArrayStream*); + + // Opaque producer-specific data + void* private_data; +}; + +#endif // ARROW_C_STREAM_INTERFACE +#endif // ARROW_FLAG_DICTIONARY_ORDERED + +/// \brief Simple ABI-stable scalar function implementation +/// +/// This object is not thread safe: callers must take care to serialize +/// access to methods if an instance is shared across threads. In general, +/// constructing and initializing this structure should be sufficiently +/// cheap that it shouldn't need to be shared in this way. +struct SedonaCScalarUdf { + /// \brief Initialize the state of this UDF instance and calculate a return + /// type + /// + /// The init callback either computes a return ArrowSchema or initializes the + /// return ArrowSchema to an explicitly released value to indicate that this + /// implementation does not apply to the arguments passed. An implementation + /// that does not apply to the arguments passed is not necessarily an error + /// (there may be another implementation prepared to handle such a case). + /// + /// \param arg_types Argument types + /// \param scalar_args An optional array of scalar arguments. The entire + /// array may be null to indicate that none of the arguments are scalars, or + /// individual items in the array may be NULL to indicate that a particular + /// argument is not a scalar. Any non-NULL arrays must be of length 1. + /// Implementations MAY take ownership over the elements of scalar_args but + /// are not required to do so (i.e., caller must check if these elements were + /// released, and must release them if needed). + /// \param n_args Number of elements in the arg_types and/or scalar_args arrays. + /// \param out Will be populated with the return type on success, or initialized + /// to a released value if this implementation does not apply to the arguments + /// passed. + /// + /// \return An errno-compatible error code, or zero on success. + int (*init)(struct SedonaCScalarUdf* self, const struct ArrowSchema** arg_types, + struct ArrowArray** scalar_args, int64_t n_args, struct ArrowSchema* out); + + /// \brief Execute a single batch + /// + /// \param args Input arguments. Input must be length one (e.g., a scalar) + /// or the size of the batch. Implementations must handle scalar or array + /// inputs. + /// \param n_args The number of pointers in args + /// \param out Will be populated with the result on success. + int (*execute)(struct SedonaCScalarUdf* self, struct ArrowArray** args, int64_t n_args, + int64_t n_rows, struct ArrowArray* out); + + /// \brief Get the last error message + /// + /// The result is valid until the next call to a UDF method. + const char* (*get_last_error)(struct SedonaCScalarUdf* self); + + /// \brief Release this instance + /// + /// Implementations of this callback must set self->release to NULL. + void (*release)(struct SedonaCScalarUdf* self); + + /// \brief Opaque implementation-specific data + void* private_data; +}; + +/// \brief Scalar function initializer +/// +/// Usually a GeoArrowScalarUdf will be used to execute a single batch +/// (although it may be reused if a caller can serialize callback use). This +/// structure is a factory object that initializes such objects. +struct SedonaCScalarUdfFactory { + /// \brief Initialize a new implementation struct + /// + /// This callback is thread safe and may be called concurrently from any + /// thread at any time (as long as this object is valid). + void (*new_scalar_udf_impl)(struct SedonaCScalarUdfFactory* self, + struct SedonaCScalarUdf* out); + + /// \brief Release this instance + /// + /// Implementations of this callback must set self->release to NULL. + void (*release)(struct SedonaCScalarUdfFactory* self); + + /// \brief Opaque implementation-specific data + void* private_data; +}; + +#ifdef __cplusplus +} +#endif + +#endif From fbba325d594428437c36584451e895940eebd69d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Dec 2025 17:01:04 -0600 Subject: [PATCH 02/20] maybe extension --- Cargo.lock | 4 + c/sedona-extension/Cargo.toml | 4 + c/sedona-extension/src/extension.rs | 41 ++++- c/sedona-extension/src/scalar_kernel.rs | 234 +++++++++++++++++++++++- 4 files changed, 273 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9d04c19a..165b19de7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4944,7 +4944,11 @@ version = "0.2.0" dependencies = [ "arrow-array", "arrow-schema", + "datafusion-common", + "datafusion-expr", + "sedona-common", "sedona-expr", + "sedona-schema", ] [[package]] diff --git a/c/sedona-extension/Cargo.toml b/c/sedona-extension/Cargo.toml index af3137025..3c031d40f 100644 --- a/c/sedona-extension/Cargo.toml +++ b/c/sedona-extension/Cargo.toml @@ -30,4 +30,8 @@ rust-version.workspace = true [dependencies] arrow-array = { workspace = true, features = ["ffi"]} arrow-schema = { workspace = true, features = ["ffi"]} +datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } +sedona-common = { workspace = true } sedona-expr = { workspace = true } +sedona-schema = { workspace = true } diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index ff70f06d1..25f2c3d21 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -15,10 +15,33 @@ // specific language governing permissions and limitations // under the License. -use std::os::raw::{c_char, c_void}; +use std::{ffi::c_int, os::raw::{c_char, c_void}}; use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +#[derive(Default)] +#[repr(C)] +pub struct SedonaCScalarUdfFactory { + pub new_scalar_udf_impl: Option< + unsafe extern "C" fn(self_: *const SedonaCScalarUdfFactory, out: *mut SedonaCScalarUdf), + >, + + release: Option, + private_data: *mut c_void, +} + +unsafe impl Send for SedonaCScalarUdfFactory {} +unsafe impl Sync for SedonaCScalarUdfFactory {} + +impl Drop for SedonaCScalarUdfFactory { + fn drop(&mut self) { + if let Some(releaser) = self.release { + unsafe { releaser(self) } + } + } +} + +#[derive(Default)] #[repr(C)] pub struct SedonaCScalarUdf { pub init: Option< @@ -28,7 +51,7 @@ pub struct SedonaCScalarUdf { scalar_args: *mut *mut FFI_ArrowArray, n_args: i64, out: *mut FFI_ArrowSchema, - ) -> i32, + ) -> c_int, >, pub execute: Option< @@ -38,12 +61,20 @@ pub struct SedonaCScalarUdf { n_args: i64, n_rows: i64, out: *mut FFI_ArrowArray, - ) -> i32, + ) -> c_int, >, pub get_last_error: Option *const c_char>, - pub release: Option, + release: Option, + + private_data: *mut c_void, +} - pub private_data: *mut c_void, +impl Drop for SedonaCScalarUdf { + fn drop(&mut self) { + if let Some(releaser) = self.release { + unsafe { releaser(self) } + } + } } diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index 830a039c8..37ff37db9 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -15,16 +15,240 @@ // specific language governing permissions and limitations // under the License. -use crate::extension::SedonaCScalarUdf; +use arrow_array::{ + ffi::{FFI_ArrowArray, FFI_ArrowSchema}, + ArrayRef, +}; +use arrow_schema::{ArrowError, Field}; +use datafusion_common::{plan_err, Result, ScalarValue}; +use datafusion_expr::ColumnarValue; +use sedona_common::sedona_internal_err; +use sedona_expr::scalar_udf::SedonaScalarKernel; +use sedona_schema::datatypes::SedonaType; +use std::{ + ffi::{c_int, CStr}, + fmt::Debug, + ptr::null_mut, +}; + +use crate::extension::{SedonaCScalarUdf, SedonaCScalarUdfFactory}; pub struct ExtensionSedonaScalarKernel { + inner: SedonaCScalarUdfFactory, +} + +impl Debug for ExtensionSedonaScalarKernel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExtensionSedonaScalarKernel") + .field("inner", &"") + .finish() + } +} + +impl SedonaScalarKernel for ExtensionSedonaScalarKernel { + fn return_type_from_args_and_scalars( + &self, + args: &[SedonaType], + scalar_args: &[Option<&ScalarValue>], + ) -> Result> { + let mut inner_impl = CScalarUdfWrapper::try_new(&self.inner)?; + inner_impl.init(args, scalar_args) + } + + fn invoke_batch_from_args( + &self, + arg_types: &[SedonaType], + args: &[ColumnarValue], + return_type: &SedonaType, + num_rows: usize, + ) -> Result { + let arg_scalars = args + .iter() + .map(|arg| { + if let ColumnarValue::Scalar(scalar) = arg { + Some(scalar) + } else { + None + } + }) + .collect::>(); + + let mut inner_impl = CScalarUdfWrapper::try_new(&self.inner)?; + inner_impl.init(arg_types, &arg_scalars)?; + let result_array = inner_impl.execute(args, return_type, num_rows)?; + for arg in args { + if let ColumnarValue::Array(_) = arg { + return Ok(ColumnarValue::Array(result_array)); + } + } + + if result_array.len() != 1 { + sedona_internal_err!( + "Expected scalar result but got result with length {}", + result_array.len() + ) + } else { + Ok(ColumnarValue::Scalar(ScalarValue::try_from_array( + &result_array, + 0, + )?)) + } + } + + fn return_type(&self, _args: &[SedonaType]) -> Result> { + sedona_internal_err!( + "Should not be called because return_type_from_args_and_scalars() is implemented" + ) + } + + fn invoke_batch( + &self, + _arg_types: &[SedonaType], + _args: &[ColumnarValue], + ) -> Result { + sedona_internal_err!("Should not be called because invoke_batch_from_args() is implemented") + } +} + +struct CScalarUdfWrapper { inner: SedonaCScalarUdf, } -impl Drop for ExtensionSedonaScalarKernel { - fn drop(&mut self) { - if let Some(releaser) = self.inner.release { - unsafe { releaser(&mut self.inner) } +impl CScalarUdfWrapper { + fn try_new(factory: &SedonaCScalarUdfFactory) -> Result { + if let Some(init) = factory.new_scalar_udf_impl { + let mut inner = SedonaCScalarUdf::default(); + unsafe { init(factory, &mut inner) }; + Ok(Self { inner }) + } else { + sedona_internal_err!("SedonaCScalarUdfFactory is not valid") + } + } + + fn init( + &mut self, + arg_types: &[SedonaType], + arg_scalars: &[Option<&ScalarValue>], + ) -> Result> { + if arg_types.len() != arg_scalars.len() { + return sedona_internal_err!("field/scalar lengths must be identical"); + } + + let arg_fields = arg_types + .iter() + .map(|sedona_type| sedona_type.to_storage_field("", true)) + .collect::>>()?; + let ffi_fields = arg_fields + .iter() + .map(FFI_ArrowSchema::try_from) + .collect::, ArrowError>>()?; + let ffi_field_ptrs = ffi_fields + .iter() + .map(|ffi_field| ffi_field as *const FFI_ArrowSchema) + .collect::>(); + + let mut ffi_scalars = arg_scalars + .iter() + .map(|maybe_scalar| -> Result> { + if let Some(scalar) = maybe_scalar { + let array = scalar.to_array()?; + Ok(Some(FFI_ArrowArray::new(&array.to_data()))) + } else { + Ok(None) + } + }) + .collect::>>()?; + + let mut ffi_scalar_ptrs = ffi_scalars + .iter_mut() + .map(|maybe_ffi_scalar| match maybe_ffi_scalar { + Some(ffi_scalar) => ffi_scalar as *mut FFI_ArrowArray, + None => null_mut(), + }) + .collect::>(); + + if let Some(init) = self.inner.init { + let mut ffi_out = FFI_ArrowSchema::empty(); + let code = unsafe { + init( + &mut self.inner, + ffi_field_ptrs.as_ptr(), + ffi_scalar_ptrs.as_mut_ptr(), + arg_types.len() as i64, + &mut ffi_out, + ) + }; + if code == 0 { + match Field::try_from(&ffi_out) { + Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)), + Err(_) => Ok(None), + } + } else { + plan_err!("SedonaCScalarUdf::init failed: {}", self.last_error(code)) + } + } else { + sedona_internal_err!("Invalid SedonaCScalarUdf") + } + } + + fn execute( + &mut self, + args: &[ColumnarValue], + return_type: &SedonaType, + num_rows: usize, + ) -> Result { + let arg_arrays = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(array) => Ok(array.clone()), + ColumnarValue::Scalar(scalar_value) => scalar_value.to_array(), + }) + .collect::>>()?; + let mut ffi_args = arg_arrays + .iter() + .map(|arg| FFI_ArrowArray::new(&arg.to_data())) + .collect::>(); + + if let Some(execute) = self.inner.execute { + let mut ffi_out = FFI_ArrowArray::empty(); + let code = unsafe { + execute( + &mut self.inner, + &mut ffi_args.as_mut_ptr(), + args.len() as i64, + num_rows as i64, + &mut ffi_out, + ) + }; + + if code == 0 { + let data = unsafe { + arrow_array::ffi::from_ffi_and_data_type( + ffi_out, + return_type.storage_type().clone(), + )? + }; + Ok(arrow_array::make_array(data)) + } else { + plan_err!("SedonaCScalarUdf::init failed: {}", self.last_error(code)) + } + } else { + sedona_internal_err!("Invalid SedonaCScalarUdf") + } + } + + fn last_error(&mut self, code: c_int) -> String { + if let Some(get_last_error) = self.inner.get_last_error { + let c_err = unsafe { get_last_error(&mut self.inner) }; + if c_err.is_null() { + format!("({code})") + } else { + unsafe { CStr::from_ptr(c_err) } + .to_string_lossy() + .into_owned() + } + } else { + "Invalid SedonaCScalarUdf".to_string() } } } From 66ce1a2ce1c5273ae5e2a4eea77e463d7d9b3744 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Dec 2025 22:56:08 -0600 Subject: [PATCH 03/20] maybe init --- Cargo.lock | 5 +- c/sedona-extension/Cargo.toml | 1 + c/sedona-extension/src/extension.rs | 4 +- c/sedona-extension/src/scalar_kernel.rs | 162 +++++++++++++++++++++++- 4 files changed, 163 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 165b19de7..dc41c9a92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3447,9 +3447,9 @@ checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" [[package]] name = "libc" -version = "0.2.176" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libloading" @@ -4946,6 +4946,7 @@ dependencies = [ "arrow-schema", "datafusion-common", "datafusion-expr", + "libc", "sedona-common", "sedona-expr", "sedona-schema", diff --git a/c/sedona-extension/Cargo.toml b/c/sedona-extension/Cargo.toml index 3c031d40f..25bfffc31 100644 --- a/c/sedona-extension/Cargo.toml +++ b/c/sedona-extension/Cargo.toml @@ -32,6 +32,7 @@ arrow-array = { workspace = true, features = ["ffi"]} arrow-schema = { workspace = true, features = ["ffi"]} datafusion-common = { workspace = true } datafusion-expr = { workspace = true } +libc = "0.2.178" sedona-common = { workspace = true } sedona-expr = { workspace = true } sedona-schema = { workspace = true } diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index 25f2c3d21..6b9868196 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -66,9 +66,9 @@ pub struct SedonaCScalarUdf { pub get_last_error: Option *const c_char>, - release: Option, + pub release: Option, - private_data: *mut c_void, + pub private_data: *mut c_void, } impl Drop for SedonaCScalarUdf { diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index 37ff37db9..e1e3e0929 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -16,19 +16,21 @@ // under the License. use arrow_array::{ - ffi::{FFI_ArrowArray, FFI_ArrowSchema}, - ArrayRef, + ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema}, + make_array, ArrayRef, }; use arrow_schema::{ArrowError, Field}; use datafusion_common::{plan_err, Result, ScalarValue}; use datafusion_expr::ColumnarValue; use sedona_common::sedona_internal_err; -use sedona_expr::scalar_udf::SedonaScalarKernel; +use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel}; use sedona_schema::datatypes::SedonaType; use std::{ - ffi::{c_int, CStr}, + ffi::{c_char, c_int, CStr, CString}, fmt::Debug, - ptr::null_mut, + iter::zip, + ptr::{null_mut, swap_nonoverlapping}, + str::FromStr, }; use crate::extension::{SedonaCScalarUdf, SedonaCScalarUdfFactory}; @@ -252,3 +254,153 @@ impl CScalarUdfWrapper { } } } + +struct ExportedScalarKernel { + inner: ScalarKernelRef, + last_arg_types: Option>, + last_return_type: Option, + last_error: CString, +} + +impl ExportedScalarKernel { + fn init( + &mut self, + ffi_types: &[*const FFI_ArrowSchema], + ffi_scalar_args: &[*mut FFI_ArrowArray], + ) -> Result> { + let arg_fields = ffi_types + .iter() + .map(|ptr| { + if let Some(ffi_schema) = unsafe { ptr.as_ref() } { + Field::try_from(ffi_schema) + } else { + Err(ArrowError::CDataInterface( + "FFI_ArrowSchema is NULL".to_string(), + )) + } + }) + .collect::, ArrowError>>()?; + let args = arg_fields + .iter() + .map(SedonaType::from_storage_field) + .collect::>>()?; + + let arg_arrays = zip(ffi_scalar_args, &args) + .map(|(ptr, arg)| { + if ptr.is_null() { + Ok(None) + } else { + let owned_ffi_array = unsafe { FFI_ArrowArray::from_raw(*ptr) }; + let data = unsafe { + from_ffi_and_data_type(owned_ffi_array, arg.storage_type().clone())? + }; + Ok(Some(make_array(data))) + } + }) + .collect::, ArrowError>>()?; + + let scalar_args = arg_arrays + .iter() + .map(|maybe_array| { + if let Some(array) = maybe_array { + Ok(Some(ScalarValue::try_from_array(array, 0)?)) + } else { + Ok(None) + } + }) + .collect::>>()?; + let scalar_arg_refs = scalar_args + .iter() + .map(|arg| arg.as_ref()) + .collect::>(); + + let maybe_return_type = self + .inner + .return_type_from_args_and_scalars(&args, &scalar_arg_refs)?; + let return_ffi_schema = if let Some(return_type) = &maybe_return_type { + let return_field = return_type.to_storage_field("", true)?; + let return_ffi_schema = FFI_ArrowSchema::try_from(&return_field)?; + Some(return_ffi_schema) + } else { + None + }; + + self.last_arg_types.replace(args); + self.last_return_type = maybe_return_type; + + Ok(return_ffi_schema) + } + + fn execute( + &self, + ffi_scalar_args: &[*mut FFI_ArrowArray], + num_rows: i64, + ) -> Result { + todo!() + } +} + +unsafe fn c_init( + self_: *mut SedonaCScalarUdf, + arg_types: *const *const FFI_ArrowSchema, + scalar_args: *mut *mut FFI_ArrowArray, + n_args: i64, + out: *mut FFI_ArrowSchema, +) -> c_int { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + .as_mut() + .unwrap(); + + let ffi_types = std::slice::from_raw_parts(arg_types, n_args as usize); + let ffi_scalar_args = std::slice::from_raw_parts(scalar_args, n_args as usize); + + match private_data.init(ffi_types, ffi_scalar_args) { + Ok(Some(mut return_ffi_schema)) => { + swap_nonoverlapping(&mut return_ffi_schema as *mut _, out, 1); + 0 + } + Ok(None) => { + *out = FFI_ArrowSchema::empty(); + 0 + } + Err(err) => { + private_data.last_error = + CString::from_str(&err.message()).unwrap_or(CString::default()); + libc::EINVAL + } + } +} + +fn c_execute( + self_: *mut SedonaCScalarUdf, + args: *mut *mut FFI_ArrowArray, + n_args: i64, + n_rows: i64, + out: *mut FFI_ArrowArray, +) -> c_int { + todo!() +} + +unsafe fn c_last_error(self_: *mut SedonaCScalarUdf) -> *const c_char { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + .as_ref() + .unwrap(); + private_data.last_error.as_ptr() +} + +unsafe fn c_release(self_: *mut SedonaCScalarUdf) { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + (self_ref.private_data as *mut ExportedScalarKernel).drop_in_place(); + *self_ = SedonaCScalarUdf::default(); +} From c04a51bd9128c51bbde3e61130bf2ee1376d0ea1 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 2 Dec 2025 23:26:46 -0600 Subject: [PATCH 04/20] maybe working --- c/sedona-extension/src/scalar_kernel.rs | 105 +++++++++++++++++++++--- 1 file changed, 92 insertions(+), 13 deletions(-) diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index e1e3e0929..8ec223020 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -26,7 +26,7 @@ use sedona_common::sedona_internal_err; use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel}; use sedona_schema::datatypes::SedonaType; use std::{ - ffi::{c_char, c_int, CStr, CString}, + ffi::{c_char, c_int, c_void, CStr, CString}, fmt::Debug, iter::zip, ptr::{null_mut, swap_nonoverlapping}, @@ -255,14 +255,36 @@ impl CScalarUdfWrapper { } } -struct ExportedScalarKernel { +pub struct ExportedScalarKernel { inner: ScalarKernelRef, last_arg_types: Option>, last_return_type: Option, last_error: CString, } +impl From for SedonaCScalarUdf { + fn from(value: ExportedScalarKernel) -> Self { + let box_value = Box::new(value); + Self { + init: Some(c_init), + execute: Some(c_execute), + get_last_error: Some(c_last_error), + release: Some(c_release), + private_data: Box::leak(box_value) as *mut ExportedScalarKernel as *mut c_void, + } + } +} + impl ExportedScalarKernel { + pub fn new(kernel: ScalarKernelRef) -> Self { + Self { + inner: kernel, + last_arg_types: None, + last_return_type: None, + last_error: CString::default(), + } + } + fn init( &mut self, ffi_types: &[*const FFI_ArrowSchema], @@ -331,16 +353,54 @@ impl ExportedScalarKernel { Ok(return_ffi_schema) } - fn execute( - &self, - ffi_scalar_args: &[*mut FFI_ArrowArray], - num_rows: i64, - ) -> Result { - todo!() + fn execute(&self, ffi_args: &[*mut FFI_ArrowArray], num_rows: i64) -> Result { + match (&self.last_arg_types, &self.last_return_type) { + (Some(arg_types), Some(return_type)) => { + let arg_arrays = zip(ffi_args, arg_types) + .map(|(ptr, arg)| { + let owned_ffi_array = unsafe { FFI_ArrowArray::from_raw(*ptr) }; + let data = unsafe { + from_ffi_and_data_type(owned_ffi_array, arg.storage_type().clone())? + }; + Ok(make_array(data)) + }) + .collect::, ArrowError>>()?; + + let args = arg_arrays + .into_iter() + .map(|array| { + if array.len() as i64 == num_rows { + Ok(ColumnarValue::Array(array)) + } else { + Ok(ColumnarValue::Scalar(ScalarValue::try_from_array( + &array, 0, + )?)) + } + }) + .collect::>>()?; + + let result_value = self.inner.invoke_batch_from_args( + arg_types, + &args, + return_type, + num_rows as usize, + )?; + let result_array = match result_value { + ColumnarValue::Array(array) => array, + ColumnarValue::Scalar(scalar_value) => scalar_value.to_array()?, + }; + + let result_ffi_array = FFI_ArrowArray::new(&result_array.to_data()); + Ok(result_ffi_array) + } + _ => { + sedona_internal_err!("Call to ExportedScalarKernel::execute() before init()") + } + } } } -unsafe fn c_init( +unsafe extern "C" fn c_init( self_: *mut SedonaCScalarUdf, arg_types: *const *const FFI_ArrowSchema, scalar_args: *mut *mut FFI_ArrowArray, @@ -375,17 +435,36 @@ unsafe fn c_init( } } -fn c_execute( +unsafe extern "C" fn c_execute( self_: *mut SedonaCScalarUdf, args: *mut *mut FFI_ArrowArray, n_args: i64, n_rows: i64, out: *mut FFI_ArrowArray, ) -> c_int { - todo!() + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + .as_mut() + .unwrap(); + + let ffi_args = std::slice::from_raw_parts(args, n_args as usize); + match private_data.execute(ffi_args, n_rows) { + Ok(mut ffi_array) => { + swap_nonoverlapping(&mut ffi_array as *mut _, out, 1); + 0 + } + Err(err) => { + private_data.last_error = + CString::from_str(&err.message()).unwrap_or(CString::default()); + libc::EINVAL + } + } } -unsafe fn c_last_error(self_: *mut SedonaCScalarUdf) -> *const c_char { +unsafe extern "C" fn c_last_error(self_: *mut SedonaCScalarUdf) -> *const c_char { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); @@ -396,7 +475,7 @@ unsafe fn c_last_error(self_: *mut SedonaCScalarUdf) -> *const c_char { private_data.last_error.as_ptr() } -unsafe fn c_release(self_: *mut SedonaCScalarUdf) { +unsafe extern "C" fn c_release(self_: *mut SedonaCScalarUdf) { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); From f4078c66c279dee3ec0de4ed0e79d4a658cd9335 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 09:47:23 -0600 Subject: [PATCH 05/20] it works! --- Cargo.lock | 1 + c/sedona-extension/Cargo.toml | 1 + c/sedona-extension/src/extension.rs | 14 +- c/sedona-extension/src/scalar_kernel.rs | 169 +++++++++++++++++++++--- 4 files changed, 161 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc41c9a92..e5b2d4d50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4950,6 +4950,7 @@ dependencies = [ "sedona-common", "sedona-expr", "sedona-schema", + "sedona-testing", ] [[package]] diff --git a/c/sedona-extension/Cargo.toml b/c/sedona-extension/Cargo.toml index 25bfffc31..e8f8c17c5 100644 --- a/c/sedona-extension/Cargo.toml +++ b/c/sedona-extension/Cargo.toml @@ -36,3 +36,4 @@ libc = "0.2.178" sedona-common = { workspace = true } sedona-expr = { workspace = true } sedona-schema = { workspace = true } +sedona-testing = { path = "../../rust/sedona-testing" } diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index 6b9868196..7f0468a40 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::{ffi::c_int, os::raw::{c_char, c_void}}; +use std::{ + ffi::c_int, + os::raw::{c_char, c_void}, + ptr::null_mut, +}; use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; @@ -26,8 +30,8 @@ pub struct SedonaCScalarUdfFactory { unsafe extern "C" fn(self_: *const SedonaCScalarUdfFactory, out: *mut SedonaCScalarUdf), >, - release: Option, - private_data: *mut c_void, + pub release: Option, + pub private_data: *mut c_void, } unsafe impl Send for SedonaCScalarUdfFactory {} @@ -37,6 +41,8 @@ impl Drop for SedonaCScalarUdfFactory { fn drop(&mut self) { if let Some(releaser) = self.release { unsafe { releaser(self) } + self.release = None; + self.private_data = null_mut(); } } } @@ -75,6 +81,8 @@ impl Drop for SedonaCScalarUdf { fn drop(&mut self) { if let Some(releaser) = self.release { unsafe { releaser(self) } + self.release = None; + self.private_data = null_mut(); } } } diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index 8ec223020..9982c4cb4 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -20,7 +20,7 @@ use arrow_array::{ make_array, ArrayRef, }; use arrow_schema::{ArrowError, Field}; -use datafusion_common::{plan_err, Result, ScalarValue}; +use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::ColumnarValue; use sedona_common::sedona_internal_err; use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel}; @@ -35,11 +35,11 @@ use std::{ use crate::extension::{SedonaCScalarUdf, SedonaCScalarUdfFactory}; -pub struct ExtensionSedonaScalarKernel { +pub struct ImportedScalarKernel { inner: SedonaCScalarUdfFactory, } -impl Debug for ExtensionSedonaScalarKernel { +impl Debug for ImportedScalarKernel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExtensionSedonaScalarKernel") .field("inner", &"") @@ -47,7 +47,24 @@ impl Debug for ExtensionSedonaScalarKernel { } } -impl SedonaScalarKernel for ExtensionSedonaScalarKernel { +impl TryFrom for ImportedScalarKernel { + type Error = DataFusionError; + + fn try_from(value: SedonaCScalarUdfFactory) -> Result { + match ( + &value.new_scalar_udf_impl, + &value.release, + value.private_data.is_null(), + ) { + (Some(_), Some(_), false) => Ok(Self { inner: value }), + _ => sedona_internal_err!( + "Can't import released or uninitialized SedonaCScalarUdfFactory" + ), + } + } +} + +impl SedonaScalarKernel for ImportedScalarKernel { fn return_type_from_args_and_scalars( &self, args: &[SedonaType], @@ -257,25 +274,74 @@ impl CScalarUdfWrapper { pub struct ExportedScalarKernel { inner: ScalarKernelRef, - last_arg_types: Option>, - last_return_type: Option, - last_error: CString, } -impl From for SedonaCScalarUdf { +impl From for ExportedScalarKernel { + fn from(value: ScalarKernelRef) -> Self { + ExportedScalarKernel { inner: value } + } +} + +impl From for SedonaCScalarUdfFactory { fn from(value: ExportedScalarKernel) -> Self { let box_value = Box::new(value); Self { - init: Some(c_init), - execute: Some(c_execute), - get_last_error: Some(c_last_error), - release: Some(c_release), + new_scalar_udf_impl: Some(c_factory_new_impl), + release: Some(c_factory_release), private_data: Box::leak(box_value) as *mut ExportedScalarKernel as *mut c_void, } } } impl ExportedScalarKernel { + fn new_impl(&self) -> ExportedScalarKernelImpl { + ExportedScalarKernelImpl::new(self.inner.clone()) + } +} + +unsafe extern "C" fn c_factory_new_impl( + self_: *const SedonaCScalarUdfFactory, + out: *mut SedonaCScalarUdf, +) { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + .as_ref() + .unwrap(); + *out = SedonaCScalarUdf::from(private_data.new_impl()) +} + +unsafe extern "C" fn c_factory_release(self_: *mut SedonaCScalarUdfFactory) { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + (self_ref.private_data as *mut ExportedScalarKernel).drop_in_place(); +} + +struct ExportedScalarKernelImpl { + inner: ScalarKernelRef, + last_arg_types: Option>, + last_return_type: Option, + last_error: CString, +} + +impl From for SedonaCScalarUdf { + fn from(value: ExportedScalarKernelImpl) -> Self { + let box_value = Box::new(value); + Self { + init: Some(c_kernel_init), + execute: Some(c_kernel_execute), + get_last_error: Some(c_kernel_last_error), + release: Some(c_kernel_release), + private_data: Box::leak(box_value) as *mut ExportedScalarKernelImpl as *mut c_void, + } + } +} + +impl ExportedScalarKernelImpl { pub fn new(kernel: ScalarKernelRef) -> Self { Self { inner: kernel, @@ -400,7 +466,7 @@ impl ExportedScalarKernel { } } -unsafe extern "C" fn c_init( +unsafe extern "C" fn c_kernel_init( self_: *mut SedonaCScalarUdf, arg_types: *const *const FFI_ArrowSchema, scalar_args: *mut *mut FFI_ArrowArray, @@ -411,7 +477,7 @@ unsafe extern "C" fn c_init( let self_ref = self_.as_ref().unwrap(); assert!(!self_ref.private_data.is_null()); - let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl) .as_mut() .unwrap(); @@ -435,7 +501,7 @@ unsafe extern "C" fn c_init( } } -unsafe extern "C" fn c_execute( +unsafe extern "C" fn c_kernel_execute( self_: *mut SedonaCScalarUdf, args: *mut *mut FFI_ArrowArray, n_args: i64, @@ -446,7 +512,7 @@ unsafe extern "C" fn c_execute( let self_ref = self_.as_ref().unwrap(); assert!(!self_ref.private_data.is_null()); - let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl) .as_mut() .unwrap(); @@ -464,22 +530,83 @@ unsafe extern "C" fn c_execute( } } -unsafe extern "C" fn c_last_error(self_: *mut SedonaCScalarUdf) -> *const c_char { +unsafe extern "C" fn c_kernel_last_error(self_: *mut SedonaCScalarUdf) -> *const c_char { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); assert!(!self_ref.private_data.is_null()); - let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl) .as_ref() .unwrap(); private_data.last_error.as_ptr() } -unsafe extern "C" fn c_release(self_: *mut SedonaCScalarUdf) { +unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarUdf) { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); assert!(!self_ref.private_data.is_null()); - (self_ref.private_data as *mut ExportedScalarKernel).drop_in_place(); - *self_ = SedonaCScalarUdf::default(); + (self_ref.private_data as *mut ExportedScalarKernelImpl).drop_in_place(); +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use datafusion_expr::Volatility; + use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel}; + use sedona_schema::{datatypes::WKB_GEOMETRY, matchers::ArgMatcher}; + use sedona_testing::{create::create_array, testers::ScalarUdfTester}; + + use super::*; + + #[test] + fn ffi_roundtrip() { + let kernel = SimpleSedonaScalarKernel::new_ref( + ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY), + Arc::new(|_, args| Ok(args[0].clone())), + ); + + let array_value = create_array(&[Some("POINT (0 1)"), None], &WKB_GEOMETRY); + + let udf_native = SedonaScalarUDF::new( + "simple_udf", + vec![kernel.clone()], + Volatility::Immutable, + None, + ); + + let tester = ScalarUdfTester::new(udf_native.into(), vec![WKB_GEOMETRY]); + tester.assert_return_type(WKB_GEOMETRY); + + let result = tester.invoke_scalar("POINT (0 1)").unwrap(); + tester.assert_scalar_result_equals(result, "POINT (0 1)"); + + assert_eq!( + &tester.invoke_array(array_value.clone()).unwrap(), + &array_value + ); + + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarUdfFactory::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + + let udf_from_ffi = SedonaScalarUDF::new( + "simple_udf_from_ffi", + vec![Arc::new(imported_kernel)], + Volatility::Immutable, + None, + ); + + let ffi_tester = ScalarUdfTester::new(udf_from_ffi.into(), vec![WKB_GEOMETRY]); + ffi_tester.assert_return_type(WKB_GEOMETRY); + + let result = ffi_tester.invoke_scalar("POINT (0 1)").unwrap(); + ffi_tester.assert_scalar_result_equals(result, "POINT (0 1)"); + + assert_eq!( + &ffi_tester.invoke_array(array_value.clone()).unwrap(), + &array_value + ); + } } From 700fec4f72f9bedc9ac686b477d78edfcfdcf0ae Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 10:13:22 -0600 Subject: [PATCH 06/20] maybe better release --- c/sedona-extension/src/scalar_kernel.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index 9982c4cb4..f7990e8f3 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -318,7 +318,8 @@ unsafe extern "C" fn c_factory_release(self_: *mut SedonaCScalarUdfFactory) { let self_ref = self_.as_ref().unwrap(); assert!(!self_ref.private_data.is_null()); - (self_ref.private_data as *mut ExportedScalarKernel).drop_in_place(); + let boxed = Box::from_raw(self_ref.private_data as *mut ExportedScalarKernel); + drop(boxed); } struct ExportedScalarKernelImpl { @@ -546,7 +547,8 @@ unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarUdf) { let self_ref = self_.as_ref().unwrap(); assert!(!self_ref.private_data.is_null()); - (self_ref.private_data as *mut ExportedScalarKernelImpl).drop_in_place(); + let boxed = Box::from_raw(self_ref.private_data as *mut ExportedScalarKernelImpl); + drop(boxed); } #[cfg(test)] From 06b550c29505beb3a50cd2f35cca49a5b146c09a Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 11:37:47 -0600 Subject: [PATCH 07/20] fix invalid schema case --- c/sedona-extension/src/extension.rs | 26 +++++++++++++++++++++++++ c/sedona-extension/src/scalar_kernel.rs | 21 +++++++++++++++----- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index 7f0468a40..fa44447d3 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -86,3 +86,29 @@ impl Drop for SedonaCScalarUdf { } } } + +/// Check if a schema is valid +/// +/// The [FFI_ArrowSchema] doesn't have the ability to check for a NULL release callback, +/// so we provide a mechanism to do so here. +pub fn ffi_arrow_schema_is_valid(ffi_schema: *const FFI_ArrowSchema) -> bool { + let ffi_schema_internal = ffi_schema as *const c_void as *const ArrowSchemaInternal; + if let Some(schema_ref) = unsafe { ffi_schema_internal.as_ref() } { + schema_ref.release.is_some() + } else { + false + } +} + +#[repr(C)] +struct ArrowSchemaInternal { + format: *const c_char, + name: *const c_char, + metadata: *const c_char, + flags: i64, + n_children: i64, + children: *mut *mut ArrowSchemaInternal, + dictionary: *mut ArrowSchemaInternal, + release: Option, + private_data: *mut c_void, +} diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index f7990e8f3..c9a2abad2 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -33,7 +33,7 @@ use std::{ str::FromStr, }; -use crate::extension::{SedonaCScalarUdf, SedonaCScalarUdfFactory}; +use crate::extension::{ffi_arrow_schema_is_valid, SedonaCScalarUdf, SedonaCScalarUdfFactory}; pub struct ImportedScalarKernel { inner: SedonaCScalarUdfFactory, @@ -197,10 +197,13 @@ impl CScalarUdfWrapper { &mut ffi_out, ) }; + if code == 0 { - match Field::try_from(&ffi_out) { - Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)), - Err(_) => Ok(None), + if ffi_arrow_schema_is_valid(&ffi_out) { + let field = Field::try_from(&ffi_out)?; + Ok(Some(SedonaType::from_storage_field(&field)?)) + } else { + Ok(None) } } else { plan_err!("SedonaCScalarUdf::init failed: {}", self.last_error(code)) @@ -600,7 +603,7 @@ mod test { None, ); - let ffi_tester = ScalarUdfTester::new(udf_from_ffi.into(), vec![WKB_GEOMETRY]); + let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), vec![WKB_GEOMETRY]); ffi_tester.assert_return_type(WKB_GEOMETRY); let result = ffi_tester.invoke_scalar("POINT (0 1)").unwrap(); @@ -610,5 +613,13 @@ mod test { &ffi_tester.invoke_array(array_value.clone()).unwrap(), &array_value ); + + // Check the case of a kernel that does not apply to input arguments + let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), vec![]); + let err = ffi_tester.return_type().unwrap_err(); + assert_eq!( + err.message(), + "simple_udf_from_ffi([]): No kernel matching arguments" + ); } } From d0403d2d462eef4ce8fa8274acf3e9e49983ccb9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 11:47:31 -0600 Subject: [PATCH 08/20] more tests --- c/sedona-extension/src/scalar_kernel.rs | 29 +++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index c9a2abad2..aee2fcda0 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -558,6 +558,7 @@ unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarUdf) { mod test { use std::sync::Arc; + use datafusion_common::exec_err; use datafusion_expr::Volatility; use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel}; use sedona_schema::{datatypes::WKB_GEOMETRY, matchers::ArgMatcher}; @@ -622,4 +623,32 @@ mod test { "simple_udf_from_ffi([]): No kernel matching arguments" ); } + + #[test] + fn erroring_invoke_batch() { + let kernel = SimpleSedonaScalarKernel::new_ref( + ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY), + Arc::new(|_, _args| exec_err!("this invoke_batch() always errors")), + ); + + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarUdfFactory::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + + let udf_from_ffi = SedonaScalarUDF::new( + "simple_udf_from_ffi", + vec![Arc::new(imported_kernel)], + Volatility::Immutable, + None, + ); + + let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), vec![WKB_GEOMETRY]); + ffi_tester.assert_return_type(WKB_GEOMETRY); + + let err = ffi_tester.invoke_scalar("POINT (0 1)").unwrap_err(); + assert_eq!( + err.message(), + "SedonaCScalarUdf::init failed: this invoke_batch() always errors" + ); + } } From 7d5cdadb6ec42ec92e3dc3700f4e0aa2507461c2 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 11:48:20 -0600 Subject: [PATCH 09/20] remove old impl --- rust/sedona/src/ffi.rs | 515 ----------------------------------------- rust/sedona/src/lib.rs | 1 - 2 files changed, 516 deletions(-) delete mode 100644 rust/sedona/src/ffi.rs diff --git a/rust/sedona/src/ffi.rs b/rust/sedona/src/ffi.rs deleted file mode 100644 index e1cb1f7bf..000000000 --- a/rust/sedona/src/ffi.rs +++ /dev/null @@ -1,515 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -use std::{any::Any, sync::Arc}; - -use abi_stable::StableAbi; -use arrow_schema::{DataType, Field, FieldRef, Schema}; -use datafusion::physical_plan::{expressions::Column, PhysicalExpr}; -use datafusion_common::{config::ConfigOptions, DataFusionError, Result, ScalarValue}; -use datafusion_expr::{ - function::{AccumulatorArgs, StateFieldsArgs}, - Accumulator, AggregateUDF, AggregateUDFImpl, ColumnarValue, ReturnFieldArgs, - ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, -}; -use datafusion_ffi::{ - udaf::{FFI_AggregateUDF, ForeignAggregateUDF}, - udf::{FFI_ScalarUDF, ForeignScalarUDF}, -}; -use sedona_common::sedona_internal_err; -use sedona_schema::datatypes::SedonaType; - -use sedona_expr::{ - aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef}, - scalar_udf::{ScalarKernelRef, SedonaScalarKernel}, -}; - -/// A stable struct for sharing [SedonaScalarKernel]s across FFI boundaries -/// -/// The primary interface for importing or exporting these is `.from()` -/// and `.into()` between the [FFI_SedonaScalarKernel] and the [ScalarKernelRef]. -/// -/// Internally this struct uses the [FFI_ScalarUDF] from DataFusion's FFI -/// library to avoid having to invent an FFI ourselves. Like the [FFI_ScalarUDF], -/// this struct is only convenient to use when the libraries on both sides of -/// a boundary are written in Rust. Because Rust makes it relatively easy to -/// wrap C or C++ libraries, this should not be a barrier for most types of -/// kernels we might want to implement; however, it is also an option to -/// create our own FFI using simpler primitives if using DataFusion's -/// introduces performance or implementation issues. -#[repr(C)] -#[derive(Debug, StableAbi)] -#[allow(non_camel_case_types)] -pub struct FFI_SedonaScalarKernel { - inner: FFI_ScalarUDF, -} - -impl From for FFI_SedonaScalarKernel { - fn from(value: ScalarKernelRef) -> Self { - let exported = ScalarUDF::new_from_impl(ExportedScalarKernel::from(value)); - FFI_SedonaScalarKernel { - inner: Arc::new(exported).into(), - } - } -} - -impl TryFrom<&FFI_SedonaScalarKernel> for ScalarKernelRef { - type Error = DataFusionError; - - fn try_from(value: &FFI_SedonaScalarKernel) -> Result { - Ok(Arc::new(ImportedScalarKernel::try_from(value)?)) - } -} - -impl TryFrom for ScalarKernelRef { - type Error = DataFusionError; - - fn try_from(value: FFI_SedonaScalarKernel) -> Result { - Self::try_from(&value) - } -} - -#[derive(Debug)] -struct ExportedScalarKernel { - name: String, - signature: Signature, - sedona_impl: ScalarKernelRef, -} - -impl PartialEq for ExportedScalarKernel { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - } -} - -impl Eq for ExportedScalarKernel {} - -impl std::hash::Hash for ExportedScalarKernel { - fn hash(&self, state: &mut H) { - self.name.hash(state); - } -} - -impl From for ExportedScalarKernel { - fn from(value: ScalarKernelRef) -> Self { - Self { - name: "ExportedScalarKernel".to_string(), - signature: Signature::any(0, datafusion_expr::Volatility::Volatile), - sedona_impl: value, - } - } -} - -impl ScalarUDFImpl for ExportedScalarKernel { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn name(&self) -> &str { - &self.name - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, _arg_types: &[DataType]) -> Result { - sedona_internal_err!("should not be called") - } - - fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { - let sedona_types = args - .arg_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - match self.sedona_impl.return_type(&sedona_types)? { - Some(output_type) => Ok(output_type.to_storage_field("", true)?.into()), - // Sedona kernels return None to indicate the kernel doesn't apply to the inputs, - // but the ScalarUDFImpl doesn't have a way to natively indicate that. We use - // NotImplemented with a special message and catch it on the other side. - None => Err(DataFusionError::NotImplemented( - "::kernel does not match input args::".to_string(), - )), - } - } - - fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let sedona_types = args - .arg_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - self.sedona_impl.invoke_batch(&sedona_types, &args.args) - } -} - -#[derive(Debug)] -struct ImportedScalarKernel { - udf_impl: ScalarUDF, -} - -impl TryFrom<&FFI_SedonaScalarKernel> for ImportedScalarKernel { - type Error = DataFusionError; - - fn try_from(value: &FFI_SedonaScalarKernel) -> Result { - let wrapped = ForeignScalarUDF::try_from(&value.inner)?; - Ok(Self { - udf_impl: ScalarUDF::new_from_impl(wrapped), - }) - } -} - -impl SedonaScalarKernel for ImportedScalarKernel { - fn return_type(&self, args: &[SedonaType]) -> Result> { - let df_args = ReturnFieldArgs { - arg_fields: &args - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?, - scalar_arguments: &[], - }; - match self.udf_impl.return_field_from_args(df_args) { - Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)), - Err(err) => { - if matches!(err, DataFusionError::NotImplemented(_)) { - Ok(None) - } else { - Err(err) - } - } - } - } - - fn invoke_batch( - &self, - arg_types: &[SedonaType], - args: &[ColumnarValue], - ) -> Result { - let arg_rows = Self::output_size(args); - - let scalar_fn_args = ScalarFunctionArgs { - args: args.to_vec(), - arg_fields: arg_types - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?, - number_rows: arg_rows.unwrap_or(1), - // Wrapper code on the other side of this doesn't use this value - return_field: Field::new("", DataType::Null, true).into(), - config_options: Arc::new(ConfigOptions::default()), - }; - - // DataFusion's FFI_ScalarUDF always returns array output but - // our original UDFs were careful to return ScalarValues. - match self.udf_impl.invoke_with_args(scalar_fn_args)? { - ColumnarValue::Array(array) => match arg_rows { - Some(_) => Ok(ColumnarValue::Array(array)), - None => Ok(ColumnarValue::Scalar(ScalarValue::try_from_array( - &array, 0, - )?)), - }, - ColumnarValue::Scalar(scalar_value) => { - // This branch is probably never taken but may in the future - Ok(ColumnarValue::Scalar(scalar_value)) - } - } - } -} - -impl ImportedScalarKernel { - fn output_size(args: &[ColumnarValue]) -> Option { - for original_arg in args { - if let ColumnarValue::Array(array) = original_arg { - return Some(array.len()); - } - } - None - } -} - -/// A stable struct for sharing [SedonaAccumulator]s across FFI boundaries -/// -/// The primary interface for importing or exporting these is `.from()` -/// and `.into()` between the [FFI_SedonaAggregateKernel] and the [SedonaAccumulatorRef]. -/// -/// Internally this struct uses the [FFI_AggregateUDF] from DataFusion's FFI -/// library to avoid having to invent an FFI ourselves. See [FFI_SedonaScalarKernel] -/// for general information about the rationale and usage of FFI implementations. -#[repr(C)] -#[derive(Debug, StableAbi)] -#[allow(non_camel_case_types)] -pub struct FFI_SedonaAggregateKernel { - inner: FFI_AggregateUDF, -} - -impl From for FFI_SedonaAggregateKernel { - fn from(value: SedonaAccumulatorRef) -> Self { - let exported: AggregateUDF = ExportedSedonaAccumulator::from(value).into(); - FFI_SedonaAggregateKernel { - inner: Arc::new(exported).into(), - } - } -} - -impl TryFrom<&FFI_SedonaAggregateKernel> for SedonaAccumulatorRef { - type Error = DataFusionError; - - fn try_from(value: &FFI_SedonaAggregateKernel) -> Result { - Ok(Arc::new(ImportedSedonaAccumulator::try_from(value)?)) - } -} - -impl TryFrom for SedonaAccumulatorRef { - type Error = DataFusionError; - - fn try_from(value: FFI_SedonaAggregateKernel) -> Result { - Self::try_from(&value) - } -} - -#[derive(Debug)] -struct ExportedSedonaAccumulator { - name: String, - signature: Signature, - sedona_impl: SedonaAccumulatorRef, -} - -impl PartialEq for ExportedSedonaAccumulator { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - } -} - -impl Eq for ExportedSedonaAccumulator {} - -impl std::hash::Hash for ExportedSedonaAccumulator { - fn hash(&self, state: &mut H) { - self.name.hash(state); - } -} - -impl From for ExportedSedonaAccumulator { - fn from(value: SedonaAccumulatorRef) -> Self { - Self { - name: "ExportedSedonaAccumulator".to_string(), - signature: Signature::any(0, datafusion_expr::Volatility::Volatile), - sedona_impl: value, - } - } -} - -impl AggregateUDFImpl for ExportedSedonaAccumulator { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - &self.name - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_field(&self, arg_fields: &[FieldRef]) -> Result { - let sedona_types = arg_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - match self.sedona_impl.return_type(&sedona_types)? { - Some(output_type) => Ok(Arc::new(output_type.to_storage_field("", true)?)), - // Sedona kernels return None to indicate the kernel doesn't apply to the inputs, - // but the ScalarUDFImpl doesn't have a way to natively indicate that. We use - // NotImplemented with a special message and catch it on the other side. - None => Err(DataFusionError::NotImplemented( - "::kernel does not match input args::".to_string(), - )), - } - } - - fn return_type(&self, _arg_types: &[DataType]) -> Result { - sedona_internal_err!("This should not be called (use return_field())") - } - - fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - let arg_fields = acc_args - .exprs - .iter() - .map(|expr| expr.return_field(acc_args.schema)) - .collect::>>()?; - let sedona_types = arg_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - if let Some(output_type) = self.sedona_impl.return_type(&sedona_types)? { - self.sedona_impl.accumulator(&sedona_types, &output_type) - } else { - Err(DataFusionError::NotImplemented( - "::kernel does not match input args::".to_string(), - )) - } - } - - fn state_fields(&self, args: StateFieldsArgs) -> Result> { - let sedona_types = args - .input_fields - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect::>>()?; - self.sedona_impl.state_fields(&sedona_types) - } -} - -#[derive(Debug)] -struct ImportedSedonaAccumulator { - aggregate_impl: AggregateUDF, -} - -impl TryFrom<&FFI_SedonaAggregateKernel> for ImportedSedonaAccumulator { - type Error = DataFusionError; - - fn try_from(value: &FFI_SedonaAggregateKernel) -> Result { - let wrapped = ForeignAggregateUDF::try_from(&value.inner)?; - Ok(Self { - aggregate_impl: wrapped.into(), - }) - } -} - -impl SedonaAccumulator for ImportedSedonaAccumulator { - fn return_type(&self, args: &[SedonaType]) -> Result> { - let arg_fields = args - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?; - - match self.aggregate_impl.return_field(&arg_fields) { - Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)), - Err(err) => { - if matches!(err, DataFusionError::NotImplemented(_)) { - Ok(None) - } else { - Err(err) - } - } - } - } - - fn accumulator( - &self, - args: &[SedonaType], - output_type: &SedonaType, - ) -> Result> { - let arg_fields = args - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?; - let mock_schema = Schema::new(arg_fields); - let exprs = (0..mock_schema.fields().len()) - .map(|i| -> Arc { Arc::new(Column::new("col", i)) }) - .collect::>(); - - let return_field = output_type.to_storage_field("", true)?; - - let args = AccumulatorArgs { - return_field: return_field.into(), - schema: &mock_schema, - ignore_nulls: true, - order_bys: &[], - is_reversed: false, - name: "", - is_distinct: false, - exprs: &exprs, - }; - - self.aggregate_impl.accumulator(args) - } - - fn state_fields(&self, args: &[SedonaType]) -> Result> { - let arg_fields = args - .iter() - .map(|arg| arg.to_storage_field("", true).map(Arc::new)) - .collect::>>()?; - - let state_field_args = StateFieldsArgs { - name: "", - input_fields: &arg_fields, - return_field: Arc::new(Field::new("", DataType::Null, false)), - ordering_fields: &[], - is_distinct: false, - }; - - self.aggregate_impl.state_fields(state_field_args) - } -} - -#[cfg(test)] -mod test { - use datafusion_expr::Volatility; - use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel}; - use sedona_schema::{datatypes::WKB_GEOMETRY, matchers::ArgMatcher}; - use sedona_testing::{create::create_array, testers::ScalarUdfTester}; - - use super::*; - - #[test] - fn ffi_roundtrip() { - let kernel = SimpleSedonaScalarKernel::new_ref( - ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY), - Arc::new(|_, args| Ok(args[0].clone())), - ); - - let array_value = create_array(&[Some("POINT (0 1)"), None], &WKB_GEOMETRY); - - let udf_native = SedonaScalarUDF::new( - "simple_udf", - vec![kernel.clone()], - Volatility::Immutable, - None, - ); - - let tester = ScalarUdfTester::new(udf_native.into(), vec![WKB_GEOMETRY]); - tester.assert_return_type(WKB_GEOMETRY); - - let result = tester.invoke_scalar("POINT (0 1)").unwrap(); - tester.assert_scalar_result_equals(result, "POINT (0 1)"); - - assert_eq!( - &tester.invoke_array(array_value.clone()).unwrap(), - &array_value - ); - - let ffi_kernel = FFI_SedonaScalarKernel::from(kernel.clone()); - let udf_from_ffi = SedonaScalarUDF::new( - "simple_udf_from_ffi", - vec![ffi_kernel.try_into().unwrap()], - Volatility::Immutable, - None, - ); - - let ffi_tester = ScalarUdfTester::new(udf_from_ffi.into(), vec![WKB_GEOMETRY]); - ffi_tester.assert_return_type(WKB_GEOMETRY); - - let result = ffi_tester.invoke_scalar("POINT (0 1)").unwrap(); - ffi_tester.assert_scalar_result_equals(result, "POINT (0 1)"); - - assert_eq!( - &ffi_tester.invoke_array(array_value.clone()).unwrap(), - &array_value - ); - } -} diff --git a/rust/sedona/src/lib.rs b/rust/sedona/src/lib.rs index 52b543874..9d18064cc 100644 --- a/rust/sedona/src/lib.rs +++ b/rust/sedona/src/lib.rs @@ -17,7 +17,6 @@ mod catalog; pub mod context; mod exec; -pub mod ffi; mod object_storage; pub mod random_geometry_provider; pub mod reader; From ae390ea1ead7f87b37e8afb73553926ebd307e88 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 13:09:04 -0600 Subject: [PATCH 10/20] license header --- c/sedona-extension/src/sedona_extension.h | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/c/sedona-extension/src/sedona_extension.h b/c/sedona-extension/src/sedona_extension.h index 4f8fe58c8..9d037be00 100644 --- a/c/sedona-extension/src/sedona_extension.h +++ b/c/sedona-extension/src/sedona_extension.h @@ -1,9 +1,25 @@ - -#include +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. #ifndef SEDONA_EXTENSION_H #define SEDONA_EXTENSION_H +#include + #ifdef __cplusplus extern "C" { #endif From 5f08520127359ac649f68000b153e9b5f7c18bbb Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 13:16:35 -0600 Subject: [PATCH 11/20] rename some things --- c/sedona-extension/src/extension.rs | 27 +++++++------- c/sedona-extension/src/scalar_kernel.rs | 44 +++++++++++------------ c/sedona-extension/src/sedona_extension.h | 19 +++++----- 3 files changed, 45 insertions(+), 45 deletions(-) diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index fa44447d3..e251bf41e 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -25,19 +25,19 @@ use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; #[derive(Default)] #[repr(C)] -pub struct SedonaCScalarUdfFactory { - pub new_scalar_udf_impl: Option< - unsafe extern "C" fn(self_: *const SedonaCScalarUdfFactory, out: *mut SedonaCScalarUdf), +pub struct SedonaCScalarKernel { + pub new_impl: Option< + unsafe extern "C" fn(self_: *const SedonaCScalarKernel, out: *mut SedonaCScalarKernelImpl), >, - pub release: Option, + pub release: Option, pub private_data: *mut c_void, } -unsafe impl Send for SedonaCScalarUdfFactory {} -unsafe impl Sync for SedonaCScalarUdfFactory {} +unsafe impl Send for SedonaCScalarKernel {} +unsafe impl Sync for SedonaCScalarKernel {} -impl Drop for SedonaCScalarUdfFactory { +impl Drop for SedonaCScalarKernel { fn drop(&mut self) { if let Some(releaser) = self.release { unsafe { releaser(self) } @@ -49,10 +49,10 @@ impl Drop for SedonaCScalarUdfFactory { #[derive(Default)] #[repr(C)] -pub struct SedonaCScalarUdf { +pub struct SedonaCScalarKernelImpl { pub init: Option< unsafe extern "C" fn( - self_: *mut SedonaCScalarUdf, + self_: *mut SedonaCScalarKernelImpl, arg_types: *const *const FFI_ArrowSchema, scalar_args: *mut *mut FFI_ArrowArray, n_args: i64, @@ -62,7 +62,7 @@ pub struct SedonaCScalarUdf { pub execute: Option< unsafe extern "C" fn( - self_: *mut SedonaCScalarUdf, + self_: *mut SedonaCScalarKernelImpl, args: *mut *mut FFI_ArrowArray, n_args: i64, n_rows: i64, @@ -70,14 +70,15 @@ pub struct SedonaCScalarUdf { ) -> c_int, >, - pub get_last_error: Option *const c_char>, + pub get_last_error: + Option *const c_char>, - pub release: Option, + pub release: Option, pub private_data: *mut c_void, } -impl Drop for SedonaCScalarUdf { +impl Drop for SedonaCScalarKernelImpl { fn drop(&mut self) { if let Some(releaser) = self.release { unsafe { releaser(self) } diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index aee2fcda0..83fca951b 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -33,10 +33,10 @@ use std::{ str::FromStr, }; -use crate::extension::{ffi_arrow_schema_is_valid, SedonaCScalarUdf, SedonaCScalarUdfFactory}; +use crate::extension::{ffi_arrow_schema_is_valid, SedonaCScalarKernel, SedonaCScalarKernelImpl}; pub struct ImportedScalarKernel { - inner: SedonaCScalarUdfFactory, + inner: SedonaCScalarKernel, } impl Debug for ImportedScalarKernel { @@ -47,12 +47,12 @@ impl Debug for ImportedScalarKernel { } } -impl TryFrom for ImportedScalarKernel { +impl TryFrom for ImportedScalarKernel { type Error = DataFusionError; - fn try_from(value: SedonaCScalarUdfFactory) -> Result { + fn try_from(value: SedonaCScalarKernel) -> Result { match ( - &value.new_scalar_udf_impl, + &value.new_impl, &value.release, value.private_data.is_null(), ) { @@ -130,13 +130,13 @@ impl SedonaScalarKernel for ImportedScalarKernel { } struct CScalarUdfWrapper { - inner: SedonaCScalarUdf, + inner: SedonaCScalarKernelImpl, } impl CScalarUdfWrapper { - fn try_new(factory: &SedonaCScalarUdfFactory) -> Result { - if let Some(init) = factory.new_scalar_udf_impl { - let mut inner = SedonaCScalarUdf::default(); + fn try_new(factory: &SedonaCScalarKernel) -> Result { + if let Some(init) = factory.new_impl { + let mut inner = SedonaCScalarKernelImpl::default(); unsafe { init(factory, &mut inner) }; Ok(Self { inner }) } else { @@ -285,11 +285,11 @@ impl From for ExportedScalarKernel { } } -impl From for SedonaCScalarUdfFactory { +impl From for SedonaCScalarKernel { fn from(value: ExportedScalarKernel) -> Self { let box_value = Box::new(value); Self { - new_scalar_udf_impl: Some(c_factory_new_impl), + new_impl: Some(c_factory_new_impl), release: Some(c_factory_release), private_data: Box::leak(box_value) as *mut ExportedScalarKernel as *mut c_void, } @@ -303,8 +303,8 @@ impl ExportedScalarKernel { } unsafe extern "C" fn c_factory_new_impl( - self_: *const SedonaCScalarUdfFactory, - out: *mut SedonaCScalarUdf, + self_: *const SedonaCScalarKernel, + out: *mut SedonaCScalarKernelImpl, ) { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); @@ -313,10 +313,10 @@ unsafe extern "C" fn c_factory_new_impl( let private_data = (self_ref.private_data as *mut ExportedScalarKernel) .as_ref() .unwrap(); - *out = SedonaCScalarUdf::from(private_data.new_impl()) + *out = SedonaCScalarKernelImpl::from(private_data.new_impl()) } -unsafe extern "C" fn c_factory_release(self_: *mut SedonaCScalarUdfFactory) { +unsafe extern "C" fn c_factory_release(self_: *mut SedonaCScalarKernel) { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); @@ -332,7 +332,7 @@ struct ExportedScalarKernelImpl { last_error: CString, } -impl From for SedonaCScalarUdf { +impl From for SedonaCScalarKernelImpl { fn from(value: ExportedScalarKernelImpl) -> Self { let box_value = Box::new(value); Self { @@ -471,7 +471,7 @@ impl ExportedScalarKernelImpl { } unsafe extern "C" fn c_kernel_init( - self_: *mut SedonaCScalarUdf, + self_: *mut SedonaCScalarKernelImpl, arg_types: *const *const FFI_ArrowSchema, scalar_args: *mut *mut FFI_ArrowArray, n_args: i64, @@ -506,7 +506,7 @@ unsafe extern "C" fn c_kernel_init( } unsafe extern "C" fn c_kernel_execute( - self_: *mut SedonaCScalarUdf, + self_: *mut SedonaCScalarKernelImpl, args: *mut *mut FFI_ArrowArray, n_args: i64, n_rows: i64, @@ -534,7 +534,7 @@ unsafe extern "C" fn c_kernel_execute( } } -unsafe extern "C" fn c_kernel_last_error(self_: *mut SedonaCScalarUdf) -> *const c_char { +unsafe extern "C" fn c_kernel_last_error(self_: *mut SedonaCScalarKernelImpl) -> *const c_char { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); @@ -545,7 +545,7 @@ unsafe extern "C" fn c_kernel_last_error(self_: *mut SedonaCScalarUdf) -> *const private_data.last_error.as_ptr() } -unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarUdf) { +unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarKernelImpl) { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); @@ -594,7 +594,7 @@ mod test { ); let exported_kernel = ExportedScalarKernel::from(kernel.clone()); - let ffi_kernel = SedonaCScalarUdfFactory::from(exported_kernel); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); let udf_from_ffi = SedonaScalarUDF::new( @@ -632,7 +632,7 @@ mod test { ); let exported_kernel = ExportedScalarKernel::from(kernel.clone()); - let ffi_kernel = SedonaCScalarUdfFactory::from(exported_kernel); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); let udf_from_ffi = SedonaScalarUDF::new( diff --git a/c/sedona-extension/src/sedona_extension.h b/c/sedona-extension/src/sedona_extension.h index 9d037be00..4412d142d 100644 --- a/c/sedona-extension/src/sedona_extension.h +++ b/c/sedona-extension/src/sedona_extension.h @@ -117,7 +117,7 @@ struct ArrowArrayStream { /// access to methods if an instance is shared across threads. In general, /// constructing and initializing this structure should be sufficiently /// cheap that it shouldn't need to be shared in this way. -struct SedonaCScalarUdf { +struct SedonaCScalarKernelImpl { /// \brief Initialize the state of this UDF instance and calculate a return /// type /// @@ -141,7 +141,7 @@ struct SedonaCScalarUdf { /// passed. /// /// \return An errno-compatible error code, or zero on success. - int (*init)(struct SedonaCScalarUdf* self, const struct ArrowSchema** arg_types, + int (*init)(struct SedonaCScalarKernelImpl* self, const struct ArrowSchema** arg_types, struct ArrowArray** scalar_args, int64_t n_args, struct ArrowSchema* out); /// \brief Execute a single batch @@ -151,18 +151,18 @@ struct SedonaCScalarUdf { /// inputs. /// \param n_args The number of pointers in args /// \param out Will be populated with the result on success. - int (*execute)(struct SedonaCScalarUdf* self, struct ArrowArray** args, int64_t n_args, - int64_t n_rows, struct ArrowArray* out); + int (*execute)(struct SedonaCScalarKernelImpl* self, struct ArrowArray** args, + int64_t n_args, int64_t n_rows, struct ArrowArray* out); /// \brief Get the last error message /// /// The result is valid until the next call to a UDF method. - const char* (*get_last_error)(struct SedonaCScalarUdf* self); + const char* (*get_last_error)(struct SedonaCScalarKernelImpl* self); /// \brief Release this instance /// /// Implementations of this callback must set self->release to NULL. - void (*release)(struct SedonaCScalarUdf* self); + void (*release)(struct SedonaCScalarKernelImpl* self); /// \brief Opaque implementation-specific data void* private_data; @@ -173,18 +173,17 @@ struct SedonaCScalarUdf { /// Usually a GeoArrowScalarUdf will be used to execute a single batch /// (although it may be reused if a caller can serialize callback use). This /// structure is a factory object that initializes such objects. -struct SedonaCScalarUdfFactory { +struct SedonaCScalarKernel { /// \brief Initialize a new implementation struct /// /// This callback is thread safe and may be called concurrently from any /// thread at any time (as long as this object is valid). - void (*new_scalar_udf_impl)(struct SedonaCScalarUdfFactory* self, - struct SedonaCScalarUdf* out); + void (*new_impl)(struct SedonaCScalarKernel* self, struct SedonaCScalarKernelImpl* out); /// \brief Release this instance /// /// Implementations of this callback must set self->release to NULL. - void (*release)(struct SedonaCScalarUdfFactory* self); + void (*release)(struct SedonaCScalarKernel* self); /// \brief Opaque implementation-specific data void* private_data; From 3c0ceb42ce3e223d3bbc19fb63c2347f2aa5d2ba Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 14:07:02 -0600 Subject: [PATCH 12/20] remame in error messages --- c/sedona-extension/src/scalar_kernel.rs | 73 ++++++++++++++++++++----- 1 file changed, 58 insertions(+), 15 deletions(-) diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index 83fca951b..007956cfd 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -42,7 +42,7 @@ pub struct ImportedScalarKernel { impl Debug for ImportedScalarKernel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExtensionSedonaScalarKernel") - .field("inner", &"") + .field("inner", &"") .finish() } } @@ -57,9 +57,7 @@ impl TryFrom for ImportedScalarKernel { value.private_data.is_null(), ) { (Some(_), Some(_), false) => Ok(Self { inner: value }), - _ => sedona_internal_err!( - "Can't import released or uninitialized SedonaCScalarUdfFactory" - ), + _ => sedona_internal_err!("Can't import released or uninitialized SedonaCScalarKernel"), } } } @@ -70,7 +68,7 @@ impl SedonaScalarKernel for ImportedScalarKernel { args: &[SedonaType], scalar_args: &[Option<&ScalarValue>], ) -> Result> { - let mut inner_impl = CScalarUdfWrapper::try_new(&self.inner)?; + let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?; inner_impl.init(args, scalar_args) } @@ -92,7 +90,7 @@ impl SedonaScalarKernel for ImportedScalarKernel { }) .collect::>(); - let mut inner_impl = CScalarUdfWrapper::try_new(&self.inner)?; + let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?; inner_impl.init(arg_types, &arg_scalars)?; let result_array = inner_impl.execute(args, return_type, num_rows)?; for arg in args { @@ -129,18 +127,18 @@ impl SedonaScalarKernel for ImportedScalarKernel { } } -struct CScalarUdfWrapper { +struct CScalarKernelImplWrapper { inner: SedonaCScalarKernelImpl, } -impl CScalarUdfWrapper { +impl CScalarKernelImplWrapper { fn try_new(factory: &SedonaCScalarKernel) -> Result { if let Some(init) = factory.new_impl { let mut inner = SedonaCScalarKernelImpl::default(); unsafe { init(factory, &mut inner) }; Ok(Self { inner }) } else { - sedona_internal_err!("SedonaCScalarUdfFactory is not valid") + sedona_internal_err!("SedonaCScalarKernel is not valid") } } @@ -206,10 +204,13 @@ impl CScalarUdfWrapper { Ok(None) } } else { - plan_err!("SedonaCScalarUdf::init failed: {}", self.last_error(code)) + plan_err!( + "SedonaCScalarKernelImpl::init failed: {}", + self.last_error(code) + ) } } else { - sedona_internal_err!("Invalid SedonaCScalarUdf") + sedona_internal_err!("Invalid SedonaCScalarKernelImpl") } } @@ -252,10 +253,13 @@ impl CScalarUdfWrapper { }; Ok(arrow_array::make_array(data)) } else { - plan_err!("SedonaCScalarUdf::init failed: {}", self.last_error(code)) + plan_err!( + "SedonaCScalarKernelImpl::init failed: {}", + self.last_error(code) + ) } } else { - sedona_internal_err!("Invalid SedonaCScalarUdf") + sedona_internal_err!("Invalid SedonaCScalarKernelImpl") } } @@ -270,7 +274,7 @@ impl CScalarUdfWrapper { .into_owned() } } else { - "Invalid SedonaCScalarUdf".to_string() + "Invalid SedonaCScalarKernelImpl".to_string() } } } @@ -648,7 +652,46 @@ mod test { let err = ffi_tester.invoke_scalar("POINT (0 1)").unwrap_err(); assert_eq!( err.message(), - "SedonaCScalarUdf::init failed: this invoke_batch() always errors" + "SedonaCScalarKernelImpl::init failed: this invoke_batch() always errors" ); } + + #[test] + fn erroring_return_type() { + let kernel = Arc::new(ErroringReturnType {}) as ScalarKernelRef; + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + + let udf_from_ffi = SedonaScalarUDF::new( + "simple_udf_from_ffi", + vec![Arc::new(imported_kernel)], + Volatility::Immutable, + None, + ); + + let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), vec![WKB_GEOMETRY]); + let err = ffi_tester.return_type().unwrap_err(); + assert_eq!( + err.message(), + "SedonaCScalarKernelImpl::init failed: this implementation of return_type always errors" + ); + } + + #[derive(Debug)] + struct ErroringReturnType {} + + impl SedonaScalarKernel for ErroringReturnType { + fn return_type(&self, _args: &[SedonaType]) -> Result> { + plan_err!("this implementation of return_type always errors") + } + + fn invoke_batch( + &self, + _arg_types: &[SedonaType], + _args: &[ColumnarValue], + ) -> Result { + unreachable!() + } + } } From cbda56264ce9123626a64f7b5a1c6271a248a5f4 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 14:30:59 -0600 Subject: [PATCH 13/20] add name --- c/sedona-extension/src/extension.rs | 2 + c/sedona-extension/src/scalar_kernel.rs | 59 +++++++++++++++++++++-- c/sedona-extension/src/sedona_extension.h | 8 ++- 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index e251bf41e..91b40ac20 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -26,6 +26,8 @@ use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; #[derive(Default)] #[repr(C)] pub struct SedonaCScalarKernel { + pub function_name: + Option *const c_char>, pub new_impl: Option< unsafe extern "C" fn(self_: *const SedonaCScalarKernel, out: *mut SedonaCScalarKernelImpl), >, diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index 007956cfd..a32f0894b 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -29,7 +29,7 @@ use std::{ ffi::{c_char, c_int, c_void, CStr, CString}, fmt::Debug, iter::zip, - ptr::{null_mut, swap_nonoverlapping}, + ptr::{null, null_mut, swap_nonoverlapping}, str::FromStr, }; @@ -37,11 +37,12 @@ use crate::extension::{ffi_arrow_schema_is_valid, SedonaCScalarKernel, SedonaCSc pub struct ImportedScalarKernel { inner: SedonaCScalarKernel, + function_name: Option, } impl Debug for ImportedScalarKernel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ExtensionSedonaScalarKernel") + f.debug_struct("ImportedScalarKernel") .field("inner", &"") .finish() } @@ -52,16 +53,39 @@ impl TryFrom for ImportedScalarKernel { fn try_from(value: SedonaCScalarKernel) -> Result { match ( + &value.function_name, &value.new_impl, &value.release, value.private_data.is_null(), ) { - (Some(_), Some(_), false) => Ok(Self { inner: value }), + (Some(function_name), Some(_), Some(_), false) => { + let name_ptr = unsafe { function_name(&value) }; + let name = if name_ptr.is_null() { + None + } else { + Some( + unsafe { CStr::from_ptr(name_ptr) } + .to_string_lossy() + .into_owned(), + ) + }; + + Ok(Self { + inner: value, + function_name: name, + }) + } _ => sedona_internal_err!("Can't import released or uninitialized SedonaCScalarKernel"), } } } +impl ImportedScalarKernel { + pub fn function_name(&self) -> Option<&str> { + self.function_name.as_deref() + } +} + impl SedonaScalarKernel for ImportedScalarKernel { fn return_type_from_args_and_scalars( &self, @@ -281,11 +305,15 @@ impl CScalarKernelImplWrapper { pub struct ExportedScalarKernel { inner: ScalarKernelRef, + function_name: Option, } impl From for ExportedScalarKernel { fn from(value: ScalarKernelRef) -> Self { - ExportedScalarKernel { inner: value } + ExportedScalarKernel { + inner: value, + function_name: None, + } } } @@ -293,6 +321,7 @@ impl From for SedonaCScalarKernel { fn from(value: ExportedScalarKernel) -> Self { let box_value = Box::new(value); Self { + function_name: Some(c_factory_function_name), new_impl: Some(c_factory_new_impl), release: Some(c_factory_release), private_data: Box::leak(box_value) as *mut ExportedScalarKernel as *mut c_void, @@ -301,11 +330,33 @@ impl From for SedonaCScalarKernel { } impl ExportedScalarKernel { + pub fn with_function_name(self, function_name: String) -> Self { + Self { + inner: self.inner, + function_name: Some(CString::from_str(&function_name).unwrap()), + } + } + fn new_impl(&self) -> ExportedScalarKernelImpl { ExportedScalarKernelImpl::new(self.inner.clone()) } } +unsafe extern "C" fn c_factory_function_name(self_: *const SedonaCScalarKernel) -> *const c_char { + assert!(!self_.is_null()); + let self_ref = self_.as_ref().unwrap(); + + assert!(!self_ref.private_data.is_null()); + let private_data = (self_ref.private_data as *mut ExportedScalarKernel) + .as_ref() + .unwrap(); + if let Some(function_name) = &private_data.function_name { + function_name.as_ptr() + } else { + null() + } +} + unsafe extern "C" fn c_factory_new_impl( self_: *const SedonaCScalarKernel, out: *mut SedonaCScalarKernelImpl, diff --git a/c/sedona-extension/src/sedona_extension.h b/c/sedona-extension/src/sedona_extension.h index 4412d142d..0afac515e 100644 --- a/c/sedona-extension/src/sedona_extension.h +++ b/c/sedona-extension/src/sedona_extension.h @@ -174,11 +174,17 @@ struct SedonaCScalarKernelImpl { /// (although it may be reused if a caller can serialize callback use). This /// structure is a factory object that initializes such objects. struct SedonaCScalarKernel { + /// \brief Function name + /// + /// Optional function name. This is used to register the kernal with the + /// appropriate function when passing this kernel across a boundary. + const char* (*function_name)(const struct SedonaCScalarKernel* self); + /// \brief Initialize a new implementation struct /// /// This callback is thread safe and may be called concurrently from any /// thread at any time (as long as this object is valid). - void (*new_impl)(struct SedonaCScalarKernel* self, struct SedonaCScalarKernelImpl* out); + void (*new_impl)(const struct SedonaCScalarKernel* self, struct SedonaCScalarKernelImpl* out); /// \brief Release this instance /// From 64614280014a8ad625696e17780593539e01145e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 14:34:55 -0600 Subject: [PATCH 14/20] names --- c/sedona-extension/src/scalar_kernel.rs | 25 +++++++++++++++++++++-- c/sedona-extension/src/sedona_extension.h | 5 +++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index a32f0894b..d894603e5 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -330,10 +330,10 @@ impl From for SedonaCScalarKernel { } impl ExportedScalarKernel { - pub fn with_function_name(self, function_name: String) -> Self { + pub fn with_function_name(self, function_name: impl AsRef) -> Self { Self { inner: self.inner, - function_name: Some(CString::from_str(&function_name).unwrap()), + function_name: Some(CString::from_str(function_name.as_ref()).unwrap()), } } @@ -679,6 +679,27 @@ mod test { ); } + #[test] + fn named_kernel() { + let kernel = SimpleSedonaScalarKernel::new_ref( + ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY), + Arc::new(|_, args| Ok(args[0].clone())), + ); + + // Without intervening, we have a None name + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + assert!(imported_kernel.function_name().is_none()); + + // If we set a function name, it should be roundtripped + let exported_kernel = + ExportedScalarKernel::from(kernel.clone()).with_function_name("foofy"); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + assert_eq!(imported_kernel.function_name(), Some("foofy")); + } + #[test] fn erroring_invoke_batch() { let kernel = SimpleSedonaScalarKernel::new_ref( diff --git a/c/sedona-extension/src/sedona_extension.h b/c/sedona-extension/src/sedona_extension.h index 0afac515e..4f251a4d3 100644 --- a/c/sedona-extension/src/sedona_extension.h +++ b/c/sedona-extension/src/sedona_extension.h @@ -176,7 +176,7 @@ struct SedonaCScalarKernelImpl { struct SedonaCScalarKernel { /// \brief Function name /// - /// Optional function name. This is used to register the kernal with the + /// Optional function name. This is used to register the kernel with the /// appropriate function when passing this kernel across a boundary. const char* (*function_name)(const struct SedonaCScalarKernel* self); @@ -184,7 +184,8 @@ struct SedonaCScalarKernel { /// /// This callback is thread safe and may be called concurrently from any /// thread at any time (as long as this object is valid). - void (*new_impl)(const struct SedonaCScalarKernel* self, struct SedonaCScalarKernelImpl* out); + void (*new_impl)(const struct SedonaCScalarKernel* self, + struct SedonaCScalarKernelImpl* out); /// \brief Release this instance /// From ac295efb024b879cfc6d061f556bd3cdc6f58803 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 14:37:53 -0600 Subject: [PATCH 15/20] docs --- c/sedona-extension/src/sedona_extension.h | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/c/sedona-extension/src/sedona_extension.h b/c/sedona-extension/src/sedona_extension.h index 4f251a4d3..5f4c32ebd 100644 --- a/c/sedona-extension/src/sedona_extension.h +++ b/c/sedona-extension/src/sedona_extension.h @@ -117,9 +117,12 @@ struct ArrowArrayStream { /// access to methods if an instance is shared across threads. In general, /// constructing and initializing this structure should be sufficiently /// cheap that it shouldn't need to be shared in this way. +/// +/// Briefly, the SedonaCScalarKernelImpl is typically the stack-allocated +/// structure that is not thread safe and the SedonaCScalarKernel is the +/// value that lives in a registry (whose job it is to initialize implementations). struct SedonaCScalarKernelImpl { - /// \brief Initialize the state of this UDF instance and calculate a return - /// type + /// \brief Initialize the state of this instance and calculate a return type /// /// The init callback either computes a return ArrowSchema or initializes the /// return ArrowSchema to an explicitly released value to indicate that this @@ -168,11 +171,12 @@ struct SedonaCScalarKernelImpl { void* private_data; }; -/// \brief Scalar function initializer +/// \brief Scalar function/kernel initializer /// /// Usually a GeoArrowScalarUdf will be used to execute a single batch /// (although it may be reused if a caller can serialize callback use). This -/// structure is a factory object that initializes such objects. +/// structure is a factory object that initializes such objects. The +/// SedonaCScalarKernel is designed to be thread-safe and live in a registry. struct SedonaCScalarKernel { /// \brief Function name /// From 4588ffc8e34217d12360f192e6252ded92a9fef5 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 14:44:17 -0600 Subject: [PATCH 16/20] test from scalar --- c/sedona-extension/src/scalar_kernel.rs | 52 +++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index d894603e5..44201271b 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -613,6 +613,7 @@ unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarKernelImpl) { mod test { use std::sync::Arc; + use arrow_schema::DataType; use datafusion_common::exec_err; use datafusion_expr::Volatility; use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel}; @@ -700,6 +701,57 @@ mod test { assert_eq!(imported_kernel.function_name(), Some("foofy")); } + #[test] + fn invoke_batch_from_scalar() { + let kernel = Arc::new(ReturnTypeFromScalars {}) as ScalarKernelRef; + let exported_kernel = ExportedScalarKernel::from(kernel.clone()); + let ffi_kernel = SedonaCScalarKernel::from(exported_kernel); + let imported_kernel = ImportedScalarKernel::try_from(ffi_kernel).unwrap(); + + let udf_from_ffi = SedonaScalarUDF::new( + "simple_udf_from_ffi", + vec![Arc::new(imported_kernel)], + Volatility::Immutable, + None, + ); + + let ffi_tester = ScalarUdfTester::new( + udf_from_ffi.clone().into(), + vec![SedonaType::Arrow(DataType::Utf8)], + ); + let return_type = ffi_tester.return_type_with_scalar(Some("foofy")).unwrap(); + assert_eq!(return_type, SedonaType::Arrow(DataType::Utf8)); + } + + #[derive(Debug)] + struct ReturnTypeFromScalars {} + + impl SedonaScalarKernel for ReturnTypeFromScalars { + fn return_type_from_args_and_scalars( + &self, + _args: &[SedonaType], + scalar_args: &[Option<&ScalarValue>], + ) -> Result> { + if let Some(arg0) = scalar_args[0] { + Ok(Some(SedonaType::Arrow(arg0.data_type()))) + } else { + Ok(None) + } + } + + fn return_type(&self, _args: &[SedonaType]) -> Result> { + unreachable!() + } + + fn invoke_batch( + &self, + _arg_types: &[SedonaType], + _args: &[ColumnarValue], + ) -> Result { + unreachable!() + } + } + #[test] fn erroring_invoke_batch() { let kernel = SimpleSedonaScalarKernel::new_ref( From 0b412b66c461f471344246c09e88286e85e39dd8 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 16:52:34 -0600 Subject: [PATCH 17/20] document --- c/sedona-extension/src/scalar_kernel.rs | 70 +++++++++++++++++++++-- c/sedona-extension/src/sedona_extension.h | 5 +- 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index 44201271b..17467dc70 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -35,6 +35,10 @@ use std::{ use crate::extension::{ffi_arrow_schema_is_valid, SedonaCScalarKernel, SedonaCScalarKernelImpl}; +/// Wrapper around a [SedonaCScalarKernel] that implements [SedonaScalarKernel] +/// +/// This is the means by which a kernel implementation may be imported from a +/// C implementation. pub struct ImportedScalarKernel { inner: SedonaCScalarKernel, function_name: Option, @@ -151,6 +155,8 @@ impl SedonaScalarKernel for ImportedScalarKernel { } } +/// Wrapper class handling the verbose details of preparing and executing FFI calls +/// for the [SedonaCScalarKernelImpl] struct CScalarKernelImplWrapper { inner: SedonaCScalarKernelImpl, } @@ -175,22 +181,28 @@ impl CScalarKernelImplWrapper { return sedona_internal_err!("field/scalar lengths must be identical"); } + // Convert arg_types to Vec let arg_fields = arg_types .iter() .map(|sedona_type| sedona_type.to_storage_field("", true)) .collect::>>()?; + + // Convert arg types to Vec let ffi_fields = arg_fields .iter() .map(FFI_ArrowSchema::try_from) .collect::, ArrowError>>()?; + + // Convert arg types to Vec<*const FFI_ArrowSchema> let ffi_field_ptrs = ffi_fields .iter() .map(|ffi_field| ffi_field as *const FFI_ArrowSchema) .collect::>(); + // Convert arg_scalars to Vec> let mut ffi_scalars = arg_scalars .iter() - .map(|maybe_scalar| -> Result> { + .map(|maybe_scalar| { if let Some(scalar) = maybe_scalar { let array = scalar.to_array()?; Ok(Some(FFI_ArrowArray::new(&array.to_data()))) @@ -200,6 +212,7 @@ impl CScalarKernelImplWrapper { }) .collect::>>()?; + // Convert arg_scalars to Vec<*mut FFI_ArrowArray> let mut ffi_scalar_ptrs = ffi_scalars .iter_mut() .map(|maybe_ffi_scalar| match maybe_ffi_scalar { @@ -208,6 +221,7 @@ impl CScalarKernelImplWrapper { }) .collect::>(); + // Call the FFI implementation of init if let Some(init) = self.inner.init { let mut ffi_out = FFI_ArrowSchema::empty(); let code = unsafe { @@ -220,6 +234,10 @@ impl CScalarKernelImplWrapper { ) }; + // On success, convert the output to SedonaType. If the implementation + // returned a "released" schema, this is the equivalent to our return_type() + // returning None (for "this kernel doesn't apply"). + // On error, query the FFI implementation for the last error string. if code == 0 { if ffi_arrow_schema_is_valid(&ffi_out) { let field = Field::try_from(&ffi_out)?; @@ -244,6 +262,7 @@ impl CScalarKernelImplWrapper { return_type: &SedonaType, num_rows: usize, ) -> Result { + // Convert args to Vec let arg_arrays = args .iter() .map(|arg| match arg { @@ -251,11 +270,14 @@ impl CScalarKernelImplWrapper { ColumnarValue::Scalar(scalar_value) => scalar_value.to_array(), }) .collect::>>()?; + + // Convert args to Vec let mut ffi_args = arg_arrays .iter() .map(|arg| FFI_ArrowArray::new(&arg.to_data())) .collect::>(); + // Call the FFI implementation of execute() if let Some(execute) = self.inner.execute { let mut ffi_out = FFI_ArrowArray::empty(); let code = unsafe { @@ -268,6 +290,8 @@ impl CScalarKernelImplWrapper { ) }; + // On success, convert the result to an ArrayRef. + // On error, query the FFI implementation for the last error string. if code == 0 { let data = unsafe { arrow_array::ffi::from_ffi_and_data_type( @@ -287,6 +311,7 @@ impl CScalarKernelImplWrapper { } } + /// Helper to get the last error from the FFI implementation as a Rust String fn last_error(&mut self, code: c_int) -> String { if let Some(get_last_error) = self.inner.get_last_error { let c_err = unsafe { get_last_error(&mut self.inner) }; @@ -303,6 +328,8 @@ impl CScalarKernelImplWrapper { } } +/// Wrapper around a [ScalarKernelRef] that may be used to export an existing +/// kernel across an FFI boundary using the [SedonaCScalarKernel] pub struct ExportedScalarKernel { inner: ScalarKernelRef, function_name: Option, @@ -330,6 +357,10 @@ impl From for SedonaCScalarKernel { } impl ExportedScalarKernel { + /// Add a function name to this exported kernel + /// + /// This ensures the kernel will be registered with the appropriate function + /// when passed across a boundary. pub fn with_function_name(self, function_name: impl AsRef) -> Self { Self { inner: self.inner, @@ -342,6 +373,7 @@ impl ExportedScalarKernel { } } +/// C callable wrapper to expose [ExportedScalarKernel::function_name] unsafe extern "C" fn c_factory_function_name(self_: *const SedonaCScalarKernel) -> *const c_char { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); @@ -357,6 +389,7 @@ unsafe extern "C" fn c_factory_function_name(self_: *const SedonaCScalarKernel) } } +/// C callable wrapper around [ExportedScalarKernel::new_impl] unsafe extern "C" fn c_factory_new_impl( self_: *const SedonaCScalarKernel, out: *mut SedonaCScalarKernelImpl, @@ -371,15 +404,20 @@ unsafe extern "C" fn c_factory_new_impl( *out = SedonaCScalarKernelImpl::from(private_data.new_impl()) } +/// C Callable wrapper called when this value is dropped via FFI unsafe extern "C" fn c_factory_release(self_: *mut SedonaCScalarKernel) { assert!(!self_.is_null()); - let self_ref = self_.as_ref().unwrap(); + let self_ref = self_.as_mut().unwrap(); assert!(!self_ref.private_data.is_null()); let boxed = Box::from_raw(self_ref.private_data as *mut ExportedScalarKernel); drop(boxed); + + self_ref.private_data = null_mut(); + self_ref.release = None; } +/// Rust-backed implementation of [SedonaCScalarKernelImpl] struct ExportedScalarKernelImpl { inner: ScalarKernelRef, last_arg_types: Option>, @@ -401,7 +439,7 @@ impl From for SedonaCScalarKernelImpl { } impl ExportedScalarKernelImpl { - pub fn new(kernel: ScalarKernelRef) -> Self { + fn new(kernel: ScalarKernelRef) -> Self { Self { inner: kernel, last_arg_types: None, @@ -415,6 +453,7 @@ impl ExportedScalarKernelImpl { ffi_types: &[*const FFI_ArrowSchema], ffi_scalar_args: &[*mut FFI_ArrowArray], ) -> Result> { + // Convert the input types to Vec let arg_fields = ffi_types .iter() .map(|ptr| { @@ -427,11 +466,14 @@ impl ExportedScalarKernelImpl { } }) .collect::, ArrowError>>()?; + + // Convert the input types to Vec let args = arg_fields .iter() .map(SedonaType::from_storage_field) .collect::>>()?; + // Convert the scalar arguments to Vec> let arg_arrays = zip(ffi_scalar_args, &args) .map(|(ptr, arg)| { if ptr.is_null() { @@ -446,6 +488,7 @@ impl ExportedScalarKernelImpl { }) .collect::, ArrowError>>()?; + // Convert the scalar arguments to Vec> let scalar_args = arg_arrays .iter() .map(|maybe_array| { @@ -456,14 +499,19 @@ impl ExportedScalarKernelImpl { } }) .collect::>>()?; + + // Convert the scalar arguments to Vec> let scalar_arg_refs = scalar_args .iter() .map(|arg| arg.as_ref()) .collect::>(); + // Call the implementation let maybe_return_type = self .inner .return_type_from_args_and_scalars(&args, &scalar_arg_refs)?; + + // Convert the result to FFI_ArrowSchema (if not None) let return_ffi_schema = if let Some(return_type) = &maybe_return_type { let return_field = return_type.to_storage_field("", true)?; let return_ffi_schema = FFI_ArrowSchema::try_from(&return_field)?; @@ -472,6 +520,7 @@ impl ExportedScalarKernelImpl { None }; + // Save the argument types and return type for following calls to execute() self.last_arg_types.replace(args); self.last_return_type = maybe_return_type; @@ -481,6 +530,7 @@ impl ExportedScalarKernelImpl { fn execute(&self, ffi_args: &[*mut FFI_ArrowArray], num_rows: i64) -> Result { match (&self.last_arg_types, &self.last_return_type) { (Some(arg_types), Some(return_type)) => { + // Resolve args as Vec let arg_arrays = zip(ffi_args, arg_types) .map(|(ptr, arg)| { let owned_ffi_array = unsafe { FFI_ArrowArray::from_raw(*ptr) }; @@ -491,6 +541,7 @@ impl ExportedScalarKernelImpl { }) .collect::, ArrowError>>()?; + // Resolve args as Vec let args = arg_arrays .into_iter() .map(|array| { @@ -504,17 +555,21 @@ impl ExportedScalarKernelImpl { }) .collect::>>()?; + // Call the implementation let result_value = self.inner.invoke_batch_from_args( arg_types, &args, return_type, num_rows as usize, )?; + + // Convert the result to an ArrayRef let result_array = match result_value { ColumnarValue::Array(array) => array, ColumnarValue::Scalar(scalar_value) => scalar_value.to_array()?, }; + // Convert the result to a FFI_ArrowArray let result_ffi_array = FFI_ArrowArray::new(&result_array.to_data()); Ok(result_ffi_array) } @@ -525,6 +580,7 @@ impl ExportedScalarKernelImpl { } } +/// C callable wrapper around [ExportedScalarKernelImpl::init] unsafe extern "C" fn c_kernel_init( self_: *mut SedonaCScalarKernelImpl, arg_types: *const *const FFI_ArrowSchema, @@ -560,6 +616,7 @@ unsafe extern "C" fn c_kernel_init( } } +/// C callable wrapper around [ExportedScalarKernelImpl::execute] unsafe extern "C" fn c_kernel_execute( self_: *mut SedonaCScalarKernelImpl, args: *mut *mut FFI_ArrowArray, @@ -589,6 +646,7 @@ unsafe extern "C" fn c_kernel_execute( } } +/// C Callable wrapper to retrieve the last error string unsafe extern "C" fn c_kernel_last_error(self_: *mut SedonaCScalarKernelImpl) -> *const c_char { assert!(!self_.is_null()); let self_ref = self_.as_ref().unwrap(); @@ -600,13 +658,17 @@ unsafe extern "C" fn c_kernel_last_error(self_: *mut SedonaCScalarKernelImpl) -> private_data.last_error.as_ptr() } +/// C Callable wrapper called when this value is dropped via FFI unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarKernelImpl) { assert!(!self_.is_null()); - let self_ref = self_.as_ref().unwrap(); + let self_ref = self_.as_mut().unwrap(); assert!(!self_ref.private_data.is_null()); let boxed = Box::from_raw(self_ref.private_data as *mut ExportedScalarKernelImpl); drop(boxed); + + self_ref.private_data = null_mut(); + self_ref.release = None; } #[cfg(test)] diff --git a/c/sedona-extension/src/sedona_extension.h b/c/sedona-extension/src/sedona_extension.h index 5f4c32ebd..8e3d9363b 100644 --- a/c/sedona-extension/src/sedona_extension.h +++ b/c/sedona-extension/src/sedona_extension.h @@ -120,7 +120,8 @@ struct ArrowArrayStream { /// /// Briefly, the SedonaCScalarKernelImpl is typically the stack-allocated /// structure that is not thread safe and the SedonaCScalarKernel is the -/// value that lives in a registry (whose job it is to initialize implementations). +/// value that lives in a registry (whose job it is to initialize implementations +/// on each stack that needs one). struct SedonaCScalarKernelImpl { /// \brief Initialize the state of this instance and calculate a return type /// @@ -173,7 +174,7 @@ struct SedonaCScalarKernelImpl { /// \brief Scalar function/kernel initializer /// -/// Usually a GeoArrowScalarUdf will be used to execute a single batch +/// Usually a SedonaCScalarKernelImpl will be used to execute a single batch /// (although it may be reused if a caller can serialize callback use). This /// structure is a factory object that initializes such objects. The /// SedonaCScalarKernel is designed to be thread-safe and live in a registry. From 6854e70891a9b822020005af21f799be5341dc0e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 16:57:24 -0600 Subject: [PATCH 18/20] docs --- c/sedona-extension/src/extension.rs | 5 +++++ c/sedona-extension/src/lib.rs | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index 91b40ac20..d2d992448 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -23,6 +23,10 @@ use std::{ use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +/// Raw FFI representation of the SedonaCScalarKernel +/// +/// See the ImportedScalarKernel and ExportedScalarKernel for high-level +/// APIs to import and export implementations using this struct. #[derive(Default)] #[repr(C)] pub struct SedonaCScalarKernel { @@ -49,6 +53,7 @@ impl Drop for SedonaCScalarKernel { } } +/// Raw FFI representation of the SedonaCScalarKernelImpl #[derive(Default)] #[repr(C)] pub struct SedonaCScalarKernelImpl { diff --git a/c/sedona-extension/src/lib.rs b/c/sedona-extension/src/lib.rs index 073ed939b..adc0a9213 100644 --- a/c/sedona-extension/src/lib.rs +++ b/c/sedona-extension/src/lib.rs @@ -15,5 +15,5 @@ // specific language governing permissions and limitations // under the License. -pub mod extension; +pub(crate) mod extension; pub mod scalar_kernel; From d2b5fd1570dae66ca81e25880f70ecc519367f14 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 3 Dec 2025 19:45:10 -0600 Subject: [PATCH 19/20] comments --- c/sedona-extension/src/extension.rs | 6 +++--- c/sedona-extension/src/scalar_kernel.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index d2d992448..4b3680750 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -99,9 +99,9 @@ impl Drop for SedonaCScalarKernelImpl { /// /// The [FFI_ArrowSchema] doesn't have the ability to check for a NULL release callback, /// so we provide a mechanism to do so here. -pub fn ffi_arrow_schema_is_valid(ffi_schema: *const FFI_ArrowSchema) -> bool { - let ffi_schema_internal = ffi_schema as *const c_void as *const ArrowSchemaInternal; - if let Some(schema_ref) = unsafe { ffi_schema_internal.as_ref() } { +pub fn ffi_arrow_schema_is_valid(schema: *const FFI_ArrowSchema) -> bool { + let schema_internal = schema as *const c_void as *const ArrowSchemaInternal; + if let Some(schema_ref) = unsafe { schema_internal.as_ref() } { schema_ref.release.is_some() } else { false diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index 17467dc70..6fc5f9402 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -302,7 +302,7 @@ impl CScalarKernelImplWrapper { Ok(arrow_array::make_array(data)) } else { plan_err!( - "SedonaCScalarKernelImpl::init failed: {}", + "SedonaCScalarKernelImpl::execute failed: {}", self.last_error(code) ) } @@ -838,7 +838,7 @@ mod test { let err = ffi_tester.invoke_scalar("POINT (0 1)").unwrap_err(); assert_eq!( err.message(), - "SedonaCScalarKernelImpl::init failed: this invoke_batch() always errors" + "SedonaCScalarKernelImpl::execute failed: this invoke_batch() always errors" ); } From c9d676fce72ecc1a48e8b506352caadfac2e5384 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 9 Dec 2025 10:19:26 -0600 Subject: [PATCH 20/20] more restrictive args, allow null private data --- c/sedona-extension/src/extension.rs | 4 ++-- c/sedona-extension/src/scalar_kernel.rs | 23 +++++++++++------------ c/sedona-extension/src/sedona_extension.h | 8 +++++--- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/c/sedona-extension/src/extension.rs b/c/sedona-extension/src/extension.rs index 4b3680750..7d957a57c 100644 --- a/c/sedona-extension/src/extension.rs +++ b/c/sedona-extension/src/extension.rs @@ -61,7 +61,7 @@ pub struct SedonaCScalarKernelImpl { unsafe extern "C" fn( self_: *mut SedonaCScalarKernelImpl, arg_types: *const *const FFI_ArrowSchema, - scalar_args: *mut *mut FFI_ArrowArray, + scalar_args: *const *mut FFI_ArrowArray, n_args: i64, out: *mut FFI_ArrowSchema, ) -> c_int, @@ -70,7 +70,7 @@ pub struct SedonaCScalarKernelImpl { pub execute: Option< unsafe extern "C" fn( self_: *mut SedonaCScalarKernelImpl, - args: *mut *mut FFI_ArrowArray, + args: *const *mut FFI_ArrowArray, n_args: i64, n_rows: i64, out: *mut FFI_ArrowArray, diff --git a/c/sedona-extension/src/scalar_kernel.rs b/c/sedona-extension/src/scalar_kernel.rs index 6fc5f9402..17972356e 100644 --- a/c/sedona-extension/src/scalar_kernel.rs +++ b/c/sedona-extension/src/scalar_kernel.rs @@ -56,13 +56,8 @@ impl TryFrom for ImportedScalarKernel { type Error = DataFusionError; fn try_from(value: SedonaCScalarKernel) -> Result { - match ( - &value.function_name, - &value.new_impl, - &value.release, - value.private_data.is_null(), - ) { - (Some(function_name), Some(_), Some(_), false) => { + match (&value.function_name, &value.new_impl, &value.release) { + (Some(function_name), Some(_), Some(_)) => { let name_ptr = unsafe { function_name(&value) }; let name = if name_ptr.is_null() { None @@ -213,7 +208,7 @@ impl CScalarKernelImplWrapper { .collect::>>()?; // Convert arg_scalars to Vec<*mut FFI_ArrowArray> - let mut ffi_scalar_ptrs = ffi_scalars + let ffi_scalar_ptrs = ffi_scalars .iter_mut() .map(|maybe_ffi_scalar| match maybe_ffi_scalar { Some(ffi_scalar) => ffi_scalar as *mut FFI_ArrowArray, @@ -228,7 +223,7 @@ impl CScalarKernelImplWrapper { init( &mut self.inner, ffi_field_ptrs.as_ptr(), - ffi_scalar_ptrs.as_mut_ptr(), + ffi_scalar_ptrs.as_ptr(), arg_types.len() as i64, &mut ffi_out, ) @@ -276,6 +271,10 @@ impl CScalarKernelImplWrapper { .iter() .map(|arg| FFI_ArrowArray::new(&arg.to_data())) .collect::>(); + let ffi_arg_ptrs = ffi_args + .iter_mut() + .map(|arg| arg as *mut FFI_ArrowArray) + .collect::>(); // Call the FFI implementation of execute() if let Some(execute) = self.inner.execute { @@ -283,7 +282,7 @@ impl CScalarKernelImplWrapper { let code = unsafe { execute( &mut self.inner, - &mut ffi_args.as_mut_ptr(), + ffi_arg_ptrs.as_ptr(), args.len() as i64, num_rows as i64, &mut ffi_out, @@ -584,7 +583,7 @@ impl ExportedScalarKernelImpl { unsafe extern "C" fn c_kernel_init( self_: *mut SedonaCScalarKernelImpl, arg_types: *const *const FFI_ArrowSchema, - scalar_args: *mut *mut FFI_ArrowArray, + scalar_args: *const *mut FFI_ArrowArray, n_args: i64, out: *mut FFI_ArrowSchema, ) -> c_int { @@ -619,7 +618,7 @@ unsafe extern "C" fn c_kernel_init( /// C callable wrapper around [ExportedScalarKernelImpl::execute] unsafe extern "C" fn c_kernel_execute( self_: *mut SedonaCScalarKernelImpl, - args: *mut *mut FFI_ArrowArray, + args: *const *mut FFI_ArrowArray, n_args: i64, n_rows: i64, out: *mut FFI_ArrowArray, diff --git a/c/sedona-extension/src/sedona_extension.h b/c/sedona-extension/src/sedona_extension.h index 8e3d9363b..7191af21f 100644 --- a/c/sedona-extension/src/sedona_extension.h +++ b/c/sedona-extension/src/sedona_extension.h @@ -145,8 +145,10 @@ struct SedonaCScalarKernelImpl { /// passed. /// /// \return An errno-compatible error code, or zero on success. - int (*init)(struct SedonaCScalarKernelImpl* self, const struct ArrowSchema** arg_types, - struct ArrowArray** scalar_args, int64_t n_args, struct ArrowSchema* out); + int (*init)(struct SedonaCScalarKernelImpl* self, + const struct ArrowSchema* const* arg_types, + struct ArrowArray* const* scalar_args, int64_t n_args, + struct ArrowSchema* out); /// \brief Execute a single batch /// @@ -155,7 +157,7 @@ struct SedonaCScalarKernelImpl { /// inputs. /// \param n_args The number of pointers in args /// \param out Will be populated with the result on success. - int (*execute)(struct SedonaCScalarKernelImpl* self, struct ArrowArray** args, + int (*execute)(struct SedonaCScalarKernelImpl* self, struct ArrowArray* const* args, int64_t n_args, int64_t n_rows, struct ArrowArray* out); /// \brief Get the last error message