From 13832a21c9863baf24b8aaad0b41c6e90e9e82bd Mon Sep 17 00:00:00 2001 From: psteinroe Date: Fri, 19 Dec 2025 12:05:02 +0100 Subject: [PATCH] refactor: improve config loading --- .dockerignore | 1 + Cargo.lock | 33 ++- Cargo.toml | 5 +- Dockerfile | 3 +- configuration/base.yml | 7 +- src/concurrency/stream.rs | 2 +- src/config/load.rs | 447 +++++++++++++++++++++++++++++++++++++ src/config/mod.rs | 2 + src/config/pipeline.rs | 6 +- src/main.rs | 3 +- src/migrations.rs | 2 +- src/test_utils/database.rs | 2 +- 12 files changed, 496 insertions(+), 17 deletions(-) create mode 100644 src/config/load.rs diff --git a/.dockerignore b/.dockerignore index c8a29ef..876e997 100644 --- a/.dockerignore +++ b/.dockerignore @@ -39,3 +39,4 @@ test-results/ # Keep these for build !.cargo/config.toml !migrations/ +!configuration/ diff --git a/Cargo.lock b/Cargo.lock index b182baa..371b779 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -610,6 +610,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "filetime" version = "0.2.26" @@ -1691,9 +1697,9 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "config", "const-oid", "etl", - "etl-config", "etl-postgres", "futures", "metrics", @@ -1707,8 +1713,11 @@ dependencies = [ "serde_json", "serde_yaml", "sqlx", + "temp-env", + "tempfile", "testcontainers", "testcontainers-modules", + "thiserror", "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", @@ -2638,6 +2647,28 @@ dependencies = [ "syn", ] +[[package]] +name = "temp-env" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96374855068f47402c3121c6eed88d29cb1de8f3ab27090e273e420bdabcf050" +dependencies = [ + "parking_lot", +] + +[[package]] +name = "tempfile" +version = "3.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" +dependencies = [ + "fastrand", + "getrandom 0.3.4", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "testcontainers" version = "0.23.3" diff --git a/Cargo.toml b/Cargo.toml index a100f7b..9a7fdbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ test-utils = ["dep:testcontainers", "dep:testcontainers-modules"] [dependencies] anyhow = { version = "1.0.98", default-features = false, features = ["std"] } chrono = { version = "0.4.41", default-features = false } +config = { version = "0.14", default-features = false, features = ["yaml", "json"] } const-oid = { version = "0.9.6", default-features = false } futures = { version = "0.3.31", default-features = false } metrics = { version = "0.24.2", default-features = false } @@ -28,6 +29,7 @@ sqlx = { version = "0.8.6", default-features = false, features = [ "chrono", "uuid", ] } +thiserror = { version = "2.0", default-features = false } tokio = { version = "1.47.0", default-features = false, features = ["rt-multi-thread", "macros", "signal"] } tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b", features = [ "runtime", @@ -41,7 +43,6 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["f x509-cert = { version = "0.2.2", default-features = false } etl = { git = "https://github.com/supabase/etl", rev = "483c7cfaf2b3730413acce8e0b4ad79a8cf28bbd" } -etl-config = { git = "https://github.com/supabase/etl", rev = "483c7cfaf2b3730413acce8e0b4ad79a8cf28bbd" } etl-postgres = { git = "https://github.com/supabase/etl", rev = "483c7cfaf2b3730413acce8e0b4ad79a8cf28bbd" } uuid = { version = "1.19.0", default-features = false, features = ["v4"] } @@ -57,6 +58,8 @@ testcontainers = { version = "0.23", optional = true } testcontainers-modules = { version = "0.11", optional = true, features = ["postgres"] } [dev-dependencies] +temp-env = "0.3" +tempfile = "3.13" [lints.clippy] fallible_impl_from = "deny" diff --git a/Dockerfile b/Dockerfile index ae79afd..c644619 100644 --- a/Dockerfile +++ b/Dockerfile @@ -47,8 +47,9 @@ WORKDIR /app # Create non-root user (distroless already has nonroot user) USER nonroot:nonroot -# Copy binary +# Copy binary and configuration files COPY --from=builder /app/target/release/postgres-stream ./postgres-stream +COPY configuration/ ./configuration/ # Use exec form for proper signal handling ENTRYPOINT ["./postgres-stream"] diff --git a/configuration/base.yml b/configuration/base.yml index c3f4a23..3bd7955 100644 --- a/configuration/base.yml +++ b/configuration/base.yml @@ -1,6 +1,3 @@ -# Base configuration for daemon -# This file contains default settings that apply to all environments - stream: id: 1 pg_connection: @@ -12,8 +9,8 @@ stream: enabled: false trusted_root_certs: "" batch: - max_size: 100 - max_fill_ms: 50 + max_size: 1000 + max_fill_ms: 1000 sink: type: memory diff --git a/src/concurrency/stream.rs b/src/concurrency/stream.rs index 439d921..92b2738 100644 --- a/src/concurrency/stream.rs +++ b/src/concurrency/stream.rs @@ -1,6 +1,6 @@ use core::pin::Pin; use core::task::{Context, Poll}; -use etl_config::shared::BatchConfig; +use etl::config::BatchConfig; use futures::{Future, Stream, ready}; use pin_project_lite::pin_project; use std::time::Duration; diff --git a/src/config/load.rs b/src/config/load.rs new file mode 100644 index 0000000..8cbde96 --- /dev/null +++ b/src/config/load.rs @@ -0,0 +1,447 @@ +use std::{ + borrow::Cow, + io, + path::{Path, PathBuf}, +}; + +use serde::de::DeserializeOwned; +use thiserror::Error; + +/// Directory containing configuration files relative to the application root. +const CONFIGURATION_DIR: &str = "configuration"; + +/// Environment variable for specifying an absolute path to the configuration directory. +const CONFIG_DIR_ENV_VAR: &str = "APP_CONFIG_DIR"; + +/// Supported extensions for base and environment configuration files. +const CONFIG_FILE_EXTENSIONS: &[&str] = &["yaml", "yml", "json"]; + +/// Prefix for environment variable configuration overrides. +const ENV_PREFIX: &str = "APP"; + +/// Separator between environment variable prefix and key segments. +const ENV_PREFIX_SEPARATOR: &str = "_"; + +/// Separator for nested configuration keys in environment variables. +const ENV_SEPARATOR: &str = "__"; + +/// Separator for list elements in environment variables. +const LIST_SEPARATOR: &str = ","; + +/// Environment variable name containing the environment identifier. +const APP_ENVIRONMENT_ENV_NAME: &str = "APP_ENVIRONMENT"; + +/// Trait implemented by configuration structures that require list parsing help. +pub trait Config { + /// Keys whose values should be parsed as lists when loading the configuration. + const LIST_PARSE_KEYS: &'static [&'static str]; +} + +/// Runtime environment for the application. +#[derive(Debug, Clone, Copy)] +enum Environment { + Prod, + Staging, + Dev, +} + +impl Environment { + /// Loads the environment from the `APP_ENVIRONMENT` environment variable. + /// + /// Defaults to [`Environment::Prod`] if the variable is not set. + fn load() -> Result { + std::env::var(APP_ENVIRONMENT_ENV_NAME) + .unwrap_or_else(|_| "prod".into()) + .try_into() + } +} + +impl std::fmt::Display for Environment { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Environment::Prod => write!(f, "prod"), + Environment::Staging => write!(f, "staging"), + Environment::Dev => write!(f, "dev"), + } + } +} + +impl TryFrom for Environment { + type Error = io::Error; + + fn try_from(s: String) -> Result { + match s.to_lowercase().as_str() { + "prod" => Ok(Self::Prod), + "staging" => Ok(Self::Staging), + "dev" => Ok(Self::Dev), + other => Err(io::Error::other(format!( + "{other} is not a supported environment. Use either `prod`/`staging`/`dev`.", + ))), + } + } +} + +/// Identifies which configuration file is currently being loaded. +#[derive(Debug, Clone, Copy)] +enum ConfigFileKind { + /// Always-present base configuration that every service loads. + Base, + /// Environment-specific overrides (dev/staging/prod). + Environment(Environment), +} + +impl ConfigFileKind { + fn stem(&self) -> Cow<'static, str> { + match self { + ConfigFileKind::Base => Cow::Borrowed("base"), + ConfigFileKind::Environment(env) => Cow::Owned(env.to_string()), + } + } + + /// Returns a static string describing this configuration file kind for error messages. + fn as_str(&self) -> &'static str { + match self { + ConfigFileKind::Base => "base", + ConfigFileKind::Environment(Environment::Dev) => "dev", + ConfigFileKind::Environment(Environment::Staging) => "staging", + ConfigFileKind::Environment(Environment::Prod) => "prod", + } + } +} + +/// Errors that can occur while loading configuration files and overrides. +#[derive(Debug, Error)] +pub enum LoadConfigError { + /// Failed to determine the current working directory. + #[error("failed to determine the current directory")] + CurrentDir(#[source] io::Error), + + /// The configured `configuration` directory does not exist. + #[error("configuration directory `{0}` does not exist")] + MissingConfigurationDirectory(PathBuf), + + /// Could not locate one of the required configuration files. + #[error("could not locate {kind} configuration in `{directory}`; attempted: {attempted}")] + ConfigurationFileMissing { + kind: &'static str, + directory: PathBuf, + attempted: String, + }, + + /// Environment variable overrides failed to merge into the configuration. + #[error("failed to load configuration from environment variables")] + EnvironmentVariables(#[source] config::ConfigError), + + /// The configuration files were parsed but deserialization failed. + #[error("failed to deserialize configuration")] + Deserialization(#[source] config::ConfigError), + + /// Failed to determine the runtime environment (`APP_ENVIRONMENT`). + #[error("failed to determine runtime environment")] + Environment(#[source] io::Error), + + /// Failed to initialize the configuration builder. + #[error("failed to initialize configuration builder")] + Builder(#[source] config::ConfigError), +} + +/// Loads hierarchical configuration from base, environment, and environment-variable sources. +/// +/// The configuration directory is determined by: +/// - First checking the `APP_CONFIG_DIR` environment variable for an absolute path +/// - If not set, using `/configuration` +/// +/// Loads files from `base.(yaml|yml|json)` (required) and `{environment}.(yaml|yml|json)` (optional) +/// before applying overrides from `APP_`-prefixed environment variables. +/// +/// Nested keys use double underscores (`APP_SERVICE__HOST`), and list values are comma-separated. +pub fn load_config() -> Result +where + T: Config + DeserializeOwned, +{ + let configuration_directory = if let Ok(config_dir) = std::env::var(CONFIG_DIR_ENV_VAR) { + // Use the absolute path provided by APP_CONFIG_DIR + PathBuf::from(config_dir) + } else { + // Fallback to /configuration + let base_path = std::env::current_dir().map_err(LoadConfigError::CurrentDir)?; + base_path.join(CONFIGURATION_DIR) + }; + + if !configuration_directory.is_dir() { + return Err(LoadConfigError::MissingConfigurationDirectory( + configuration_directory, + )); + } + + let environment = Environment::load().map_err(LoadConfigError::Environment)?; + + // Base file is required + let base_file = find_configuration_file(&configuration_directory, ConfigFileKind::Base) + .ok_or_else(|| { + let stem = ConfigFileKind::Base.stem(); + let attempted = CONFIG_FILE_EXTENSIONS + .iter() + .map(|ext| { + format!( + "`{}`", + configuration_directory + .join(format!("{stem}.{ext}")) + .display() + ) + }) + .collect::>() + .join(", "); + + LoadConfigError::ConfigurationFileMissing { + kind: ConfigFileKind::Base.as_str(), + directory: configuration_directory.clone(), + attempted, + } + })?; + + // Environment-specific file is optional + let environment_file = find_configuration_file( + &configuration_directory, + ConfigFileKind::Environment(environment), + ); + + let mut environment_source = config::Environment::with_prefix(ENV_PREFIX) + .prefix_separator(ENV_PREFIX_SEPARATOR) + .separator(ENV_SEPARATOR); + + if !T::LIST_PARSE_KEYS.is_empty() { + environment_source = environment_source + .try_parsing(true) + .list_separator(LIST_SEPARATOR); + + for key in ::LIST_PARSE_KEYS { + environment_source = environment_source.with_list_parse_key(key); + } + } + + let mut builder = config::Config::builder(); + + // Always add base file (required) + builder = builder.add_source(config::File::from(base_file)); + + // Optionally add environment file if it exists + if let Some(env_file) = environment_file { + builder = builder.add_source(config::File::from(env_file)); + } + + // Always add environment variables (highest priority) + builder = builder.add_source(environment_source); + + let settings = builder.build().map_err(LoadConfigError::Builder)?; + + settings + .try_deserialize::() + .map_err(LoadConfigError::Deserialization) +} + +/// Finds the configuration file that matches the requested kind and supported extensions. +/// Returns `Some(PathBuf)` if found, `None` otherwise. +fn find_configuration_file(directory: &Path, kind: ConfigFileKind) -> Option { + let stem = kind.stem(); + + for extension in CONFIG_FILE_EXTENSIONS { + let path = directory.join(format!("{stem}.{extension}")); + if path.is_file() { + return Some(path); + } + } + + None +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::{Deserialize, Serialize}; + use std::fs; + use temp_env::with_vars; + use tempfile::TempDir; + + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + struct TestConfig { + value: String, + number: i32, + } + + impl Config for TestConfig { + const LIST_PARSE_KEYS: &'static [&'static str] = &[]; + } + + #[test] + fn test_load_with_base_only() { + let temp_dir = TempDir::new().unwrap(); + let config_dir = temp_dir.path().join("configuration"); + fs::create_dir(&config_dir).unwrap(); + + let base_content = "value: \"from_base\"\nnumber: 42\n"; + fs::write(config_dir.join("base.yml"), base_content).unwrap(); + + with_vars( + vec![ + ("APP_ENVIRONMENT", Some("prod")), + ("APP_CONFIG_DIR", Some(config_dir.to_str().unwrap())), + ("APP_VALUE", None), + ("APP_NUMBER", None), + ], + || { + let config: TestConfig = load_config().unwrap(); + assert_eq!(config.value, "from_base"); + assert_eq!(config.number, 42); + }, + ); + } + + #[test] + fn test_load_with_base_and_env_file() { + let temp_dir = TempDir::new().unwrap(); + let config_dir = temp_dir.path().join("configuration"); + fs::create_dir(&config_dir).unwrap(); + + let base_content = "value: \"from_base\"\nnumber: 42\n"; + fs::write(config_dir.join("base.yml"), base_content).unwrap(); + + let dev_content = "value: \"from_dev\"\nnumber: 99\n"; + fs::write(config_dir.join("dev.yml"), dev_content).unwrap(); + + with_vars( + vec![ + ("APP_ENVIRONMENT", Some("dev")), + ("APP_CONFIG_DIR", Some(config_dir.to_str().unwrap())), + ("APP_VALUE", None), + ("APP_NUMBER", None), + ], + || { + let config: TestConfig = load_config().unwrap(); + assert_eq!(config.value, "from_dev"); + assert_eq!(config.number, 99); + }, + ); + } + + #[test] + fn test_env_vars_override_files() { + let temp_dir = TempDir::new().unwrap(); + let config_dir = temp_dir.path().join("configuration"); + fs::create_dir(&config_dir).unwrap(); + + let base_content = "value: \"from_base\"\nnumber: 42\n"; + fs::write(config_dir.join("base.yml"), base_content).unwrap(); + + with_vars( + vec![ + ("APP_ENVIRONMENT", Some("prod")), + ("APP_CONFIG_DIR", Some(config_dir.to_str().unwrap())), + ("APP_VALUE", Some("from_env")), + ("APP_NUMBER", Some("123")), + ], + || { + let config: TestConfig = load_config().unwrap(); + assert_eq!(config.value, "from_env"); + assert_eq!(config.number, 123); + }, + ); + } + + #[test] + fn test_missing_base_file_fails() { + let temp_dir = TempDir::new().unwrap(); + let config_dir = temp_dir.path().join("configuration"); + fs::create_dir(&config_dir).unwrap(); + + with_vars( + vec![ + ("APP_ENVIRONMENT", Some("prod")), + ("APP_CONFIG_DIR", Some(config_dir.to_str().unwrap())), + ], + || { + let result = load_config::(); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + LoadConfigError::ConfigurationFileMissing { .. } + )); + }, + ); + } + + #[test] + fn test_missing_env_file_succeeds() { + let temp_dir = TempDir::new().unwrap(); + let config_dir = temp_dir.path().join("configuration"); + fs::create_dir(&config_dir).unwrap(); + + let base_content = "value: \"from_base\"\nnumber: 42\n"; + fs::write(config_dir.join("base.yml"), base_content).unwrap(); + + with_vars( + vec![ + ("APP_ENVIRONMENT", Some("staging")), + ("APP_CONFIG_DIR", Some(config_dir.to_str().unwrap())), + ("APP_VALUE", None), + ("APP_NUMBER", None), + ], + || { + let config: TestConfig = load_config().unwrap(); + assert_eq!(config.value, "from_base"); + assert_eq!(config.number, 42); + }, + ); + } + + #[test] + fn test_app_config_dir_env_var() { + let temp_dir = TempDir::new().unwrap(); + let custom_dir = temp_dir.path().join("my-config"); + fs::create_dir(&custom_dir).unwrap(); + + let base_content = "value: \"custom_dir\"\nnumber: 777\n"; + fs::write(custom_dir.join("base.yml"), base_content).unwrap(); + + with_vars( + vec![ + ("APP_ENVIRONMENT", Some("prod")), + ("APP_CONFIG_DIR", Some(custom_dir.to_str().unwrap())), + ("APP_VALUE", None), + ("APP_NUMBER", None), + ], + || { + let config: TestConfig = load_config().unwrap(); + assert_eq!(config.value, "custom_dir"); + assert_eq!(config.number, 777); + }, + ); + } + + #[test] + fn test_environment_defaults_to_prod() { + let temp_dir = TempDir::new().unwrap(); + let config_dir = temp_dir.path().join("configuration"); + fs::create_dir(&config_dir).unwrap(); + + let base_content = "value: \"base\"\nnumber: 1\n"; + fs::write(config_dir.join("base.yml"), base_content).unwrap(); + + let prod_content = "value: \"prod\"\nnumber: 2\n"; + fs::write(config_dir.join("prod.yml"), prod_content).unwrap(); + + with_vars( + [ + ("APP_ENVIRONMENT", None::<&str>), + ("APP_CONFIG_DIR", Some(config_dir.to_str().unwrap())), + ("APP_VALUE", None::<&str>), + ("APP_NUMBER", None::<&str>), + ], + || { + let config: TestConfig = load_config().unwrap(); + assert_eq!(config.value, "prod"); + assert_eq!(config.number, 2); + }, + ); + } +} diff --git a/src/config/mod.rs b/src/config/mod.rs index 38e92dc..d41bef5 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,8 +1,10 @@ +mod load; mod pipeline; mod schema; mod sink; mod stream; +pub use load::*; pub use pipeline::*; pub use schema::*; pub use sink::*; diff --git a/src/config/pipeline.rs b/src/config/pipeline.rs index 65624d6..e23f12c 100644 --- a/src/config/pipeline.rs +++ b/src/config/pipeline.rs @@ -1,12 +1,10 @@ //! Configuration management for pgstream daemon. //! -//! Loads configuration from YAML files and environment variables, -//! following the etl-config pattern. +//! Loads configuration from YAML files and environment variables. -use etl_config::Config; use serde::Deserialize; -use crate::config::{SinkConfig, StreamConfig}; +use crate::config::{SinkConfig, StreamConfig, load::Config}; /// Configuration for the pipeline. /// diff --git a/src/main.rs b/src/main.rs index bef3048..571a982 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ use etl::error::EtlResult; -use etl_config::load_config; -use postgres_stream::config::PipelineConfig; +use postgres_stream::config::{PipelineConfig, load_config}; use postgres_stream::core::start_pipeline_with_config; use postgres_stream::metrics::init_metrics; use tracing::{error, info}; diff --git a/src/migrations.rs b/src/migrations.rs index ec7049b..df59c6b 100644 --- a/src/migrations.rs +++ b/src/migrations.rs @@ -1,4 +1,4 @@ -use etl_config::shared::{IntoConnectOptions, PgConnectionConfig}; +use etl::config::{IntoConnectOptions, PgConnectionConfig}; use sqlx::{ Executor, postgres::{PgConnectOptions, PgPoolOptions}, diff --git a/src/test_utils/database.rs b/src/test_utils/database.rs index 2059542..9d4d8ce 100644 --- a/src/test_utils/database.rs +++ b/src/test_utils/database.rs @@ -1,7 +1,7 @@ use std::time::Duration; use chrono::{Days, NaiveDate, Utc}; -use etl_config::shared::{IntoConnectOptions, PgConnectionConfig}; +use etl::config::{IntoConnectOptions, PgConnectionConfig}; use sqlx::{Connection, Executor, PgConnection, PgPool, postgres::PgPoolOptions}; use tokio::runtime::Handle;