diff --git a/src/abstract.rs b/src/abstract.rs index 7d3de5d4..394548cb 100644 --- a/src/abstract.rs +++ b/src/abstract.rs @@ -4,8 +4,7 @@ use crate::{ iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree, - Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree, - UserKey, UserValue, + Config, Guard, InternalValue, KvPair, Memtable, SeqNo, TableId, Tree, UserKey, UserValue, }; use std::{ ops::RangeBounds, @@ -143,27 +142,6 @@ pub trait AbstractTree { index: Option<(Arc, SeqNo)>, ) -> Box + Send + 'static>; - /// Ingests a sorted stream of key-value pairs into the tree. - /// - /// Can only be called on a new fresh, empty tree. - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - /// - /// # Panics - /// - /// Panics if the tree is **not** initially empty. - /// - /// Will panic if the input iterator is not sorted in ascending order. - #[doc(hidden)] - fn ingest( - &self, - iter: impl Iterator, - seqno_generator: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, - ) -> crate::Result<()>; - /// Returns the approximate number of tombstones in the tree. fn tombstone_count(&self) -> u64; diff --git a/src/any_tree.rs b/src/any_tree.rs index 29cfbaf6..46084bc6 100644 --- a/src/any_tree.rs +++ b/src/any_tree.rs @@ -2,7 +2,10 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{BlobTree, Tree}; +use crate::{ + blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, BlobTree, SeqNo, Tree, UserKey, + UserValue, +}; use enum_dispatch::enum_dispatch; /// May be a standard [`Tree`] or a [`BlobTree`] diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs new file mode 100644 index 00000000..30189daa --- /dev/null +++ b/src/blob_tree/ingest.rs @@ -0,0 +1,254 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::{ + blob_tree::handle::BlobIndirection, + file::BLOBS_FOLDER, + table::Table, + tree::ingest::Ingestion as TableIngestion, + vlog::{BlobFileWriter, ValueHandle}, + SeqNo, UserKey, UserValue, +}; + +/// Bulk ingestion for [`BlobTree`] +/// +/// Items NEED to be added in ascending key order. +/// +/// Uses table ingestion for the index and a blob file writer for large +/// values so both streams advance together. +pub struct BlobIngestion<'a> { + tree: &'a crate::BlobTree, + pub(crate) table: TableIngestion<'a>, + pub(crate) blob: BlobFileWriter, + seqno: SeqNo, + separation_threshold: u32, + last_key: Option, +} + +impl<'a> BlobIngestion<'a> { + /// Creates a new ingestion. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn new(tree: &'a crate::BlobTree) -> crate::Result { + let kv = tree + .index + .config + .kv_separation_opts + .as_ref() + .expect("kv separation options should exist"); + + let blob_file_size = kv.file_target_size; + + let table = TableIngestion::new(&tree.index)?; + let blob = BlobFileWriter::new( + tree.index.0.blob_file_id_counter.clone(), + tree.index.config.path.join(BLOBS_FOLDER), + )? + .use_target_size(blob_file_size) + .use_compression(kv.compression); + + let separation_threshold = kv.separation_threshold; + + Ok(Self { + tree, + table, + blob, + seqno: 0, + separation_threshold, + last_key: None, + }) + } + + /// Writes a key-value pair. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> { + // Check order before any blob I/O to avoid partial writes on failure + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + #[expect(clippy::cast_possible_truncation)] + let value_size = value.len() as u32; + + if value_size >= self.separation_threshold { + let offset = self.blob.offset(); + let blob_file_id = self.blob.blob_file_id(); + let on_disk_size = self.blob.write(&key, self.seqno, &value)?; + + let indirection = BlobIndirection { + vhandle: ValueHandle { + blob_file_id, + offset, + on_disk_size, + }, + size: value_size, + }; + + let cloned_key = key.clone(); + let res = self.table.write_indirection(key, indirection); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res + } else { + let cloned_key = key.clone(); + let res = self.table.write(key, value); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res + } + } + + /// Writes a tombstone for a key. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> { + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); + let res = self.table.write_tombstone(key); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res + } + + /// Finishes the ingestion. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + #[allow(clippy::significant_drop_tightening)] + pub fn finish(self) -> crate::Result { + use crate::AbstractTree; + + let index = self.index().clone(); + + // CRITICAL SECTION: Atomic flush + seqno allocation + registration + // + // For BlobTree, we must coordinate THREE components atomically: + // 1. Index tree memtable flush + // 2. Value log blob files + // 3. Index tree tables (with blob indirections) + // + // The sequence ensures all components see the same global_seqno: + // 1. Acquire flush lock on index tree + // 2. Flush index tree active memtable + // 3. Finalize blob writer (creates blob files) + // 4. Finalize table writer (creates index tables) + // 5. Allocate next global seqno + // 6. Recover tables with that seqno + // 7. Register version with same seqno + blob files + // + // This prevents race conditions where blob files and their index + // entries could have mismatched sequence numbers. + let flush_lock = index.get_flush_lock(); + + // Flush any pending index memtable writes to ensure ingestion sees + // a consistent snapshot of the index. + // We call rotate + flush directly because we already hold the lock. + index.rotate_memtable(); + index.flush(&flush_lock, 0)?; + + // Finalize the blob writer first, ensuring all large values are + // written to blob files before we finalize the index tables that + // reference them. + let blob_files = self.blob.finish()?; + + // Finalize the table writer, creating index tables with blob + // indirections pointing to the blob files we just created. + let results = self.table.writer.finish()?; + + // Acquire locks for version registration on the index tree. We must + // hold both the compaction state lock and version history lock to + // safely modify the tree's version. + let mut _compaction_state = index.compaction_state.lock().expect("lock is poisoned"); + let mut version_lock = index.version_history.write().expect("lock is poisoned"); + + // Allocate the next global sequence number. This seqno will be shared + // by all ingested tables, blob files, and the version that registers + // them, ensuring consistent MVCC snapshots across the value log. + let global_seqno = index.config.seqno.next(); + + // Recover all created index tables, assigning them the global_seqno + // we just allocated. These tables contain indirections to the blob + // files created above, so they must share the same sequence number + // for MVCC correctness. + // + // We intentionally do NOT pin filter/index blocks here. Large ingests + // are typically placed in level 1, and pinning would increase memory + // pressure unnecessarily. + let created_tables = results + .into_iter() + .map(|(table_id, checksum)| -> crate::Result { + Table::recover( + index + .config + .path + .join(crate::file::TABLES_FOLDER) + .join(table_id.to_string()), + checksum, + global_seqno, + index.id, + index.config.cache.clone(), + index.config.descriptor_table.clone(), + false, + false, + #[cfg(feature = "metrics")] + index.metrics.clone(), + ) + }) + .collect::>>()?; + + // Upgrade the version with our ingested tables and blob files, using + // the global_seqno we allocated earlier. This ensures the version, + // tables, and blob files all share the same sequence number, which is + // critical for GC correctness - we must not delete blob files that are + // still referenced by visible snapshots. + // + // We use upgrade_version_with_seqno (instead of upgrade_version) because + // we need precise control over the seqno: it must match the seqno we + // already assigned to the recovered tables. + version_lock.upgrade_version_with_seqno( + &index.config.path, + |current| { + let mut copy = current.clone(); + copy.version = + copy.version + .with_new_l0_run(&created_tables, Some(&blob_files), None); + Ok(copy) + }, + global_seqno, + )?; + + // Perform maintenance on the version history (e.g., clean up old versions). + // We use gc_watermark=0 since ingestion doesn't affect sealed memtables. + if let Err(e) = version_lock.maintenance(&index.config.path, 0) { + log::warn!("Version GC failed: {e:?}"); + } + + Ok(global_seqno) + } + + #[inline] + fn index(&self) -> &crate::Tree { + &self.tree.index + } +} diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 4e33524e..b58c3ee7 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -4,6 +4,7 @@ mod gc; pub mod handle; +pub mod ingest; #[doc(hidden)] pub use gc::{FragmentationEntry, FragmentationMap}; @@ -18,8 +19,7 @@ use crate::{ value::InternalValue, version::Version, vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle}, - Cache, Config, DescriptorTable, Memtable, SeqNo, SequenceNumberCounter, TableId, TreeId, - UserKey, UserValue, + Cache, Config, DescriptorTable, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue, }; use handle::BlobIndirection; use std::{ @@ -269,119 +269,6 @@ impl AbstractTree for BlobTree { self.index.drop_range(range) } - fn ingest( - &self, - iter: impl Iterator, - seqno_generator: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, - ) -> crate::Result<()> { - use crate::tree::ingest::Ingestion; - use std::time::Instant; - - let seqno = seqno_generator.next(); - - let blob_file_size = self - .index - .config - .kv_separation_opts - .as_ref() - .expect("kv separation options should exist") - .file_target_size; - - let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno); - let mut blob_writer = BlobFileWriter::new( - self.index.0.blob_file_id_counter.clone(), - blob_file_size, - self.index.config.path.join(BLOBS_FOLDER), - )? - .use_compression( - self.index - .config - .kv_separation_opts - .as_ref() - .expect("blob options should exist") - .compression, - ); - - let start = Instant::now(); - let mut count = 0; - let mut last_key = None; - - let separation_threshold = self - .index - .config - .kv_separation_opts - .as_ref() - .expect("kv separation options should exist") - .separation_threshold; - - for (key, value) in iter { - if let Some(last_key) = &last_key { - assert!( - key > last_key, - "next key in bulk ingest was not greater than last key", - ); - } - last_key = Some(key.clone()); - - #[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")] - let value_size = value.len() as u32; - - if value_size >= separation_threshold { - let offset = blob_writer.offset(); - let blob_file_id = blob_writer.blob_file_id(); - let on_disk_size = blob_writer.write(&key, seqno, &value)?; - - let indirection = BlobIndirection { - vhandle: ValueHandle { - blob_file_id, - offset, - on_disk_size, - }, - size: value_size, - }; - - table_writer.write_indirection(key, indirection)?; - } else { - table_writer.write(key, value)?; - } - - count += 1; - } - - let blob_files = blob_writer.finish()?; - let results = table_writer.writer.finish()?; - - let created_tables = results - .into_iter() - .map(|(table_id, checksum)| -> crate::Result
{ - Table::recover( - self.index - .config - .path - .join(crate::file::TABLES_FOLDER) - .join(table_id.to_string()), - checksum, - self.index.id, - self.index.config.cache.clone(), - self.index.config.descriptor_table.clone(), - false, - false, - #[cfg(feature = "metrics")] - self.index.metrics.clone(), - ) - }) - .collect::>>()?; - - self.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?; - - visible_seqno.fetch_max(seqno + 1); - - log::info!("Ingested {count} items in {:?}", start.elapsed()); - - Ok(()) - } - fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> { self.index.major_compact(target_size, seqno_threshold) } @@ -508,9 +395,9 @@ impl AbstractTree for BlobTree { let mut blob_writer = BlobFileWriter::new( self.index.0.blob_file_id_counter.clone(), - kv_opts.file_target_size, self.index.config.path.join(BLOBS_FOLDER), )? + .use_target_size(kv_opts.file_target_size) .use_compression( self.index .config @@ -580,6 +467,7 @@ impl AbstractTree for BlobTree { Table::recover( table_folder.join(table_id.to_string()), checksum, + 0, self.index.id, self.index.config.cache.clone(), self.index.config.descriptor_table.clone(), diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index fd135fb3..16be2cfc 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -334,6 +334,7 @@ impl StandardCompaction { Table::recover( table_base_folder.join(table_id.to_string()), checksum, + 0, opts.tree_id, opts.config.cache.clone(), opts.config.descriptor_table.clone(), diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index fc15b9bd..02d49e70 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -424,9 +424,9 @@ fn merge_tables( let writer = BlobFileWriter::new( opts.blob_file_id_generator.clone(), - blob_opts.file_target_size, opts.config.path.join(BLOBS_FOLDER), )? + .use_target_size(blob_opts.file_target_size) .use_passthrough_compression(blob_opts.compression); let inner = StandardCompaction::new(table_writer, tables); diff --git a/src/ingestion.rs b/src/ingestion.rs new file mode 100644 index 00000000..3740a446 --- /dev/null +++ b/src/ingestion.rs @@ -0,0 +1,76 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::{ + blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, AnyTree, SeqNo, UserKey, UserValue, +}; + +/// Unified ingestion builder over `AnyTree` +// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch. +// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap. +// Allowing this lint preserves hot-path performance at the cost of a larger enum size. +#[expect(clippy::large_enum_variant)] +pub enum AnyIngestion<'a> { + /// Ingestion for a standard LSM-tree + Standard(Ingestion<'a>), + + /// Ingestion for a [`BlobTree`] with KV separation + Blob(BlobIngestion<'a>), +} + +impl AnyIngestion<'_> { + /// Writes a key-value pair. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write, V: Into>( + &mut self, + key: K, + value: V, + ) -> crate::Result<()> { + match self { + Self::Standard(i) => i.write(key.into(), value.into()), + Self::Blob(b) => b.write(key.into(), value.into()), + } + } + + /// Writes a tombstone for a key. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write_tombstone>(&mut self, key: K) -> crate::Result<()> { + match self { + Self::Standard(i) => i.write_tombstone(key.into()), + Self::Blob(b) => b.write_tombstone(key.into()), + } + } + + /// Finalizes ingestion and registers created tables (and blob files if present). + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn finish(self) -> crate::Result { + match self { + Self::Standard(i) => i.finish(), + Self::Blob(b) => b.finish(), + } + } +} + +impl AnyTree { + /// Starts an ingestion for any tree type (standard or blob). + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn ingestion(&self) -> crate::Result> { + match self { + Self::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)), + Self::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 8dea7113..89c37944 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,6 +100,7 @@ mod error; pub mod file; mod hash; +mod ingestion; mod iter_guard; mod key; mod key_range; @@ -164,7 +165,6 @@ pub use { merge::BoxedIterator, slice::Builder, table::{GlobalTableId, Table, TableId}, - tree::ingest::Ingestion, tree::inner::TreeId, tree::Guard as StandardGuard, value::InternalValue, @@ -179,6 +179,7 @@ pub use { descriptor_table::DescriptorTable, error::{Error, Result}, format_version::FormatVersion, + ingestion::AnyIngestion, iter_guard::IterGuard as Guard, memtable::Memtable, r#abstract::AbstractTree, diff --git a/src/range.rs b/src/range.rs index c89f0634..ea9a9402 100644 --- a/src/range.rs +++ b/src/range.rs @@ -163,15 +163,17 @@ impl TreeIter { range.start_bound().map(|x| &*x.user_key), range.end_bound().map(|x| &*x.user_key), )) { - let reader = table.range(( - range.start_bound().map(|x| &x.user_key).cloned(), - range.end_bound().map(|x| &x.user_key).cloned(), - )); - - iters.push(Box::new(reader.filter(move |item| match item { - Ok(item) => seqno_filter(item.key.seqno, seqno), - Err(_) => true, - }))); + let reader = table + .range(( + range.start_bound().map(|x| &x.user_key).cloned(), + range.end_bound().map(|x| &x.user_key).cloned(), + )) + .filter(move |item| match item { + Ok(item) => seqno_filter(item.key.seqno, seqno), + Err(_) => true, + }); + + iters.push(Box::new(reader)); } } _ => { diff --git a/src/table/inner.rs b/src/table/inner.rs index b77c1206..0d7b0acd 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -11,7 +11,7 @@ use crate::{ descriptor_table::DescriptorTable, table::{filter::block::FilterBlock, IndexBlock}, tree::inner::TreeId, - Checksum, GlobalTableId, + Checksum, GlobalTableId, SeqNo, }; use std::{ path::PathBuf, @@ -53,6 +53,8 @@ pub struct Inner { pub(super) checksum: Checksum, + pub(super) global_seqno: SeqNo, + #[cfg(feature = "metrics")] pub(crate) metrics: Arc, diff --git a/src/table/iter.rs b/src/table/iter.rs index 4ac587ed..932b7fab 100644 --- a/src/table/iter.rs +++ b/src/table/iter.rs @@ -94,6 +94,8 @@ pub struct Iter { table_id: GlobalTableId, path: Arc, + global_seqno: SeqNo, + #[expect(clippy::struct_field_names)] index_iter: BlockIndexIterImpl, @@ -118,6 +120,7 @@ pub struct Iter { impl Iter { pub fn new( table_id: GlobalTableId, + global_seqno: SeqNo, path: Arc, index_iter: BlockIndexIterImpl, descriptor_table: Arc, @@ -129,6 +132,8 @@ impl Iter { table_id, path, + global_seqno, + index_iter, descriptor_table, cache, @@ -165,7 +170,14 @@ impl Iterator for Iter { // Always try to keep iterating inside the already-materialized low data block first; this // lets callers consume multiple entries without touching the index or cache again. if let Some(block) = &mut self.lo_data_block { - if let Some(item) = block.next().map(Ok) { + if let Some(item) = block + .next() + .map(|mut v| { + v.key.seqno += self.global_seqno; + v + }) + .map(Ok) + { return Some(item); } } @@ -212,7 +224,14 @@ impl Iterator for Iter { // No more block handles coming from the index. Flush any pending items buffered on // the high side (used by reverse iteration) before signalling completion. if let Some(block) = &mut self.hi_data_block { - if let Some(item) = block.next().map(Ok) { + if let Some(item) = block + .next() + .map(|mut v| { + v.key.seqno += self.global_seqno; + v + }) + .map(Ok) + { return Some(item); } } @@ -226,7 +245,6 @@ impl Iterator for Iter { // Load the next data block referenced by the index handle. We try the shared block // cache first to avoid hitting the filesystem, and fall back to `load_block` on miss. - #[expect(clippy::single_match_else)] let block = match self.cache.get_block(self.table_id, handle.offset()) { Some(block) => block, None => { @@ -262,7 +280,9 @@ impl Iterator for Iter { self.lo_offset = handle.offset(); self.lo_data_block = Some(reader); - if let Some(item) = item { + if let Some(mut item) = item { + item.key.seqno += self.global_seqno; + // Serving the first item immediately avoids stashing it in a temporary buffer and // keeps block iteration semantics identical to the simple case at the top. return Some(Ok(item)); @@ -276,7 +296,14 @@ impl DoubleEndedIterator for Iter { // Mirror the forward iterator: prefer consuming buffered items from the high data block to // avoid touching the index once a block has been materialized. if let Some(block) = &mut self.hi_data_block { - if let Some(item) = block.next_back().map(Ok) { + if let Some(item) = block + .next_back() + .map(|mut v| { + v.key.seqno += self.global_seqno; + v + }) + .map(Ok) + { return Some(item); } } @@ -318,7 +345,14 @@ impl DoubleEndedIterator for Iter { // Once we exhaust the index in reverse order, flush any items that were buffered on // the low side (set when iterating forward first) before signalling completion. if let Some(block) = &mut self.lo_data_block { - if let Some(item) = block.next_back().map(Ok) { + if let Some(item) = block + .next_back() + .map(|mut v| { + v.key.seqno += self.global_seqno; + v + }) + .map(Ok) + { return Some(item); } } @@ -367,7 +401,9 @@ impl DoubleEndedIterator for Iter { self.hi_offset = handle.offset(); self.hi_data_block = Some(reader); - if let Some(item) = item { + if let Some(mut item) = item { + item.key.seqno += self.global_seqno; + // Emit the first materialized entry immediately to match the forward path and avoid // storing it in a temporary buffer. return Some(Ok(item)); diff --git a/src/table/meta.rs b/src/table/meta.rs index abfd43fa..18004af0 100644 --- a/src/table/meta.rs +++ b/src/table/meta.rs @@ -38,7 +38,7 @@ pub struct ParsedMeta { pub data_block_count: u64, pub index_block_count: u64, pub key_range: KeyRange, - pub seqnos: (SeqNo, SeqNo), + pub(super) seqnos: (SeqNo, SeqNo), pub file_size: u64, pub item_count: u64, pub tombstone_count: u64, diff --git a/src/table/mod.rs b/src/table/mod.rs index 6800f1dc..dc9a87bd 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -96,6 +96,11 @@ impl std::fmt::Debug for Table { } impl Table { + #[must_use] + pub fn global_seqno(&self) -> SeqNo { + self.0.global_seqno + } + pub fn referenced_blob_bytes(&self) -> crate::Result { if let Some(v) = self.0.cached_blob_bytes.get() { return Ok(*v); @@ -239,7 +244,7 @@ impl Table { #[cfg(feature = "metrics")] use std::sync::atomic::Ordering::Relaxed; - if self.metadata.seqnos.0 >= seqno { + if (self.metadata.seqnos.0 + self.global_seqno()) >= seqno { return Ok(None); } @@ -302,6 +307,8 @@ impl Table { return Ok(None); }; + let seqno = seqno.saturating_sub(self.global_seqno()); + for block_handle in iter { let block_handle = block_handle?; @@ -344,6 +351,7 @@ impl Table { &self.path, block_count, self.metadata.data_block_compression, + self.global_seqno(), ) } @@ -373,6 +381,7 @@ impl Table { let mut iter = Iter::new( self.global_id(), + self.global_seqno(), self.path.clone(), index_iter, self.descriptor_table.clone(), @@ -421,6 +430,7 @@ impl Table { pub fn recover( file_path: PathBuf, checksum: Checksum, + global_seqno: SeqNo, tree_id: TreeId, cache: Arc, descriptor_table: Arc, @@ -545,6 +555,7 @@ impl Table { is_deleted: AtomicBool::default(), checksum, + global_seqno, #[cfg(feature = "metrics")] metrics, diff --git a/src/table/scanner.rs b/src/table/scanner.rs index 25f01318..dc46858b 100644 --- a/src/table/scanner.rs +++ b/src/table/scanner.rs @@ -5,7 +5,7 @@ use super::{Block, DataBlock}; use crate::{ table::{block::BlockType, iter::OwnedDataBlockIter}, - CompressionType, InternalValue, + CompressionType, InternalValue, SeqNo, }; use std::{fs::File, io::BufReader, path::Path}; @@ -17,6 +17,8 @@ pub struct Scanner { compression: CompressionType, block_count: usize, read_count: usize, + + global_seqno: SeqNo, } impl Scanner { @@ -24,6 +26,7 @@ impl Scanner { path: &Path, block_count: usize, compression: CompressionType, + global_seqno: SeqNo, ) -> crate::Result { // TODO: a larger buffer size may be better for HDD, maybe make this configurable let mut reader = BufReader::with_capacity(8 * 4_096, File::open(path)?); @@ -38,6 +41,8 @@ impl Scanner { compression, block_count, read_count: 1, + + global_seqno, }) } @@ -68,7 +73,8 @@ impl Iterator for Scanner { fn next(&mut self) -> Option { loop { - if let Some(item) = self.iter.next() { + if let Some(mut item) = self.iter.next() { + item.key.seqno += self.global_seqno; return Some(Ok(item)); } diff --git a/src/table/tests.rs b/src/table/tests.rs index 59b76f4d..31f43272 100644 --- a/src/table/tests.rs +++ b/src/table/tests.rs @@ -49,6 +49,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), false, @@ -74,6 +75,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), true, @@ -99,6 +101,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), false, @@ -124,6 +127,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), true, @@ -169,6 +173,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), false, @@ -193,6 +198,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), true, @@ -217,6 +223,7 @@ fn test_with_table( file.clone(), checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), false, @@ -242,6 +249,7 @@ fn test_with_table( file, checksum, 0, + 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Arc::new(DescriptorTable::new(10)), true, diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index ee3d09ac..48f427a0 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -4,21 +4,25 @@ use super::Tree; use crate::{ - compaction::MoveDown, config::FilterPolicyEntry, table::multi_writer::MultiWriter, - AbstractTree, BlobIndirection, SeqNo, UserKey, UserValue, + config::FilterPolicyEntry, table::multi_writer::MultiWriter, AbstractTree, BlobIndirection, + SeqNo, UserKey, UserValue, }; -use std::{path::PathBuf, sync::Arc}; +use std::path::PathBuf; pub const INITIAL_CANONICAL_LEVEL: usize = 1; /// Bulk ingestion /// /// Items NEED to be added in ascending key order. +/// +/// Ingested data bypasses memtables and is written directly into new tables, +/// using the same table writer configuration that is used for flush and compaction. pub struct Ingestion<'a> { folder: PathBuf, tree: &'a Tree, pub(crate) writer: MultiWriter, seqno: SeqNo, + last_key: Option, } impl<'a> Ingestion<'a> { @@ -100,16 +104,10 @@ impl<'a> Ingestion<'a> { tree, writer, seqno: 0, + last_key: None, }) } - /// Sets the ingestion seqno. - #[must_use] - pub fn with_seqno(mut self, seqno: SeqNo) -> Self { - self.seqno = seqno; - self - } - /// Writes a key-value pair. /// /// # Errors @@ -122,6 +120,14 @@ impl<'a> Ingestion<'a> { ) -> crate::Result<()> { use crate::coding::Encode; + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); self.writer.write(crate::InternalValue::from_components( key, indirection.encode_into_vec(), @@ -131,6 +137,9 @@ impl<'a> Ingestion<'a> { self.writer.register_blob(indirection); + // Remember the last user key to validate the next call's ordering + self.last_key = Some(cloned_key); + Ok(()) } @@ -140,12 +149,26 @@ impl<'a> Ingestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> { + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); + self.writer.write(crate::InternalValue::from_components( key, value, self.seqno, crate::ValueType::Value, - )) + ))?; + + // Remember the last user key to validate the next call's ordering + self.last_key = Some(cloned_key); + + Ok(()) } /// Writes a key-value pair. @@ -153,14 +176,25 @@ impl<'a> Ingestion<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - #[doc(hidden)] pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> { - self.writer.write(crate::InternalValue::from_components( + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); + let res = self.writer.write(crate::InternalValue::from_components( key, crate::UserValue::empty(), self.seqno, crate::ValueType::Tombstone, - )) + )); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res } /// Finishes the ingestion. @@ -168,25 +202,65 @@ impl<'a> Ingestion<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn finish(self) -> crate::Result<()> { - use crate::Table; + #[allow(clippy::significant_drop_tightening)] + pub fn finish(self) -> crate::Result { + use crate::{AbstractTree, Table}; + + // CRITICAL SECTION: Atomic flush + seqno allocation + registration + // + // We must ensure no concurrent writes interfere between flushing the + // active memtable and registering the ingested tables. The sequence is: + // 1. Acquire flush lock (prevents concurrent flushes) + // 2. Flush active memtable (ensures no pending writes) + // 3. Finish ingestion writer (creates table files) + // 4. Allocate next global seqno (atomic timestamp) + // 5. Recover tables with that seqno + // 6. Register version with same seqno + // + // Why not flush in new()? + // If we flushed in new(), there would be a race condition: + // new() -> flush -> [TIME PASSES + OTHER WRITES] -> finish() -> seqno + // The seqno would be disconnected from the flush, violating MVCC. + // + // By holding the flush lock throughout, we guarantee atomicity. + let flush_lock = self.tree.get_flush_lock(); + + // Flush any pending memtable writes to ensure ingestion sees a + // consistent snapshot and lookup order remains correct. + // We call rotate + flush directly because we already hold the lock. + self.tree.rotate_memtable(); + self.tree.flush(&flush_lock, 0)?; + // Finalize the ingestion writer, writing all buffered data to disk. let results = self.writer.finish()?; log::info!("Finished ingestion writer"); + // Acquire locks for version registration. We must hold both the + // compaction state lock and version history lock to safely modify + // the tree's version. + let mut _compaction_state = self.tree.compaction_state.lock().expect("lock is poisoned"); + let mut version_lock = self.tree.version_history.write().expect("lock is poisoned"); + + // Allocate the next global sequence number. This seqno will be shared + // by all ingested tables and the version that registers them, ensuring + // consistent MVCC snapshots. + let global_seqno = self.tree.config.seqno.next(); + + // Recover all created tables, assigning them the global_seqno we just + // allocated. This ensures all ingested tables share the same sequence + // number, which is critical for MVCC correctness. + // + // We intentionally do NOT pin filter/index blocks here. Large ingests + // are typically placed in level 1, and pinning would increase memory + // pressure unnecessarily. let created_tables = results .into_iter() .map(|(table_id, checksum)| -> crate::Result
{ - // TODO: table recoverer struct w/ builder pattern - // Table::recover() - // .pin_filters(true) - // .with_metrics(metrics) - // .run(path, tree_id, cache, descriptor_table); - Table::recover( self.folder.join(table_id.to_string()), checksum, + global_seqno, self.tree.id, self.tree.config.cache.clone(), self.tree.config.descriptor_table.clone(), @@ -198,9 +272,29 @@ impl<'a> Ingestion<'a> { }) .collect::>>()?; - self.tree - .register_tables(&created_tables, None, None, &[], 0)?; + // Upgrade the version with our ingested tables, using the global_seqno + // we allocated earlier. This ensures the version and all tables share + // the same sequence number. + // + // We use upgrade_version_with_seqno (instead of upgrade_version) because + // we need precise control over the seqno: it must match the seqno we + // already assigned to the recovered tables. + version_lock.upgrade_version_with_seqno( + &self.tree.config.path, + |current| { + let mut copy = current.clone(); + copy.version = copy.version.with_new_l0_run(&created_tables, None, None); + Ok(copy) + }, + global_seqno, + )?; - Ok(()) + // Perform maintenance on the version history (e.g., clean up old versions). + // We use gc_watermark=0 since ingestion doesn't affect sealed memtables. + if let Err(e) = version_lock.maintenance(&self.tree.config.path, 0) { + log::warn!("Version GC failed: {e:?}"); + } + + Ok(global_seqno) } } diff --git a/src/tree/inner.rs b/src/tree/inner.rs index 6bbd3b46..499dba17 100644 --- a/src/tree/inner.rs +++ b/src/tree/inner.rs @@ -63,8 +63,8 @@ pub struct TreeInner { /// can be concurrent next to each other. pub(crate) major_compaction_lock: RwLock<()>, + /// Serializes flush operations. pub(crate) flush_lock: Mutex<()>, - #[doc(hidden)] #[cfg(feature = "metrics")] pub metrics: Arc, diff --git a/src/tree/mod.rs b/src/tree/mod.rs index ea03ed6d..aec50a36 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -7,7 +7,7 @@ pub mod inner; pub mod sealed; use crate::{ - compaction::{drop_range::OwnedBounds, CompactionStrategy}, + compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy}, config::Config, file::CURRENT_VERSION_FILE, format_version::FormatVersion, @@ -188,48 +188,6 @@ impl AbstractTree for Tree { .sum() } - fn ingest( - &self, - iter: impl Iterator, - seqno_generator: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, - ) -> crate::Result<()> { - use crate::tree::ingest::Ingestion; - use std::time::Instant; - - let seqno = seqno_generator.next(); - - // TODO: allow ingestion always, by flushing memtable - - let mut writer = Ingestion::new(self)?.with_seqno(seqno); - - let start = Instant::now(); - let mut count = 0; - let mut last_key = None; - - for (key, value) in iter { - if let Some(last_key) = &last_key { - assert!( - key > last_key, - "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}", - ); - } - last_key = Some(key.clone()); - - writer.write(key, value)?; - - count += 1; - } - - writer.finish()?; - - visible_seqno.fetch_max(seqno + 1); - - log::info!("Ingested {count} items in {:?}", start.elapsed()); - - Ok(()) - } - fn drop_range, R: RangeBounds>(&self, range: R) -> crate::Result<()> { let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range); @@ -383,6 +341,7 @@ impl AbstractTree for Tree { Table::recover( folder.join(table_id.to_string()), checksum, + 0, self.id, self.config.cache.clone(), self.config.descriptor_table.clone(), @@ -537,7 +496,6 @@ impl AbstractTree for Tree { self.current_version().level(idx).map(|x| x.table_count()) } - #[expect(clippy::significant_drop_tightening)] fn approximate_len(&self) -> usize { let super_version = self .version_history @@ -570,7 +528,6 @@ impl AbstractTree for Tree { .sum() } - #[expect(clippy::significant_drop_tightening)] fn get_highest_memtable_seqno(&self) -> Option { let version = self .version_history @@ -798,6 +755,39 @@ impl Tree { Ok(tree) } + pub(crate) fn consume_writer( + &self, + writer: crate::table::Writer, + ) -> crate::Result> { + let table_file_path = writer.path.clone(); + + let Some((_, checksum)) = writer.finish()? else { + return Ok(None); + }; + + log::debug!("Finalized table write at {}", table_file_path.display()); + + let pin_filter = self.config.filter_block_pinning_policy.get(0); + let pin_index = self.config.index_block_pinning_policy.get(0); + + let created_table = Table::recover( + table_file_path, + checksum, + 0, + self.id, + self.config.cache.clone(), + self.config.descriptor_table.clone(), + pin_filter, + pin_index, + #[cfg(feature = "metrics")] + self.metrics.clone(), + )?; + + log::debug!("Flushed table to {:?}", created_table.path); + + Ok(Some(created_table)) + } + /// Returns `true` if there are some tables that are being compacted. #[doc(hidden)] #[must_use] @@ -909,8 +899,7 @@ impl Tree { let version = Self::recover_levels( &config.path, tree_id, - &config.cache, - &config.descriptor_table, + &config, #[cfg(feature = "metrics")] &metrics, )?; @@ -957,9 +946,7 @@ impl Tree { config, major_compaction_lock: RwLock::default(), flush_lock: Mutex::default(), - compaction_state: Arc::new(Mutex::new( - crate::compaction::state::CompactionState::default(), - )), + compaction_state: Arc::new(Mutex::new(CompactionState::default())), #[cfg(feature = "metrics")] metrics, @@ -1011,8 +998,7 @@ impl Tree { fn recover_levels>( tree_path: P, tree_id: TreeId, - cache: &Arc, - descriptor_table: &Arc, + config: &Config, #[cfg(feature = "metrics")] metrics: &Arc, ) -> crate::Result { use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId}; @@ -1022,23 +1008,24 @@ impl Tree { let recovery = recover(tree_path)?; let table_map = { - let mut result: crate::HashMap = + let mut result: crate::HashMap = crate::HashMap::default(); for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() { for run in table_ids { - for &(table_id, checksum) in run { + for table in run { #[expect( clippy::expect_used, reason = "there are always less than 256 levels" )] result.insert( - table_id, + table.id, ( level_idx .try_into() .expect("there are less than 256 levels"), - checksum, + table.checksum, + table.global_seqno, ), ); } @@ -1098,15 +1085,19 @@ impl Tree { crate::Error::Unrecoverable })?; - if let Some(&(level_idx, checksum)) = table_map.get(&table_id) { + if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) { + let pin_filter = config.filter_block_pinning_policy.get(level_idx.into()); + let pin_index = config.index_block_pinning_policy.get(level_idx.into()); + let table = Table::recover( table_file_path, checksum, + global_seqno, tree_id, - cache.clone(), - descriptor_table.clone(), - level_idx <= 1, // TODO: look at configuration - level_idx <= 2, // TODO: look at configuration + config.cache.clone(), + config.descriptor_table.clone(), + pin_filter, + pin_index, #[cfg(feature = "metrics")] metrics.clone(), )?; diff --git a/src/version/mod.rs b/src/version/mod.rs index 2d82e92f..f04917f2 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -18,11 +18,11 @@ use crate::blob_tree::{FragmentationEntry, FragmentationMap}; use crate::coding::Encode; use crate::compaction::state::hidden_set::HiddenSet; use crate::version::recovery::Recovery; +use crate::TreeType; use crate::{ vlog::{BlobFile, BlobFileId}, HashSet, KeyRange, Table, TableId, }; -use crate::{Tree, TreeType}; use optimize::optimize_runs; use run::Ranged; use std::fs::File; @@ -249,10 +249,10 @@ impl Version { .map(|run| { let run_tables = run .iter() - .map(|&(table_id, _)| { + .map(|table| { tables .iter() - .find(|x| x.id() == table_id) + .find(|x| x.id() == table.id) .cloned() .ok_or(crate::Error::Unrecoverable) }) @@ -674,6 +674,7 @@ impl Version { writer.write_u64::(table.id())?; writer.write_u8(0)?; // Checksum type, 0 = XXH3 writer.write_u128::(table.checksum().into_u128())?; + writer.write_u64::(table.global_seqno())?; } } } diff --git a/src/version/recovery.rs b/src/version/recovery.rs index b7a6d7a7..d8d7bf69 100644 --- a/src/version/recovery.rs +++ b/src/version/recovery.rs @@ -4,7 +4,7 @@ use crate::{ coding::Decode, file::CURRENT_VERSION_FILE, version::VersionId, vlog::BlobFileId, Checksum, - TableId, TreeType, + SeqNo, TableId, TreeType, }; use byteorder::{LittleEndian, ReadBytesExt}; use std::path::Path; @@ -17,10 +17,16 @@ pub fn get_current_version(folder: &std::path::Path) -> crate::Result .map_err(Into::into) } +pub struct RecoveredTable { + pub id: TableId, + pub checksum: Checksum, + pub global_seqno: SeqNo, +} + pub struct Recovery { pub tree_type: TreeType, pub curr_version_id: VersionId, - pub table_ids: Vec>>, + pub table_ids: Vec>>, pub blob_file_ids: Vec<(BlobFileId, Checksum)>, pub gc_stats: crate::blob_tree::FragmentationMap, } @@ -70,7 +76,13 @@ pub fn recover(folder: &Path) -> crate::Result { let checksum = reader.read_u128::()?; let checksum = Checksum::from_raw(checksum); - run.push((id, checksum)); + let global_seqno = reader.read_u64::()?; + + run.push(RecoveredTable { + id, + checksum, + global_seqno, + }); } level.push(run); diff --git a/src/version/super_version.rs b/src/version/super_version.rs index 5d64bf8b..d6ffa0f6 100644 --- a/src/version/super_version.rs +++ b/src/version/super_version.rs @@ -111,14 +111,23 @@ impl SuperVersions { f: F, seqno: &SequenceNumberCounter, ) -> crate::Result<()> { - // NOTE: Copy-on-write... - // - // Create a copy of the levels we can operate on - // without mutating the current level manifest - // If persisting to disk fails, this way the level manifest - // is unchanged + self.upgrade_version_with_seqno(tree_path, f, seqno.next()) + } + + /// Like `upgrade_version`, but takes an already-allocated sequence number. + /// + /// This is useful when the seqno must be coordinated with other operations + /// (e.g., bulk ingestion where tables are recovered with the same seqno). + pub(crate) fn upgrade_version_with_seqno< + F: FnOnce(&SuperVersion) -> crate::Result, + >( + &mut self, + tree_path: &Path, + f: F, + seqno: SeqNo, + ) -> crate::Result<()> { let mut next_version = f(&self.latest_version())?; - next_version.seqno = seqno.next(); + next_version.seqno = seqno; log::trace!("Next version seqno={}", next_version.seqno); persist_version(tree_path, &next_version.version)?; diff --git a/src/vlog/blob_file/multi_writer.rs b/src/vlog/blob_file/multi_writer.rs index 5dfe2270..03f924f3 100644 --- a/src/vlog/blob_file/multi_writer.rs +++ b/src/vlog/blob_file/multi_writer.rs @@ -39,7 +39,6 @@ impl MultiWriter { #[doc(hidden)] pub fn new>( id_generator: SequenceNumberCounter, - target_size: u64, folder: P, ) -> crate::Result { let folder = folder.as_ref(); @@ -50,7 +49,7 @@ impl MultiWriter { Ok(Self { id_generator, folder: folder.into(), - target_size, + target_size: 64 * 1_024 * 1_024, active_writer: Writer::new(blob_file_path, blob_file_id)?, diff --git a/src/vlog/blob_file/reader.rs b/src/vlog/blob_file/reader.rs index 21582508..4ae02fae 100644 --- a/src/vlog/blob_file/reader.rs +++ b/src/vlog/blob_file/reader.rs @@ -112,8 +112,8 @@ mod tests { let id_generator = SequenceNumberCounter::default(); let folder = tempfile::tempdir()?; - let mut writer = - crate::vlog::BlobFileWriter::new(id_generator, u64::MAX, folder.path()).unwrap(); + let mut writer = crate::vlog::BlobFileWriter::new(id_generator, folder.path())? + .use_target_size(u64::MAX); let offset = writer.offset(); let on_disk_size = writer.write(b"a", 0, b"abcdef")?; @@ -140,8 +140,8 @@ mod tests { let id_generator = SequenceNumberCounter::default(); let folder = tempfile::tempdir()?; - let mut writer = crate::vlog::BlobFileWriter::new(id_generator, u64::MAX, folder.path()) - .unwrap() + let mut writer = crate::vlog::BlobFileWriter::new(id_generator, folder.path())? + .use_target_size(u64::MAX) .use_compression(CompressionType::Lz4); let offset = writer.offset(); diff --git a/tests/blob_ingest.rs b/tests/blob_ingest.rs new file mode 100644 index 00000000..0aa7811d --- /dev/null +++ b/tests/blob_ingest.rs @@ -0,0 +1,56 @@ +use lsm_tree::{ + blob_tree::FragmentationEntry, AbstractTree, KvSeparationOptions, SeqNo, SequenceNumberCounter, +}; +use test_log::test; + +#[test] +fn blob_ingest_gc_stats() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let path = folder.path(); + + let big_value = b"neptune!".repeat(128_000); + let new_big_value = b"winter!".repeat(128_000); + + { + let tree = lsm_tree::Config::new(path, SequenceNumberCounter::default()) + .with_kv_separation(Some( + KvSeparationOptions::default().compression(lsm_tree::CompressionType::None), + )) + .open()?; + + let mut ingestion = tree.ingestion()?; + ingestion.write("big", &big_value)?; + ingestion.write("smol", "small value")?; + ingestion.finish()?; + + let value = tree.get("big", SeqNo::MAX)?.expect("should exist"); + assert_eq!(&*value, big_value); + assert_eq!(1, tree.table_count()); + assert_eq!(1, tree.blob_file_count()); + + let mut ingestion = tree.ingestion()?; + ingestion.write("big", &new_big_value)?; + ingestion.finish()?; + + // Blob file has no fragmentation before compaction (in stats) + // so it is not rewritten + tree.major_compact(64_000_000, 1_000)?; + assert_eq!(1, tree.table_count()); + assert_eq!(2, tree.blob_file_count()); + + let gc_stats = tree.current_version().gc_stats().clone(); + + // "big":0 is expired + assert_eq!( + &{ + let mut map = lsm_tree::HashMap::default(); + let size = big_value.len() as u64; + map.insert(0, FragmentationEntry::new(1, size, size)); + map + }, + &*gc_stats, + ); + } + + Ok(()) +} diff --git a/tests/blob_ingest_relink.rs b/tests/blob_ingest_relink.rs new file mode 100644 index 00000000..cc5d317c --- /dev/null +++ b/tests/blob_ingest_relink.rs @@ -0,0 +1,62 @@ +use lsm_tree::{AbstractTree, KvSeparationOptions, SeqNo, SequenceNumberCounter}; +use test_log::test; + +#[test] +fn blob_tree_ingest_relink() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let path = folder.path(); + + let big_value = b"neptune!".repeat(128_000); + + { + let tree = lsm_tree::Config::new(path, SequenceNumberCounter::default()) + .with_kv_separation(Some( + KvSeparationOptions::default().compression(lsm_tree::CompressionType::None), + )) + .open()?; + + let mut ingestion = tree.ingestion()?; + ingestion.write("big", &big_value)?; + ingestion.write("smol", "small value")?; + ingestion.finish()?; + + let value = tree.get("big", SeqNo::MAX)?.expect("should exist"); + assert_eq!(&*value, big_value); + assert_eq!(1, tree.table_count()); + assert_eq!(1, tree.blob_file_count()); + + assert_eq!( + Some(vec![lsm_tree::table::writer::LinkedFile { + blob_file_id: 0, + bytes: big_value.len() as u64, + on_disk_bytes: big_value.len() as u64, + len: 1, + }]), + tree.current_version() + .iter_tables() + .next() + .unwrap() + .list_blob_file_references()?, + ); + + tree.major_compact(64_000_000, 1_000)?; + assert_eq!(1, tree.table_count()); + assert_eq!(1, tree.blob_file_count()); + + assert_eq!( + Some(vec![lsm_tree::table::writer::LinkedFile { + blob_file_id: 0, + bytes: big_value.len() as u64, + on_disk_bytes: big_value.len() as u64, + len: 1, + }]), + tree.current_version() + .iter_tables() + .next() + .unwrap() + .list_blob_file_references()?, + ); + } + + Ok(()) +} diff --git a/tests/ingest_dirty_snapshot.rs b/tests/ingest_dirty_snapshot.rs new file mode 100644 index 00000000..043b3650 --- /dev/null +++ b/tests/ingest_dirty_snapshot.rs @@ -0,0 +1,23 @@ +use lsm_tree::{AbstractTree, Config, SequenceNumberCounter}; + +#[test] +fn ingestion_dirty_snapshot() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let seqno = SequenceNumberCounter::default(); + let tree = Config::new(&folder, seqno.clone()).open()?; + + tree.insert("a", "a", seqno.next()); + tree.insert("a", "b", seqno.next()); + + let snapshot_seqno = 1; + assert_eq!(b"a", &*tree.get("a", snapshot_seqno)?.unwrap()); + + let mut ingest = tree.ingestion()?; + ingest.write("b", "b")?; + ingest.finish()?; + + assert_eq!(b"a", &*tree.get("a", snapshot_seqno)?.unwrap()); + + Ok(()) +} diff --git a/tests/ingestion_api.rs b/tests/ingestion_api.rs new file mode 100644 index 00000000..75551bea --- /dev/null +++ b/tests/ingestion_api.rs @@ -0,0 +1,301 @@ +use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; + +#[test] +fn tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()).open()?; + + for i in 0..10u32 { + let key = format!("k{:03}", i); + tree.insert(key.as_bytes(), b"v", 0); + } + + let mut ingest = tree.ingestion()?; + for i in 0..10u32 { + let key = format!("k{:03}", i); + ingest.write_tombstone(key)?; + } + ingest.finish()?; + + for i in 0..10u32 { + let key = format!("k{:03}", i); + assert!(tree.get(key.as_bytes(), SeqNo::MAX)?.is_none()); + } + assert_eq!(tree.tombstone_count(), 10); + + Ok(()) +} + +#[test] +fn sealed_memtable_value_overrides_table_value() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Older table value via ingestion (seqno 1) + { + let mut ingest = tree.ingestion()?; + ingest.write(b"k", b"old")?; + ingest.finish()?; + } + + // Newer value in memtable (seqno 2), then seal it + tree.insert(b"k", b"new", 2); + let _ = tree.rotate_memtable(); // move active -> sealed + + // Read should return the sealed memtable value + assert_eq!( + tree.get(b"k", lsm_tree::SeqNo::MAX)?, + Some(b"new".as_slice().into()) + ); + + Ok(()) +} + +#[test] +fn sealed_memtable_tombstone_overrides_table_value() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Older table value via ingestion (seqno 1) + { + let mut ingest = tree.ingestion()?; + ingest.write(b"k", b"old")?; + ingest.finish()?; + } + + // Newer tombstone in memtable (seqno 2), then seal it + tree.remove(b"k", 2); + let _ = tree.rotate_memtable(); + + // Read should see the delete from sealed memtable + assert!(tree.get(b"k", lsm_tree::SeqNo::MAX)?.is_none()); + + Ok(()) +} + +#[test] +fn tables_newest_first_returns_highest_seqno() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Two separate ingestions create two tables containing the same key at different seqnos + { + let mut ingest = tree.ingestion()?; + ingest.write(b"k", b"v1")?; + ingest.finish()?; + } + { + let mut ingest = tree.ingestion()?; + ingest.write(b"k", b"v2")?; + ingest.finish()?; + } + + // With memtables empty, read should return the value from the newest table run (seqno 2) + assert_eq!( + tree.get(b"k", lsm_tree::SeqNo::MAX)?, + Some(b"v2".as_slice().into()) + ); + Ok(()) +} + +#[test] +#[should_panic(expected = "next key in ingestion must be greater than last key")] +fn ingestion_enforces_order_standard_panics() { + let folder = tempfile::tempdir().unwrap(); + let tree = lsm_tree::Config::new(folder, Default::default()) + .open() + .unwrap(); + + let mut ingest = tree.ingestion().unwrap(); + + // First write higher key, then lower to trigger ordering assertion + ingest.write(b"k2", b"v").unwrap(); + + // Panics here + let _ = ingest.write(b"k1", b"v"); +} + +#[test] +fn blob_ingestion_out_of_order_panics_without_blob_write() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(8))) + .open()?; + + let before = tree.blob_file_count(); + + // Use a small value for the first write to avoid blob I/O + let result = std::panic::catch_unwind(|| { + let mut ingest = tree.ingestion().unwrap(); + ingest.write(b"k2", b"x").unwrap(); + + // Second write would require blob I/O, but ordering check should fire before any blob write + let _ = ingest.write(b"k1", [1u8; 16]); + }); + assert!(result.is_err()); + + let after = tree.blob_file_count(); + assert_eq!(before, after); + + Ok(()) +} + +#[test] +fn memtable_put_overrides_table_tombstone() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Older put written via ingestion to tables (seqno 1) + { + let mut ingest = tree.ingestion()?; + ingest.write(b"k", b"v1")?; + ingest.finish()?; + } + + // Newer tombstone written via ingestion to tables (seqno 2) + { + let mut ingest = tree.ingestion()?; + ingest.write_tombstone(b"k")?; + ingest.finish()?; + } + + // Newest put in active memtable (seqno 3) should override table tombstone + tree.insert(b"k", b"v3", 3); + assert_eq!( + tree.get(b"k", lsm_tree::SeqNo::MAX)?, + Some(b"v3".as_slice().into()) + ); + Ok(()) +} + +#[test] +fn blob_tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for i in 0..8u32 { + let key = format!("b{:03}", i); + tree.insert(key.as_bytes(), b"x", 0); + } + + let mut ingest = tree.ingestion()?; + for i in 0..8u32 { + let key = format!("b{:03}", i); + ingest.write_tombstone(key)?; + } + ingest.finish()?; + + for i in 0..8u32 { + let key = format!("b{:03}", i); + assert!(tree.get(key.as_bytes(), SeqNo::MAX)?.is_none()); + } + assert_eq!(tree.tombstone_count(), 8); + + Ok(()) +} + +#[test] +fn tree_ingestion_finish_no_writes_noop() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()).open()?; + + let before_tables = tree.table_count(); + tree.ingestion()?.finish()?; + let after_tables = tree.table_count(); + + assert_eq!(before_tables, after_tables); + assert!(tree.is_empty(SeqNo::MAX, None)?); + + Ok(()) +} + +#[test] +fn blob_ingestion_only_tombstones_does_not_create_blob_files() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for i in 0..5u32 { + let key = format!("d{:03}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + let before_blobs = tree.blob_file_count(); + + let mut ingest = tree.ingestion()?; + for i in 0..5u32 { + let key = format!("d{:03}", i); + ingest.write_tombstone(key)?; + } + ingest.finish()?; + + let after_blobs = tree.blob_file_count(); + assert_eq!(before_blobs, after_blobs); + + for i in 0..5u32 { + let key = format!("d{:03}", i); + assert!(tree.get(key.as_bytes(), SeqNo::MAX)?.is_none()); + } + + Ok(()) +} + +#[test] +fn blob_ingestion_finish_no_writes_noop() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + let before_tables = tree.table_count(); + let before_blobs = tree.blob_file_count(); + + tree.ingestion()?.finish()?; + + let after_tables = tree.table_count(); + let after_blobs = tree.blob_file_count(); + + assert_eq!(before_tables, after_tables); + assert_eq!(before_blobs, after_blobs); + assert!(tree.is_empty(SeqNo::MAX, None)?); + + Ok(()) +} + +#[test] +fn blob_ingestion_separates_large_values_and_reads_ok() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(8))) + .open()?; + + let mut ingest = tree.ingestion()?; + ingest.write("k_big1", [1u8; 16])?; + ingest.write("k_big2", [2u8; 32])?; + ingest.write("k_small", "abc")?; + ingest.finish()?; + + assert!(tree.blob_file_count() >= 1); + + assert_eq!( + tree.get("k_small", SeqNo::MAX)?, + Some(b"abc".as_slice().into()) + ); + assert_eq!( + tree.get("k_big1", SeqNo::MAX)?.as_deref().map(|s| s.len()), + Some(16) + ); + assert_eq!( + tree.get("k_big2", SeqNo::MAX)?.as_deref().map(|s| s.len()), + Some(32) + ); + + Ok(()) +} diff --git a/tests/ingestion_invariants.rs b/tests/ingestion_invariants.rs new file mode 100644 index 00000000..ce9dec65 --- /dev/null +++ b/tests/ingestion_invariants.rs @@ -0,0 +1,152 @@ +use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +#[test] +fn ingestion_autoflushes_active_memtable() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()).open()?; + + // Write to active memtable + for i in 0..10u32 { + let k = format!("a{:03}", i); + tree.insert(k.as_bytes(), b"v", 1); + } + + let tables_before = tree.table_count(); + let sealed_before = tree.sealed_memtable_count(); + assert_eq!(sealed_before, 0); + + // Start ingestion (should auto-flush active) + tree.ingestion()?.finish()?; + + // After ingestion, data is in tables; no sealed memtables + assert_eq!(tree.sealed_memtable_count(), 0); + assert!(tree.table_count() > tables_before); + + // Reads must succeed from tables + for i in 0..10u32 { + let k = format!("a{:03}", i); + assert_eq!( + tree.get(k.as_bytes(), SeqNo::MAX)?, + Some(b"v".as_slice().into()) + ); + } + + Ok(()) +} + +#[test] +fn ingestion_flushes_sealed_memtables() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()).open()?; + + // Put items into active and seal them + for i in 0..8u32 { + let k = format!("s{:03}", i); + tree.insert(k.as_bytes(), b"x", 1); + } + assert!(tree.rotate_memtable().is_some()); + assert!(tree.sealed_memtable_count() > 0); + + let tables_before = tree.table_count(); + + // Ingestion should flush sealed memtables and register resulting tables + tree.ingestion()?.finish()?; + + assert_eq!(tree.sealed_memtable_count(), 0); + assert!(tree.table_count() > tables_before); + + for i in 0..8u32 { + let k = format!("s{:03}", i); + assert_eq!( + tree.get(k.as_bytes(), SeqNo::MAX)?, + Some(b"x".as_slice().into()) + ); + } + + Ok(()) +} + +#[test] +fn ingestion_blocks_memtable_writes_until_finish() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()).open()?; + + // Acquire ingestion and keep it active while another thread performs writes + let ingest = tree.ingestion()?; + + let (started_tx, started_rx) = mpsc::channel(); + let (done_tx, done_rx) = mpsc::channel(); + let tree2 = tree.clone(); + + let handle = thread::spawn(move || { + started_tx.send(()).ok(); + tree2.insert(b"k_block", b"v", 6); + done_tx.send(()).ok(); + }); + + // Wait for the writer thread to start the attempt + started_rx.recv().unwrap(); + + // Give it a moment; the insert should complete and not be blocked by ingestion + thread::sleep(Duration::from_millis(100)); + assert!(done_rx.try_recv().is_ok(), "insert should not be blocked"); + + handle.join().unwrap(); + ingest.finish()?; + + // Verify the write landed and is visible + assert_eq!( + tree.get(b"k_block", SeqNo::MAX)?, + Some(b"v".as_slice().into()) + ); + + Ok(()) +} + +#[test] +fn blob_ingestion_honors_invariants_and_blocks_writes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + // Write small values into memtable and then start blob ingestion + for i in 0..4u32 { + let k = format!("b{:03}", i); + tree.insert(k.as_bytes(), b"y", 1); + } + + let (started_tx, started_rx) = mpsc::channel(); + let (done_tx, done_rx) = mpsc::channel(); + let tree2 = tree.clone(); + + let ingest = tree.ingestion()?; + + let handle = thread::spawn(move || { + started_tx.send(()).ok(); + tree2.insert(b"b999", b"z", 31); + done_tx.send(()).ok(); + }); + + started_rx.recv().unwrap(); + thread::sleep(Duration::from_millis(100)); + assert!(done_rx.try_recv().is_ok()); + + handle.join().unwrap(); + ingest.finish()?; + + // Data visible after ingestion, including concurrent write + for i in 0..4u32 { + let k = format!("b{:03}", i); + assert_eq!( + tree.get(k.as_bytes(), SeqNo::MAX)?, + Some(b"y".as_slice().into()) + ); + } + assert_eq!(tree.get(b"b999", SeqNo::MAX)?, Some(b"z".as_slice().into())); + + Ok(()) +} diff --git a/tests/tree_bulk_ingest.rs b/tests/tree_bulk_ingest.rs index 0712ed73..c5c2e575 100644 --- a/tests/tree_bulk_ingest.rs +++ b/tests/tree_bulk_ingest.rs @@ -12,15 +12,14 @@ fn tree_bulk_ingest() -> lsm_tree::Result<()> { let tree = Config::new(folder, seqno.clone()).open()?; - tree.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let mut ingestion = tree.ingestion()?; + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k, v)?; + } + let seq = ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(tree.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -47,15 +46,14 @@ fn tree_copy() -> lsm_tree::Result<()> { let src = Config::new(folder, seqno.clone()).open()?; - src.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let mut ingestion = src.ingestion()?; + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k, v)?; + } + let seq = ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(src.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -73,13 +71,13 @@ fn tree_copy() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let dest = Config::new(folder, seqno.clone()).open()?; - dest.ingest( - src.iter(SeqNo::MAX, None) - .map(|x| x.into_inner()) - .map(|x| x.unwrap()), - &seqno, - &visible_seqno, - )?; + let mut ingestion = dest.ingestion()?; + for item in src.iter(SeqNo::MAX, None) { + let (k, v) = item.into_inner().unwrap(); + ingestion.write(k, v)?; + } + let seq = ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(dest.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -108,15 +106,14 @@ fn blob_tree_bulk_ingest() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; - tree.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let mut ingestion = tree.ingestion()?; + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k, v)?; + } + let seq = ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(tree.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -146,15 +143,14 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; - src.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let mut ingestion = src.ingestion()?; + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k, v)?; + } + let seq = ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(src.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -175,13 +171,13 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; - dest.ingest( - src.iter(SeqNo::MAX, None) - .map(|x| x.into_inner()) - .map(|x| x.unwrap()), - &seqno, - &visible_seqno, - )?; + let mut ingestion = dest.ingestion()?; + for item in src.iter(SeqNo::MAX, None) { + let (k, v) = item.into_inner().unwrap(); + ingestion.write(k, v)?; + } + let seq = ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(dest.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!(