From 3ffd10eb377be21f9e070376c03028c5410ce36b Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 25 Nov 2022 21:10:44 +0800 Subject: [PATCH] support running on wasi Signed-off-by: Runji Wang --- Cargo.lock | 30 ++++++++++------ Cargo.toml | 11 +++--- src/db.rs | 24 +++++++------ src/executor_v2/copy_from_file.rs | 7 ++++ src/executor_v2/copy_to_file.rs | 7 ++++ src/main.rs | 57 ++++++++++++++++++++----------- src/storage/mod.rs | 4 +++ src/utils/mod.rs | 2 ++ src/utils/rustyline_mock.rs | 50 +++++++++++++++++++++++++++ src/v1/executor/copy_from_file.rs | 7 ++++ src/v1/executor/copy_to_file.rs | 7 ++++ src/v1/executor/mod.rs | 5 +++ tests/sqlplannertest/src/lib.rs | 3 +- 13 files changed, 165 insertions(+), 49 deletions(-) create mode 100644 src/utils/rustyline_mock.rs diff --git a/Cargo.lock b/Cargo.lock index 7385a034b..56d1d2612 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,7 +212,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.5.4", "object", "rustc-demangle", ] @@ -974,12 +974,12 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flate2" -version = "1.0.24" +version = "1.0.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.6.2", ] [[package]] @@ -1443,9 +1443,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb68f22743a3fb35785f1e7f844ca5a3de2dde5bd0c0ef5b372065814699b121" +checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f" [[package]] name = "lock_api" @@ -1554,6 +1554,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" +dependencies = [ + "adler", +] + [[package]] name = "minstant" version = "0.1.2" @@ -2329,9 +2338,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.36.1" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "812a2ec2043c4d6bc6482f5be2ab8244613cac2493d128d36c0759e52a626ab3" +checksum = "0b1fbb4dfc4eb1d390c02df47760bb19a84bb80b301ecc947ab5406394d8223e" dependencies = [ "bitflags", "errno", @@ -2822,7 +2831,6 @@ dependencies = [ "memchr", "mio", "num_cpus", - "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -3087,9 +3095,9 @@ checksum = "936e4b492acfd135421d8dca4b1aa80a7bfc26e702ef3af710e0752684df5372" [[package]] name = "uuid" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83" +checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" dependencies = [ "getrandom", ] diff --git a/Cargo.toml b/Cargo.toml index 0c2815caa..112322a15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,10 @@ keywords = ["sql", "database", "embedded", "cli"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["jemalloc"] +default = ["jemalloc", "storage", "rustyline", "console-subscriber"] simd = [] jemalloc = ["tikv-jemallocator"] +storage = ["moka"] [dependencies] anyhow = "1" @@ -28,7 +29,7 @@ bytes = "1" chrono = "0.4" clap = { version = "4", features = ["derive"] } comfy-table = { version = "6", default-features = false } -console-subscriber = "0.1" +console-subscriber = { version = "0.1", optional = true } crc32fast = "1" csv = "1" dirs = "4" @@ -44,7 +45,7 @@ indoc = "1" iter-chunks = "0.1" itertools = "0.10" minitrace = "0.4.0" -moka = { version = "0.9", features = ["future"] } +moka = { version = "0.9", features = ["future"], optional = true } num-traits = "0.2" ordered-float = { version = "3", features = ["serde"] } parking_lot = "0.12" @@ -54,7 +55,7 @@ prost = "0.11.0" ref-cast = "1.0" risinglight_proto = "0.1" rust_decimal = "1" -rustyline = "10" +rustyline = { version = "10", optional = true } serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" smallvec = { version = "1", features = ["serde"] } @@ -62,7 +63,7 @@ sqllogictest = "0.6" sqlparser = { version = "0.26", features = ["serde"] } thiserror = "1" tikv-jemallocator = { version = "0.5", optional = true } -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "fs", "sync", "signal", "tracing", "macros"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "parking_lot"] } diff --git a/src/db.rs b/src/db.rs index 17e34aad3..bd0aec41b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,7 +4,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use futures::TryStreamExt; -use risinglight_proto::rowset::block_statistics::BlockStatisticsType; use tracing::debug; use crate::array::{ @@ -12,10 +11,9 @@ use crate::array::{ }; use crate::catalog::RootCatalogRef; use crate::parser::{parse, ParserError}; -use crate::storage::{ - InMemoryStorage, SecondaryStorage, SecondaryStorageOptions, Storage, StorageColumnRef, - StorageImpl, Table, -}; +use crate::storage::{InMemoryStorage, Storage, StorageColumnRef, StorageImpl, Table}; +#[cfg(feature = "storage")] +use crate::storage::{SecondaryStorage, SecondaryStorageOptions}; use crate::v1::binder::Binder; use crate::v1::executor::ExecutorBuilder; use crate::v1::logical_planner::LogicalPlaner; @@ -42,6 +40,7 @@ impl Database { } /// Create a new database instance with merge-tree engine. + #[cfg(feature = "storage")] pub async fn new_on_disk(options: SecondaryStorageOptions) -> Self { let storage = Arc::new(SecondaryStorage::open(options).await.unwrap()); storage.spawn_compactor().await; @@ -53,6 +52,7 @@ impl Database { } pub async fn shutdown(&self) -> Result<(), Error> { + #[cfg(feature = "storage")] if let StorageImpl::SecondaryStorage(storage) = &self.storage { storage.shutdown().await?; } @@ -94,7 +94,9 @@ impl Database { pub async fn run_internal(&self, cmd: &str) -> Result, Error> { if let Some((cmd, arg)) = cmd.split_once(' ') { if cmd == "stat" { + #[cfg(feature = "storage")] if let StorageImpl::SecondaryStorage(ref storage) = self.storage { + use risinglight_proto::rowset::block_statistics::BlockStatisticsType; let (table, col) = arg.split_once(' ').expect("failed to parse command"); let table_id = self .catalog @@ -141,15 +143,14 @@ impl Database { .to_string() .as_str(), )); - Ok(vec![Chunk::new(vec![DataChunk::from_iter([ + return Ok(vec![Chunk::new(vec![DataChunk::from_iter([ ArrayBuilderImpl::from(stat_name), ArrayBuilderImpl::from(stat_value), - ])])]) - } else { - Err(Error::InternalError( - "this storage engine doesn't support statistics".to_string(), - )) + ])])]); } + Err(Error::InternalError( + "this storage engine doesn't support statistics".to_string(), + )) } else { Err(Error::InternalError("unsupported command".to_string())) } @@ -183,6 +184,7 @@ impl Database { StorageImpl::InMemoryStorage(s) => { crate::executor_v2::build(self.catalog.clone(), s, &optimized) } + #[cfg(feature = "storage")] StorageImpl::SecondaryStorage(s) => { crate::executor_v2::build(self.catalog.clone(), s, &optimized) } diff --git a/src/executor_v2/copy_from_file.rs b/src/executor_v2/copy_from_file.rs index 5a2519c15..44f8e30a4 100644 --- a/src/executor_v2/copy_from_file.rs +++ b/src/executor_v2/copy_from_file.rs @@ -21,6 +21,7 @@ pub struct CopyFromFileExecutor { const IMPORT_PROGRESS_BAR_LIMIT: u64 = 1024 * 1024; impl CopyFromFileExecutor { + #[cfg(not(target_os = "wasi"))] #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] pub async fn execute(self) { let (tx, mut rx) = tokio::sync::mpsc::channel(1); @@ -34,6 +35,12 @@ impl CopyFromFileExecutor { handle.await.unwrap()?; } + #[cfg(target_os = "wasi")] + #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] + pub async fn execute(self) { + panic!("not supported in WASI"); + } + /// Read records from file using blocking IO. /// /// The read data chunks will be sent through `tx`. diff --git a/src/executor_v2/copy_to_file.rs b/src/executor_v2/copy_to_file.rs index 59ec035fa..dd160c77d 100644 --- a/src/executor_v2/copy_to_file.rs +++ b/src/executor_v2/copy_to_file.rs @@ -14,6 +14,7 @@ pub struct CopyToFileExecutor { } impl CopyToFileExecutor { + #[cfg(not(target_os = "wasi"))] #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] pub async fn execute(self, child: BoxedExecutor) { let (sender, recver) = mpsc::channel(1); @@ -36,6 +37,12 @@ impl CopyToFileExecutor { yield DataChunk::single(rows as _); } + #[cfg(target_os = "wasi")] + #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] + pub async fn execute(self, child: BoxedExecutor) { + panic!("not supported in WASI"); + } + fn write_file_blocking( path: PathBuf, format: FileFormat, diff --git a/src/main.rs b/src/main.rs index 048c03638..953efb9cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,12 +14,13 @@ use clap::Parser; use humantime::format_duration; use minitrace::prelude::*; use risinglight::array::{datachunk_to_sqllogictest_string, Chunk}; -use risinglight::storage::SecondaryStorageOptions; +#[cfg(not(feature = "rustyline"))] +use risinglight::utils::rustyline_mock as rustyline; use risinglight::utils::time::RoundingDuration; use risinglight::Database; use rustyline::error::ReadlineError; use rustyline::Editor; -use tokio::{select, signal}; +use tokio::select; use tracing::{info, warn, Level}; use tracing_subscriber::prelude::*; @@ -116,22 +117,26 @@ async fn run_query_in_background( } }; - select! { - _ = signal::ctrl_c() => { + #[cfg(not(target_os = "wasi"))] + let ret = select! { + _ = tokio::signal::ctrl_c() => { // we simply drop the future `task` to cancel the query. println!("Interrupted"); + return; } - ret = task => { - match ret { - Ok(chunks) => { - for chunk in chunks { - print_chunk(&chunk, &output_format); - } - print_execution_time(start_time); - } - Err(err) => println!("{}", err), + ret = task => ret, + }; + #[cfg(target_os = "wasi")] + let ret = task.await; + + match ret { + Ok(chunks) => { + for chunk in chunks { + print_chunk(&chunk, &output_format); } + print_execution_time(start_time); } + Err(err) => println!("{}", err), } } @@ -302,11 +307,13 @@ async fn run_sqllogictest( Ok(()) } -#[tokio::main] +#[cfg_attr(not(target_os = "wasi"), tokio::main)] +#[cfg_attr(target_os = "wasi", tokio::main(flavor = "current_thread"))] async fn main() -> Result<()> { let args = Args::parse(); if args.tokio_console { + #[cfg(feature = "console-subscriber")] console_subscriber::init(); } else { let fmt_layer = tracing_subscriber::fmt::layer().compact(); @@ -321,12 +328,22 @@ async fn main() -> Result<()> { info!("using query engine v2. type '\\v1' to use the legacy engine"); - let db = if args.memory { - info!("using memory engine"); - Database::new_in_memory() - } else { - info!("using Secondary engine"); - Database::new_on_disk(SecondaryStorageOptions::default_for_cli()).await + let db = match args.memory { + true => { + info!("using memory engine"); + Database::new_in_memory() + } + #[cfg(not(feature = "storage"))] + _ => { + info!("using memory engine"); + Database::new_in_memory() + } + #[cfg(feature = "storage")] + false => { + info!("using Secondary engine"); + use risinglight::storage::SecondaryStorageOptions; + Database::new_on_disk(SecondaryStorageOptions::default_for_cli()).await + } }; if let Some(file) = args.file { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index eb965b1aa..02ce9bed1 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -5,7 +5,9 @@ mod memory; pub use memory::InMemoryStorage; +#[cfg(feature = "storage")] mod secondary; +#[cfg(feature = "storage")] pub use secondary::{SecondaryStorage, StorageOptions as SecondaryStorageOptions}; mod error; @@ -27,6 +29,7 @@ use crate::v1::binder::BoundExpr; #[derive(Clone)] pub enum StorageImpl { InMemoryStorage(Arc), + #[cfg(feature = "storage")] SecondaryStorage(Arc), } @@ -46,6 +49,7 @@ impl StorageImpl { impl StorageImpl { pub fn enable_filter_scan(&self) -> bool { match self { + #[cfg(feature = "storage")] Self::SecondaryStorage(_) => true, Self::InMemoryStorage(_) => false, } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 94060a3c4..bacc329df 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,5 @@ // Copyright 2022 RisingLight Project Authors. Licensed under Apache-2.0. +#[cfg(not(feature = "rustyline"))] +pub mod rustyline_mock; pub mod time; diff --git a/src/utils/rustyline_mock.rs b/src/utils/rustyline_mock.rs new file mode 100644 index 000000000..b6995aa02 --- /dev/null +++ b/src/utils/rustyline_mock.rs @@ -0,0 +1,50 @@ +use std::io::Write; + +pub struct Editor { + _helper: H, +} + +impl Editor<()> { + pub fn new() -> Result { + Ok(Self { _helper: () }) + } + + pub fn readline(&mut self, prompt: &str) -> Result { + print!("{prompt}"); + std::io::stdout().flush()?; + let mut line = String::new(); + std::io::stdin().read_line(&mut line)?; + line.pop(); + Ok(line) + } + + pub fn add_history_entry + Into>(&mut self, _line: S) -> bool { + true + } + + pub fn save_history + ?Sized>(&mut self, _path: &P) -> Result<()> { + Ok(()) + } + + pub fn load_history + ?Sized>(&mut self, _path: &P) -> Result<()> { + Ok(()) + } +} + +type Result = std::result::Result; + +use std::path::Path; + +use error::*; + +pub mod error { + #[derive(thiserror::Error, Debug)] + pub enum ReadlineError { + #[error("interrupted")] + Interrupted, + #[error("end of file")] + Eof, + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + } +} diff --git a/src/v1/executor/copy_from_file.rs b/src/v1/executor/copy_from_file.rs index 67cf0d751..003683a1a 100644 --- a/src/v1/executor/copy_from_file.rs +++ b/src/v1/executor/copy_from_file.rs @@ -21,6 +21,7 @@ pub struct CopyFromFileExecutor { const IMPORT_PROGRESS_BAR_LIMIT: u64 = 1024 * 1024; impl CopyFromFileExecutor { + #[cfg(not(target_os = "wasi"))] #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] pub async fn execute(self) { let (tx, mut rx) = tokio::sync::mpsc::channel(1); @@ -34,6 +35,12 @@ impl CopyFromFileExecutor { handle.await.unwrap()?; } + #[cfg(target_os = "wasi")] + #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] + pub async fn execute(self) { + panic!("not supported in WASI"); + } + /// Read records from file using blocking IO. /// /// The read data chunks will be sent through `tx`. diff --git a/src/v1/executor/copy_to_file.rs b/src/v1/executor/copy_to_file.rs index 390cdeedb..3edb907ed 100644 --- a/src/v1/executor/copy_to_file.rs +++ b/src/v1/executor/copy_to_file.rs @@ -16,6 +16,7 @@ pub struct CopyToFileExecutor { } impl CopyToFileExecutor { + #[cfg(not(target_os = "wasi"))] #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] pub async fn execute(self) { let Self { @@ -43,6 +44,12 @@ impl CopyToFileExecutor { yield DataChunk::single(rows as _); } + #[cfg(target_os = "wasi")] + #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] + pub async fn execute(self) { + panic!("not supported in WASI"); + } + fn write_file_blocking( path: PathBuf, format: FileFormat, diff --git a/src/v1/executor/mod.rs b/src/v1/executor/mod.rs index fb1f056d3..372daa8d1 100644 --- a/src/v1/executor/mod.rs +++ b/src/v1/executor/mod.rs @@ -188,6 +188,7 @@ impl PlanVisitor for ExecutorBuilder { storage: storage.clone(), } .execute(), + #[cfg(feature = "storage")] StorageImpl::SecondaryStorage(storage) => CreateTableExecutor { plan: plan.clone(), storage: storage.clone(), @@ -206,6 +207,7 @@ impl PlanVisitor for ExecutorBuilder { storage: storage.clone(), } .execute(), + #[cfg(feature = "storage")] StorageImpl::SecondaryStorage(storage) => DropExecutor { plan: plan.clone(), storage: storage.clone(), @@ -226,6 +228,7 @@ impl PlanVisitor for ExecutorBuilder { child: self.visit(plan.child()).unwrap(), } .execute(), + #[cfg(feature = "storage")] StorageImpl::SecondaryStorage(storage) => InsertExecutor { table_ref_id: plan.logical().table_ref_id(), column_ids: plan.logical().column_ids().to_vec(), @@ -267,6 +270,7 @@ impl PlanVisitor for ExecutorBuilder { storage: storage.clone(), } .execute(), + #[cfg(feature = "storage")] StorageImpl::SecondaryStorage(storage) => TableScanExecutor { plan: plan.clone(), expr: plan.logical().expr().cloned(), @@ -406,6 +410,7 @@ impl PlanVisitor for ExecutorBuilder { storage: storage.clone(), } .execute(), + #[cfg(feature = "storage")] StorageImpl::SecondaryStorage(storage) => DeleteExecutor { child, table_ref_id: plan.logical().table_ref_id(), diff --git a/tests/sqlplannertest/src/lib.rs b/tests/sqlplannertest/src/lib.rs index 9decde0f6..d58937963 100644 --- a/tests/sqlplannertest/src/lib.rs +++ b/tests/sqlplannertest/src/lib.rs @@ -2,7 +2,6 @@ use anyhow::Error; use risinglight::array::*; -use risinglight::storage::SecondaryStorageOptions; use risinglight::Database; use sqlplannertest::ParsedTestCase; @@ -13,7 +12,7 @@ pub struct DatabaseWrapper; impl sqlplannertest::PlannerTestRunner for DatabaseWrapper { async fn run(&mut self, test_case: &ParsedTestCase) -> Result { if !test_case.tasks.is_empty() { - let db = Database::new_on_disk(SecondaryStorageOptions::default_for_test()).await; + let db = Database::new_in_memory(); for sql in &test_case.before_sql { db.run(sql).await?; }