From 4ccd398e7907367749e9d383650c040b1e65995d Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Fri, 1 Dec 2023 12:43:28 +0100 Subject: [PATCH 01/16] libsql-server: use LRU cache to store active namespaces This is a port of Marin's https://github.com/libsql/sqld/pull/689/ This PR replaces the namespace store dumb hashmap with an LRU cache to bound the maximum number of namespaces loaded into memory while unbounding the number of namespaces allocated on a sqld instance. Additionally, this enables fully concurrent access to namespaces by removing the global lock on the namespace store. Co-authored-by: Piotr Sarna --- libsql-server/Cargo.toml | 1 + libsql-server/src/error.rs | 14 +- libsql-server/src/namespace/mod.rs | 287 +++++++++++++++++++---------- 3 files changed, 207 insertions(+), 95 deletions(-) diff --git a/libsql-server/Cargo.toml b/libsql-server/Cargo.toml index af7ca172e0..8c40429455 100644 --- a/libsql-server/Cargo.toml +++ b/libsql-server/Cargo.toml @@ -41,6 +41,7 @@ metrics = "0.21.1" metrics-util = "0.15" metrics-exporter-prometheus = "0.12.2" mimalloc = { version = "0.1.36", default-features = false } +moka = { version = "0.12.1", features = ["future"] } nix = { version = "0.26.2", features = ["fs"] } once_cell = "1.17.0" parking_lot = "0.12.1" diff --git a/libsql-server/src/error.rs b/libsql-server/src/error.rs index 87696c9397..e15461214a 100644 --- a/libsql-server/src/error.rs +++ b/libsql-server/src/error.rs @@ -96,6 +96,9 @@ pub enum Error { NamespaceStoreShutdown, #[error("Unable to update metastore: {0}")] MetaStoreUpdateFailure(Box), + // This is for errors returned by moka + #[error(transparent)] + Ref(#[from] std::sync::Arc), } trait ResponseError: std::error::Error { @@ -109,6 +112,12 @@ trait ResponseError: std::error::Error { impl ResponseError for Error {} impl IntoResponse for Error { + fn into_response(self) -> axum::response::Response { + (&self).into_response() + } +} + +impl IntoResponse for &Error { fn into_response(self) -> axum::response::Response { use Error::*; @@ -156,6 +165,7 @@ impl IntoResponse for Error { UrlParseError(_) => self.format_err(StatusCode::BAD_REQUEST), NamespaceStoreShutdown => self.format_err(StatusCode::SERVICE_UNAVAILABLE), MetaStoreUpdateFailure(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR), + Ref(this) => this.as_ref().into_response(), } } } @@ -230,7 +240,7 @@ pub enum LoadDumpError { impl ResponseError for LoadDumpError {} -impl IntoResponse for LoadDumpError { +impl IntoResponse for &LoadDumpError { fn into_response(self) -> axum::response::Response { use LoadDumpError::*; @@ -250,7 +260,7 @@ impl IntoResponse for LoadDumpError { impl ResponseError for ForkError {} -impl IntoResponse for ForkError { +impl IntoResponse for &ForkError { fn into_response(self) -> axum::response::Response { match self { ForkError::Internal(_) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index e1d74068be..d7ca65749a 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -10,16 +10,18 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; use anyhow::Context as _; -use async_lock::{RwLock, RwLockUpgradableReadGuard}; +use async_lock::RwLock; use bottomless::bottomless_wal::CreateBottomlessWal; use bottomless::replicator::Options; use bytes::Bytes; use chrono::NaiveDateTime; use enclose::enclose; -use futures_core::Stream; +use futures::TryFutureExt; +use futures_core::{Future, Stream}; use hyper::Uri; use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient; use libsql_sys::wal::{Sqlite3WalManager, WalManager}; +use moka::future::Cache; use parking_lot::Mutex; use rusqlite::ErrorCode; use serde::de::Visitor; @@ -30,7 +32,6 @@ use tokio::task::JoinSet; use tokio::time::{Duration, Instant}; use tokio_util::io::StreamReader; use tonic::transport::Channel; -use tracing::trace; use uuid::Uuid; use crate::auth::Authenticated; @@ -379,6 +380,8 @@ impl MakeNamespace for ReplicaNamespaceMaker { } } +type NamespaceEntry = Arc>>>; + /// Stores and manage a set of namespaces. pub struct NamespaceStore { inner: Arc>, @@ -393,7 +396,7 @@ impl Clone for NamespaceStore { } struct NamespaceStoreInner { - store: RwLock>>, + store: Cache>, metadata: MetaStore, /// The namespace factory, to create new namespaces. make_namespace: M, @@ -411,9 +414,25 @@ impl NamespaceStore { ) -> crate::Result { let metadata = MetaStore::new(meta_store_path).await?; + let store = Cache::>::builder() + .async_eviction_listener(|name, ns, _| { + Box::pin(async move { + tracing::info!("namespace `{name}` deallocated"); + // shutdown namespace + if let Some(ns) = ns.write().await.take() { + if let Err(e) = ns.destroy().await { + tracing::error!("error deallocating `{name}`: {e}") + } + } + }) + }) + // TODO(marin): configurable capacity + .max_capacity(25) + .build(); + Ok(Self { inner: Arc::new(NamespaceStoreInner { - store: Default::default(), + store, metadata, make_namespace, allow_lazy_creation, @@ -427,18 +446,19 @@ impl NamespaceStore { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); } - let mut lock = self.inner.store.write().await; let mut bottomless_db_id_init = NamespaceBottomlessDbIdInit::FetchFromConfig; - if let Some(ns) = lock.remove(&namespace) { - bottomless_db_id_init = NamespaceBottomlessDbIdInit::Provided( - NamespaceBottomlessDbId::from_config(&ns.db_config_store.get()), - ); + if let Some(ns) = self.inner.store.remove(&namespace).await { // FIXME: when destroying, we are waiting for all the tasks associated with the - // allocation to finnish, which create a lot of contention on the lock. Need to use a + // allocation to finish, which create a lot of contention on the lock. Need to use a // conccurent hashmap to deal with this issue. // deallocate in-memory resources - ns.destroy().await?; + if let Some(ns) = ns.write().await.take() { + bottomless_db_id_init = NamespaceBottomlessDbIdInit::Provided( + NamespaceBottomlessDbId::from_config(&ns.db_config_store.get()), + ); + ns.destroy().await?; + } } // destroy on-disk database and backups @@ -457,24 +477,29 @@ impl NamespaceStore { Ok(()) } - async fn reset( + pub async fn reset( &self, namespace: NamespaceName, restore_option: RestoreOption, - ) -> crate::Result<()> { - if self.inner.has_shutdown.load(Ordering::Relaxed) { - return Err(Error::NamespaceStoreShutdown); - } - let mut lock = self.inner.store.write().await; - if let Some(ns) = lock.remove(&namespace) { + ) -> anyhow::Result<()> { + // The process for reseting is as follow: + // - get a lock on the namespace entry, if the entry exists, then it's a lock on the entry, + // if it doesn't exist, insert an empty entry and take a lock on it + // - destroy the old namespace + // - create a new namespace and insert it in the held lock + let entry = self + .inner + .store + .get_with(namespace.clone(), async { Default::default() }) + .await; + let mut lock = entry.write().await; + if let Some(ns) = lock.take() { // FIXME: when destroying, we are waiting for all the tasks associated with the // allocation to finnish, which create a lot of contention on the lock. Need to use a // conccurent hashmap to deal with this issue. - // deallocate in-memory resources ns.destroy().await?; } - // destroy on-disk database self.inner .make_namespace @@ -497,7 +522,8 @@ impl NamespaceStore { &self.inner.metadata, ) .await?; - lock.insert(namespace, ns); + + lock.replace(ns); Ok(()) } @@ -534,18 +560,22 @@ impl NamespaceStore { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); } - let mut lock = self.inner.store.write().await; - if lock.contains_key(&to) { - return Err(crate::error::Error::NamespaceAlreadyExist( - to.as_str().to_string(), - )); + + let to_entry = self + .inner + .store + .get_with(to.clone(), async { Default::default() }) + .await; + let mut to_lock = to_entry.write().await; + if to_lock.is_some() { + return Err(crate::error::Error::NamespaceAlreadyExist(to.to_string())); } // check that the source namespace exists - let from_ns = match lock.entry(from.clone()) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - // we just want to load the namespace into memory, so we refuse creation. + let from_entry = self + .inner + .store + .try_get_with(from.clone(), async { let ns = self .inner .make_namespace @@ -558,16 +588,25 @@ impl NamespaceStore { &self.inner.metadata, ) .await?; - e.insert(ns) - } + tracing::info!("loaded namespace: `{to}`"); + Ok::<_, crate::error::Error>(Arc::new(RwLock::new(Some(ns)))) + }) + .await + // FIXME: find how to deal with Arc + .unwrap(); + + let from_lock = from_entry.read().await; + let Some(from_ns) = &*from_lock else { + return Err(crate::error::Error::NamespaceDoesntExist(to.to_string())); }; - let forked = self + let to_ns = self .inner .make_namespace .fork(from_ns, to.clone(), timestamp, &self.inner.metadata) .await?; - lock.insert(to.clone(), forked); + + to_lock.replace(to_ns); Ok(()) } @@ -579,7 +618,7 @@ impl NamespaceStore { f: Fun, ) -> crate::Result where - Fun: FnOnce(&Namespace) -> R, + Fun: FnOnce(&Namespace) -> R + 'static, { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); @@ -593,82 +632,144 @@ impl NamespaceStore { pub async fn with(&self, namespace: NamespaceName, f: Fun) -> crate::Result where - Fun: FnOnce(&Namespace) -> R, + Fun: FnOnce(&Namespace) -> R + 'static, { - if self.inner.has_shutdown.load(Ordering::Relaxed) { - return Err(Error::NamespaceStoreShutdown); - } - let before_load = Instant::now(); - let lock = self.inner.store.upgradable_read().await; - if let Some(ns) = lock.get(&namespace) { - Ok(f(ns)) - } else { - let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await; - let ns = self - .inner - .make_namespace - .create( - namespace.clone(), - RestoreOption::Latest, - NamespaceBottomlessDbId::NotProvided, - self.inner.allow_lazy_creation, - self.make_reset_cb(), - &self.inner.metadata, - ) - .await?; - let ret = f(&ns); - tracing::info!("loaded namespace: `{namespace}`"); - lock.insert(namespace, ns); + let init = { + let namespace = namespace.clone(); + async move { + let ns = self + .inner + .make_namespace + .create( + namespace.clone(), + RestoreOption::Latest, + NamespaceBottomlessDbId::NotProvided, + self.inner.allow_lazy_creation, + self.make_reset_cb(), + ) + .await?; + tracing::info!("loaded namespace: `{namespace}`"); - NAMESPACE_LOAD_LATENCY.record(before_load.elapsed()); + Ok(Some(ns)) + } + }; - Ok(ret) - } + let f = { + let name = namespace.clone(); + move |ns: NamespaceEntry| async move { + let lock = ns.read().await; + match &*lock { + Some(ns) => Ok(f(ns)), + // the namespace was taken out of the entry + None => Err(Error::NamespaceDoesntExist(name.to_string())), + } + } + }; + + self.with_lock_or_init(namespace, f, init).await? } - pub async fn create( + async fn with_lock_or_init( &self, namespace: NamespaceName, - restore_option: RestoreOption, - bottomless_db_id: NamespaceBottomlessDbId, - ) -> crate::Result<()> { - if self.inner.has_shutdown.load(Ordering::Relaxed) { - return Err(Error::NamespaceStoreShutdown); - } - let lock = self.inner.store.upgradable_read().await; - if lock.contains_key(&namespace) { - return Err(crate::error::Error::NamespaceAlreadyExist( - namespace.as_str().to_owned(), - )); - } - + f: Fun, + init: Init, + ) -> crate::Result + where + Fun: FnOnce(NamespaceEntry) -> Fut, + Fut: Future, + Init: Future>>>, + { + let before_load = Instant::now(); let ns = self .inner - .make_namespace - .create( + .store + .try_get_with( namespace.clone(), - restore_option, - bottomless_db_id, - true, - self.make_reset_cb(), - &self.inner.metadata, + init.map_ok(|ns| Arc::new(RwLock::new(ns))), ) .await?; + NAMESPACE_LOAD_LATENCY.record(before_load.elapsed()); + Ok(f(ns).await) + } - let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await; - tracing::info!("loaded namespace: `{namespace}`"); - lock.insert(namespace, ns); + pub async fn create( + &self, + namespace: NamespaceName, + restore_option: RestoreOption, + bottomless_db_id: NamespaceBottomlessDbId, + ) -> crate::Result<()> { + let name = namespace.clone(); + let bottomless_db_id_for_init = bottomless_db_id.clone(); + let init = async { + let ns = self + .inner + .make_namespace + .create( + name.clone(), + RestoreOption::Latest, + bottomless_db_id_for_init, + false, + self.make_reset_cb(), + ) + .await; + match ns { + // the namespace already exist, load it, and let the `f` function fail + Ok(ns) => { + tracing::info!("loaded namespace: `{name}`"); + Ok(Some(ns)) + } + // return an empty slot to put the new namespace in + Err(Error::NamespaceDoesntExist(_)) => Ok(None), + Err(e) => Err(e), + } + }; - Ok(()) + let f = { + let name = namespace.clone(); + move |ns: NamespaceEntry| { + let ns = ns.clone(); + let name = name.clone(); + async move { + let mut lock = ns.write().await; + if lock.is_some() { + return Err(Error::NamespaceAlreadyExist(name.to_string())); + } + let ns = self + .inner + .make_namespace + .create( + name.clone(), + restore_option, + bottomless_db_id, + true, + self.make_reset_cb(), + &self.inner.metadata + ) + .await?; + + tracing::info!("loaded namespace: `{name}`"); + + lock.replace(ns); + + Ok(()) + } + } + }; + + self.with_lock_or_init(namespace, f, init).await? } pub async fn shutdown(self) -> crate::Result<()> { self.inner.has_shutdown.store(true, Ordering::Relaxed); - let mut lock = self.inner.store.write().await; - for (name, ns) in lock.drain() { - ns.shutdown(self.inner.snapshot_at_shutdown).await?; - trace!("shutdown namespace: `{}`", name); + for (_name, entry) in self.inner.store.iter() { + let mut lock = entry.write().await; + if let Some(ns) = lock.take() { + ns.shutdown(self.inner.snapshot_at_shutdown).await?; + } } + self.inner.store.invalidate_all(); + self.inner.store.run_pending_tasks().await; Ok(()) } From 58be66bcd3e5d83ef7caf16a7fa6b7c62f518a2c Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 1 Dec 2023 17:18:30 +0100 Subject: [PATCH 02/16] namespace: make namespace cache configurable ... specifically, half-configurable, since we could also configure the time-to-idle period. On the other hand, that's another burden for the user to decide, and 5 minutes sound generous enough. --- libsql-server/src/lib.rs | 19 +++++++++++--- libsql-server/src/main.rs | 5 ++++ libsql-server/src/namespace/mod.rs | 39 ++++++++++++++-------------- libsql-server/src/test/bottomless.rs | 1 + 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index 846f28a76b..fc0424e1d3 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -100,6 +100,7 @@ pub struct Server, pub disable_namespaces: bool, pub shutdown: Arc, + pub max_active_namespaces: usize, } impl Default for Server { @@ -117,6 +118,7 @@ impl Default for Server { heartbeat_config: Default::default(), disable_namespaces: true, shutdown: Default::default(), + max_active_namespaces: 100, } } } @@ -384,6 +386,7 @@ where db_config: self.db_config.clone(), base_path: self.path.clone(), auth: auth.clone(), + max_active_namespaces: self.max_active_namespaces, }; let (namespaces, proxy_service, replication_service) = replica.configure().await?; self.rpc_client_config = None; @@ -422,6 +425,7 @@ where extensions, base_path: self.path.clone(), disable_namespaces: self.disable_namespaces, + max_active_namespaces: self.max_active_namespaces, join_set: &mut join_set, auth: auth.clone(), }; @@ -487,6 +491,7 @@ struct Primary<'a, A> { extensions: Arc<[PathBuf]>, base_path: Arc, disable_namespaces: bool, + max_active_namespaces: usize, auth: Arc, join_set: &'a mut JoinSet>, } @@ -520,14 +525,13 @@ where let meta_store_path = conf.base_path.join("metastore"); let factory = PrimaryNamespaceMaker::new(conf); - let namespaces = NamespaceStore::new( factory, false, self.db_config.snapshot_at_shutdown, meta_store_path, - ) - .await?; + self.max_active_namespaces, + ); // eagerly load the default namespace when namespaces are disabled if self.disable_namespaces { @@ -602,6 +606,7 @@ struct Replica { db_config: DbConfig, base_path: Arc, auth: Arc, + max_active_namespaces: usize, } impl Replica { @@ -627,7 +632,13 @@ impl Replica { let meta_store_path = conf.base_path.join("metastore"); let factory = ReplicaNamespaceMaker::new(conf); - let namespaces = NamespaceStore::new(factory, true, false, meta_store_path).await?; + let namespaces = NamespaceStore::new( + factory, + true, + false, + meta_store_path, + self.max_active_namespaces, + ); let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone()); let proxy_service = ReplicaProxyService::new(channel, uri, self.auth.clone()); diff --git a/libsql-server/src/main.rs b/libsql-server/src/main.rs index 57c557324a..4f850ff6b6 100644 --- a/libsql-server/src/main.rs +++ b/libsql-server/src/main.rs @@ -195,6 +195,10 @@ struct Cli { /// Enable snapshot at shutdown #[clap(long)] snapshot_at_shutdown: bool, + + /// Max active namespaces kept in-memory + #[clap(long, env = "SQLD_MAX_ACTIVE_NAMESPACES", default_value = "100")] + max_active_namespaces: usize, } #[derive(clap::Subcommand, Debug)] @@ -506,6 +510,7 @@ async fn build_server(config: &Cli) -> anyhow::Result { disable_default_namespace: config.disable_default_namespace, disable_namespaces: !config.enable_namespaces, shutdown, + max_active_namespaces: config.max_active_namespaces, }) } diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index d7ca65749a..955042f010 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -406,31 +406,30 @@ struct NamespaceStoreInner { } impl NamespaceStore { - pub async fn new( + pub fn new( make_namespace: M, allow_lazy_creation: bool, snapshot_at_shutdown: bool, meta_store_path: impl AsRef, - ) -> crate::Result { - let metadata = MetaStore::new(meta_store_path).await?; - + max_active_namespaces: usize, + ) -> Self { let store = Cache::>::builder() - .async_eviction_listener(|name, ns, _| { - Box::pin(async move { - tracing::info!("namespace `{name}` deallocated"); - // shutdown namespace - if let Some(ns) = ns.write().await.take() { - if let Err(e) = ns.destroy().await { - tracing::error!("error deallocating `{name}`: {e}") + .async_eviction_listener(|name, ns, _| { + Box::pin(async move { + tracing::info!("namespace `{name}` deallocated"); + // shutdown namespace + if let Some(ns) = ns.write().await.take() { + if let Err(e) = ns.destroy().await { + tracing::error!("error deallocating `{name}`: {e}") + } } - } + }) }) - }) - // TODO(marin): configurable capacity - .max_capacity(25) - .build(); - - Ok(Self { + // TODO(marin): configurable capacity + .max_capacity(max_active_namespaces as u64) + .time_to_idle(Duration::from_secs(300)) + .build(); + Self { inner: Arc::new(NamespaceStoreInner { store, metadata, @@ -439,7 +438,7 @@ impl NamespaceStore { has_shutdown: AtomicBool::new(false), snapshot_at_shutdown, }), - }) + } } pub async fn destroy(&self, namespace: NamespaceName) -> crate::Result<()> { @@ -744,7 +743,7 @@ impl NamespaceStore { bottomless_db_id, true, self.make_reset_cb(), - &self.inner.metadata + &self.inner.metadata, ) .await?; diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index e8f1523c83..dc0a39e5ad 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -105,6 +105,7 @@ async fn configure_server( }, path: path.into().into(), disable_default_namespace: false, + max_active_namespaces: 100, heartbeat_config: None, idle_shutdown_timeout: None, initial_idle_shutdown_timeout: None, From fff3d7a8225aa58b5a6c229aafe00390accdf028 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Sun, 3 Dec 2023 19:23:57 +0100 Subject: [PATCH 03/16] review fixes: no unwrap and no outdated comments --- libsql-server/src/namespace/mod.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 955042f010..2aa147bb48 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -447,10 +447,6 @@ impl NamespaceStore { } let mut bottomless_db_id_init = NamespaceBottomlessDbIdInit::FetchFromConfig; if let Some(ns) = self.inner.store.remove(&namespace).await { - // FIXME: when destroying, we are waiting for all the tasks associated with the - // allocation to finish, which create a lot of contention on the lock. Need to use a - // conccurent hashmap to deal with this issue. - // deallocate in-memory resources if let Some(ns) = ns.write().await.take() { bottomless_db_id_init = NamespaceBottomlessDbIdInit::Provided( @@ -493,10 +489,6 @@ impl NamespaceStore { .await; let mut lock = entry.write().await; if let Some(ns) = lock.take() { - // FIXME: when destroying, we are waiting for all the tasks associated with the - // allocation to finnish, which create a lot of contention on the lock. Need to use a - // conccurent hashmap to deal with this issue. - // deallocate in-memory resources ns.destroy().await?; } // destroy on-disk database @@ -590,9 +582,7 @@ impl NamespaceStore { tracing::info!("loaded namespace: `{to}`"); Ok::<_, crate::error::Error>(Arc::new(RwLock::new(Some(ns)))) }) - .await - // FIXME: find how to deal with Arc - .unwrap(); + .await?; let from_lock = from_entry.read().await; let Some(from_ns) = &*from_lock else { From 619d59b2422931acc1fe29064b0d0f7c65530985 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Sun, 3 Dec 2023 19:25:50 +0100 Subject: [PATCH 04/16] namespace: add a comment regarding snapshot-on-cache-eviction --- libsql-server/src/namespace/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 2aa147bb48..f3fa6a68ec 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -415,6 +415,11 @@ impl NamespaceStore { ) -> Self { let store = Cache::>::builder() .async_eviction_listener(|name, ns, _| { + // TODO(sarna): not clear if we should snapshot-on-evict... + // On the one hand, better to do so, because we have no idea + // for how long we're evicting a namespace. + // On the other, if there's lots of cache pressure, snapshotting + // very often will kill the machine's I/O. Box::pin(async move { tracing::info!("namespace `{name}` deallocated"); // shutdown namespace @@ -425,7 +430,6 @@ impl NamespaceStore { } }) }) - // TODO(marin): configurable capacity .max_capacity(max_active_namespaces as u64) .time_to_idle(Duration::from_secs(300)) .build(); From eee8d1956029c6aebe58cba079dd8bda69dcbe3d Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 4 Dec 2023 11:26:12 +0100 Subject: [PATCH 05/16] namespace: check if a namespace exists early Originally the test was performed in the f() function, but that changed after we allowed default namespace creation. --- libsql-server/src/namespace/mod.rs | 45 +++++++----------------------- 1 file changed, 10 insertions(+), 35 deletions(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index f3fa6a68ec..686b67542b 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -674,6 +674,7 @@ impl NamespaceStore { Init: Future>>>, { let before_load = Instant::now(); + tracing::warn!("starting with_lock_or_init: {namespace}"); let ns = self .inner .store @@ -692,6 +693,12 @@ impl NamespaceStore { restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, ) -> crate::Result<()> { + if self.inner.store.get(&namespace).await.is_some() { + return Err(crate::error::Error::NamespaceAlreadyExist( + namespace.to_string(), + )); + } + let name = namespace.clone(); let bottomless_db_id_for_init = bottomless_db_id.clone(); let init = async { @@ -700,14 +707,13 @@ impl NamespaceStore { .make_namespace .create( name.clone(), - RestoreOption::Latest, + restore_option, bottomless_db_id_for_init, false, self.make_reset_cb(), ) .await; match ns { - // the namespace already exist, load it, and let the `f` function fail Ok(ns) => { tracing::info!("loaded namespace: `{name}`"); Ok(Some(ns)) @@ -718,39 +724,8 @@ impl NamespaceStore { } }; - let f = { - let name = namespace.clone(); - move |ns: NamespaceEntry| { - let ns = ns.clone(); - let name = name.clone(); - async move { - let mut lock = ns.write().await; - if lock.is_some() { - return Err(Error::NamespaceAlreadyExist(name.to_string())); - } - let ns = self - .inner - .make_namespace - .create( - name.clone(), - restore_option, - bottomless_db_id, - true, - self.make_reset_cb(), - &self.inner.metadata, - ) - .await?; - - tracing::info!("loaded namespace: `{name}`"); - - lock.replace(ns); - - Ok(()) - } - } - }; - - self.with_lock_or_init(namespace, f, init).await? + self.with_lock_or_init(namespace, |_| async { Ok(()) }, init) + .await? } pub async fn shutdown(self) -> crate::Result<()> { From 7f531b592b58e5d2b8fa396d64fda36a7ce3cc62 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 4 Dec 2023 11:31:54 +0100 Subject: [PATCH 06/16] namespace: snapshot on evict TODO: snapshotting on eviction will kill I/O if we get n+1 active namespaces, given a cache of n entries... We need a more robust mechanism for snapshot tracking to avoid: 1. snapshotting the same namespace more than once, if it was evicted, then recreated, and then evicted again 2. snapshotting a namespace if the previous snapshot happened not long ago (esp. if there's no new data) --- libsql-server/src/namespace/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 686b67542b..a190e64886 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -414,7 +414,8 @@ impl NamespaceStore { max_active_namespaces: usize, ) -> Self { let store = Cache::>::builder() - .async_eviction_listener(|name, ns, _| { + .async_eviction_listener(move |name, ns, _| { + tracing::info!("evicting namespace `{name}` asynchronously"); // TODO(sarna): not clear if we should snapshot-on-evict... // On the one hand, better to do so, because we have no idea // for how long we're evicting a namespace. @@ -424,7 +425,7 @@ impl NamespaceStore { tracing::info!("namespace `{name}` deallocated"); // shutdown namespace if let Some(ns) = ns.write().await.take() { - if let Err(e) = ns.destroy().await { + if let Err(e) = ns.shutdown(snapshot_at_shutdown).await { tracing::error!("error deallocating `{name}`: {e}") } } From 3d017103eec02344f7c2ea6520b880de73c4cd71 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 4 Dec 2023 15:20:04 +0100 Subject: [PATCH 07/16] namespace: add a function for verifying if a namespace exists Now that we have cache, it's better to consult the MakeNamespace implementation to check if the namespace has already been created. --- libsql-server/src/namespace/mod.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index a190e64886..ccf81eccbf 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -198,6 +198,8 @@ pub trait MakeNamespace: Sync + Send + 'static { timestamp: Option, meta_store: &MetaStore, ) -> crate::Result>; + + fn exists(&self, namespace: &NamespaceName) -> bool; } /// Creates new primary `Namespace` @@ -315,6 +317,11 @@ impl MakeNamespace for PrimaryNamespaceMaker { let ns = fork_task.fork().await?; Ok(ns) } + + fn exists(&self, namespace: &NamespaceName) -> bool { + let ns_path = self.config.base_path.join("dbs").join(namespace.as_str()); + ns_path.try_exists().unwrap_or(false) + } } /// Creates new replica `Namespace` @@ -378,6 +385,11 @@ impl MakeNamespace for ReplicaNamespaceMaker { ) -> crate::Result> { return Err(ForkError::ForkReplica.into()); } + + fn exists(&self, namespace: &NamespaceName) -> bool { + let ns_path = self.config.base_path.join("dbs").join(namespace.as_str()); + ns_path.try_exists().unwrap_or(false) + } } type NamespaceEntry = Arc>>>; @@ -675,7 +687,6 @@ impl NamespaceStore { Init: Future>>>, { let before_load = Instant::now(); - tracing::warn!("starting with_lock_or_init: {namespace}"); let ns = self .inner .store @@ -694,10 +705,8 @@ impl NamespaceStore { restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, ) -> crate::Result<()> { - if self.inner.store.get(&namespace).await.is_some() { - return Err(crate::error::Error::NamespaceAlreadyExist( - namespace.to_string(), - )); + if self.inner.make_namespace.exists(&namespace) { + return Err(Error::NamespaceAlreadyExist(namespace.to_string())); } let name = namespace.clone(); From b3beda3565b589e6ac83d835f69e6450b0b9d753 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 5 Dec 2023 12:42:20 +0100 Subject: [PATCH 08/16] namespace: auto-create namespace in create() Now that we abstracted out exists() for MakeNamespace, we no longer need to pass a boolean that allows or disallows auto-creation. --- libsql-server/src/namespace/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index ccf81eccbf..6d80e399bd 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -719,7 +719,7 @@ impl NamespaceStore { name.clone(), restore_option, bottomless_db_id_for_init, - false, + true, self.make_reset_cb(), ) .await; From 008eeab355d8e00d3ba0f172a405abc676647a20 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 5 Dec 2023 13:40:26 +0100 Subject: [PATCH 09/16] namespace: drop allow_creation boolean With MakeNamespace::exists(), the logic for checking if a namespace already exists is shifted to a separate call. --- libsql-server/src/namespace/fork.rs | 1 - libsql-server/src/namespace/mod.rs | 58 +++++------------------------ 2 files changed, 10 insertions(+), 49 deletions(-) diff --git a/libsql-server/src/namespace/fork.rs b/libsql-server/src/namespace/fork.rs index 8373c4d4ae..28efd7f62c 100644 --- a/libsql-server/src/namespace/fork.rs +++ b/libsql-server/src/namespace/fork.rs @@ -109,7 +109,6 @@ impl ForkTask<'_> { self.dest_namespace.clone(), RestoreOption::Latest, self.bottomless_db_id, - true, // Forking works only on primary and // PrimaryNamespaceMaker::create ignores // reset_cb param diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 6d80e399bd..5437851493 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -175,7 +175,6 @@ pub trait MakeNamespace: Sync + Send + 'static { name: NamespaceName, restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, reset: ResetCb, meta_store: &MetaStore, ) -> crate::Result>; @@ -223,7 +222,6 @@ impl MakeNamespace for PrimaryNamespaceMaker { name: NamespaceName, restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, _reset: ResetCb, meta_store: &MetaStore, ) -> crate::Result> { @@ -232,7 +230,6 @@ impl MakeNamespace for PrimaryNamespaceMaker { name.clone(), restore_option, bottomless_db_id, - allow_creation, meta_store.handle(name), ) .await @@ -345,7 +342,6 @@ impl MakeNamespace for ReplicaNamespaceMaker { name: NamespaceName, restore_option: RestoreOption, _bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, reset: ResetCb, meta_store: &MetaStore, ) -> crate::Result> { @@ -354,14 +350,7 @@ impl MakeNamespace for ReplicaNamespaceMaker { _ => Err(LoadDumpError::ReplicaLoadDump)?, } - Namespace::new_replica( - &self.config, - name.clone(), - allow_creation, - reset, - meta_store.handle(name), - ) - .await + Namespace::new_replica(&self.config, name.clone(), reset, meta_store.handle(name)).await } async fn destroy( @@ -525,7 +514,6 @@ impl NamespaceStore { namespace.clone(), restore_option, NamespaceBottomlessDbId::NotProvided, - true, self.make_reset_cb(), &self.inner.metadata, ) @@ -580,6 +568,10 @@ impl NamespaceStore { } // check that the source namespace exists + if !self.inner.make_namespace.exists(&from) { + return Err(crate::error::Error::NamespaceDoesntExist(from.to_string())); + } + let from_entry = self .inner .store @@ -591,7 +583,6 @@ impl NamespaceStore { from.clone(), RestoreOption::Latest, NamespaceBottomlessDbId::NotProvided, - false, self.make_reset_cb(), &self.inner.metadata, ) @@ -650,7 +641,6 @@ impl NamespaceStore { namespace.clone(), RestoreOption::Latest, NamespaceBottomlessDbId::NotProvided, - self.inner.allow_lazy_creation, self.make_reset_cb(), ) .await?; @@ -705,7 +695,11 @@ impl NamespaceStore { restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, ) -> crate::Result<()> { - if self.inner.make_namespace.exists(&namespace) { + // With namespaces disabled, the default namespace can be auto-created, + // otherwise it's an error. + if self.inner.allow_lazy_creation && namespace == NamespaceName::default() { + tracing::trace!("auto-creating default namespace"); + } else if self.inner.make_namespace.exists(&namespace) { return Err(Error::NamespaceAlreadyExist(namespace.to_string())); } @@ -719,7 +713,6 @@ impl NamespaceStore { name.clone(), restore_option, bottomless_db_id_for_init, - true, self.make_reset_cb(), ) .await; @@ -821,20 +814,12 @@ impl Namespace { async fn new_replica( config: &ReplicaNamespaceConfig, name: NamespaceName, - allow_creation: bool, reset: ResetCb, meta_store_handle: MetaStoreHandle, ) -> crate::Result { tracing::debug!("creating replica namespace"); let db_path = config.base_path.join("dbs").join(name.as_str()); - // there isn't a database folder for this database, and we're not allowed to create it. - if !allow_creation && !db_path.exists() { - return Err(crate::error::Error::NamespaceDoesntExist( - name.as_str().to_owned(), - )); - } - let rpc_client = ReplicationLogClient::with_origin(config.channel.clone(), config.uri.clone()); let client = @@ -991,7 +976,6 @@ impl Namespace { name: NamespaceName, restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, meta_store_handle: MetaStoreHandle, ) -> crate::Result { // FIXME: make that truly atomic. explore the idea of using temp directories, and it's implications @@ -1000,7 +984,6 @@ impl Namespace { name.clone(), restore_option, bottomless_db_id, - allow_creation, meta_store_handle, ) .await @@ -1021,21 +1004,11 @@ impl Namespace { name: NamespaceName, restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, meta_store_handle: MetaStoreHandle, ) -> crate::Result { - // if namespaces are disabled, then we allow creation for the default namespace. - let allow_creation = - allow_creation || (config.disable_namespace && name == NamespaceName::default()); - let mut join_set = JoinSet::new(); let db_path = config.base_path.join("dbs").join(name.as_str()); - // The database folder doesn't exist, bottomless replication is disabled (no db to recover) - // and we're not allowed to create a new database, return an error. - if !allow_creation && config.bottomless_replication.is_none() && !db_path.try_exists()? { - return Err(crate::error::Error::NamespaceDoesntExist(name.to_string())); - } let mut is_dirty = config.db_is_dirty; tokio::fs::create_dir_all(&db_path).await?; @@ -1072,17 +1045,6 @@ impl Namespace { let options = make_bottomless_options(options, bottomless_db_id, name.clone()); let (replicator, did_recover) = init_bottomless_replicator(db_path.join("data"), options, &restore_option).await?; - - // There wasn't any database to recover from bottomless, and we are not allowed to - // create a new database - if !did_recover && !allow_creation && !db_path.try_exists()? { - // clean stale directory - // FIXME: this is not atomic, we could be left with a stale directory. Maybe do - // setup in a temp directory and then atomically rename it? - let _ = tokio::fs::remove_dir_all(&db_path).await; - return Err(crate::error::Error::NamespaceDoesntExist(name.to_string())); - } - is_dirty |= did_recover; Some(replicator) } else { From ff0b851cf17f50387c6114a75fb9508867b3e55a Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 5 Dec 2023 14:23:57 +0100 Subject: [PATCH 10/16] namespace: fix honoring allow_lazy_creation --- libsql-server/src/namespace/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 5437851493..f2f91389e8 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -634,6 +634,13 @@ impl NamespaceStore { let init = { let namespace = namespace.clone(); async move { + if !self.inner.make_namespace.exists(&namespace) { + if self.inner.allow_lazy_creation { + return Ok(None); + } else { + return Err(Error::NamespaceDoesntExist(namespace.to_string())); + } + } let ns = self .inner .make_namespace From e2d8fa548fb3e08706c08439f55f11130c44e6c3 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 5 Dec 2023 14:34:27 +0100 Subject: [PATCH 11/16] namespace: move checking if database is created to init() --- libsql-server/src/namespace/mod.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index f2f91389e8..b4eeb95310 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -634,12 +634,9 @@ impl NamespaceStore { let init = { let namespace = namespace.clone(); async move { - if !self.inner.make_namespace.exists(&namespace) { - if self.inner.allow_lazy_creation { - return Ok(None); - } else { - return Err(Error::NamespaceDoesntExist(namespace.to_string())); - } + if !self.inner.make_namespace.exists(&namespace) && !self.inner.allow_lazy_creation + { + return Err(Error::NamespaceDoesntExist(namespace.to_string())); } let ns = self .inner From 9a481eea78151dd79cbf08f95495c6ce8b861c3b Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 5 Dec 2023 14:52:03 +0100 Subject: [PATCH 12/16] namespace: allow auto-creation of the default namespace --- libsql-server/src/namespace/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index b4eeb95310..a87c6858a5 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -634,7 +634,9 @@ impl NamespaceStore { let init = { let namespace = namespace.clone(); async move { - if !self.inner.make_namespace.exists(&namespace) && !self.inner.allow_lazy_creation + if namespace != NamespaceName::default() + && !self.inner.make_namespace.exists(&namespace) + && !self.inner.allow_lazy_creation { return Err(Error::NamespaceDoesntExist(namespace.to_string())); } @@ -701,8 +703,8 @@ impl NamespaceStore { ) -> crate::Result<()> { // With namespaces disabled, the default namespace can be auto-created, // otherwise it's an error. - if self.inner.allow_lazy_creation && namespace == NamespaceName::default() { - tracing::trace!("auto-creating default namespace"); + if self.inner.allow_lazy_creation || namespace == NamespaceName::default() { + tracing::trace!("auto-creating the namespace"); } else if self.inner.make_namespace.exists(&namespace) { return Err(Error::NamespaceAlreadyExist(namespace.to_string())); } From c82d96c0f5f1f51f35b3d8ca545997dbb61be28e Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 5 Dec 2023 17:45:12 +0100 Subject: [PATCH 13/16] namespace: bump time_to_idle to a very high value ... for testing purposes only --- libsql-server/src/namespace/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index a87c6858a5..79c1522943 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -433,7 +433,7 @@ impl NamespaceStore { }) }) .max_capacity(max_active_namespaces as u64) - .time_to_idle(Duration::from_secs(300)) + .time_to_idle(Duration::from_secs(86400)) .build(); Self { inner: Arc::new(NamespaceStoreInner { From 355e2988981c2d7fc6f8a6fff3b9d92af23d56e4 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 5 Dec 2023 18:07:14 +0100 Subject: [PATCH 14/16] namespace: print cache eviction reason in debug logs --- libsql-server/src/namespace/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 79c1522943..00dfdfd86e 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -414,9 +414,10 @@ impl NamespaceStore { meta_store_path: impl AsRef, max_active_namespaces: usize, ) -> Self { + tracing::trace!("Max active namespaces: {max_active_namespaces}"); let store = Cache::>::builder() - .async_eviction_listener(move |name, ns, _| { - tracing::info!("evicting namespace `{name}` asynchronously"); + .async_eviction_listener(move |name, ns, cause| { + tracing::debug!("evicting namespace `{name}` asynchronously: {cause:?}"); // TODO(sarna): not clear if we should snapshot-on-evict... // On the one hand, better to do so, because we have no idea // for how long we're evicting a namespace. From 51de23b04b0b8413333482a1b8fb6ef1986de861 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 11 Dec 2023 16:25:27 +0100 Subject: [PATCH 15/16] namespace: add bottomless checks in exists() Even if the namespace doesn't exist locally, it might have a functional backup. --- bottomless/src/replicator.rs | 27 +++++++++++++++++++ libsql-server/src/namespace/mod.rs | 43 +++++++++++++++++++++++++----- 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 864cdfb0f9..1be2a43fe6 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -376,6 +376,33 @@ impl Replicator { }) } + /// Checks if there exists any backup of given database + pub async fn has_backup_of(db_name: impl AsRef, options: &Options) -> Result { + let prefix = match &options.db_id { + Some(db_id) => format!("{db_id}-"), + None => format!("ns-:{}-", db_name.as_ref()), + }; + let config = options.client_config().await?; + let client = Client::from_conf(config); + let bucket = options.bucket_name.clone(); + + match client.head_bucket().bucket(&bucket).send().await { + Ok(_) => tracing::trace!("Bucket {bucket} exists and is accessible"), + Err(e) => { + tracing::trace!("Bucket checking error: {e}"); + return Err(e.into()); + } + } + + let mut last_frame = 0; + let list_objects = client.list_objects().bucket(&bucket).prefix(&prefix); + let response = list_objects.send().await?; + let _ = Self::try_get_last_frame_no(response, &mut last_frame); + tracing::trace!("Last frame of {prefix}: {last_frame}"); + + Ok(last_frame > 0) + } + pub async fn shutdown_gracefully(&mut self) -> Result<()> { tracing::info!("bottomless replicator: shutting down..."); // 1. wait for all committed WAL frames to be committed locally diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 00dfdfd86e..c9b0545e2c 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -76,6 +76,12 @@ impl Default for NamespaceName { } } +impl AsRef for NamespaceName { + fn as_ref(&self) -> &str { + self.as_str() + } +} + impl NamespaceName { pub fn from_string(s: String) -> crate::Result { Self::validate(&s)?; @@ -198,7 +204,7 @@ pub trait MakeNamespace: Sync + Send + 'static { meta_store: &MetaStore, ) -> crate::Result>; - fn exists(&self, namespace: &NamespaceName) -> bool; + async fn exists(&self, namespace: &NamespaceName) -> bool; } /// Creates new primary `Namespace` @@ -315,9 +321,32 @@ impl MakeNamespace for PrimaryNamespaceMaker { Ok(ns) } - fn exists(&self, namespace: &NamespaceName) -> bool { + async fn exists(&self, namespace: &NamespaceName) -> bool { let ns_path = self.config.base_path.join("dbs").join(namespace.as_str()); - ns_path.try_exists().unwrap_or(false) + if let Ok(true) = ns_path.try_exists() { + return true; + } + + if let Some(replication_options) = self.config.bottomless_replication.as_ref() { + tracing::info!("Bottomless: {:?}", replication_options); + match bottomless::replicator::Replicator::has_backup_of(namespace, replication_options) + .await + { + Ok(true) => { + tracing::debug!("Bottomless: Backup found"); + return true; + } + Ok(false) => { + tracing::debug!("Bottomless: No backup found"); + } + Err(err) => { + tracing::debug!("Bottomless: Error checking backup: {}", err); + } + } + } else { + tracing::debug!("Bottomless: No backup configured"); + } + false } } @@ -375,7 +404,7 @@ impl MakeNamespace for ReplicaNamespaceMaker { return Err(ForkError::ForkReplica.into()); } - fn exists(&self, namespace: &NamespaceName) -> bool { + async fn exists(&self, namespace: &NamespaceName) -> bool { let ns_path = self.config.base_path.join("dbs").join(namespace.as_str()); ns_path.try_exists().unwrap_or(false) } @@ -569,7 +598,7 @@ impl NamespaceStore { } // check that the source namespace exists - if !self.inner.make_namespace.exists(&from) { + if !self.inner.make_namespace.exists(&from).await { return Err(crate::error::Error::NamespaceDoesntExist(from.to_string())); } @@ -636,7 +665,7 @@ impl NamespaceStore { let namespace = namespace.clone(); async move { if namespace != NamespaceName::default() - && !self.inner.make_namespace.exists(&namespace) + && !self.inner.make_namespace.exists(&namespace).await && !self.inner.allow_lazy_creation { return Err(Error::NamespaceDoesntExist(namespace.to_string())); @@ -706,7 +735,7 @@ impl NamespaceStore { // otherwise it's an error. if self.inner.allow_lazy_creation || namespace == NamespaceName::default() { tracing::trace!("auto-creating the namespace"); - } else if self.inner.make_namespace.exists(&namespace) { + } else if self.inner.make_namespace.exists(&namespace).await { return Err(Error::NamespaceAlreadyExist(namespace.to_string())); } From 59ee522becc40d36414c8d3d9082a5b9450c4b83 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 15 Dec 2023 10:21:33 +0100 Subject: [PATCH 16/16] one of those painful rebases: part I --- libsql-server/src/lib.rs | 6 ++- libsql-server/src/namespace/meta_store.rs | 7 +++ libsql-server/src/namespace/mod.rs | 62 +++++------------------ 3 files changed, 25 insertions(+), 50 deletions(-) diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index fc0424e1d3..387f1110c1 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -531,7 +531,8 @@ where self.db_config.snapshot_at_shutdown, meta_store_path, self.max_active_namespaces, - ); + ) + .await?; // eagerly load the default namespace when namespaces are disabled if self.disable_namespaces { @@ -638,7 +639,8 @@ impl Replica { false, meta_store_path, self.max_active_namespaces, - ); + ) + .await?; let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone()); let proxy_service = ReplicaProxyService::new(channel, uri, self.auth.clone()); diff --git a/libsql-server/src/namespace/meta_store.rs b/libsql-server/src/namespace/meta_store.rs index feffff1281..c2cbb7f3e9 100644 --- a/libsql-server/src/namespace/meta_store.rs +++ b/libsql-server/src/namespace/meta_store.rs @@ -177,6 +177,13 @@ impl MetaStore { inner: HandleState::External(change_tx, rx), } } + + // TODO: we need to either make sure that the metastore is restored + // before we start accepting connections or we need to contact bottomless + // here to check if a namespace exists. Preferably the former. + pub fn exists(&self, namespace: &NamespaceName) -> bool { + self.inner.lock().configs.contains_key(namespace) + } } impl MetaStoreHandle { diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index c9b0545e2c..4af96543a4 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -2,8 +2,6 @@ mod fork; pub mod meta_store; pub mod replication_wal; -use std::collections::hash_map::Entry; -use std::collections::HashMap; use std::fmt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -203,8 +201,6 @@ pub trait MakeNamespace: Sync + Send + 'static { timestamp: Option, meta_store: &MetaStore, ) -> crate::Result>; - - async fn exists(&self, namespace: &NamespaceName) -> bool; } /// Creates new primary `Namespace` @@ -320,34 +316,6 @@ impl MakeNamespace for PrimaryNamespaceMaker { let ns = fork_task.fork().await?; Ok(ns) } - - async fn exists(&self, namespace: &NamespaceName) -> bool { - let ns_path = self.config.base_path.join("dbs").join(namespace.as_str()); - if let Ok(true) = ns_path.try_exists() { - return true; - } - - if let Some(replication_options) = self.config.bottomless_replication.as_ref() { - tracing::info!("Bottomless: {:?}", replication_options); - match bottomless::replicator::Replicator::has_backup_of(namespace, replication_options) - .await - { - Ok(true) => { - tracing::debug!("Bottomless: Backup found"); - return true; - } - Ok(false) => { - tracing::debug!("Bottomless: No backup found"); - } - Err(err) => { - tracing::debug!("Bottomless: Error checking backup: {}", err); - } - } - } else { - tracing::debug!("Bottomless: No backup configured"); - } - false - } } /// Creates new replica `Namespace` @@ -403,11 +371,6 @@ impl MakeNamespace for ReplicaNamespaceMaker { ) -> crate::Result> { return Err(ForkError::ForkReplica.into()); } - - async fn exists(&self, namespace: &NamespaceName) -> bool { - let ns_path = self.config.base_path.join("dbs").join(namespace.as_str()); - ns_path.try_exists().unwrap_or(false) - } } type NamespaceEntry = Arc>>>; @@ -436,13 +399,14 @@ struct NamespaceStoreInner { } impl NamespaceStore { - pub fn new( + pub async fn new( make_namespace: M, allow_lazy_creation: bool, snapshot_at_shutdown: bool, meta_store_path: impl AsRef, max_active_namespaces: usize, - ) -> Self { + ) -> crate::Result { + let metadata = MetaStore::new(meta_store_path).await?; tracing::trace!("Max active namespaces: {max_active_namespaces}"); let store = Cache::>::builder() .async_eviction_listener(move |name, ns, cause| { @@ -465,7 +429,7 @@ impl NamespaceStore { .max_capacity(max_active_namespaces as u64) .time_to_idle(Duration::from_secs(86400)) .build(); - Self { + Ok(Self { inner: Arc::new(NamespaceStoreInner { store, metadata, @@ -474,7 +438,7 @@ impl NamespaceStore { has_shutdown: AtomicBool::new(false), snapshot_at_shutdown, }), - } + }) } pub async fn destroy(&self, namespace: NamespaceName) -> crate::Result<()> { @@ -587,6 +551,11 @@ impl NamespaceStore { return Err(Error::NamespaceStoreShutdown); } + // check that the source namespace exists + if !self.inner.metadata.exists(&from) { + return Err(crate::error::Error::NamespaceDoesntExist(from.to_string())); + } + let to_entry = self .inner .store @@ -597,11 +566,6 @@ impl NamespaceStore { return Err(crate::error::Error::NamespaceAlreadyExist(to.to_string())); } - // check that the source namespace exists - if !self.inner.make_namespace.exists(&from).await { - return Err(crate::error::Error::NamespaceDoesntExist(from.to_string())); - } - let from_entry = self .inner .store @@ -665,7 +629,7 @@ impl NamespaceStore { let namespace = namespace.clone(); async move { if namespace != NamespaceName::default() - && !self.inner.make_namespace.exists(&namespace).await + && !self.inner.metadata.exists(&namespace) && !self.inner.allow_lazy_creation { return Err(Error::NamespaceDoesntExist(namespace.to_string())); @@ -678,6 +642,7 @@ impl NamespaceStore { RestoreOption::Latest, NamespaceBottomlessDbId::NotProvided, self.make_reset_cb(), + &self.inner.metadata, ) .await?; tracing::info!("loaded namespace: `{namespace}`"); @@ -735,7 +700,7 @@ impl NamespaceStore { // otherwise it's an error. if self.inner.allow_lazy_creation || namespace == NamespaceName::default() { tracing::trace!("auto-creating the namespace"); - } else if self.inner.make_namespace.exists(&namespace).await { + } else if self.inner.metadata.exists(&namespace) { return Err(Error::NamespaceAlreadyExist(namespace.to_string())); } @@ -750,6 +715,7 @@ impl NamespaceStore { restore_option, bottomless_db_id_for_init, self.make_reset_cb(), + &self.inner.metadata, ) .await; match ns {