diff --git a/Cargo.lock b/Cargo.lock index 5e4aff24c07a..10e0621c191f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2288,6 +2288,7 @@ dependencies = [ "async-trait", "datafusion", "datafusion-common", + "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate-common", "datafusion-physical-expr", diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d9e6452d1db4..8849615927fb 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1860,6 +1860,12 @@ impl FunctionRegistry for SessionContext { } } +impl datafusion_execution::TaskContextProvider for SessionContext { + fn task_ctx(&self) -> Arc { + SessionContext::task_ctx(self) + } +} + /// Create a new task context instance from SessionContext impl From<&SessionContext> for TaskContext { fn from(session: &SessionContext) -> Self { diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 37ec27ef7537..c9ab98099324 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -1988,6 +1988,12 @@ impl FunctionRegistry for SessionState { } } +impl datafusion_execution::TaskContextProvider for SessionState { + fn task_ctx(&self) -> Arc { + SessionState::task_ctx(self) + } +} + impl OptimizerConfig for SessionState { fn query_execution_start_time(&self) -> DateTime { self.execution_props.query_execution_start_time diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 727e1702741e..1a8da9459ae1 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -47,4 +47,4 @@ pub mod registry { pub use disk_manager::DiskManager; pub use registry::FunctionRegistry; pub use stream::{RecordBatchStream, SendableRecordBatchStream}; -pub use task::TaskContext; +pub use task::{TaskContext, TaskContextProvider}; diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index c2a6cfe2c833..0be4f290a829 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -211,6 +211,11 @@ impl FunctionRegistry for TaskContext { } } +/// Produce the [`TaskContext`]. +pub trait TaskContextProvider { + fn task_ctx(&self) -> Arc; +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 126b59a16880..a06a9cf1839d 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -48,6 +48,7 @@ async-ffi = { version = "0.5.0", features = ["abi_stable"] } async-trait = { workspace = true } datafusion = { workspace = true, default-features = false } datafusion-common = { workspace = true } +datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-physical-expr = { workspace = true } diff --git a/datafusion/ffi/README.md b/datafusion/ffi/README.md index f8db1ac484a1..304ebb90f49d 100644 --- a/datafusion/ffi/README.md +++ b/datafusion/ffi/README.md @@ -170,6 +170,29 @@ your unit tests you should override this with `crate::mock_foreign_marker_id` to force your test to create the foreign variant of your struct. +## Task Context Provider + +Many of the FFI structs in this crate contain a `FFI_TaskContextProvider`. The +purpose of this struct is to _weakly_ hold a reference to a method to +access the current `TaskContext`. The reason we need this accessor is because +we use the `datafusion-proto` crate to serialize and deserialize data across +the FFI boundary. In particular, we need to serialize and deserialize +functions using a `TaskContext`, which implements `FunctionRegistry`. + +This becomes difficult because we may need to register multiple user defined +functions, table or catalog providers, etc with a `Session`, and each of these +will need the `TaskContext` to perform the processing. For this reason we +cannot simply include the `TaskContext` at the time of registration because +it would not have knowledge of anything registered afterward. + +The `FFI_TaskContextProvider` is built from a trait that provides a method +to get the current `TaskContext`. `FFI_TaskContextProvider` only holds a +`Weak` reference to the `TaskContextProvider`, because otherwise we could +create a circular dependency at runtime. It is imperative that if you use +these methods that your provider remains valid for the lifetime of the +calls. The `FFI_TaskContextProvider` is implemented on `SessionContext` +and it is easy to implement on any struct that implements `Session`. + [apache datafusion]: https://datafusion.apache.org/ [api docs]: http://docs.rs/datafusion-ffi/latest [rust abi]: https://doc.rust-lang.org/reference/abi.html diff --git a/datafusion/ffi/src/execution/mod.rs b/datafusion/ffi/src/execution/mod.rs new file mode 100644 index 000000000000..41107947fff0 --- /dev/null +++ b/datafusion/ffi/src/execution/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod task_ctx; +pub mod task_ctx_provider; + +pub use task_ctx::FFI_TaskContext; +pub use task_ctx_provider::FFI_TaskContextProvider; diff --git a/datafusion/ffi/src/execution/task_ctx.rs b/datafusion/ffi/src/execution/task_ctx.rs new file mode 100644 index 000000000000..bf57a2607a07 --- /dev/null +++ b/datafusion/ffi/src/execution/task_ctx.rs @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ffi::c_void; +use std::sync::Arc; + +use abi_stable::pmr::ROption; +use abi_stable::std_types::{RHashMap, RString}; +use abi_stable::StableAbi; +use datafusion_execution::config::SessionConfig; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_execution::TaskContext; +use datafusion_expr::{ + AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl, +}; + +use crate::session_config::FFI_SessionConfig; +use crate::udaf::FFI_AggregateUDF; +use crate::udf::FFI_ScalarUDF; +use crate::udwf::FFI_WindowUDF; + +/// A stable struct for sharing [`TaskContext`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_TaskContext { + /// Return the session ID. + pub session_id: unsafe extern "C" fn(&Self) -> RString, + + /// Return the task ID. + pub task_id: unsafe extern "C" fn(&Self) -> ROption, + + /// Return the session configuration. + pub session_config: unsafe extern "C" fn(&Self) -> FFI_SessionConfig, + + /// Returns a hashmap of names to scalar functions. + pub scalar_functions: unsafe extern "C" fn(&Self) -> RHashMap, + + /// Returns a hashmap of names to aggregate functions. + pub aggregate_functions: + unsafe extern "C" fn(&Self) -> RHashMap, + + /// Returns a hashmap of names to window functions. + pub window_functions: unsafe extern "C" fn(&Self) -> RHashMap, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Internal data. This is only to be accessed by the provider of the plan. + /// The foreign library should never attempt to access this data. + pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, +} + +struct TaskContextPrivateData { + ctx: Arc, +} + +impl FFI_TaskContext { + unsafe fn inner(&self) -> &Arc { + let private_data = self.private_data as *const TaskContextPrivateData; + &(*private_data).ctx + } +} + +unsafe extern "C" fn session_id_fn_wrapper(ctx: &FFI_TaskContext) -> RString { + let ctx = ctx.inner(); + ctx.session_id().into() +} + +unsafe extern "C" fn task_id_fn_wrapper(ctx: &FFI_TaskContext) -> ROption { + let ctx = ctx.inner(); + ctx.task_id().map(|s| s.as_str().into()).into() +} + +unsafe extern "C" fn session_config_fn_wrapper( + ctx: &FFI_TaskContext, +) -> FFI_SessionConfig { + let ctx = ctx.inner(); + ctx.session_config().into() +} + +unsafe extern "C" fn scalar_functions_fn_wrapper( + ctx: &FFI_TaskContext, +) -> RHashMap { + let ctx = ctx.inner(); + ctx.scalar_functions() + .iter() + .map(|(name, udf)| (name.to_owned().into(), Arc::clone(udf).into())) + .collect() +} + +unsafe extern "C" fn aggregate_functions_fn_wrapper( + ctx: &FFI_TaskContext, +) -> RHashMap { + let ctx = ctx.inner(); + ctx.aggregate_functions() + .iter() + .map(|(name, udaf)| { + ( + name.to_owned().into(), + FFI_AggregateUDF::from(Arc::clone(udaf)), + ) + }) + .collect() +} + +unsafe extern "C" fn window_functions_fn_wrapper( + ctx: &FFI_TaskContext, +) -> RHashMap { + let ctx = ctx.inner(); + ctx.window_functions() + .iter() + .map(|(name, udf)| (name.to_owned().into(), FFI_WindowUDF::from(Arc::clone(udf)))) + .collect() +} + +unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_TaskContext) { + let private_data = Box::from_raw(ctx.private_data as *mut TaskContextPrivateData); + drop(private_data); +} + +impl Drop for FFI_TaskContext { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl From> for FFI_TaskContext { + fn from(ctx: Arc) -> Self { + let private_data = Box::new(TaskContextPrivateData { ctx }); + + FFI_TaskContext { + session_id: session_id_fn_wrapper, + task_id: task_id_fn_wrapper, + session_config: session_config_fn_wrapper, + scalar_functions: scalar_functions_fn_wrapper, + aggregate_functions: aggregate_functions_fn_wrapper, + window_functions: window_functions_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, + } + } +} + +impl From for Arc { + fn from(ffi_ctx: FFI_TaskContext) -> Self { + unsafe { + if (ffi_ctx.library_marker_id)() == crate::get_library_marker_id() { + return Arc::clone(ffi_ctx.inner()); + } + + let task_id = (ffi_ctx.task_id)(&ffi_ctx).map(|s| s.to_string()).into(); + let session_id = (ffi_ctx.session_id)(&ffi_ctx).into(); + let session_config = (ffi_ctx.session_config)(&ffi_ctx); + let session_config = + SessionConfig::try_from(&session_config).unwrap_or_default(); + + let scalar_functions = (ffi_ctx.scalar_functions)(&ffi_ctx) + .into_iter() + .map(|kv_pair| { + let udf = >::from(&kv_pair.1); + + ( + kv_pair.0.into_string(), + Arc::new(ScalarUDF::new_from_shared_impl(udf)), + ) + }) + .collect(); + let aggregate_functions = (ffi_ctx.aggregate_functions)(&ffi_ctx) + .into_iter() + .map(|kv_pair| { + let udaf = >::from(&kv_pair.1); + + ( + kv_pair.0.into_string(), + Arc::new(AggregateUDF::new_from_shared_impl(udaf)), + ) + }) + .collect(); + let window_functions = (ffi_ctx.window_functions)(&ffi_ctx) + .into_iter() + .map(|kv_pair| { + let udwf = >::from(&kv_pair.1); + + ( + kv_pair.0.into_string(), + Arc::new(WindowUDF::new_from_shared_impl(udwf)), + ) + }) + .collect(); + + let runtime = Arc::new(RuntimeEnv::default()); + + Arc::new(TaskContext::new( + task_id, + session_id, + session_config, + scalar_functions, + aggregate_functions, + window_functions, + runtime, + )) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::prelude::SessionContext; + use datafusion_common::Result; + use datafusion_execution::TaskContext; + + use crate::execution::FFI_TaskContext; + + #[test] + fn ffi_task_ctx_round_trip() -> Result<()> { + let session_ctx = SessionContext::new(); + let original = session_ctx.task_ctx(); + let mut ffi_task_ctx = FFI_TaskContext::from(Arc::clone(&original)); + ffi_task_ctx.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_task_ctx: Arc = ffi_task_ctx.into(); + + // TaskContext doesn't implement Eq (nor should it) so check some of the + // data is round tripping correctly. + + assert_eq!( + original.scalar_functions(), + foreign_task_ctx.scalar_functions() + ); + assert_eq!( + original.aggregate_functions(), + foreign_task_ctx.aggregate_functions() + ); + assert_eq!( + original.window_functions(), + foreign_task_ctx.window_functions() + ); + assert_eq!(original.task_id(), foreign_task_ctx.task_id()); + assert_eq!(original.session_id(), foreign_task_ctx.session_id()); + assert_eq!( + format!("{:?}", original.session_config()), + format!("{:?}", foreign_task_ctx.session_config()) + ); + + Ok(()) + } +} diff --git a/datafusion/ffi/src/execution/task_ctx_provider.rs b/datafusion/ffi/src/execution/task_ctx_provider.rs new file mode 100644 index 000000000000..b9861a68d0ac --- /dev/null +++ b/datafusion/ffi/src/execution/task_ctx_provider.rs @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ffi::c_void; +use std::sync::{Arc, Weak}; + +use abi_stable::StableAbi; +use datafusion_common::{ffi_datafusion_err, DataFusionError}; +use datafusion_execution::{TaskContext, TaskContextProvider}; + +use crate::execution::task_ctx::FFI_TaskContext; +use crate::util::FFIResult; +use crate::{df_result, rresult}; + +/// Struct for accessing the [`TaskContext`]. This method contains a weak +/// reference, so there are no guarantees that the [`TaskContext`] remains +/// valid. This is used primarily for protobuf encoding and decoding of +/// data passed across the FFI boundary. See the crate README for +/// additional information. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_TaskContextProvider { + /// Retrieve the current [`TaskContext`] provided the provider has not + /// gone out of scope. This function will return an error if the weakly + /// held reference to the underlying [`TaskContextProvider`] is no longer + /// available. + pub task_ctx: unsafe extern "C" fn(&Self) -> FFIResult, + + /// Used to create a clone on the task context accessor. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Internal data. This is only to be accessed by the provider of the plan. + /// The foreign library should never attempt to access this data. + pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, +} + +unsafe impl Send for FFI_TaskContextProvider {} +unsafe impl Sync for FFI_TaskContextProvider {} + +struct TaskContextProviderPrivateData { + ctx: Weak, +} + +impl FFI_TaskContextProvider { + unsafe fn inner(&self) -> Option> { + let private_data = self.private_data as *const TaskContextProviderPrivateData; + (*private_data).ctx.upgrade().map(|ctx| ctx.task_ctx()) + } +} + +unsafe extern "C" fn task_ctx_fn_wrapper( + ctx_provider: &FFI_TaskContextProvider, +) -> FFIResult { + rresult!(ctx_provider + .inner() + .map(FFI_TaskContext::from) + .ok_or_else(|| { + ffi_datafusion_err!( + "TaskContextProvider went out of scope over FFI boundary." + ) + })) +} + +unsafe extern "C" fn clone_fn_wrapper( + provider: &FFI_TaskContextProvider, +) -> FFI_TaskContextProvider { + let private_data = provider.private_data as *const TaskContextProviderPrivateData; + let ctx = Weak::clone(&(*private_data).ctx); + + let private_data = Box::new(TaskContextProviderPrivateData { ctx }); + + FFI_TaskContextProvider { + task_ctx: task_ctx_fn_wrapper, + release: release_fn_wrapper, + clone: clone_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, + } +} +unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_TaskContextProvider) { + let private_data = + Box::from_raw(ctx.private_data as *mut TaskContextProviderPrivateData); + drop(private_data); +} +impl Drop for FFI_TaskContextProvider { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl Clone for FFI_TaskContextProvider { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +impl From<&Arc> for FFI_TaskContextProvider { + fn from(ctx: &Arc) -> Self { + let ctx = Arc::downgrade(ctx); + let private_data = Box::new(TaskContextProviderPrivateData { ctx }); + + FFI_TaskContextProvider { + task_ctx: task_ctx_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, + } + } +} + +impl TryFrom<&FFI_TaskContextProvider> for Arc { + type Error = DataFusionError; + fn try_from(ffi_ctx: &FFI_TaskContextProvider) -> Result { + unsafe { + if (ffi_ctx.library_marker_id)() == crate::get_library_marker_id() { + return ffi_ctx.inner().ok_or_else(|| { + ffi_datafusion_err!( + "TaskContextProvider went out of scope over FFI boundary." + ) + }); + } + + df_result!((ffi_ctx.task_ctx)(ffi_ctx)).map(Into::into) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion_common::{DataFusionError, Result}; + use datafusion_execution::{TaskContext, TaskContextProvider}; + + use crate::execution::FFI_TaskContextProvider; + + #[derive(Default)] + struct TestCtxProvider { + ctx: Arc, + } + + impl TaskContextProvider for TestCtxProvider { + fn task_ctx(&self) -> Arc { + Arc::clone(&self.ctx) + } + } + + #[test] + fn ffi_task_context_provider_round_trip() -> Result<()> { + let ctx = Arc::new(TestCtxProvider::default()) as Arc; + let mut ffi_ctx_provider: FFI_TaskContextProvider = (&Arc::clone(&ctx)).into(); + ffi_ctx_provider.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_task_ctx: Arc = (&ffi_ctx_provider).try_into()?; + + assert_eq!( + format!("{foreign_task_ctx:?}"), + format!("{:?}", ctx.task_ctx()) + ); + + Ok(()) + } + + #[test] + fn ffi_task_context_provider_clone() -> Result<()> { + let ctx = Arc::new(TestCtxProvider::default()) as Arc; + let first_provider: FFI_TaskContextProvider = (&ctx).into(); + + let second_provider = first_provider.clone(); + + let first_ctx: Arc = (&first_provider).try_into()?; + let second_ctx: Arc = (&second_provider).try_into()?; + + assert!(Arc::ptr_eq(&first_ctx, &second_ctx)); + + Ok(()) + } + + #[test] + fn ffi_task_context_provider_out_of_scope() { + fn create_ffi_out_of_scope() -> FFI_TaskContextProvider { + let ctx = + Arc::new(TestCtxProvider::default()) as Arc; + (&ctx).into() + } + + let provider = create_ffi_out_of_scope(); + let failed_ctx = >::try_from(&provider); + + let Err(DataFusionError::Ffi(_)) = failed_ctx else { + panic!("Expected out of scope error") + }; + } +} diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index c4aa9c9d1aaa..fbb45a8028fc 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -28,6 +28,7 @@ pub mod arrow_wrappers; pub mod catalog_provider; pub mod catalog_provider_list; +pub mod execution; pub mod execution_plan; pub mod expr; pub mod insert_op; diff --git a/datafusion/ffi/src/session_config.rs b/datafusion/ffi/src/session_config.rs index ae28d3ac472d..5fbd6175e0b7 100644 --- a/datafusion/ffi/src/session_config.rs +++ b/datafusion/ffi/src/session_config.rs @@ -15,17 +15,13 @@ // specific language governing permissions and limitations // under the License. -use abi_stable::{ - std_types::{RHashMap, RString}, - StableAbi, -}; -use datafusion::{config::ConfigOptions, error::Result}; -use datafusion::{error::DataFusionError, prelude::SessionConfig}; -use std::sync::Arc; -use std::{ - collections::HashMap, - ffi::{c_char, c_void, CString}, -}; +use std::collections::HashMap; +use std::ffi::c_void; + +use abi_stable::std_types::{RHashMap, RString}; +use abi_stable::StableAbi; +use datafusion_common::error::{DataFusionError, Result}; +use datafusion_execution::config::SessionConfig; /// A stable struct for sharing [`SessionConfig`] across FFI boundaries. /// Instead of attempting to expose the entire SessionConfig interface, we @@ -54,18 +50,28 @@ pub struct FFI_SessionConfig { pub release: unsafe extern "C" fn(arg: &mut Self), /// Internal data. This is only to be accessed by the provider of the plan. - /// A [`ForeignSessionConfig`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_SessionConfig {} unsafe impl Sync for FFI_SessionConfig {} +impl FFI_SessionConfig { + fn inner(&self) -> &SessionConfig { + let private_data = self.private_data as *mut SessionConfigPrivateData; + unsafe { &(*private_data).config } + } +} + unsafe extern "C" fn config_options_fn_wrapper( config: &FFI_SessionConfig, ) -> RHashMap { - let private_data = config.private_data as *mut SessionConfigPrivateData; - let config_options = &(*private_data).config; + let config_options = config.inner().options(); let mut options = RHashMap::default(); for config_entry in config_options.entries() { @@ -87,7 +93,7 @@ unsafe extern "C" fn release_fn_wrapper(config: &mut FFI_SessionConfig) { unsafe extern "C" fn clone_fn_wrapper(config: &FFI_SessionConfig) -> FFI_SessionConfig { let old_private_data = config.private_data as *mut SessionConfigPrivateData; - let old_config = Arc::clone(&(*old_private_data).config); + let old_config = (*old_private_data).config.clone(); let private_data = Box::new(SessionConfigPrivateData { config: old_config }); @@ -96,31 +102,18 @@ unsafe extern "C" fn clone_fn_wrapper(config: &FFI_SessionConfig) -> FFI_Session private_data: Box::into_raw(private_data) as *mut c_void, clone: clone_fn_wrapper, release: release_fn_wrapper, + library_marker_id: crate::get_library_marker_id, } } struct SessionConfigPrivateData { - pub config: Arc, + pub config: SessionConfig, } impl From<&SessionConfig> for FFI_SessionConfig { fn from(session: &SessionConfig) -> Self { - let mut config_keys = Vec::new(); - let mut config_values = Vec::new(); - for config_entry in session.options().entries() { - if let Some(value) = config_entry.value { - let key_cstr = CString::new(config_entry.key).unwrap_or_default(); - let key_ptr = key_cstr.into_raw() as *const c_char; - config_keys.push(key_ptr); - - config_values - .push(CString::new(value).unwrap_or_default().into_raw() - as *const c_char); - } - } - let private_data = Box::new(SessionConfigPrivateData { - config: Arc::clone(session.options()), + config: session.clone(), }); Self { @@ -128,6 +121,7 @@ impl From<&SessionConfig> for FFI_SessionConfig { private_data: Box::into_raw(private_data) as *mut c_void, clone: clone_fn_wrapper, release: release_fn_wrapper, + library_marker_id: crate::get_library_marker_id, } } } @@ -144,16 +138,14 @@ impl Drop for FFI_SessionConfig { } } -/// A wrapper struct for accessing [`SessionConfig`] across a FFI boundary. -/// The [`SessionConfig`] will be generated from a hash map of the config -/// options in the provider and will be reconstructed on this side of the -/// interface.s -pub struct ForeignSessionConfig(pub SessionConfig); - -impl TryFrom<&FFI_SessionConfig> for ForeignSessionConfig { +impl TryFrom<&FFI_SessionConfig> for SessionConfig { type Error = DataFusionError; fn try_from(config: &FFI_SessionConfig) -> Result { + if (config.library_marker_id)() == crate::get_library_marker_id() { + return Ok(config.inner().clone()); + } + let config_options = unsafe { (config.config_options)(config) }; let mut options_map = HashMap::new(); @@ -161,7 +153,7 @@ impl TryFrom<&FFI_SessionConfig> for ForeignSessionConfig { options_map.insert(kv_pair.0.to_string(), kv_pair.1.to_string()); }); - Ok(Self(SessionConfig::from_string_hash_map(&options_map)?)) + SessionConfig::from_string_hash_map(&options_map) } } @@ -174,13 +166,15 @@ mod tests { let session_config = SessionConfig::new(); let original_options = session_config.options().entries(); - let ffi_config: FFI_SessionConfig = (&session_config).into(); + let mut ffi_config: FFI_SessionConfig = (&session_config).into(); + let _ = ffi_config.clone(); + ffi_config.library_marker_id = crate::mock_foreign_marker_id; - let foreign_config: ForeignSessionConfig = (&ffi_config).try_into()?; + let foreign_config: SessionConfig = (&ffi_config).try_into()?; - let returned_options = foreign_config.0.options().entries(); + let returned_options = foreign_config.options().entries(); - assert!(original_options.len() == returned_options.len()); + assert_eq!(original_options.len(), returned_options.len()); Ok(()) } diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index bc98eab518ae..f20ae3659706 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -45,7 +45,6 @@ use tokio::runtime::Handle; use crate::{ arrow_wrappers::WrappedSchema, df_result, rresult_return, - session_config::ForeignSessionConfig, table_source::{FFI_TableProviderFilterPushDown, FFI_TableType}, }; @@ -55,6 +54,7 @@ use super::{ }; use crate::util::FFIResult; use datafusion::error::Result; +use datafusion_execution::config::SessionConfig; /// A stable struct for sharing [`TableProvider`] across FFI boundaries. /// @@ -239,10 +239,10 @@ unsafe extern "C" fn scan_fn_wrapper( let session_config = session_config.clone(); async move { - let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); + let config = rresult_return!(SessionConfig::try_from(&session_config)); let session = SessionStateBuilder::new() .with_default_features() - .with_config(config.0) + .with_config(config) .build(); let ctx = SessionContext::new_with_state(session); @@ -292,10 +292,10 @@ unsafe extern "C" fn insert_into_fn_wrapper( let input = input.clone(); async move { - let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); + let config = rresult_return!(SessionConfig::try_from(&session_config)); let session = SessionStateBuilder::new() .with_default_features() - .with_config(config.0) + .with_config(config) .build(); let ctx = SessionContext::new_with_state(session); diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index ec7ebd29e571..838c3f909b02 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -410,22 +410,20 @@ impl Hash for ForeignAggregateUDF { } } -impl TryFrom<&FFI_AggregateUDF> for Arc { - type Error = DataFusionError; - - fn try_from(udaf: &FFI_AggregateUDF) -> Result { +impl From<&FFI_AggregateUDF> for Arc { + fn from(udaf: &FFI_AggregateUDF) -> Self { if (udaf.library_marker_id)() == crate::get_library_marker_id() { - return Ok(Arc::clone(unsafe { udaf.inner().inner() })); + return Arc::clone(unsafe { udaf.inner().inner() }); } let signature = Signature::user_defined((&udaf.volatility).into()); let aliases = udaf.aliases.iter().map(|s| s.to_string()).collect(); - Ok(Arc::new(ForeignAggregateUDF { + Arc::new(ForeignAggregateUDF { udaf: udaf.clone(), signature, aliases, - })) + }) } } @@ -561,9 +559,7 @@ impl AggregateUDFImpl for ForeignAggregateUDF { ))? .into_option(); - let result = result - .map(|func| >::try_from(&func)) - .transpose()?; + let result = result.map(|func| >::from(&func)); Ok(result) } @@ -671,7 +667,7 @@ mod tests { let mut local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); local_udaf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udaf: Arc = (&local_udaf).try_into()?; + let foreign_udaf: Arc = (&local_udaf).into(); Ok(AggregateUDF::new_from_shared_impl(foreign_udaf)) } @@ -686,7 +682,7 @@ mod tests { local_udaf.library_marker_id = crate::mock_foreign_marker_id; // Convert back to native format - let foreign_udaf: Arc = (&local_udaf).try_into()?; + let foreign_udaf: Arc = (&local_udaf).into(); let foreign_udaf = AggregateUDF::new_from_shared_impl(foreign_udaf); assert_eq!(original_name, foreign_udaf.name()); @@ -740,7 +736,7 @@ mod tests { let local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); // Convert back to native format - let foreign_udaf: Arc = (&local_udaf).try_into()?; + let foreign_udaf: Arc = (&local_udaf).into(); let foreign_udaf = AggregateUDF::new_from_shared_impl(foreign_udaf); let metadata: HashMap = @@ -833,12 +829,12 @@ mod tests { let mut ffi_udaf = FFI_AggregateUDF::from(original_udaf); // Verify local libraries can be downcast to their original - let foreign_udaf: Arc = (&ffi_udaf).try_into()?; + let foreign_udaf: Arc = (&ffi_udaf).into(); assert!(foreign_udaf.as_any().downcast_ref::().is_some()); // Verify different library markers generate foreign providers ffi_udaf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udaf: Arc = (&ffi_udaf).try_into()?; + let foreign_udaf: Arc = (&ffi_udaf).into(); assert!(foreign_udaf .as_any() .downcast_ref::() diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index dcb6afb6fd1a..7ed3105e63e3 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -305,24 +305,22 @@ impl Hash for ForeignScalarUDF { } } -impl TryFrom<&FFI_ScalarUDF> for Arc { - type Error = DataFusionError; - - fn try_from(udf: &FFI_ScalarUDF) -> Result { +impl From<&FFI_ScalarUDF> for Arc { + fn from(udf: &FFI_ScalarUDF) -> Self { if (udf.library_marker_id)() == crate::get_library_marker_id() { - Ok(Arc::clone(udf.inner().inner())) + Arc::clone(udf.inner().inner()) } else { let name = udf.name.to_owned().into(); let signature = Signature::user_defined((&udf.volatility).into()); let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); - Ok(Arc::new(ForeignScalarUDF { + Arc::new(ForeignScalarUDF { name, udf: udf.clone(), aliases, signature, - })) + }) } } } @@ -440,7 +438,7 @@ mod tests { let mut local_udf: FFI_ScalarUDF = Arc::clone(&original_udf).into(); local_udf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udf: Arc = (&local_udf).try_into()?; + let foreign_udf: Arc = (&local_udf).into(); assert_eq!(original_udf.name(), foreign_udf.name()); @@ -456,12 +454,12 @@ mod tests { let mut ffi_udf = FFI_ScalarUDF::from(original_udf); // Verify local libraries can be downcast to their original - let foreign_udf: Arc = (&ffi_udf).try_into()?; + let foreign_udf: Arc = (&ffi_udf).into(); assert!(foreign_udf.as_any().downcast_ref::().is_some()); // Verify different library markers generate foreign providers ffi_udf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udf: Arc = (&ffi_udf).try_into()?; + let foreign_udf: Arc = (&ffi_udf).into(); assert!(foreign_udf .as_any() .downcast_ref::() diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index 0d90c90ce181..7a65a4784f65 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -15,29 +15,22 @@ // specific language governing permissions and limitations // under the License. -use std::{ - ffi::c_void, - hash::{Hash, Hasher}, - sync::Arc, -}; - -use abi_stable::{ - std_types::{ROption, RResult, RString, RVec}, - StableAbi, -}; -use arrow::{ - compute::SortOptions, - datatypes::{DataType, Schema, SchemaRef}, -}; +use std::ffi::c_void; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use abi_stable::std_types::{ROption, RResult, RString, RVec}; +use abi_stable::StableAbi; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow_schema::{Field, FieldRef}; -use datafusion::{ - error::{DataFusionError, Result}, - logical_expr::{ - function::WindowUDFFieldArgs, type_coercion::functions::fields_with_window_udf, - LimitEffect, PartitionEvaluator, Signature, WindowUDF, WindowUDFImpl, - }, - physical_expr::PhysicalExpr, +use datafusion::error::Result; +use datafusion::logical_expr::function::WindowUDFFieldArgs; +use datafusion::logical_expr::type_coercion::functions::fields_with_window_udf; +use datafusion::logical_expr::{ + LimitEffect, PartitionEvaluator, Signature, WindowUDF, WindowUDFImpl, }; +use datafusion::physical_expr::PhysicalExpr; use datafusion_common::ffi_err; use partition_evaluator::FFI_PartitionEvaluator; use partition_evaluator_args::{ @@ -48,16 +41,13 @@ mod partition_evaluator; mod partition_evaluator_args; mod range; -use crate::util::FFIResult; -use crate::{ - arrow_wrappers::WrappedSchema, - df_result, rresult, rresult_return, - util::{ - rvec_wrapped_to_vec_datatype, rvec_wrapped_to_vec_fieldref, - vec_datatype_to_rvec_wrapped, vec_fieldref_to_rvec_wrapped, - }, - volatility::FFI_Volatility, +use crate::arrow_wrappers::WrappedSchema; +use crate::util::{ + rvec_wrapped_to_vec_datatype, rvec_wrapped_to_vec_fieldref, + vec_datatype_to_rvec_wrapped, vec_fieldref_to_rvec_wrapped, FFIResult, }; +use crate::volatility::FFI_Volatility; +use crate::{df_result, rresult, rresult_return}; /// A stable struct for sharing a [`WindowUDF`] across FFI boundaries. #[repr(C)] @@ -280,24 +270,22 @@ impl Hash for ForeignWindowUDF { } } -impl TryFrom<&FFI_WindowUDF> for Arc { - type Error = DataFusionError; - - fn try_from(udf: &FFI_WindowUDF) -> Result { +impl From<&FFI_WindowUDF> for Arc { + fn from(udf: &FFI_WindowUDF) -> Self { if (udf.library_marker_id)() == crate::get_library_marker_id() { - Ok(Arc::clone(unsafe { udf.inner().inner() })) + Arc::clone(unsafe { udf.inner().inner() }) } else { let name = udf.name.to_owned().into(); let signature = Signature::user_defined((&udf.volatility).into()); let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); - Ok(Arc::new(ForeignWindowUDF { + Arc::new(ForeignWindowUDF { name, udf: udf.clone(), aliases, signature, - })) + }) } } } @@ -400,16 +388,13 @@ mod tests { use std::sync::Arc; use arrow::array::{create_array, ArrayRef}; - use datafusion::{ - functions_window::lead_lag::{lag_udwf, WindowShift}, - logical_expr::{col, expr::Sort, ExprFunctionExt, WindowUDF, WindowUDFImpl}, - prelude::SessionContext, - }; + use datafusion::functions_window::lead_lag::{lag_udwf, WindowShift}; + use datafusion::logical_expr::expr::Sort; + use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF, WindowUDFImpl}; + use datafusion::prelude::SessionContext; - use crate::{ - tests::create_record_batch, - udwf::{FFI_WindowUDF, ForeignWindowUDF}, - }; + use crate::tests::create_record_batch; + use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; fn create_test_foreign_udwf( original_udwf: impl WindowUDFImpl + 'static, @@ -419,7 +404,7 @@ mod tests { let mut local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); local_udwf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udwf: Arc = (&local_udwf).try_into()?; + let foreign_udwf: Arc = (&local_udwf).into(); Ok(WindowUDF::new_from_shared_impl(foreign_udwf)) } @@ -433,7 +418,7 @@ mod tests { local_udwf.library_marker_id = crate::mock_foreign_marker_id; // Convert back to native format - let foreign_udwf: Arc = (&local_udwf).try_into()?; + let foreign_udwf: Arc = (&local_udwf).into(); let foreign_udwf = WindowUDF::new_from_shared_impl(foreign_udwf); assert_eq!(original_name, foreign_udwf.name()); @@ -476,7 +461,7 @@ mod tests { let mut ffi_udwf = FFI_WindowUDF::from(original_udwf); // Verify local libraries can be downcast to their original - let foreign_udwf: Arc = (&ffi_udwf).try_into()?; + let foreign_udwf: Arc = (&ffi_udwf).into(); assert!(foreign_udwf .as_any() .downcast_ref::() @@ -484,7 +469,7 @@ mod tests { // Verify different library markers generate foreign providers ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udwf: Arc = (&ffi_udwf).try_into()?; + let foreign_udwf: Arc = (&ffi_udwf).into(); assert!(foreign_udwf .as_any() .downcast_ref::() diff --git a/datafusion/ffi/tests/ffi_udaf.rs b/datafusion/ffi/tests/ffi_udaf.rs index 9d1823ded076..23ca913fcaf1 100644 --- a/datafusion/ffi/tests/ffi_udaf.rs +++ b/datafusion/ffi/tests/ffi_udaf.rs @@ -38,7 +38,7 @@ mod tests { .ok_or(DataFusionError::NotImplemented( "External table provider failed to implement create_udaf".to_string(), ))?(); - let foreign_sum_func: Arc = (&ffi_sum_func).try_into()?; + let foreign_sum_func: Arc = (&ffi_sum_func).into(); let udaf = AggregateUDF::new_from_shared_impl(foreign_sum_func); @@ -80,8 +80,7 @@ mod tests { .ok_or(DataFusionError::NotImplemented( "External table provider failed to implement create_udaf".to_string(), ))?(); - let foreign_stddev_func: Arc = - (&ffi_stddev_func).try_into()?; + let foreign_stddev_func: Arc = (&ffi_stddev_func).into(); let udaf = AggregateUDF::new_from_shared_impl(foreign_stddev_func); diff --git a/datafusion/ffi/tests/ffi_udf.rs b/datafusion/ffi/tests/ffi_udf.rs index d50739be9975..5f9630835530 100644 --- a/datafusion/ffi/tests/ffi_udf.rs +++ b/datafusion/ffi/tests/ffi_udf.rs @@ -43,7 +43,7 @@ mod tests { "External table provider failed to implement create_scalar_udf" .to_string(), ))?(); - let foreign_abs_func: Arc = (&ffi_abs_func).try_into()?; + let foreign_abs_func: Arc = (&ffi_abs_func).into(); let udf = ScalarUDF::new_from_shared_impl(foreign_abs_func); @@ -81,7 +81,7 @@ mod tests { "External table provider failed to implement create_scalar_udf" .to_string(), ))?(); - let foreign_abs_func: Arc = (&ffi_abs_func).try_into()?; + let foreign_abs_func: Arc = (&ffi_abs_func).into(); let udf = ScalarUDF::new_from_shared_impl(foreign_abs_func); diff --git a/datafusion/ffi/tests/ffi_udwf.rs b/datafusion/ffi/tests/ffi_udwf.rs index e87c65ca8907..e2a823cb162b 100644 --- a/datafusion/ffi/tests/ffi_udwf.rs +++ b/datafusion/ffi/tests/ffi_udwf.rs @@ -39,7 +39,7 @@ mod tests { "External table provider failed to implement create_scalar_udf" .to_string(), ))?(); - let foreign_rank_func: Arc = (&ffi_rank_func).try_into()?; + let foreign_rank_func: Arc = (&ffi_rank_func).into(); let udwf = WindowUDF::new_from_shared_impl(foreign_rank_func); diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 7fb12353b922..88a1ddfc7363 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -209,7 +209,7 @@ Previously you may write: Instead this should now be: ```rust,ignore - let foreign_udf: Arc = ffi_udf.try_into()?; + let foreign_udf: Arc = ffi_udf.into(); let foreign_udf = ScalarUDF::new_from_shared_impl(foreign_udf); ```