From 0a09291547db7e2e7bf33e955f83a3cab8763cff Mon Sep 17 00:00:00 2001 From: if0ne Date: Fri, 14 Nov 2025 10:52:21 +0300 Subject: [PATCH 01/14] add: async traits for driver, db, connection, stmt and implement it for DataFusion driver Signed-off-by: if0ne --- rust/core/src/lib.rs | 1 + rust/core/src/non_blocking.rs | 470 ++++++++++++++++++ rust/driver/datafusion/src/lib.rs | 305 +++++------- rust/driver/datafusion/src/syncify.rs | 338 +++++++++++++ .../datafusion/tests/test_datafusion.rs | 182 ++++--- 5 files changed, 1044 insertions(+), 252 deletions(-) create mode 100644 rust/core/src/non_blocking.rs create mode 100644 rust/driver/datafusion/src/syncify.rs diff --git a/rust/core/src/lib.rs b/rust/core/src/lib.rs index c8df8a9c18..c9e599c920 100644 --- a/rust/core/src/lib.rs +++ b/rust/core/src/lib.rs @@ -41,6 +41,7 @@ pub mod constants; pub mod error; +pub mod non_blocking; pub mod options; pub mod schemas; diff --git a/rust/core/src/non_blocking.rs b/rust/core/src/non_blocking.rs new file mode 100644 index 0000000000..96683cc9af --- /dev/null +++ b/rust/core/src/non_blocking.rs @@ -0,0 +1,470 @@ +use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_schema::Schema; + +use std::collections::HashSet; +use std::future::Future; + +use crate::error::Result; +use crate::options::{self, *}; +use crate::PartitionedResult; + +/// Ability to configure an object by setting/getting options. +pub trait AsyncOptionable { + type Option: AsRef + Send; + + /// Set a post-init option. + fn set_option( + &mut self, + key: Self::Option, + value: OptionValue, + ) -> impl Future> + Send; + + /// Get a string option value by key. + fn get_option_string(&self, key: Self::Option) -> impl Future>; + + /// Get a bytes option value by key. + fn get_option_bytes(&self, key: Self::Option) -> impl Future>>; + + /// Get an integer option value by key. + fn get_option_int(&self, key: Self::Option) -> impl Future>; + + /// Get a float option value by key. + fn get_option_double(&self, key: Self::Option) -> impl Future>; +} + +/// A handle to an async ADBC driver. +pub trait AsyncDriver { + type DatabaseType: AsyncDatabase + Send; + + /// Allocate and initialize a new database without pre-init options. + fn new_database(&mut self) -> impl Future> + Send; + + /// Allocate and initialize a new database with pre-init options. + fn new_database_with_opts( + &mut self, + opts: Vec<(options::OptionDatabase, OptionValue)>, + ) -> impl Future> + Send; +} + +/// A handle to an async ADBC database. +/// +/// Databases hold state shared by multiple connections. This typically means +/// configuration and caches. For in-memory databases, it provides a place to +/// hold ownership of the in-memory database. +/// +/// Databases must be kept alive as long as any connections exist. +pub trait AsyncDatabase: AsyncOptionable