Skip to content

Commit 6e76292

Browse files
committed
Long term dream #433 continuation of pull request #644
1 parent d6da8af commit 6e76292

File tree

6 files changed

+315
-76
lines changed

6 files changed

+315
-76
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
Added CLI Arguments (server/src/config.rs:95-116)
2+
3+
- --storage-backends: Comma-separated list of backends to enable (sled, dashmap, rocksdb,
4+
redb, fs)
5+
- --prefer-memory: Prioritize in-memory storage for better performance
6+
- --rocksdb-path: Custom path for RocksDB storage
7+
- --redb-path: Custom path for ReDB storage
8+
- --fs-path: Custom path for filesystem storage
9+
10+
2. Extended Config Struct (server/src/config.rs:215-225)
11+
12+
Added fields to store the storage configuration in the Config struct.
13+
14+
3. Created StorageConfig Struct (lib/src/db.rs:86-111)
15+
16+
New configuration struct with:
17+
- List of enabled backends
18+
- Memory preference flag
19+
- Custom paths for each backend type
20+
- Default configuration (sled + dashmap)
21+
22+
4. Refactored Db::init (lib/src/db.rs:158-165)
23+
24+
- Added init_with_config method that accepts StorageConfig
25+
- Original init method now calls init_with_config with defaults
26+
- Dynamic backend initialization based on configuration
27+
- Respects prefer_memory flag for fastest operator selection
28+
29+
5. Wired Configuration (server/src/appstate.rs:47-56)
30+
31+
Connected server config to database initialization with the new storage configuration.
32+
33+
Usage Examples:
34+
35+
# Use default backends (sled + dashmap)
36+
atomic-server
37+
38+
# Enable specific backends
39+
atomic-server --storage-backends sled,dashmap,rocksdb
40+
41+
# Prefer in-memory storage
42+
atomic-server --prefer-memory
43+
44+
# Custom paths
45+
atomic-server --rocksdb-path /custom/rocksdb --redb-path /custom/redb
46+
47+
# Environment variables
48+
ATOMIC_STORAGE_BACKENDS=sled,dashmap,fs atomic-server
49+
ATOMIC_PREFER_MEMORY=true atomic-server

lib/src/db.rs

Lines changed: 151 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,33 @@ impl Drop for DropSafeRuntime {
8383
// A function called by the Store when a Commit is accepted
8484
type HandleCommit = Box<dyn Fn(&CommitResponse) + Send + Sync>;
8585

86+
/// Configuration for OpenDAL storage backends
87+
#[derive(Clone, Debug)]
88+
pub struct StorageConfig {
89+
/// List of enabled storage backends (e.g., ["sled", "dashmap", "rocksdb"])
90+
pub enabled_backends: Vec<String>,
91+
/// Whether to prefer in-memory storage for better performance
92+
pub prefer_memory: bool,
93+
/// Custom path for RocksDB storage
94+
pub rocksdb_path: Option<std::path::PathBuf>,
95+
/// Custom path for ReDB storage
96+
pub redb_path: Option<std::path::PathBuf>,
97+
/// Custom path for filesystem storage
98+
pub fs_path: Option<std::path::PathBuf>,
99+
}
100+
101+
impl Default for StorageConfig {
102+
fn default() -> Self {
103+
Self {
104+
enabled_backends: vec!["sled".to_string(), "dashmap".to_string()],
105+
prefer_memory: false,
106+
rocksdb_path: None,
107+
redb_path: None,
108+
fs_path: None,
109+
}
110+
}
111+
}
112+
86113
/// Inside the reference_index, each value is mapped to this type.
87114
/// The String on the left represents a Property URL, and the second one is the set of subjects.
88115
pub type PropSubjectMap = HashMap<String, HashSet<String>>;
@@ -133,62 +160,103 @@ impl Db {
133160
/// The server_url is the domain where the db will be hosted, e.g. http://localhost/
134161
/// It is used for distinguishing locally defined items from externally defined ones.
135162
pub fn init(path: &std::path::Path, server_url: String) -> AtomicResult<Db> {
163+
Self::init_with_config(path, server_url, StorageConfig::default())
164+
}
165+
166+
/// Creates a new store with custom storage configuration.
167+
/// The server_url is the domain where the db will be hosted, e.g. http://localhost/
168+
/// It is used for distinguishing locally defined items from externally defined ones.
169+
pub fn init_with_config(path: &std::path::Path, server_url: String, config: StorageConfig) -> AtomicResult<Db> {
136170
// Local runtime for async OpenDAL ops during initialization.
137171
// Use a current-thread runtime to avoid nested multi-thread runtimes inside Actix/Tokio tests.
138172
let raw_rt = tokio::runtime::Builder::new_current_thread()
139173
.enable_all()
140174
.build()?;
141175

142-
// OpenDAL operator: Sled (on-disk)
143-
let mut dal_sled = opendal::services::Sled::default();
144-
let dal_path = path.join("opendal");
145-
dal_sled
146-
.datadir(dal_path.to_str().expect("wrong data dir string"))
147-
.tree("resources_v1");
148-
let sled_op = opendal::Operator::new(dal_sled)
149-
.map_err(|_e| format!("Error operator: {}", _e))?
150-
.layer(opendal::layers::LoggingLayer::default())
151-
.finish();
176+
let mut dal_ops = std::collections::HashMap::new();
177+
let mut all_ops = Vec::new();
178+
179+
// OpenDAL operator: Sled (on-disk) - always included as base storage
180+
if config.enabled_backends.contains(&"sled".to_string()) {
181+
let mut dal_sled = opendal::services::Sled::default();
182+
let dal_path = path.join("opendal");
183+
dal_sled
184+
.datadir(dal_path.to_str().expect("wrong data dir string"))
185+
.tree("resources_v1");
186+
let sled_op = opendal::Operator::new(dal_sled)
187+
.map_err(|_e| format!("Error operator: {}", _e))?
188+
.layer(opendal::layers::LoggingLayer::default())
189+
.finish();
190+
dal_ops.insert("sled".to_string(), sled_op.clone());
191+
all_ops.push(("sled".to_string(), sled_op));
192+
}
152193

153194
// OpenDAL operator: DashMap (in-memory, fast)
154-
let dash_op = opendal::Operator::new(opendal::services::Dashmap::default())
155-
.map_err(|_e| format!("Error operator: {}", _e))?
156-
.layer(opendal::layers::LoggingLayer::default())
157-
.finish();
195+
if config.enabled_backends.contains(&"dashmap".to_string()) {
196+
let dash_op = opendal::Operator::new(opendal::services::Dashmap::default())
197+
.map_err(|_e| format!("Error operator: {}", _e))?
198+
.layer(opendal::layers::LoggingLayer::default())
199+
.finish();
200+
dal_ops.insert("dashmap".to_string(), dash_op.clone());
201+
all_ops.push(("dashmap".to_string(), dash_op));
202+
}
158203

159204
// OpenDAL operator: RocksDB (on-disk kv)
160205
#[cfg(feature = "persist-rocksdb")]
161-
let rocks_op = {
162-
let mut b = opendal::services::Rocksdb::default();
163-
b.datadir(path.join("opendal_rocksdb").to_str().expect("rocks path"));
164-
opendal::Operator::new(b)
165-
.map_err(|_e| format!("Error operator: {}", _e))?
166-
.layer(opendal::layers::LoggingLayer::default())
167-
.finish()
168-
};
206+
if config.enabled_backends.contains(&"rocksdb".to_string()) {
207+
let rocks_op = {
208+
let mut b = opendal::services::Rocksdb::default();
209+
let rocks_path = config.rocksdb_path.clone()
210+
.unwrap_or_else(|| path.join("opendal_rocksdb"));
211+
b.datadir(rocks_path.to_str().expect("rocks path"));
212+
opendal::Operator::new(b)
213+
.map_err(|_e| format!("Error operator: {}", _e))?
214+
.layer(opendal::layers::LoggingLayer::default())
215+
.finish()
216+
};
217+
dal_ops.insert("rocksdb".to_string(), rocks_op.clone());
218+
all_ops.push(("rocksdb".to_string(), rocks_op));
219+
}
169220

170221
// OpenDAL operator: ReDB (embedded db)
171222
#[cfg(feature = "persist-redb")]
172-
let redb_op = {
173-
let mut b = opendal::services::Redb::default();
174-
b.datadir(path.join("opendal_redb").to_str().expect("redb path"));
175-
b.table("resources_v1");
176-
opendal::Operator::new(b)
177-
.map_err(|_e| format!("Error operator: {}", _e))?
178-
.layer(opendal::layers::LoggingLayer::default())
179-
.finish()
180-
};
223+
if config.enabled_backends.contains(&"redb".to_string()) {
224+
let redb_op = {
225+
let mut b = opendal::services::Redb::default();
226+
let redb_path = config.redb_path.clone()
227+
.unwrap_or_else(|| path.join("opendal_redb"));
228+
b.datadir(redb_path.to_str().expect("redb path"));
229+
b.table("resources_v1");
230+
opendal::Operator::new(b)
231+
.map_err(|_e| format!("Error operator: {}", _e))?
232+
.layer(opendal::layers::LoggingLayer::default())
233+
.finish()
234+
};
235+
dal_ops.insert("redb".to_string(), redb_op.clone());
236+
all_ops.push(("redb".to_string(), redb_op));
237+
}
181238

182239
// OpenDAL operator: FS (filesystem flat files)
183240
#[cfg(feature = "persist-fs")]
184-
let fs_op = {
185-
let mut b = opendal::services::Fs::default();
186-
b.root(path.join("opendal_fs").to_str().expect("fs path"));
187-
opendal::Operator::new(b)
188-
.map_err(|_e| format!("Error operator: {}", _e))?
189-
.layer(opendal::layers::LoggingLayer::default())
190-
.finish()
191-
};
241+
if config.enabled_backends.contains(&"fs".to_string()) {
242+
let fs_op = {
243+
let mut b = opendal::services::Fs::default();
244+
let fs_path = config.fs_path.clone()
245+
.unwrap_or_else(|| path.join("opendal_fs"));
246+
b.root(fs_path.to_str().expect("fs path"));
247+
opendal::Operator::new(b)
248+
.map_err(|_e| format!("Error operator: {}", _e))?
249+
.layer(opendal::layers::LoggingLayer::default())
250+
.finish()
251+
};
252+
dal_ops.insert("fs".to_string(), fs_op.clone());
253+
all_ops.push(("fs".to_string(), fs_op));
254+
}
255+
256+
// Ensure we have at least one storage backend
257+
if dal_ops.is_empty() {
258+
return Err("No storage backends enabled. Please enable at least one backend.".into());
259+
}
192260

193261
// Simple speed check: write+read small payload and measure read latency
194262
fn measure_read_ns(
@@ -225,43 +293,52 @@ impl Db {
225293
}
226294
}
227295

228-
let sled_ns = measure_read_ns(&raw_rt, sled_op.clone())?;
229-
let dash_ns = measure_read_ns(&raw_rt, dash_op.clone())?;
230-
#[cfg(feature = "persist-rocksdb")]
231-
let rocks_ns = measure_read_ns(&raw_rt, rocks_op.clone())?;
232-
#[cfg(feature = "persist-redb")]
233-
let redb_ns = measure_read_ns(&raw_rt, redb_op.clone())?;
234-
#[cfg(feature = "persist-fs")]
235-
let fs_ns = measure_read_ns(&raw_rt, fs_op.clone())?;
236-
237-
#[allow(unused_mut)]
238-
let mut fastest_op = if dash_ns <= sled_ns {
239-
dash_op.clone()
296+
// Determine the fastest operator based on configuration preference or benchmarking
297+
let fastest_op = if config.prefer_memory {
298+
// If prefer_memory is set, use dashmap if available, otherwise benchmark
299+
if let Some(dashmap_op) = dal_ops.get("dashmap") {
300+
info!("Using dashmap as fastest operator due to prefer_memory setting");
301+
dashmap_op.clone()
302+
} else {
303+
// Fall back to benchmarking
304+
info!("Benchmarking storage backends to find fastest...");
305+
let mut fastest_name = String::new();
306+
let mut fastest_op = None;
307+
let mut fastest_ns = u128::MAX;
308+
309+
for (name, op) in &all_ops {
310+
let ns = measure_read_ns(&raw_rt, op.clone())?;
311+
info!("Backend '{}' read latency: {} ns", name, ns);
312+
if ns < fastest_ns {
313+
fastest_ns = ns;
314+
fastest_name = name.clone();
315+
fastest_op = Some(op.clone());
316+
}
317+
}
318+
319+
info!("Selected '{}' as fastest backend ({} ns)", fastest_name, fastest_ns);
320+
fastest_op.expect("No operators available")
321+
}
240322
} else {
241-
sled_op.clone()
323+
// Benchmark all available operators
324+
info!("Benchmarking storage backends to find fastest...");
325+
let mut fastest_name = String::new();
326+
let mut fastest_op = None;
327+
let mut fastest_ns = u128::MAX;
328+
329+
for (name, op) in &all_ops {
330+
let ns = measure_read_ns(&raw_rt, op.clone())?;
331+
info!("Backend '{}' read latency: {} ns", name, ns);
332+
if ns < fastest_ns {
333+
fastest_ns = ns;
334+
fastest_name = name.clone();
335+
fastest_op = Some(op.clone());
336+
}
337+
}
338+
339+
info!("Selected '{}' as fastest backend ({} ns)", fastest_name, fastest_ns);
340+
fastest_op.expect("No operators available")
242341
};
243-
#[cfg(feature = "persist-rocksdb")]
244-
if rocks_ns < measure_read_ns(&raw_rt, fastest_op.clone())? {
245-
fastest_op = rocks_op.clone();
246-
}
247-
#[cfg(feature = "persist-redb")]
248-
if redb_ns < measure_read_ns(&raw_rt, fastest_op.clone())? {
249-
fastest_op = redb_op.clone();
250-
}
251-
#[cfg(feature = "persist-fs")]
252-
if fs_ns < measure_read_ns(&raw_rt, fastest_op.clone())? {
253-
fastest_op = fs_op.clone();
254-
}
255-
256-
let mut dal_ops = std::collections::HashMap::new();
257-
dal_ops.insert("sled".to_string(), sled_op);
258-
dal_ops.insert("dashmap".to_string(), dash_op);
259-
#[cfg(feature = "persist-rocksdb")]
260-
dal_ops.insert("rocksdb".to_string(), rocks_op);
261-
#[cfg(feature = "persist-redb")]
262-
dal_ops.insert("redb".to_string(), redb_op);
263-
#[cfg(feature = "persist-fs")]
264-
dal_ops.insert("fs".to_string(), fs_op);
265342

266343
let db = sled::open(path).map_err(|e|format!("Failed opening DB at this location: {:?} . Is another instance of Atomic Server running? {}", path, e))?;
267344
let resources = db.open_tree("resources_v1").map_err(|e|format!("Failed building resources. Your DB might be corrupt. Go back to a previous version and export your data. {}", e))?;

lib/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub mod values;
9393
pub use atoms::Atom;
9494
pub use commit::Commit;
9595
#[cfg(feature = "db")]
96-
pub use db::Db;
96+
pub use db::{Db, StorageConfig};
9797
pub use errors::AtomicError;
9898
pub use errors::AtomicErrorType;
9999
pub use resources::Resource;

server/src/appstate.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,18 @@ pub fn init(config: Config) -> AtomicServerResult<AppState> {
4242

4343
tracing::info!("Opening database at {:?}", &config.store_path);
4444
let should_init = !&config.store_path.exists() || config.initialize;
45-
let mut store = atomic_lib::Db::init(&config.store_path, config.server_url.clone())?;
45+
46+
// Create storage configuration from server config
47+
let storage_config = atomic_lib::StorageConfig {
48+
enabled_backends: config.storage_backends.clone(),
49+
prefer_memory: config.prefer_memory,
50+
rocksdb_path: Some(config.rocksdb_path.clone()),
51+
redb_path: Some(config.redb_path.clone()),
52+
fs_path: Some(config.fs_path.clone()),
53+
};
54+
55+
tracing::info!("Initializing database with storage backends: {:?}", storage_config.enabled_backends);
56+
let mut store = atomic_lib::Db::init_with_config(&config.store_path, config.server_url.clone(), storage_config)?;
4657
if should_init {
4758
tracing::info!("Initialize: creating and populating new Database...");
4859
atomic_lib::populate::populate_default_store(&store)

0 commit comments

Comments
 (0)