From 95900aaefacf9ee68f65caf0f121372147e72977 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 12 Nov 2025 14:59:22 +0800 Subject: [PATCH 1/5] update --- Cargo.lock | 1 + src/query/service/Cargo.toml | 1 + .../transforms/transform_udf_script.rs | 278 ++++++++++++++++-- 3 files changed, 250 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b66f07dc8090..913dec54bac44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5320,6 +5320,7 @@ dependencies = [ "walkdir", "wiremock", "xorf", + "zip", ] [[package]] diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 2c77ad4ec6142..447fd89131d0e 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -175,6 +175,7 @@ typetag = { workspace = true } url = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } +zip = { workspace = true } xorf = { workspace = true } [dev-dependencies] diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 9fdaeab461dc6..89383e503bf37 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -90,14 +90,21 @@ impl ScriptRuntime { UDFLanguage::Python => { let code = String::from_utf8(code.to_vec())?; let code = if let Some(temp_dir) = _temp_dir { + let import_dir = temp_dir.path().display(); format!( - r#"import sys -sys._xoptions['databend_import_directory'] = '{}' -sys.path.append('{}') -{}"#, - temp_dir.path().display(), - temp_dir.path().display(), - code + r#"import sys, sysconfig +_databend_std_keys = ("stdlib", "platstdlib", "purelib", "platlib") +for _databend_key in _databend_std_keys: + _databend_path = sysconfig.get_path(_databend_key) + if _databend_path and _databend_path not in sys.path: + sys.path.append(_databend_path) +del _databend_std_keys +sys._xoptions['databend_import_directory'] = '{dir}' +if '{dir}' not in sys.path: + sys.path.append('{dir}') +{code}"#, + dir = import_dir, + code = code ) } else { code @@ -253,32 +260,44 @@ mod python_pool { const RESTRICTED_PYTHON_CODE: &str = r#" import os import sys +import sysconfig from pathlib import Path import builtins, sys if "DATABEND_RESTRICTED_PYTHON" not in sys._xoptions: sys._xoptions['DATABEND_RESTRICTED_PYTHON'] = '1' - ALLOWED_BASE = Path("/tmp") + ALLOWED_BASES = {Path("/tmp")} + for key in ("stdlib", "platstdlib", "purelib", "platlib"): + path = sysconfig.get_path(key) + if path: + ALLOWED_BASES.add(Path(path)) + for prefix in (sys.prefix, sys.exec_prefix, sys.base_prefix, sys.base_exec_prefix): + if prefix: + base_path = Path(prefix) + ALLOWED_BASES.add(base_path) + ALLOWED_BASES.add(base_path / f"lib/python{sys.version_info.major}.{sys.version_info.minor}") + _original_open = open _original_os_open = os.open if hasattr(os, 'open') else None + def _ensure_allowed(file_path: Path, target: str): + for base in ALLOWED_BASES: + try: + file_path.relative_to(base) + return + except ValueError: + continue + raise PermissionError(f"Access denied: {target} is outside allowed directories") + def safe_open(file, mode='r', **kwargs): file_path = Path(file).resolve() - - try: - file_path.relative_to(ALLOWED_BASE) - except ValueError: - raise PermissionError(f"Access denied: {file} is outside allowed directory") - + _ensure_allowed(file_path, file) return _original_open(file, mode, **kwargs) def safe_os_open(path, flags, mode=0o777): file_path = Path(path).resolve() - try: - file_path.relative_to(ALLOWED_BASE) - except ValueError: - raise PermissionError(f"Access denied: {path} is outside allowed directory") + _ensure_allowed(file_path, path) return _original_os_open(path, flags, mode) builtins.open = safe_open @@ -289,6 +308,92 @@ if "DATABEND_RESTRICTED_PYTHON" not in sys._xoptions: for module in dangerous_modules: if module in sys.modules: del sys.modules[module] + + try: + import _contextvars # noqa: F401 + except ModuleNotFoundError: + import threading + import types + import weakref + + _MISSING = object() + _REGISTERED_VARS = weakref.WeakSet() + + class Token: + __slots__ = ("var", "old") + + def __init__(self, var, old): + self.var = var + self.old = old + + class ContextVar: + __slots__ = ("name", "default", "_local") + + def __init__(self, name, *, default=_MISSING): + self.name = name + self.default = default + self._local = threading.local() + _REGISTERED_VARS.add(self) + + def get(self, default=_MISSING): + value = getattr(self._local, "value", _MISSING) + if value is _MISSING: + if default is not _MISSING: + return default + if self.default is _MISSING: + raise LookupError(f"ContextVar {self.name} has no value") + return self.default + return value + + def set(self, value): + old = getattr(self._local, "value", _MISSING) + self._local.value = value + return Token(self, old) + + def reset(self, token): + if token.var is not self: + raise ValueError("Token does not belong to this ContextVar") + if token.old is _MISSING: + if hasattr(self._local, "value"): + del self._local.value + else: + self._local.value = token.old + + class Context: + def __init__(self, values=None): + self._values = values or {} + + def __setitem__(self, key, value): + self._values[key] = value + + def items(self): + return self._values.items() + + def run(self, callable, *args, **kwargs): + tokens = [] + try: + for var, value in self._values.items(): + tokens.append(var.set(value)) + return callable(*args, **kwargs) + finally: + for token in reversed(tokens): + token.var.reset(token) + + def copy_context(): + ctx = Context() + for var in list(_REGISTERED_VARS): + try: + ctx[var] = var.get() + except LookupError: + continue + return ctx + + module = types.ModuleType("_contextvars") + module.ContextVar = ContextVar + module.Context = Context + module.Token = Token + module.copy_context = copy_context + sys.modules["_contextvars"] = module "#; impl RuntimeBuilder for PyRuntimeBuilder { @@ -386,14 +491,12 @@ impl TransformUdfScript { dependencies.extend_from_slice(packages.as_slice()); let temp_dir = if !dependencies.is_empty() || !imports_stage_info.is_empty() { - // try to find the temp dir from cache let key = venv::PyVenvKeyEntry { udf_desc: func.clone(), }; let mut w = venv::PY_VENV_CACHE.write(); - let entry = w.get(&key); - if let Some(entry) = entry { - Some(entry.temp_dir.clone()) + if let Some(entry) = w.get(&key) { + Some(entry.materialize().map_err(ErrorCode::from_string)?) } else { let temp_dir = Arc::new(venv::create_venv(PY_VERSION.as_str())?); venv::install_deps(temp_dir.path(), &dependencies)?; @@ -425,9 +528,12 @@ impl TransformUdfScript { })?; } - w.insert(key, venv::PyVenvCacheEntry { - temp_dir: temp_dir.clone(), - }); + let archive_path = venv::archive_env(temp_dir.path()) + .map_err(ErrorCode::from_string)?; + w.insert( + key, + venv::PyVenvCacheEntry::new(temp_dir.clone(), archive_path), + ); Some(temp_dir) } @@ -596,22 +702,48 @@ impl TransformUdfScript { } mod venv { + use std::fs; + use std::fs::File; + use std::io; use std::path::Path; + use std::path::PathBuf; use std::process::Command; use std::sync::Arc; use std::sync::LazyLock; + use std::sync::Weak; use databend_common_cache::LruCache; use databend_common_cache::MemSized; + use parking_lot::Mutex; use parking_lot::RwLock; use tempfile::TempDir; + use uuid::Uuid; + use walkdir::WalkDir; + use zip::write::FileOptions; + use zip::ZipArchive; + use zip::ZipWriter; use crate::physical_plans::UdfFunctionDesc; + static PY_VENV_ARCHIVE_DIR: LazyLock = LazyLock::new(|| { + let base = std::env::var("DATABEND_PY_UDF_CACHE_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| { + std::env::temp_dir() + .join("databend") + .join("python_udf_cache") + }); + if let Err(e) = fs::create_dir_all(&base) { + panic!("Failed to create python udf cache dir {:?}: {}", base, e); + } + base + }); + pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> { if deps.is_empty() { return Ok(()); } + let target_path = temp_dir_path.display().to_string(); let status = Command::new("python") .args(["-m", "pip", "install"]) @@ -622,15 +754,68 @@ mod venv { .status() .map_err(|e| format!("Failed to install dependencies: {}", e))?; - log::info!("Dependency installation success {}", deps.join(", ")); - if status.success() { + log::info!("Dependency installation success {}", deps.join(", ")); Ok(()) } else { Err("Dependency installation failed".into()) } } + pub fn archive_env(temp_dir_path: &Path) -> Result { + let archive_path = PY_VENV_ARCHIVE_DIR.join(format!("{}.zip", Uuid::now_v7())); + let writer = File::create(&archive_path) + .map_err(|e| format!("Failed to create python deps archive: {}", e))?; + let mut zip = ZipWriter::new(writer); + let options = FileOptions::<'static, ()>::default() + .compression_method(zip::CompressionMethod::Deflated) + .unix_permissions(0o755); + + for entry in WalkDir::new(temp_dir_path).min_depth(1) { + let entry = entry.map_err(|e| format!("Failed to scan python deps: {}", e))?; + let path = entry.path(); + let rel_path = path + .strip_prefix(temp_dir_path) + .map_err(|e| format!("Failed to strip python deps prefix: {}", e))?; + let rel_path = rel_path + .to_str() + .ok_or_else(|| "Python dependency path is not valid UTF-8".to_string())?; + let rel_path = rel_path.replace('\\', "/"); + + if entry.file_type().is_dir() { + zip.add_directory(&rel_path, options) + .map_err(|e| format!("Failed to add directory to archive: {}", e))?; + continue; + } + + if entry.file_type().is_file() || entry.file_type().is_symlink() { + zip.start_file(&rel_path, options) + .map_err(|e| format!("Failed to add file to archive: {}", e))?; + let mut file = File::open(path) + .map_err(|e| format!("Failed to read file {:?}: {}", path, e))?; + io::copy(&mut file, &mut zip) + .map_err(|e| format!("Failed to write archive entry {:?}: {}", path, e))?; + } + } + + zip.finish() + .map_err(|e| format!("Failed to finalize python deps archive: {}", e))?; + Ok(archive_path) + } + + pub fn restore_env(archive_path: &Path) -> Result { + let temp_dir = + tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?; + let reader = File::open(archive_path) + .map_err(|e| format!("Failed to read python deps archive: {}", e))?; + let mut archive = + ZipArchive::new(reader).map_err(|e| format!("Failed to open archive: {}", e))?; + archive + .extract(temp_dir.path()) + .map_err(|e| format!("Failed to extract python deps: {}", e))?; + Ok(temp_dir) + } + pub fn create_venv(_python_version: &str) -> Result { let temp_dir = tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?; @@ -671,9 +856,9 @@ mod venv { // cached temp dir for python udf // Add this after the PY_VERSION LazyLock declaration // A simple LRU cache for Python virtual environments - #[derive(Clone)] pub(crate) struct PyVenvCacheEntry { - pub(crate) temp_dir: Arc, + temp_dir: Mutex>, + archive_path: PathBuf, } #[derive(Eq, Hash, PartialEq)] @@ -689,7 +874,40 @@ mod venv { impl MemSized for PyVenvCacheEntry { fn mem_bytes(&self) -> usize { - std::mem::size_of::() + std::mem::size_of::>>() + std::mem::size_of::() + } + } + + impl PyVenvCacheEntry { + pub fn new(temp_dir: Arc, archive_path: PathBuf) -> Self { + Self { + temp_dir: Mutex::new(Arc::downgrade(&temp_dir)), + archive_path, + } + } + + pub fn materialize(&self) -> Result, String> { + if let Some(existing) = self.temp_dir.lock().upgrade() { + return Ok(existing); + } + + let temp_dir = Arc::new(restore_env(&self.archive_path)?); + *self.temp_dir.lock() = Arc::downgrade(&temp_dir); + Ok(temp_dir) + } + } + + impl Drop for PyVenvCacheEntry { + fn drop(&mut self) { + if let Err(e) = fs::remove_file(&self.archive_path) { + if !matches!(e.kind(), io::ErrorKind::NotFound) { + log::warn!( + "Failed to remove python udf cache archive {:?}: {}", + self.archive_path, + e + ); + } + } } } From 5f94a69f34b3942cfc9dd883c84873586c04fe27 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 12 Nov 2025 22:14:31 +0800 Subject: [PATCH 2/5] show error --- .../transforms/transform_udf_script.rs | 213 ++++++++++++++++-- 1 file changed, 197 insertions(+), 16 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 89383e503bf37..b2bca398afc9f 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -14,6 +14,7 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; +use std::str; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::sync::LazyLock; @@ -34,6 +35,7 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataField; use databend_common_expression::DataSchema; use databend_common_expression::FunctionContext; +use databend_common_expression::ScalarRef; use databend_common_expression::Value; use databend_common_pipeline_transforms::processors::Transform; use databend_common_sql::plans::UDFLanguage; @@ -56,6 +58,105 @@ pub enum ScriptRuntime { static PY_VERSION: LazyLock = LazyLock::new(|| venv::detect_python_version().unwrap_or("3.12".to_string())); +#[cfg(feature = "python-udf")] +const PYTHON_SCRIPT_BOOTSTRAP: &str = r#" +import sys +import sysconfig + +_databend_std_keys = ("stdlib", "platstdlib", "purelib", "platlib") +for _databend_key in _databend_std_keys: + _databend_path = sysconfig.get_path(_databend_key) + if _databend_path and _databend_path not in sys.path: + sys.path.append(_databend_path) +del _databend_std_keys + +try: + import _contextvars # noqa: F401 +except ModuleNotFoundError: + import threading + import types + import weakref + + _databend_missing = object() + _databend_registered = weakref.WeakSet() + + class Token: + __slots__ = ("var", "old") + + def __init__(self, var, old): + self.var = var + self.old = old + + class ContextVar: + __slots__ = ("name", "default", "_local") + + def __init__(self, name, *, default=_databend_missing): + self.name = name + self.default = default + self._local = threading.local() + _databend_registered.add(self) + + def get(self, default=_databend_missing): + value = getattr(self._local, "value", _databend_missing) + if value is _databend_missing: + if default is not _databend_missing: + return default + if self.default is _databend_missing: + raise LookupError(f"ContextVar {self.name} has no value") + return self.default + return value + + def set(self, value): + old = getattr(self._local, "value", _databend_missing) + self._local.value = value + return Token(self, old) + + def reset(self, token): + if token.var is not self: + raise ValueError("Token does not belong to this ContextVar") + if token.old is _databend_missing: + if hasattr(self._local, "value"): + del self._local.value + else: + self._local.value = token.old + + class Context: + def __init__(self, values=None): + self._values = values or {} + + def __setitem__(self, key, value): + self._values[key] = value + + def items(self): + return self._values.items() + + def run(self, callable, *args, **kwargs): + tokens = [] + try: + for var, value in self._values.items(): + tokens.append(var.set(value)) + return callable(*args, **kwargs) + finally: + for token in reversed(tokens): + token.var.reset(token) + + def copy_context(): + ctx = Context() + for var in list(_databend_registered): + try: + ctx[var] = var.get() + except LookupError: + continue + return ctx + + module = types.ModuleType("_contextvars") + module.ContextVar = ContextVar + module.Context = Context + module.Token = Token + module.copy_context = copy_context + sys.modules["_contextvars"] = module +"#; + impl ScriptRuntime { pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option>) -> Result { let UDFType::Script(box UDFScriptCode { language, code, .. }) = &func.udf_type else { @@ -88,27 +189,35 @@ impl ScriptRuntime { } #[cfg(feature = "python-udf")] UDFLanguage::Python => { - let code = String::from_utf8(code.to_vec())?; - let code = if let Some(temp_dir) = _temp_dir { + let user_code = String::from_utf8(code.to_vec())?; + let mut script = + String::with_capacity(PYTHON_SCRIPT_BOOTSTRAP.len() + user_code.len() + 256); + script.push_str(PYTHON_SCRIPT_BOOTSTRAP); + if let Some(temp_dir) = &_temp_dir { let import_dir = temp_dir.path().display(); - format!( - r#"import sys, sysconfig -_databend_std_keys = ("stdlib", "platstdlib", "purelib", "platlib") -for _databend_key in _databend_std_keys: - _databend_path = sysconfig.get_path(_databend_key) - if _databend_path and _databend_path not in sys.path: - sys.path.append(_databend_path) -del _databend_std_keys + script.push_str(&format!( + r#" sys._xoptions['databend_import_directory'] = '{dir}' if '{dir}' not in sys.path: sys.path.append('{dir}') -{code}"#, +"#, dir = import_dir, - code = code - ) - } else { - code - }; + )); + + let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref()); + if !stage_paths.is_empty() { + script.push_str("for _databend_zip in ("); + for (idx, path) in stage_paths.iter().enumerate() { + if idx > 0 { + script.push_str(", "); + } + script.push_str(&format!("{path:?}")); + } + script.push_str("):\n if _databend_zip not in sys.path:\n sys.path.append(_databend_zip)\ndel _databend_zip\n"); + } + } + script.push_str(&user_code); + let code = script; let builder = PyRuntimeBuilder { name: func.name.clone(), @@ -181,6 +290,36 @@ if '{dir}' not in sys.path: }; Ok(result_batch) } + + #[cfg(feature = "python-udf")] + fn collect_stage_sys_paths(func: &UdfFunctionDesc, temp_dir: &TempDir) -> Vec { + match &func.udf_type { + UDFType::Script(box UDFScriptCode { + imports_stage_info, .. + }) => imports_stage_info + .iter() + .filter_map(|(_, stage_path)| { + let name = stage_path + .trim_end_matches('/') + .rsplit('/') + .next() + .unwrap_or(stage_path); + let local_path = temp_dir.path().join(name); + let ext = local_path + .extension() + .and_then(|s| s.to_str()) + .map(|s| s.to_ascii_lowercase()); + match ext.as_deref() { + Some("zip") | Some("whl") | Some("egg") => { + Some(local_path.display().to_string()) + } + _ => None, + } + }) + .collect(), + _ => Vec::new(), + } + } } pub struct JsRuntimeBuilder { @@ -672,6 +811,20 @@ impl TransformUdfScript { )) })?; + if let Some(error_idx) = schema + .fields() + .iter() + .position(|field| field.name().eq_ignore_ascii_case("error")) + { + let error_entry = result_block.get_by_offset(error_idx); + if let Some(message) = Self::first_non_null_error(error_entry) { + return Err(ErrorCode::UDFRuntimeError(format!( + "Python UDF {:?} execution failed: {}", + func.name, message + ))); + } + } + let entry = if contains_variant(&func.data_type) { let value = transform_variant(&result_block.get_by_offset(0).value(), false).map_err( |err| { @@ -699,6 +852,34 @@ impl TransformUdfScript { data_block.add_entry(entry); Ok(()) } + + fn first_non_null_error(entry: &BlockEntry) -> Option { + for row in 0..entry.len() { + if let Some(value) = entry.index(row) { + match value { + ScalarRef::Null => continue, + ScalarRef::String(message) => { + if !message.is_empty() { + return Some(message.to_string()); + } + } + ScalarRef::Binary(bytes) => { + if let Ok(text) = str::from_utf8(bytes) { + if !text.is_empty() { + return Some(text.to_string()); + } + } else { + return Some("Python UDF returned non UTF-8 error payload".to_string()); + } + } + other => { + return Some(format!("{other:?}")); + } + } + } + } + None + } } mod venv { From fb2aae6e82bd976d2459518e9c0250c4db8d9be7 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Thu, 13 Nov 2025 08:41:00 +0800 Subject: [PATCH 3/5] show error --- src/query/service/Cargo.toml | 2 +- .../transforms/transform_udf_script.rs | 253 +++++++----------- 2 files changed, 96 insertions(+), 159 deletions(-) diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 447fd89131d0e..6a63bdb837bdd 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -175,8 +175,8 @@ typetag = { workspace = true } url = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } -zip = { workspace = true } xorf = { workspace = true } +zip = { workspace = true } [dev-dependencies] arrow-cast = { workspace = true } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index b2bca398afc9f..135a6c40bd3a5 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -35,7 +35,6 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataField; use databend_common_expression::DataSchema; use databend_common_expression::FunctionContext; -use databend_common_expression::ScalarRef; use databend_common_expression::Value; use databend_common_pipeline_transforms::processors::Transform; use databend_common_sql::plans::UDFLanguage; @@ -58,105 +57,6 @@ pub enum ScriptRuntime { static PY_VERSION: LazyLock = LazyLock::new(|| venv::detect_python_version().unwrap_or("3.12".to_string())); -#[cfg(feature = "python-udf")] -const PYTHON_SCRIPT_BOOTSTRAP: &str = r#" -import sys -import sysconfig - -_databend_std_keys = ("stdlib", "platstdlib", "purelib", "platlib") -for _databend_key in _databend_std_keys: - _databend_path = sysconfig.get_path(_databend_key) - if _databend_path and _databend_path not in sys.path: - sys.path.append(_databend_path) -del _databend_std_keys - -try: - import _contextvars # noqa: F401 -except ModuleNotFoundError: - import threading - import types - import weakref - - _databend_missing = object() - _databend_registered = weakref.WeakSet() - - class Token: - __slots__ = ("var", "old") - - def __init__(self, var, old): - self.var = var - self.old = old - - class ContextVar: - __slots__ = ("name", "default", "_local") - - def __init__(self, name, *, default=_databend_missing): - self.name = name - self.default = default - self._local = threading.local() - _databend_registered.add(self) - - def get(self, default=_databend_missing): - value = getattr(self._local, "value", _databend_missing) - if value is _databend_missing: - if default is not _databend_missing: - return default - if self.default is _databend_missing: - raise LookupError(f"ContextVar {self.name} has no value") - return self.default - return value - - def set(self, value): - old = getattr(self._local, "value", _databend_missing) - self._local.value = value - return Token(self, old) - - def reset(self, token): - if token.var is not self: - raise ValueError("Token does not belong to this ContextVar") - if token.old is _databend_missing: - if hasattr(self._local, "value"): - del self._local.value - else: - self._local.value = token.old - - class Context: - def __init__(self, values=None): - self._values = values or {} - - def __setitem__(self, key, value): - self._values[key] = value - - def items(self): - return self._values.items() - - def run(self, callable, *args, **kwargs): - tokens = [] - try: - for var, value in self._values.items(): - tokens.append(var.set(value)) - return callable(*args, **kwargs) - finally: - for token in reversed(tokens): - token.var.reset(token) - - def copy_context(): - ctx = Context() - for var in list(_databend_registered): - try: - ctx[var] = var.get() - except LookupError: - continue - return ctx - - module = types.ModuleType("_contextvars") - module.ContextVar = ContextVar - module.Context = Context - module.Token = Token - module.copy_context = copy_context - sys.modules["_contextvars"] = module -"#; - impl ScriptRuntime { pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option>) -> Result { let UDFType::Script(box UDFScriptCode { language, code, .. }) = &func.udf_type else { @@ -190,14 +90,11 @@ impl ScriptRuntime { #[cfg(feature = "python-udf")] UDFLanguage::Python => { let user_code = String::from_utf8(code.to_vec())?; - let mut script = - String::with_capacity(PYTHON_SCRIPT_BOOTSTRAP.len() + user_code.len() + 256); - script.push_str(PYTHON_SCRIPT_BOOTSTRAP); - if let Some(temp_dir) = &_temp_dir { + let code = if let Some(temp_dir) = &_temp_dir { let import_dir = temp_dir.path().display(); + let mut script = String::from("import sys\n"); script.push_str(&format!( - r#" -sys._xoptions['databend_import_directory'] = '{dir}' + r#"sys._xoptions['databend_import_directory'] = '{dir}' if '{dir}' not in sys.path: sys.path.append('{dir}') "#, @@ -215,9 +112,23 @@ if '{dir}' not in sys.path: } script.push_str("):\n if _databend_zip not in sys.path:\n sys.path.append(_databend_zip)\ndel _databend_zip\n"); } + + script.push_str(&user_code); + script + } else { + user_code + }; + + if let Some(temp_dir) = &_temp_dir { + let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref()); + if !stage_paths.is_empty() { + log::info!( + "Python UDF {:?} added stage artifacts to sys.path: {:?}", + func.name, + stage_paths + ); + } } - script.push_str(&user_code); - let code = script; let builder = PyRuntimeBuilder { name: func.name.clone(), @@ -252,14 +163,23 @@ if '{dir}' not in sys.path: }) })?, #[cfg(feature = "python-udf")] - ScriptRuntime::Python(pool) => pool - .call(|runtime| runtime.call(&func.name, input_batch)) - .map_err(|err| { - ErrorCode::UDFRuntimeError(format!( - "Python UDF {:?} execution failed: {err}", - func.name - )) - })?, + ScriptRuntime::Python(pool) => { + let batch = pool + .call(|runtime| runtime.call(&func.name, input_batch)) + .map_err(|err| { + ErrorCode::UDFRuntimeError(format!( + "Python UDF {:?} execution failed: {err}", + func.name + )) + })?; + if let Some(message) = Self::python_error_from_batch(func, &batch)? { + return Err(ErrorCode::UDFRuntimeError(format!( + "Python UDF {:?} execution failed: {}", + func.name, message + ))); + } + batch + } ScriptRuntime::WebAssembly(runtime) => { let args_types: Vec<_> = input_batch .schema() @@ -291,6 +211,65 @@ if '{dir}' not in sys.path: Ok(result_batch) } + #[cfg(feature = "python-udf")] + fn python_error_from_batch( + func: &UdfFunctionDesc, + batch: &RecordBatch, + ) -> Result> { + let schema = DataSchema::try_from(&(*batch.schema())).map_err(|err| { + ErrorCode::UDFDataError(format!( + "Failed to create schema from record batch for function '{}': {}", + func.name, err + )) + })?; + + if schema.fields().len() > 1 { + let error_field = &schema.fields()[1]; + if error_field.name().eq_ignore_ascii_case("error") { + let result_block = DataBlock::from_record_batch(&schema, batch).map_err(|err| { + ErrorCode::UDFDataError(format!( + "Failed to create data block from record batch for function '{}': {}", + func.name, err + )) + })?; + let error_entry = result_block.get_by_offset(1); + if let Some(message) = Self::first_non_null_error(error_entry) { + return Ok(Some(message)); + } + } + } + Ok(None) + } + + #[cfg(feature = "python-udf")] + fn first_non_null_error(entry: &BlockEntry) -> Option { + for row in 0..entry.len() { + if let Some(value) = entry.index(row) { + match value { + ScalarRef::Null => continue, + ScalarRef::String(message) => { + if !message.is_empty() { + return Some(message.to_string()); + } + } + ScalarRef::Binary(bytes) => { + if let Ok(text) = str::from_utf8(bytes) { + if !text.is_empty() { + return Some(text.to_string()); + } + } else { + return Some("Python UDF returned non UTF-8 error payload".to_string()); + } + } + other => { + return Some(format!("{other:?}")); + } + } + } + } + None + } + #[cfg(feature = "python-udf")] fn collect_stage_sys_paths(func: &UdfFunctionDesc, temp_dir: &TempDir) -> Vec { match &func.udf_type { @@ -811,20 +790,6 @@ impl TransformUdfScript { )) })?; - if let Some(error_idx) = schema - .fields() - .iter() - .position(|field| field.name().eq_ignore_ascii_case("error")) - { - let error_entry = result_block.get_by_offset(error_idx); - if let Some(message) = Self::first_non_null_error(error_entry) { - return Err(ErrorCode::UDFRuntimeError(format!( - "Python UDF {:?} execution failed: {}", - func.name, message - ))); - } - } - let entry = if contains_variant(&func.data_type) { let value = transform_variant(&result_block.get_by_offset(0).value(), false).map_err( |err| { @@ -852,34 +817,6 @@ impl TransformUdfScript { data_block.add_entry(entry); Ok(()) } - - fn first_non_null_error(entry: &BlockEntry) -> Option { - for row in 0..entry.len() { - if let Some(value) = entry.index(row) { - match value { - ScalarRef::Null => continue, - ScalarRef::String(message) => { - if !message.is_empty() { - return Some(message.to_string()); - } - } - ScalarRef::Binary(bytes) => { - if let Ok(text) = str::from_utf8(bytes) { - if !text.is_empty() { - return Some(text.to_string()); - } - } else { - return Some("Python UDF returned non UTF-8 error payload".to_string()); - } - } - other => { - return Some(format!("{other:?}")); - } - } - } - } - None - } } mod venv { From 96f2ac104a2040c07d418dc423ff8ac9ac709d2d Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Thu, 13 Nov 2025 09:39:51 +0800 Subject: [PATCH 4/5] show error --- .../transforms/transform_udf_script.rs | 98 ++++++++++++++++--- 1 file changed, 86 insertions(+), 12 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 135a6c40bd3a5..7ae44c81ccefb 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -36,6 +36,7 @@ use databend_common_expression::DataField; use databend_common_expression::DataSchema; use databend_common_expression::FunctionContext; use databend_common_expression::Value; +use databend_common_meta_app::principal::StageInfo; use databend_common_pipeline_transforms::processors::Transform; use databend_common_sql::plans::UDFLanguage; use databend_common_sql::plans::UDFScriptCode; @@ -243,6 +244,8 @@ if '{dir}' not in sys.path: #[cfg(feature = "python-udf")] fn first_non_null_error(entry: &BlockEntry) -> Option { + use databend_common_expression::ScalarRef; + for row in 0..entry.len() { if let Some(value) = entry.index(row) { match value { @@ -608,10 +611,11 @@ impl TransformUdfScript { let mut dependencies = Self::extract_deps(&code_str); dependencies.extend_from_slice(packages.as_slice()); - let temp_dir = if !dependencies.is_empty() || !imports_stage_info.is_empty() { - let key = venv::PyVenvKeyEntry { - udf_desc: func.clone(), - }; + let stage_fingerprints = Self::collect_stage_fingerprints(imports_stage_info)?; + + let temp_dir = if !dependencies.is_empty() || !stage_fingerprints.is_empty() { + let key = + venv::PyVenvKeyEntry::new(&dependencies, stage_fingerprints.clone()); let mut w = venv::PY_VENV_CACHE.write(); if let Some(entry) = w.get(&key) { Some(entry.materialize().map_err(ErrorCode::from_string)?) @@ -648,10 +652,9 @@ impl TransformUdfScript { let archive_path = venv::archive_env(temp_dir.path()) .map_err(ErrorCode::from_string)?; - w.insert( - key, - venv::PyVenvCacheEntry::new(temp_dir.clone(), archive_path), - ); + let cache_entry = + venv::PyVenvCacheEntry::new(temp_dir.clone(), archive_path); + w.insert(key, cache_entry); Some(temp_dir) } @@ -744,6 +747,39 @@ impl TransformUdfScript { Ok(block_entries) } + #[cfg(feature = "python-udf")] + fn collect_stage_fingerprints( + imports_stage_info: &[(StageInfo, String)], + ) -> Result> { + if imports_stage_info.is_empty() { + return Ok(Vec::new()); + } + + let mut tasks = Vec::with_capacity(imports_stage_info.len()); + for (stage, path) in imports_stage_info.iter() { + let op = init_stage_operator(stage)?; + let stage_name = stage.stage_name.clone(); + let path = path.clone(); + tasks.push(async move { + let meta = op.stat(&path).await.map_err(|e| { + ErrorCode::StorageUnavailable(format!( + "Failed to stat stage file '{path}': {e}" + )) + })?; + Ok(venv::StageImportFingerprint { + stage_name, + path, + content_length: meta.content_length(), + etag: meta.etag().map(str::to_string), + }) + }); + } + + databend_common_base::runtime::block_on(async { + futures::future::try_join_all(tasks).await + }) + } + fn create_input_batch( &self, block_entries: Vec, @@ -841,8 +877,6 @@ mod venv { use zip::ZipArchive; use zip::ZipWriter; - use crate::physical_plans::UdfFunctionDesc; - static PY_VENV_ARCHIVE_DIR: LazyLock = LazyLock::new(|| { let base = std::env::var("DATABEND_PY_UDF_CACHE_DIR") .map(PathBuf::from) @@ -979,14 +1013,54 @@ mod venv { archive_path: PathBuf, } + #[derive(Clone, Eq, Hash, PartialEq)] + pub(crate) struct StageImportFingerprint { + pub(crate) stage_name: String, + pub(crate) path: String, + pub(crate) content_length: u64, + pub(crate) etag: Option, + } + #[derive(Eq, Hash, PartialEq)] pub(crate) struct PyVenvKeyEntry { - pub(crate) udf_desc: UdfFunctionDesc, + pub(crate) dependencies: Vec, + pub(crate) stage_fingerprints: Vec, + } + + impl PyVenvKeyEntry { + pub fn new( + dependencies: &[String], + stage_fingerprints: Vec, + ) -> Self { + let mut deps = dependencies.to_vec(); + deps.sort(); + Self { + dependencies: deps, + stage_fingerprints, + } + } } impl MemSized for PyVenvKeyEntry { fn mem_bytes(&self) -> usize { - std::mem::size_of::() + self.dependencies + .iter() + .map(MemSized::mem_bytes) + .sum::() + + self + .stage_fingerprints + .iter() + .map(MemSized::mem_bytes) + .sum::() + } + } + + impl MemSized for StageImportFingerprint { + fn mem_bytes(&self) -> usize { + self.stage_name.mem_bytes() + + self.path.mem_bytes() + + std::mem::size_of::() + + self.etag.mem_bytes() } } From e92a1e62023160727e1fca264f49e58b11edcb57 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Thu, 13 Nov 2025 15:35:40 +0800 Subject: [PATCH 5/5] window expr can't be inside window expr --- .../transforms/transform_udf_script.rs | 60 +++++++++++-------- src/query/sql/src/planner/binder/window.rs | 6 ++ .../query/window_function/window_basic.test | 4 ++ 3 files changed, 44 insertions(+), 26 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 7ae44c81ccefb..63ce8910c00fe 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -36,6 +36,7 @@ use databend_common_expression::DataField; use databend_common_expression::DataSchema; use databend_common_expression::FunctionContext; use databend_common_expression::Value; +#[cfg_attr(not(feature = "python-udf"), allow(unused_imports))] use databend_common_meta_app::principal::StageInfo; use databend_common_pipeline_transforms::processors::Transform; use databend_common_sql::plans::UDFLanguage; @@ -747,37 +748,44 @@ impl TransformUdfScript { Ok(block_entries) } - #[cfg(feature = "python-udf")] fn collect_stage_fingerprints( imports_stage_info: &[(StageInfo, String)], ) -> Result> { - if imports_stage_info.is_empty() { - return Ok(Vec::new()); - } + #[cfg(feature = "python-udf")] + { + if imports_stage_info.is_empty() { + return Ok(Vec::new()); + } - let mut tasks = Vec::with_capacity(imports_stage_info.len()); - for (stage, path) in imports_stage_info.iter() { - let op = init_stage_operator(stage)?; - let stage_name = stage.stage_name.clone(); - let path = path.clone(); - tasks.push(async move { - let meta = op.stat(&path).await.map_err(|e| { - ErrorCode::StorageUnavailable(format!( - "Failed to stat stage file '{path}': {e}" - )) - })?; - Ok(venv::StageImportFingerprint { - stage_name, - path, - content_length: meta.content_length(), - etag: meta.etag().map(str::to_string), - }) - }); - } + let mut tasks = Vec::with_capacity(imports_stage_info.len()); + for (stage, path) in imports_stage_info.iter() { + let op = init_stage_operator(stage)?; + let stage_name = stage.stage_name.clone(); + let path = path.clone(); + tasks.push(async move { + let meta = op.stat(&path).await.map_err(|e| { + ErrorCode::StorageUnavailable(format!( + "Failed to stat stage file '{path}': {e}" + )) + })?; + Ok(venv::StageImportFingerprint { + stage_name, + path, + content_length: meta.content_length(), + etag: meta.etag().map(str::to_string), + }) + }); + } - databend_common_base::runtime::block_on(async { - futures::future::try_join_all(tasks).await - }) + databend_common_base::runtime::block_on(async { + futures::future::try_join_all(tasks).await + }) + } + #[cfg(not(feature = "python-udf"))] + { + let _ = imports_stage_info; + Ok(Vec::new()) + } } fn create_input_batch( diff --git a/src/query/sql/src/planner/binder/window.rs b/src/query/sql/src/planner/binder/window.rs index d1e1c355f7372..e3b9b6bc1248d 100644 --- a/src/query/sql/src/planner/binder/window.rs +++ b/src/query/sql/src/planner/binder/window.rs @@ -532,6 +532,12 @@ pub struct WindowAggregateRewriter<'a> { impl<'a> VisitorMut<'a> for WindowAggregateRewriter<'a> { fn visit(&mut self, expr: &'a mut ScalarExpr) -> Result<()> { + if matches!(expr, ScalarExpr::WindowFunction(_)) { + return Err(ErrorCode::SemanticError( + "Window function cannot contain another window function".to_string(), + )); + } + if let ScalarExpr::AggregateFunction(agg_func) = expr { let Some(agg) = self .bind_context diff --git a/tests/sqllogictests/suites/query/window_function/window_basic.test b/tests/sqllogictests/suites/query/window_function/window_basic.test index 94bc2ebd422c2..8b205038552b1 100644 --- a/tests/sqllogictests/suites/query/window_function/window_basic.test +++ b/tests/sqllogictests/suites/query/window_function/window_basic.test @@ -75,6 +75,10 @@ SELECT row_number() OVER (PARTITION BY depname ORDER BY salary) rn FROM empsalar 2 3 +## window can't be inside window +statement error +select avg(number) over(partition by number % 3 order by number %3) as a, a + 3 as b, avg(b) over (partition by number % 3 order by number %3) as c from numbers(10); + statement error select number %3 a, number %4 b, row_number() over(order by number % 11) from range(1, 10) t(number) group by a,b;