diff --git a/Cargo.lock b/Cargo.lock index 2b66f07dc8090..913dec54bac44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5320,6 +5320,7 @@ dependencies = [ "walkdir", "wiremock", "xorf", + "zip", ] [[package]] diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 2c77ad4ec6142..6a63bdb837bdd 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -176,6 +176,7 @@ url = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } xorf = { workspace = true } +zip = { workspace = true } [dev-dependencies] arrow-cast = { workspace = true } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 9fdaeab461dc6..63ce8910c00fe 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -14,6 +14,7 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; +use std::str; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::sync::LazyLock; @@ -35,6 +36,8 @@ use databend_common_expression::DataField; use databend_common_expression::DataSchema; use databend_common_expression::FunctionContext; use databend_common_expression::Value; +#[cfg_attr(not(feature = "python-udf"), allow(unused_imports))] +use databend_common_meta_app::principal::StageInfo; use databend_common_pipeline_transforms::processors::Transform; use databend_common_sql::plans::UDFLanguage; use databend_common_sql::plans::UDFScriptCode; @@ -88,21 +91,47 @@ impl ScriptRuntime { } #[cfg(feature = "python-udf")] UDFLanguage::Python => { - let code = String::from_utf8(code.to_vec())?; - let code = if let Some(temp_dir) = _temp_dir { - format!( - r#"import sys -sys._xoptions['databend_import_directory'] = '{}' -sys.path.append('{}') -{}"#, - temp_dir.path().display(), - temp_dir.path().display(), - code - ) + let user_code = String::from_utf8(code.to_vec())?; + let code = if let Some(temp_dir) = &_temp_dir { + let import_dir = temp_dir.path().display(); + let mut script = String::from("import sys\n"); + script.push_str(&format!( + r#"sys._xoptions['databend_import_directory'] = '{dir}' +if '{dir}' not in sys.path: + sys.path.append('{dir}') +"#, + dir = import_dir, + )); + + let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref()); + if !stage_paths.is_empty() { + script.push_str("for _databend_zip in ("); + for (idx, path) in stage_paths.iter().enumerate() { + if idx > 0 { + script.push_str(", "); + } + script.push_str(&format!("{path:?}")); + } + script.push_str("):\n if _databend_zip not in sys.path:\n sys.path.append(_databend_zip)\ndel _databend_zip\n"); + } + + script.push_str(&user_code); + script } else { - code + user_code }; + if let Some(temp_dir) = &_temp_dir { + let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref()); + if !stage_paths.is_empty() { + log::info!( + "Python UDF {:?} added stage artifacts to sys.path: {:?}", + func.name, + stage_paths + ); + } + } + let builder = PyRuntimeBuilder { name: func.name.clone(), handler: func.func_name.clone(), @@ -136,14 +165,23 @@ sys.path.append('{}') }) })?, #[cfg(feature = "python-udf")] - ScriptRuntime::Python(pool) => pool - .call(|runtime| runtime.call(&func.name, input_batch)) - .map_err(|err| { - ErrorCode::UDFRuntimeError(format!( - "Python UDF {:?} execution failed: {err}", - func.name - )) - })?, + ScriptRuntime::Python(pool) => { + let batch = pool + .call(|runtime| runtime.call(&func.name, input_batch)) + .map_err(|err| { + ErrorCode::UDFRuntimeError(format!( + "Python UDF {:?} execution failed: {err}", + func.name + )) + })?; + if let Some(message) = Self::python_error_from_batch(func, &batch)? { + return Err(ErrorCode::UDFRuntimeError(format!( + "Python UDF {:?} execution failed: {}", + func.name, message + ))); + } + batch + } ScriptRuntime::WebAssembly(runtime) => { let args_types: Vec<_> = input_batch .schema() @@ -174,6 +212,97 @@ sys.path.append('{}') }; Ok(result_batch) } + + #[cfg(feature = "python-udf")] + fn python_error_from_batch( + func: &UdfFunctionDesc, + batch: &RecordBatch, + ) -> Result> { + let schema = DataSchema::try_from(&(*batch.schema())).map_err(|err| { + ErrorCode::UDFDataError(format!( + "Failed to create schema from record batch for function '{}': {}", + func.name, err + )) + })?; + + if schema.fields().len() > 1 { + let error_field = &schema.fields()[1]; + if error_field.name().eq_ignore_ascii_case("error") { + let result_block = DataBlock::from_record_batch(&schema, batch).map_err(|err| { + ErrorCode::UDFDataError(format!( + "Failed to create data block from record batch for function '{}': {}", + func.name, err + )) + })?; + let error_entry = result_block.get_by_offset(1); + if let Some(message) = Self::first_non_null_error(error_entry) { + return Ok(Some(message)); + } + } + } + Ok(None) + } + + #[cfg(feature = "python-udf")] + fn first_non_null_error(entry: &BlockEntry) -> Option { + use databend_common_expression::ScalarRef; + + for row in 0..entry.len() { + if let Some(value) = entry.index(row) { + match value { + ScalarRef::Null => continue, + ScalarRef::String(message) => { + if !message.is_empty() { + return Some(message.to_string()); + } + } + ScalarRef::Binary(bytes) => { + if let Ok(text) = str::from_utf8(bytes) { + if !text.is_empty() { + return Some(text.to_string()); + } + } else { + return Some("Python UDF returned non UTF-8 error payload".to_string()); + } + } + other => { + return Some(format!("{other:?}")); + } + } + } + } + None + } + + #[cfg(feature = "python-udf")] + fn collect_stage_sys_paths(func: &UdfFunctionDesc, temp_dir: &TempDir) -> Vec { + match &func.udf_type { + UDFType::Script(box UDFScriptCode { + imports_stage_info, .. + }) => imports_stage_info + .iter() + .filter_map(|(_, stage_path)| { + let name = stage_path + .trim_end_matches('/') + .rsplit('/') + .next() + .unwrap_or(stage_path); + let local_path = temp_dir.path().join(name); + let ext = local_path + .extension() + .and_then(|s| s.to_str()) + .map(|s| s.to_ascii_lowercase()); + match ext.as_deref() { + Some("zip") | Some("whl") | Some("egg") => { + Some(local_path.display().to_string()) + } + _ => None, + } + }) + .collect(), + _ => Vec::new(), + } + } } pub struct JsRuntimeBuilder { @@ -253,32 +382,44 @@ mod python_pool { const RESTRICTED_PYTHON_CODE: &str = r#" import os import sys +import sysconfig from pathlib import Path import builtins, sys if "DATABEND_RESTRICTED_PYTHON" not in sys._xoptions: sys._xoptions['DATABEND_RESTRICTED_PYTHON'] = '1' - ALLOWED_BASE = Path("/tmp") + ALLOWED_BASES = {Path("/tmp")} + for key in ("stdlib", "platstdlib", "purelib", "platlib"): + path = sysconfig.get_path(key) + if path: + ALLOWED_BASES.add(Path(path)) + for prefix in (sys.prefix, sys.exec_prefix, sys.base_prefix, sys.base_exec_prefix): + if prefix: + base_path = Path(prefix) + ALLOWED_BASES.add(base_path) + ALLOWED_BASES.add(base_path / f"lib/python{sys.version_info.major}.{sys.version_info.minor}") + _original_open = open _original_os_open = os.open if hasattr(os, 'open') else None + def _ensure_allowed(file_path: Path, target: str): + for base in ALLOWED_BASES: + try: + file_path.relative_to(base) + return + except ValueError: + continue + raise PermissionError(f"Access denied: {target} is outside allowed directories") + def safe_open(file, mode='r', **kwargs): file_path = Path(file).resolve() - - try: - file_path.relative_to(ALLOWED_BASE) - except ValueError: - raise PermissionError(f"Access denied: {file} is outside allowed directory") - + _ensure_allowed(file_path, file) return _original_open(file, mode, **kwargs) def safe_os_open(path, flags, mode=0o777): file_path = Path(path).resolve() - try: - file_path.relative_to(ALLOWED_BASE) - except ValueError: - raise PermissionError(f"Access denied: {path} is outside allowed directory") + _ensure_allowed(file_path, path) return _original_os_open(path, flags, mode) builtins.open = safe_open @@ -289,6 +430,92 @@ if "DATABEND_RESTRICTED_PYTHON" not in sys._xoptions: for module in dangerous_modules: if module in sys.modules: del sys.modules[module] + + try: + import _contextvars # noqa: F401 + except ModuleNotFoundError: + import threading + import types + import weakref + + _MISSING = object() + _REGISTERED_VARS = weakref.WeakSet() + + class Token: + __slots__ = ("var", "old") + + def __init__(self, var, old): + self.var = var + self.old = old + + class ContextVar: + __slots__ = ("name", "default", "_local") + + def __init__(self, name, *, default=_MISSING): + self.name = name + self.default = default + self._local = threading.local() + _REGISTERED_VARS.add(self) + + def get(self, default=_MISSING): + value = getattr(self._local, "value", _MISSING) + if value is _MISSING: + if default is not _MISSING: + return default + if self.default is _MISSING: + raise LookupError(f"ContextVar {self.name} has no value") + return self.default + return value + + def set(self, value): + old = getattr(self._local, "value", _MISSING) + self._local.value = value + return Token(self, old) + + def reset(self, token): + if token.var is not self: + raise ValueError("Token does not belong to this ContextVar") + if token.old is _MISSING: + if hasattr(self._local, "value"): + del self._local.value + else: + self._local.value = token.old + + class Context: + def __init__(self, values=None): + self._values = values or {} + + def __setitem__(self, key, value): + self._values[key] = value + + def items(self): + return self._values.items() + + def run(self, callable, *args, **kwargs): + tokens = [] + try: + for var, value in self._values.items(): + tokens.append(var.set(value)) + return callable(*args, **kwargs) + finally: + for token in reversed(tokens): + token.var.reset(token) + + def copy_context(): + ctx = Context() + for var in list(_REGISTERED_VARS): + try: + ctx[var] = var.get() + except LookupError: + continue + return ctx + + module = types.ModuleType("_contextvars") + module.ContextVar = ContextVar + module.Context = Context + module.Token = Token + module.copy_context = copy_context + sys.modules["_contextvars"] = module "#; impl RuntimeBuilder for PyRuntimeBuilder { @@ -385,15 +612,14 @@ impl TransformUdfScript { let mut dependencies = Self::extract_deps(&code_str); dependencies.extend_from_slice(packages.as_slice()); - let temp_dir = if !dependencies.is_empty() || !imports_stage_info.is_empty() { - // try to find the temp dir from cache - let key = venv::PyVenvKeyEntry { - udf_desc: func.clone(), - }; + let stage_fingerprints = Self::collect_stage_fingerprints(imports_stage_info)?; + + let temp_dir = if !dependencies.is_empty() || !stage_fingerprints.is_empty() { + let key = + venv::PyVenvKeyEntry::new(&dependencies, stage_fingerprints.clone()); let mut w = venv::PY_VENV_CACHE.write(); - let entry = w.get(&key); - if let Some(entry) = entry { - Some(entry.temp_dir.clone()) + if let Some(entry) = w.get(&key) { + Some(entry.materialize().map_err(ErrorCode::from_string)?) } else { let temp_dir = Arc::new(venv::create_venv(PY_VERSION.as_str())?); venv::install_deps(temp_dir.path(), &dependencies)?; @@ -425,9 +651,11 @@ impl TransformUdfScript { })?; } - w.insert(key, venv::PyVenvCacheEntry { - temp_dir: temp_dir.clone(), - }); + let archive_path = venv::archive_env(temp_dir.path()) + .map_err(ErrorCode::from_string)?; + let cache_entry = + venv::PyVenvCacheEntry::new(temp_dir.clone(), archive_path); + w.insert(key, cache_entry); Some(temp_dir) } @@ -520,6 +748,46 @@ impl TransformUdfScript { Ok(block_entries) } + fn collect_stage_fingerprints( + imports_stage_info: &[(StageInfo, String)], + ) -> Result> { + #[cfg(feature = "python-udf")] + { + if imports_stage_info.is_empty() { + return Ok(Vec::new()); + } + + let mut tasks = Vec::with_capacity(imports_stage_info.len()); + for (stage, path) in imports_stage_info.iter() { + let op = init_stage_operator(stage)?; + let stage_name = stage.stage_name.clone(); + let path = path.clone(); + tasks.push(async move { + let meta = op.stat(&path).await.map_err(|e| { + ErrorCode::StorageUnavailable(format!( + "Failed to stat stage file '{path}': {e}" + )) + })?; + Ok(venv::StageImportFingerprint { + stage_name, + path, + content_length: meta.content_length(), + etag: meta.etag().map(str::to_string), + }) + }); + } + + databend_common_base::runtime::block_on(async { + futures::future::try_join_all(tasks).await + }) + } + #[cfg(not(feature = "python-udf"))] + { + let _ = imports_stage_info; + Ok(Vec::new()) + } + } + fn create_input_batch( &self, block_entries: Vec, @@ -596,22 +864,46 @@ impl TransformUdfScript { } mod venv { + use std::fs; + use std::fs::File; + use std::io; use std::path::Path; + use std::path::PathBuf; use std::process::Command; use std::sync::Arc; use std::sync::LazyLock; + use std::sync::Weak; use databend_common_cache::LruCache; use databend_common_cache::MemSized; + use parking_lot::Mutex; use parking_lot::RwLock; use tempfile::TempDir; - - use crate::physical_plans::UdfFunctionDesc; + use uuid::Uuid; + use walkdir::WalkDir; + use zip::write::FileOptions; + use zip::ZipArchive; + use zip::ZipWriter; + + static PY_VENV_ARCHIVE_DIR: LazyLock = LazyLock::new(|| { + let base = std::env::var("DATABEND_PY_UDF_CACHE_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| { + std::env::temp_dir() + .join("databend") + .join("python_udf_cache") + }); + if let Err(e) = fs::create_dir_all(&base) { + panic!("Failed to create python udf cache dir {:?}: {}", base, e); + } + base + }); pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> { if deps.is_empty() { return Ok(()); } + let target_path = temp_dir_path.display().to_string(); let status = Command::new("python") .args(["-m", "pip", "install"]) @@ -622,15 +914,68 @@ mod venv { .status() .map_err(|e| format!("Failed to install dependencies: {}", e))?; - log::info!("Dependency installation success {}", deps.join(", ")); - if status.success() { + log::info!("Dependency installation success {}", deps.join(", ")); Ok(()) } else { Err("Dependency installation failed".into()) } } + pub fn archive_env(temp_dir_path: &Path) -> Result { + let archive_path = PY_VENV_ARCHIVE_DIR.join(format!("{}.zip", Uuid::now_v7())); + let writer = File::create(&archive_path) + .map_err(|e| format!("Failed to create python deps archive: {}", e))?; + let mut zip = ZipWriter::new(writer); + let options = FileOptions::<'static, ()>::default() + .compression_method(zip::CompressionMethod::Deflated) + .unix_permissions(0o755); + + for entry in WalkDir::new(temp_dir_path).min_depth(1) { + let entry = entry.map_err(|e| format!("Failed to scan python deps: {}", e))?; + let path = entry.path(); + let rel_path = path + .strip_prefix(temp_dir_path) + .map_err(|e| format!("Failed to strip python deps prefix: {}", e))?; + let rel_path = rel_path + .to_str() + .ok_or_else(|| "Python dependency path is not valid UTF-8".to_string())?; + let rel_path = rel_path.replace('\\', "/"); + + if entry.file_type().is_dir() { + zip.add_directory(&rel_path, options) + .map_err(|e| format!("Failed to add directory to archive: {}", e))?; + continue; + } + + if entry.file_type().is_file() || entry.file_type().is_symlink() { + zip.start_file(&rel_path, options) + .map_err(|e| format!("Failed to add file to archive: {}", e))?; + let mut file = File::open(path) + .map_err(|e| format!("Failed to read file {:?}: {}", path, e))?; + io::copy(&mut file, &mut zip) + .map_err(|e| format!("Failed to write archive entry {:?}: {}", path, e))?; + } + } + + zip.finish() + .map_err(|e| format!("Failed to finalize python deps archive: {}", e))?; + Ok(archive_path) + } + + pub fn restore_env(archive_path: &Path) -> Result { + let temp_dir = + tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?; + let reader = File::open(archive_path) + .map_err(|e| format!("Failed to read python deps archive: {}", e))?; + let mut archive = + ZipArchive::new(reader).map_err(|e| format!("Failed to open archive: {}", e))?; + archive + .extract(temp_dir.path()) + .map_err(|e| format!("Failed to extract python deps: {}", e))?; + Ok(temp_dir) + } + pub fn create_venv(_python_version: &str) -> Result { let temp_dir = tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?; @@ -671,25 +1016,98 @@ mod venv { // cached temp dir for python udf // Add this after the PY_VERSION LazyLock declaration // A simple LRU cache for Python virtual environments - #[derive(Clone)] pub(crate) struct PyVenvCacheEntry { - pub(crate) temp_dir: Arc, + temp_dir: Mutex>, + archive_path: PathBuf, + } + + #[derive(Clone, Eq, Hash, PartialEq)] + pub(crate) struct StageImportFingerprint { + pub(crate) stage_name: String, + pub(crate) path: String, + pub(crate) content_length: u64, + pub(crate) etag: Option, } #[derive(Eq, Hash, PartialEq)] pub(crate) struct PyVenvKeyEntry { - pub(crate) udf_desc: UdfFunctionDesc, + pub(crate) dependencies: Vec, + pub(crate) stage_fingerprints: Vec, + } + + impl PyVenvKeyEntry { + pub fn new( + dependencies: &[String], + stage_fingerprints: Vec, + ) -> Self { + let mut deps = dependencies.to_vec(); + deps.sort(); + Self { + dependencies: deps, + stage_fingerprints, + } + } } impl MemSized for PyVenvKeyEntry { fn mem_bytes(&self) -> usize { - std::mem::size_of::() + self.dependencies + .iter() + .map(MemSized::mem_bytes) + .sum::() + + self + .stage_fingerprints + .iter() + .map(MemSized::mem_bytes) + .sum::() + } + } + + impl MemSized for StageImportFingerprint { + fn mem_bytes(&self) -> usize { + self.stage_name.mem_bytes() + + self.path.mem_bytes() + + std::mem::size_of::() + + self.etag.mem_bytes() } } impl MemSized for PyVenvCacheEntry { fn mem_bytes(&self) -> usize { - std::mem::size_of::() + std::mem::size_of::>>() + std::mem::size_of::() + } + } + + impl PyVenvCacheEntry { + pub fn new(temp_dir: Arc, archive_path: PathBuf) -> Self { + Self { + temp_dir: Mutex::new(Arc::downgrade(&temp_dir)), + archive_path, + } + } + + pub fn materialize(&self) -> Result, String> { + if let Some(existing) = self.temp_dir.lock().upgrade() { + return Ok(existing); + } + + let temp_dir = Arc::new(restore_env(&self.archive_path)?); + *self.temp_dir.lock() = Arc::downgrade(&temp_dir); + Ok(temp_dir) + } + } + + impl Drop for PyVenvCacheEntry { + fn drop(&mut self) { + if let Err(e) = fs::remove_file(&self.archive_path) { + if !matches!(e.kind(), io::ErrorKind::NotFound) { + log::warn!( + "Failed to remove python udf cache archive {:?}: {}", + self.archive_path, + e + ); + } + } } } diff --git a/src/query/sql/src/planner/binder/window.rs b/src/query/sql/src/planner/binder/window.rs index d1e1c355f7372..e3b9b6bc1248d 100644 --- a/src/query/sql/src/planner/binder/window.rs +++ b/src/query/sql/src/planner/binder/window.rs @@ -532,6 +532,12 @@ pub struct WindowAggregateRewriter<'a> { impl<'a> VisitorMut<'a> for WindowAggregateRewriter<'a> { fn visit(&mut self, expr: &'a mut ScalarExpr) -> Result<()> { + if matches!(expr, ScalarExpr::WindowFunction(_)) { + return Err(ErrorCode::SemanticError( + "Window function cannot contain another window function".to_string(), + )); + } + if let ScalarExpr::AggregateFunction(agg_func) = expr { let Some(agg) = self .bind_context diff --git a/tests/sqllogictests/suites/query/window_function/window_basic.test b/tests/sqllogictests/suites/query/window_function/window_basic.test index 94bc2ebd422c2..8b205038552b1 100644 --- a/tests/sqllogictests/suites/query/window_function/window_basic.test +++ b/tests/sqllogictests/suites/query/window_function/window_basic.test @@ -75,6 +75,10 @@ SELECT row_number() OVER (PARTITION BY depname ORDER BY salary) rn FROM empsalar 2 3 +## window can't be inside window +statement error +select avg(number) over(partition by number % 3 order by number %3) as a, a + 3 as b, avg(b) over (partition by number % 3 order by number %3) as c from numbers(10); + statement error select number %3 a, number %4 b, row_number() over(order by number % 11) from range(1, 10) t(number) group by a,b;