diff --git a/Cargo.lock b/Cargo.lock index 0cdba13..0214a8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1681,6 +1681,7 @@ dependencies = [ "hyper", "hyper-util", "serde", + "serde_json", "serde_yaml", "shared", "thiserror 2.0.17", @@ -3138,6 +3139,7 @@ dependencies = [ "hyper-util", "tokio", "tracing", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4accfdb..d48b46b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ reqwest = { version = "0.12.23", features = ["json"] } sentry = { version = "0.45.0", features = ["tracing"] } serde = { version = "1.0.228", features = ["derive"] } serde_yaml = "0.9.34" +serde_json = "1.0.145" tempfile = "3.23.0" thiserror = "2.0.17" tokio = { version = "1.48.0", features = ["full"] } diff --git a/example_config_ingest_router.yaml b/example_config_ingest_router.yaml index dabe4f6..7f44928 100644 --- a/example_config_ingest_router.yaml +++ b/example_config_ingest_router.yaml @@ -7,17 +7,27 @@ ingest_router: port: 3001 locales: us: - us1: + - name: us1 sentry_url: "http://10.0.0.1:8080" relay_url: "http://10.0.0.1:8090" - us2: + - name: us2 sentry_url: "http://10.0.0.2:8080" relay_url: "http://10.0.0.2:8090" de: - de: + - name: de sentry_url: "http://10.0.0.3:8080" relay_url: "http://10.0.0.3:8090" + # Timeout configuration for relay handlers (optional, defaults shown) + # Two-layer timeout strategy: + # - HTTP timeout: Per-request timeout for individual HTTP calls (fixed) + # - Task timeout: Global cutoff - once first succeeds, ALL remaining tasks have + # task_subsequent_timeout_secs total to complete (prevents slow cells from blocking) + # relay_timeouts: + # http_timeout_secs: 15 # HTTP request timeout + # task_initial_timeout_secs: 20 # Wait for first upstream (must be >= http_timeout) + # task_subsequent_timeout_secs: 5 # Global deadline after first success + routes: - match: host: us.sentry.io diff --git a/ingest-router/Cargo.toml b/ingest-router/Cargo.toml index a47f115..edecc50 100644 --- a/ingest-router/Cargo.toml +++ b/ingest-router/Cargo.toml @@ -9,6 +9,7 @@ http-body-util = { workspace = true } hyper = { workspace = true } hyper-util = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } shared = { path = "../shared" } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/ingest-router/src/config.rs b/ingest-router/src/config.rs index 14b85c3..a43c5d3 100644 --- a/ingest-router/src/config.rs +++ b/ingest-router/src/config.rs @@ -22,6 +22,9 @@ pub enum ValidationError { #[error("Locale '{0}' has no valid cells (none of its cells match any upstream)")] LocaleHasNoValidCells(String), + + #[error("Invalid timeout configuration: {0}")] + InvalidTimeouts(String), } /// HTTP methods supported for route matching @@ -59,10 +62,63 @@ pub struct RelayProjectConfigsArgs { pub locale: String, } +/// Timeout configuration for relay project configs handler +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(default)] +pub struct RelayTimeouts { + /// HTTP request timeout for individual upstream calls (seconds). + /// This is the maximum time a single HTTP request can take. + /// Default: 15 seconds + pub http_timeout_secs: u16, + + /// Task timeout when waiting for the first upstream to respond (seconds). + /// Must be >= http_timeout_secs to allow HTTP requests to complete. + /// Default: 20 seconds + pub task_initial_timeout_secs: u16, + + /// Deadline for all remaining tasks after first success (seconds). + /// Aggressively cuts off slow upstreams once we have good data. + /// Default: 5 seconds + pub task_subsequent_timeout_secs: u16, +} + +impl Default for RelayTimeouts { + fn default() -> Self { + Self { + http_timeout_secs: 15, + task_initial_timeout_secs: 20, + task_subsequent_timeout_secs: 5, + } + } +} + +impl RelayTimeouts { + /// Validates the timeout configuration + pub fn validate(&self) -> Result<(), ValidationError> { + // Initial task timeout must be >= HTTP timeout to allow requests to complete + if self.task_initial_timeout_secs < self.http_timeout_secs { + return Err(ValidationError::InvalidTimeouts( + "task_initial_timeout_secs must be >= http_timeout_secs".to_string(), + )); + } + + // Subsequent task timeout should be > 0 + if self.task_subsequent_timeout_secs == 0 { + return Err(ValidationError::InvalidTimeouts( + "task_subsequent_timeout_secs must be > 0".to_string(), + )); + } + + Ok(()) + } +} + /// Cell/upstream configuration /// Note: The cell name is the HashMap key in Config.locales #[derive(Clone, Debug, Deserialize, PartialEq)] pub struct CellConfig { + /// Name/identifier of the cell + pub name: String, /// URL of the Sentry upstream server pub sentry_url: Url, /// URL of the Relay upstream server @@ -80,12 +136,14 @@ pub struct Config { pub admin_listener: AdminListener, /// Maps locale identifiers to their cells (cell name -> cell config) /// - /// Note: Uses String keys instead of an enum to allow flexible, - /// deployment-specific locale configuration without code changes. - /// Different deployments may use different locale identifiers. - pub locales: HashMap>, + /// Cells are stored as a Vec to maintain priority order - the first cell + /// in the list has highest priority for global config responses. + pub locales: HashMap>, /// Request routing rules pub routes: Vec, + /// Timeout configuration for relay handlers + #[serde(default)] + pub relay_timeouts: RelayTimeouts, } impl Config { @@ -95,6 +153,9 @@ impl Config { self.listener.validate()?; self.admin_listener.validate()?; + // Validate timeouts + self.relay_timeouts.validate()?; + // Validate locales and cells for (locale, cells) in &self.locales { // Check that locale has at least one cell @@ -102,11 +163,15 @@ impl Config { return Err(ValidationError::LocaleHasNoValidCells(locale.clone())); } - // Check for empty cell names (HashMap keys) - for cell_name in cells.keys() { - if cell_name.is_empty() { + // Check for empty cell names and collect for duplicate checking + let mut seen_names = HashSet::new(); + for cell in cells { + if cell.name.is_empty() { return Err(ValidationError::EmptyUpstreamName); } + if !seen_names.insert(&cell.name) { + return Err(ValidationError::DuplicateUpstream(cell.name.clone())); + } } } @@ -226,16 +291,16 @@ admin_listener: port: 3001 locales: us: - us1: - sentry_url: "http://127.0.0.1:8080" - relay_url: "http://127.0.0.1:8090" - us2: - sentry_url: "http://10.0.0.2:8080" - relay_url: "http://10.0.0.2:8090" + - name: us1 + sentry_url: "http://127.0.0.1:8080" + relay_url: "http://127.0.0.1:8090" + - name: us2 + sentry_url: "http://10.0.0.2:8080" + relay_url: "http://10.0.0.2:8090" de: - de1: - sentry_url: "http://10.0.0.3:8080" - relay_url: "http://10.0.0.3:8090" + - name: de1 + sentry_url: "http://10.0.0.3:8080" + relay_url: "http://10.0.0.3:8090" routes: - match: host: us.sentry.io @@ -261,6 +326,8 @@ routes: assert_eq!(config.locales.len(), 2); assert_eq!(config.locales.get("us").unwrap().len(), 2); assert_eq!(config.locales.get("de").unwrap().len(), 1); + assert_eq!(config.locales.get("us").unwrap()[0].name, "us1"); + assert_eq!(config.locales.get("us").unwrap()[1].name, "us2"); assert_eq!(config.routes.len(), 2); assert_eq!(config.routes[0].r#match.method, Some(HttpMethod::Post)); assert_eq!(config.routes[1].r#match.host, None); @@ -285,14 +352,13 @@ routes: }, locales: HashMap::from([( "us".to_string(), - HashMap::from([( - "us1".to_string(), - CellConfig { - sentry_url: Url::parse("http://127.0.0.1:8080").unwrap(), - relay_url: Url::parse("http://127.0.0.1:8090").unwrap(), - }, - )]), + vec![CellConfig { + name: "us1".to_string(), + sentry_url: Url::parse("http://127.0.0.1:8080").unwrap(), + relay_url: Url::parse("http://127.0.0.1:8090").unwrap(), + }], )]), + relay_timeouts: RelayTimeouts::default(), routes: vec![Route { r#match: Match { path: Some("/api/".to_string()), @@ -315,18 +381,28 @@ routes: // Test empty cell name let mut config = base_config.clone(); - config.locales.get_mut("us").unwrap().insert( - "".to_string(), - CellConfig { - sentry_url: Url::parse("http://10.0.0.2:8080").unwrap(), - relay_url: Url::parse("http://10.0.0.2:8090").unwrap(), - }, - ); + config.locales.get_mut("us").unwrap().push(CellConfig { + name: "".to_string(), + sentry_url: Url::parse("http://10.0.0.2:8080").unwrap(), + relay_url: Url::parse("http://10.0.0.2:8090").unwrap(), + }); assert!(matches!( config.validate().unwrap_err(), ValidationError::EmptyUpstreamName )); + // Test duplicate cell name + let mut config = base_config.clone(); + config.locales.get_mut("us").unwrap().push(CellConfig { + name: "us1".to_string(), + sentry_url: Url::parse("http://10.0.0.2:8080").unwrap(), + relay_url: Url::parse("http://10.0.0.2:8090").unwrap(), + }); + assert!(matches!( + config.validate().unwrap_err(), + ValidationError::DuplicateUpstream(_) + )); + // Test unknown locale in action let mut config = base_config.clone(); config.routes[0].action = HandlerAction::RelayProjectConfigs(RelayProjectConfigsArgs { @@ -341,11 +417,35 @@ routes: let mut config = base_config.clone(); config .locales - .insert("invalid_locale".to_string(), HashMap::new()); + .insert("invalid_locale".to_string(), Vec::new()); assert!(matches!( config.validate().unwrap_err(), ValidationError::LocaleHasNoValidCells(_) )); + + // Test invalid timeouts: task_initial < http + let mut config = base_config.clone(); + config.relay_timeouts = RelayTimeouts { + http_timeout_secs: 20, + task_initial_timeout_secs: 15, // Less than HTTP timeout + task_subsequent_timeout_secs: 5, + }; + assert!(matches!( + config.validate().unwrap_err(), + ValidationError::InvalidTimeouts(_) + )); + + // Test invalid timeouts: task_subsequent = 0 + let mut config = base_config.clone(); + config.relay_timeouts = RelayTimeouts { + http_timeout_secs: 15, + task_initial_timeout_secs: 20, + task_subsequent_timeout_secs: 0, // Zero timeout + }; + assert!(matches!( + config.validate().unwrap_err(), + ValidationError::InvalidTimeouts(_) + )); } #[test] diff --git a/ingest-router/src/errors.rs b/ingest-router/src/errors.rs index 13b2a40..fb9dda7 100644 --- a/ingest-router/src/errors.rs +++ b/ingest-router/src/errors.rs @@ -1,3 +1,6 @@ +use http_body_util::{BodyExt, Full, combinators::BoxBody}; +use hyper::body::Bytes; +use hyper::{Response, StatusCode}; use thiserror::Error; /// Errors that can occur during ingest-router operations @@ -9,30 +12,83 @@ pub enum IngestRouterError { #[error("Failed to read response body: {0}")] ResponseBodyError(String), - #[error("No route matched for request")] - NoRouteMatched, - - #[error("Upstream not found: {0}")] - UpstreamNotFound(String), - #[error("Upstream request failed for {0}: {1}")] UpstreamRequestFailed(String, String), #[error("Upstream timeout for {0}")] UpstreamTimeout(String), - #[error("Response serialization error: {0}")] + #[error("Failed to serialize response: {0}")] ResponseSerializationError(String), - #[error("Hyper error: {0}")] - HyperError(String), - - #[error("HTTP client error: {0}")] - HttpClientError(String), + #[error("Failed to build response: {0}")] + ResponseBuildError(String), #[error("Internal error: {0}")] InternalError(String), + #[error("Service unavailable: {0}")] + ServiceUnavailable(String), + #[error("IO error: {0}")] Io(#[from] std::io::Error), } + +impl IngestRouterError { + /// Returns the appropriate HTTP status code for this error + pub fn status_code(&self) -> StatusCode { + match self { + IngestRouterError::RequestBodyError(_) => StatusCode::BAD_REQUEST, + IngestRouterError::ResponseBodyError(_) => StatusCode::BAD_GATEWAY, + IngestRouterError::UpstreamRequestFailed(_, _) => StatusCode::BAD_GATEWAY, + IngestRouterError::UpstreamTimeout(_) => StatusCode::GATEWAY_TIMEOUT, + IngestRouterError::ResponseSerializationError(_) => StatusCode::INTERNAL_SERVER_ERROR, + IngestRouterError::ResponseBuildError(_) => StatusCode::INTERNAL_SERVER_ERROR, + IngestRouterError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, + IngestRouterError::ServiceUnavailable(_) => StatusCode::SERVICE_UNAVAILABLE, + IngestRouterError::Io(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + /// Converts this error into an HTTP response + pub fn into_response(self) -> Response> { + let status = self.status_code(); + let body = format!("{}\n", self); + + Response::builder() + .status(status) + .body(Full::new(Bytes::from(body)).map_err(|e| match e {}).boxed()) + .unwrap_or_else(|_| { + // Fallback if response building fails + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body( + Full::new(Bytes::from("Internal server error\n")) + .map_err(|e| match e {}) + .boxed(), + ) + .unwrap() + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_error_into_response() { + use http_body_util::BodyExt; + + // Test that errors convert to proper HTTP responses + let error = IngestRouterError::ServiceUnavailable("Test unavailable".to_string()); + let response = error.into_response(); + + assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE); + + // Verify body contains error message + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let body_str = String::from_utf8(body_bytes.to_vec()).unwrap(); + assert!(body_str.contains("Test unavailable")); + } +} diff --git a/ingest-router/src/lib.rs b/ingest-router/src/lib.rs index 84fb094..3dc644f 100644 --- a/ingest-router/src/lib.rs +++ b/ingest-router/src/lib.rs @@ -2,43 +2,59 @@ pub mod config; pub mod errors; pub mod http; pub mod locale; +pub mod relay_project_config_handler; pub mod router; -use http_body_util::{BodyExt, Full, combinators::BoxBody}; +use crate::config::{CellConfig, Config, RelayTimeouts, Route}; +use crate::errors::IngestRouterError; +use crate::relay_project_config_handler::RelayProjectConfigsHandler; +use crate::router::Router; +use http_body_util::combinators::BoxBody; use hyper::body::Bytes; use hyper::body::Incoming; use hyper::service::Service; use hyper::{Request, Response}; use shared::http::run_http_service; +use std::collections::HashMap; use std::pin::Pin; -#[derive(thiserror::Error, Debug)] -pub enum IngestRouterError { - #[error("io error: {0}")] - Io(#[from] std::io::Error), -} - -pub async fn run(config: config::Config) -> Result<(), IngestRouterError> { - let router_service = IngestRouterService {}; +pub async fn run(config: Config) -> Result<(), errors::IngestRouterError> { + let router_service = IngestRouterService::new( + config.routes.clone(), + config.locales.clone(), + config.relay_timeouts.clone(), + ); let router_task = run_http_service(&config.listener.host, config.listener.port, router_service); router_task.await?; Ok(()) } -struct IngestRouterService {} +#[derive(Clone)] +struct IngestRouterService { + router: Router, +} + +impl IngestRouterService { + fn new( + routes: Vec, + locales: HashMap>, + relay_timeouts: RelayTimeouts, + ) -> Self { + let handler = RelayProjectConfigsHandler::new(locales, relay_timeouts); + Self { + router: Router::new(routes, handler), + } + } +} impl Service> for IngestRouterService { - type Response = Response>; + type Response = Response>; type Error = IngestRouterError; type Future = Pin> + Send + 'static>>; fn call(&self, req: Request) -> Self::Future { - println!("Received request: {req:?}"); - Box::pin(async move { - Ok(Response::new( - Full::new("ok\n".into()).map_err(|e| match e {}).boxed(), - )) - }) + let router = self.router.clone(); + Box::pin(async move { router.route(req).await }) } } diff --git a/ingest-router/src/locale.rs b/ingest-router/src/locale.rs index c50130f..6f16f7b 100644 --- a/ingest-router/src/locale.rs +++ b/ingest-router/src/locale.rs @@ -49,17 +49,28 @@ impl From for Upstream { /// Collection of upstreams grouped by cell name #[derive(Clone, Debug)] pub struct Cells { + /// Prioritized list of cell names (first = highest priority) + pub cell_list: Vec, pub cell_to_upstreams: HashMap, } impl Cells { /// Build cells from cell configurations - fn from_config(cell_configs: HashMap) -> Self { + fn from_config(cell_configs: Vec) -> Self { + let mut cell_list = Vec::new(); + let mut cell_to_upstreams = HashMap::new(); + + for config in cell_configs { + let name = config.name.clone(); + let upstream = Upstream::from(config); + + cell_list.push(name.clone()); + cell_to_upstreams.insert(name, upstream); + } + Self { - cell_to_upstreams: cell_configs - .into_iter() - .map(|(name, config)| (name, config.into())) - .collect(), + cell_list, + cell_to_upstreams, } } } @@ -73,7 +84,7 @@ pub struct Locales { impl Locales { /// Build locale mappings from configuration - pub fn new(locales: HashMap>) -> Self { + pub fn new(locales: HashMap>) -> Self { // Build locale -> cells mapping let locale_to_cells = locales .into_iter() @@ -98,6 +109,7 @@ mod tests { fn cell_config(sentry_url: &str, relay_url: &str) -> CellConfig { CellConfig { + name: String::new(), sentry_url: Url::parse(sentry_url).unwrap(), relay_url: Url::parse(relay_url).unwrap(), } @@ -108,32 +120,35 @@ mod tests { let mut locales_config = HashMap::new(); locales_config.insert( "us".to_string(), - HashMap::from([ - ( - "us1".to_string(), - cell_config( + vec![ + { + let mut config = cell_config( "http://us1-sentry.example.com", "http://us1-relay.example.com", - ), - ), - ( - "us2".to_string(), - cell_config( + ); + config.name = "us1".to_string(); + config + }, + { + let mut config = cell_config( "http://us2-sentry.example.com", "http://us2-relay.example.com", - ), - ), - ]), + ); + config.name = "us2".to_string(); + config + }, + ], ); locales_config.insert( "de".to_string(), - HashMap::from([( - "de".to_string(), - cell_config( + vec![{ + let mut config = cell_config( "http://de-sentry.example.com", "http://de-relay.example.com", - ), - )]), + ); + config.name = "de".to_string(); + config + }], ); let locales = Locales::new(locales_config); @@ -141,13 +156,19 @@ mod tests { // Verify US locale has 2 cells let us_cells = locales.get_cells("us").unwrap(); assert_eq!(us_cells.cell_to_upstreams.len(), 2); + assert_eq!(us_cells.cell_list.len(), 2); assert!(us_cells.cell_to_upstreams.contains_key("us1")); assert!(us_cells.cell_to_upstreams.contains_key("us2")); + // Verify priority order (us1 is first) + assert_eq!(us_cells.cell_list[0], "us1"); + assert_eq!(us_cells.cell_list[1], "us2"); // Verify DE locale has 1 cell let de_cells = locales.get_cells("de").unwrap(); assert_eq!(de_cells.cell_to_upstreams.len(), 1); + assert_eq!(de_cells.cell_list.len(), 1); assert!(de_cells.cell_to_upstreams.contains_key("de")); + assert_eq!(de_cells.cell_list[0], "de"); // Verify unknown locale returns None assert!(locales.get_cells("unknown").is_none()); diff --git a/ingest-router/src/relay_project_config_handler/merger.rs b/ingest-router/src/relay_project_config_handler/merger.rs new file mode 100644 index 0000000..0550685 --- /dev/null +++ b/ingest-router/src/relay_project_config_handler/merger.rs @@ -0,0 +1,249 @@ +//! Response merging logic for combining upstream responses. +//! +//! This module handles merging responses from multiple upstream Sentry instances +//! following the v3 protocol merge strategy: +//! - Configs: HashMap merge (all configs from all upstreams) +//! - Pending: Array concatenation (includes failed keys) +//! - Extra fields: Priority-based selection (highest priority cell wins) + +use crate::errors::IngestRouterError; +use http_body_util::{BodyExt, Full, combinators::BoxBody}; +use hyper::body::Bytes; +use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE, HeaderMap}; +use hyper::{Response, StatusCode}; +use serde_json::Value as JsonValue; +use std::collections::HashMap; + +use super::protocol::ProjectConfigsResponse; + +/// Merged results from all upstream tasks. +pub struct MergedResults { + /// All configs merged from successful upstreams. + pub project_configs: HashMap, + /// All pending keys (from failed upstreams or upstream pending arrays). + pub pending_keys: Vec, + /// Extra fields (global config, status, etc.). + pub extra_fields: HashMap, + /// Headers from the highest priority upstream + pub http_headers: HeaderMap, +} + +impl MergedResults { + /// Creates a new empty MergedResults. + pub fn new() -> Self { + Self { + project_configs: HashMap::new(), + pending_keys: Vec::new(), + extra_fields: HashMap::new(), + http_headers: HeaderMap::new(), + } + } + + /// Merges configs from a successful upstream response. + pub fn merge_project_configs(&mut self, configs: HashMap) { + self.project_configs.extend(configs); + } + + /// Adds keys to the pending array (for failed upstreams or retry). + pub fn add_pending_keys(&mut self, keys: Vec) { + self.pending_keys.extend(keys); + } + + /// Checks if results are empty (no configs, no pending, had keys to request). + pub fn is_empty(&self, had_keys_to_request: bool) -> bool { + self.project_configs.is_empty() && self.pending_keys.is_empty() && had_keys_to_request + } + + /// Builds an HTTP response from the merged results. + /// + /// Uses headers from the highest priority cell (same cell used for global config). + /// Filters out hop-by-hop headers and Content-Length (which is recalculated for the new body). + pub fn into_response( + mut self, + ) -> Result>, IngestRouterError> { + let response = ProjectConfigsResponse { + project_configs: self.project_configs, + pending_keys: if self.pending_keys.is_empty() { + None + } else { + Some(self.pending_keys) + }, + extra_fields: self.extra_fields, + }; + + let merged_json = serde_json::to_vec(&response) + .map_err(|e| IngestRouterError::ResponseSerializationError(e.to_string()))?; + + // Filter hop-by-hop headers (connection-specific, not forwarded) + shared::http::filter_hop_by_hop(&mut self.http_headers, hyper::Version::HTTP_11); + + // Remove Content-Length since body size changed after merging + self.http_headers.remove(CONTENT_LENGTH); + + let mut builder = Response::builder().status(StatusCode::OK); + + for (name, value) in self.http_headers.iter() { + builder = builder.header(name, value); + } + + builder = builder.header(CONTENT_TYPE, "application/json"); + + builder + .body( + Full::new(Bytes::from(merged_json)) + .map_err(|e| match e {}) + .boxed(), + ) + .map_err(|e| IngestRouterError::ResponseBuildError(e.to_string())) + } +} + +impl Default for MergedResults { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http_body_util::BodyExt; + use hyper::header::{CACHE_CONTROL, HeaderValue}; + + #[tokio::test] + async fn test_empty_response() { + let results = MergedResults::new(); + let response = results.into_response().unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); + assert_eq!(parsed, serde_json::json!({"configs": {}})); + } + + #[tokio::test] + async fn test_configs_merge() { + let mut results = MergedResults::new(); + + let mut configs1 = HashMap::new(); + configs1.insert( + "key1".to_string(), + serde_json::json!({"disabled": false, "slug": "project1"}), + ); + + let mut configs2 = HashMap::new(); + configs2.insert( + "key2".to_string(), + serde_json::json!({"disabled": false, "slug": "project2"}), + ); + + results.merge_project_configs(configs1); + results.merge_project_configs(configs2); + + let response = results.into_response().unwrap(); + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); + + assert_eq!(parsed["configs"].as_object().unwrap().len(), 2); + assert!(parsed["configs"].get("key1").is_some()); + assert!(parsed["configs"].get("key2").is_some()); + } + + #[tokio::test] + async fn test_pending_handling() { + let mut results = MergedResults::new(); + + results.add_pending_keys(vec!["key1".to_string(), "key2".to_string()]); + results.add_pending_keys(vec!["key3".to_string()]); + + let response = results.into_response().unwrap(); + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); + + assert_eq!(parsed["pending"].as_array().unwrap().len(), 3); + } + + #[tokio::test] + async fn test_empty_pending_omitted() { + let results = MergedResults::new(); + let response = results.into_response().unwrap(); + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); + + assert!(parsed.get("pending").is_none()); + } + + #[tokio::test] + async fn test_headers_forwarding() { + let mut results = MergedResults::new(); + + // Add upstream headers + results + .http_headers + .insert(CACHE_CONTROL, HeaderValue::from_static("max-age=300")); + results.http_headers.insert( + "X-Sentry-Rate-Limit-Remaining", + HeaderValue::from_static("100"), + ); + // Add hop-by-hop header that should be filtered + results.http_headers.insert( + hyper::header::CONNECTION, + HeaderValue::from_static("keep-alive"), + ); + + let response = results.into_response().unwrap(); + + // Verify headers are copied + assert_eq!( + response.headers().get(CACHE_CONTROL), + Some(&HeaderValue::from_static("max-age=300")) + ); + assert_eq!( + response.headers().get("X-Sentry-Rate-Limit-Remaining"), + Some(&HeaderValue::from_static("100")) + ); + + // Verify hop-by-hop headers are filtered out + assert!(response.headers().get(hyper::header::CONNECTION).is_none()); + + // Verify Content-Type is always set + assert_eq!( + response.headers().get(CONTENT_TYPE), + Some(&HeaderValue::from_static("application/json")) + ); + } + + #[tokio::test] + async fn test_extra_fields() { + let mut results = MergedResults::new(); + + results.extra_fields.insert( + "global".to_string(), + serde_json::json!({"measurements": {"maxCustomMeasurements": 10}}), + ); + results + .extra_fields + .insert("global_status".to_string(), serde_json::json!("ready")); + + let response = results.into_response().unwrap(); + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); + + assert_eq!( + parsed["global"]["measurements"]["maxCustomMeasurements"], + 10 + ); + assert_eq!(parsed["global_status"], "ready"); + } + + #[test] + fn test_is_empty() { + let results = MergedResults::new(); + + // Empty with keys requested = empty + assert!(results.is_empty(true)); + + // Empty without keys requested = not empty (valid state) + assert!(!results.is_empty(false)); + } +} diff --git a/ingest-router/src/relay_project_config_handler/mod.rs b/ingest-router/src/relay_project_config_handler/mod.rs new file mode 100644 index 0000000..1a7bb1c --- /dev/null +++ b/ingest-router/src/relay_project_config_handler/mod.rs @@ -0,0 +1,489 @@ +//! Relay Project Configuration Handler +//! +//! This module implements the split-merge strategy for handling Sentry's +//! `/api/0/relays/projectconfigs/` endpoint across multiple upstream Sentry instances +//! in a multi-cell architecture. +//! +//! # Protocol Overview +//! +//! The Relay Project Configs endpoint (version 3) is used by Relay instances to fetch +//! project configurations needed to process events. This implementation acts as an +//! aggregation layer that: +//! 1. Splits requests across multiple upstream Sentry cells based on project ownership +//! 2. Fans out requests in parallel to multiple upstreams +//! 3. Merges responses back into a single v3-compliant response +//! 4. Passes through all config data unchanged +//! +//! ## Endpoint Details +//! The endpoint implementation is at +//! +//! - **Path**: `/api/0/relays/projectconfigs/` +//! - **Method**: `POST` +//! - **Protocol Version**: 3 +//! - **Authentication**: RelayAuthentication (X-Sentry-Relay-Id, X-Sentry-Relay-Signature) +//! +//! # Request Format (Version 3) +//! +//! ```json +//! { +//! "publicKeys": ["key1", "key2", "key3"], +//! "noCache": false, +//! "global": true +//! } +//! ``` +//! +//! ## Request Fields +//! +//! - **`publicKeys`** (required): Array of project public keys (DSN keys) to fetch configs for +//! - **`noCache`** (optional): If `true`, bypass caching and compute fresh configs (downgrades to v2 behavior) +//! - **`global`** (optional): If `true`, include global relay configuration in response +//! +//! # Response Format (Version 3) +//! +//! ```json +//! { +//! "configs": { +//! "key1": { +//! "disabled": false, +//! "slug": "project-slug", +//! "publicKeys": [...], +//! "config": {...}, +//! "organizationId": 1, +//! "projectId": 42 +//! } +//! }, +//! "pending": ["key2", "key3"], +//! "global": { +//! "measurements": {...}, +//! "options": {...} +//! }, +//! "global_status": "ready" +//! } +//! ``` +//! +//! ## Response Fields +//! +//! - **`configs`**: Map of public keys to their project configurations +//! - Configs are passed through unchanged from upstream Sentry instances +//! - They will add the relevant processing relay URL in the response +//! - **`pending`**: Array of public keys for which configs are being computed asynchronously +//! - Relay should retry the request later to fetch these configs +//! - Also used when upstreams fail/timeout (graceful degradation) +//! - **`global`**: Global relay configuration (only if `global: true` in request) +//! - **`global_status`**: Status of global config (always "ready" when present) +//! +//! # Implementation Details +//! +//! ## Request Splitting Strategy +//! +//! When a request arrives with multiple public keys: +//! +//! 1. **Route each key to its owning cell** +//! - Currently: Round-robin distribution (TODO: replace with control plane lookup) +//! - TODO: Query locator service for each public key to get cell name +//! +//! 2. **Group keys by target upstream** +//! - Keys routed to the same cell are batched into one request +//! +//! 3. **Handle global config flag** +//! - All upstreams receive the same `global` flag value as the original request +//! - Global config is selected from the highest priority cell that returns it +//! - Priority is determined by cell order in configuration (first = highest priority) +//! - Enables failover: if highest priority cell fails, next cell's global config is used +//! +//! ## Response Merging Strategy +//! +//! Responses from multiple upstreams are merged as follows: +//! +//! ### Configs (HashMap merge) +//! - Merge all `configs` HashMaps from all upstreams +//! - Configs are passed through unchanged (no modifications) +//! - relay_url is expected to be added in the upstream response +//! +//! ### Pending (Array concatenation) +//! - Concatenate all `pending` arrays from all upstream responses +//! - Include keys from failed/timed-out upstreams +//! - Relay will retry these keys in a subsequent request +//! +//! ### Extra fields (Priority-based selection) +//! - Select `extra` fields from highest priority cell that responds successfully +//! - Priority determined by cell order in configuration (first = highest) +//! - Forward compatibility: new fields are automatically preserved +//! +//! ## Error Handling +//! +//! ### Partial Failures (Graceful Degradation) +//! - If some upstreams succeed: Return successful configs + pending for failed keys +//! - Failed keys are added to `pending` array (v3 protocol) +//! - Logged but does not block response +//! +//! ### Total Failure +//! - If all upstreams fail: Check if any keys were added to pending +//! - If pending is not empty: Return 200 OK with pending array (relay will retry) +//! - If pending is empty: Return 503 error (no recoverable state) +//! +//! ### Upstream Failure Scenarios +//! - **Timeout**: All keys from that upstream → pending +//! - **Connection error**: All keys from that upstream → pending +//! - **Parse error**: All keys from that upstream → pending +//! - **Task panic**: Logged error (extreme edge case, keys lost) +//! +//! ## Request Flow +//! +//! ### Success Scenario +//! +//! ```text +//! ┌─────────────┐ +//! │ Relay │ +//! └──────┬──────┘ +//! │ +//! │ POST /api/0/relays/projectconfigs/ +//! │ {publicKeys: [A,B,C,D,E,F]} +//! │ +//! ▼ +//! ┌──────────────────────────────────────┐ +//! │ Ingest Router (this handler) │ +//! │ │ +//! │ 1. Parse request │ +//! │ 2. Split keys by cell: │ +//! │ • US1 → [A,C,E] │ +//! │ • US2 → [B,D,F] │ +//! └───┬──────────────────────────┬───────┘ +//! │ │ +//! │ {publicKeys: [A,C,E], │ {publicKeys: [B,D,F], +//! │ global: true} │ global: true} +//! │ │ +//! ▼ ▼ +//! ┌──────────┐ ┌──────────┐ +//! │Cell US1 │ │Cell US2 │ +//! │(Sentry) │ │(Sentry) │ +//! └────┬─────┘ └─────┬────┘ +//! │ │ +//! │ {configs: {A,C,E}} │ {configs: {B,D,F}} +//! │ │ +//! └──────────┬───────────────┘ +//! ▼ +//! ┌──────────────────────────────────────┐ +//! │ Ingest Router (this handler) │ +//! │ │ +//! │ 3. Merge responses: │ +//! │ • Combine all configs │ +//! │ • Merge pending arrays │ +//! │ • Select others from priority │ +//! └──────────────┬───────────────────────┘ +//! │ +//! │ {configs: {A,B,C,D,E,F}, +//! │ global: {...}} +//! │ +//! ▼ +//! ┌─────────────┐ +//! │ Relay │ +//! └─────────────┘ +//! ``` +//! +//! ### Failure Scenario (Graceful Degradation) +//! +//! When an upstream fails or times out, keys are added to the `pending` array: +//! +//! ```text +//! ┌─────────────┐ +//! │ Relay │ +//! └──────┬──────┘ +//! │ +//! │ POST {publicKeys: [A,B,C,D,E,F]} +//! │ +//! ▼ +//! ┌──────────────────────────────────────┐ +//! │ Ingest Router (this handler) │ +//! │ Split: US1→[A,C,E], US2→[B,D,F] │ +//! └───┬──────────────────────────┬───────┘ +//! │ │ +//! ▼ ▼ +//! ┌──────────┐ ┌──────────┐ +//! │Cell US1 │ │Cell US2 │ +//! └────┬─────┘ └─────┬────┘ +//! │ │ +//! │ ✓ Success │ ✗ Timeout/Error +//! │ {configs: {A,C,E}} │ +//! │ │ +//! └──────────┬───────────────┘ +//! ▼ +//! ┌──────────────────────────────────────┐ +//! │ Ingest Router (this handler) │ +//! │ │ +//! │ • Collect successful: {A,C,E} │ +//! │ • Add failed to pending: [B,D,F] │ +//! └──────────────┬───────────────────────┘ +//! │ +//! │ {configs: {A,C,E}, +//! │ pending: [B,D,F]} +//! │ +//! ▼ +//! ┌─────────────┐ +//! │ Relay │ (will retry pending) +//! └─────────────┘ +//! ``` +//! +//! # Examples +//! +//! ## Example 1: All upstreams succeed +//! +//! **Request**: +//! ```json +//! {"publicKeys": ["key1", "key2"]} +//! ``` +//! +//! **Response**: +//! ```json +//! { +//! "configs": { +//! "key1": {"disabled": false, "slug": "project-us1", ...}, +//! "key2": {"disabled": false, "slug": "project-us2", ...} +//! } +//! } +//! ``` +//! +//! ## Example 2: One upstream fails +//! +//! **Request**: +//! ```json +//! {"publicKeys": ["key1", "key2", "key3"]} +//! ``` +//! +//! **Response** (if upstream with key2,key3 times out): +//! ```json +//! { +//! "configs": { +//! "key1": {"disabled": false, "slug": "project-us1", ...} +//! }, +//! "pending": ["key2", "key3"] +//! } +//! ``` +//! +//! ## Example 3: Request with global config +//! +//! **Request**: +//! ```json +//! {"publicKeys": ["key1", "key2"], "global": true} +//! ``` +//! +//! **Splitting**: +//! - Request to US1: `{"publicKeys": ["key1"], "global": true}` +//! - Request to US2: `{"publicKeys": ["key2"], "global": true}` +//! +//! (US1 has higher priority, so its global config will be used in the final response) +//! +//! **Response**: +//! ```json +//! { +//! "configs": {...}, +//! "global": {"measurements": {...}}, +//! "global_status": "ready" +//! } +//! ``` +//! +//! # Timeout Configuration +//! +//! The handler uses a two-layer timeout strategy: +//! +//! ## HTTP Timeout (per-request) +//! - Applied to individual HTTP requests to upstream Sentry instances +//! - Fixed timeout for each request +//! - Default: 15 seconds +//! - Configured via `relay_timeouts.http_timeout_secs` +//! +//! ## Task Timeout (adaptive) +//! - **Initial timeout**: Wait for first upstream to respond +//! - Default: 20 seconds +//! - Must be >= HTTP timeout to allow at least one request to complete +//! - Configured via `relay_timeouts.task_initial_timeout_secs` +//! +//! - **Subsequent timeout**: Global deadline after first success +//! - Once first upstream succeeds, ALL remaining tasks have this much time total +//! - Prevents slow/down cells from blocking progress +//! - Default: 5 seconds +//! - Configured via `relay_timeouts.task_subsequent_timeout_secs` +//! +//! ### Rationale +//! +//! The adaptive timeout strategy ensures: +//! 1. We wait long enough for at least one cell to respond +//! 2. Fast cells aren't blocked by slow cells +//! 3. We don't wait indefinitely for stragglers once we have good data +//! 4. Failed/timed-out keys are added to `pending` for retry +//! +//! # See Also +//! +//! - [`RelayProjectConfigsHandler`] - Main handler struct for processing requests + +mod merger; +mod protocol; +mod splitter; +mod task_executor; + +use crate::config::{CellConfig, RelayTimeouts}; +use crate::errors::IngestRouterError; +use crate::locale::Locales; +use http_body_util::combinators::BoxBody; +use hyper::body::{Body, Bytes}; +use hyper::{Request, Response}; +use hyper_util::client::legacy::Client; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::rt::TokioExecutor; +use std::collections::HashMap; + +use protocol::ProjectConfigsRequest; +use splitter::PublicKeyRouter; +use task_executor::UpstreamTaskExecutor; + +/// Handler for Relay Project Configs endpoint. +pub struct RelayProjectConfigsHandler { + /// Executes parallel upstream requests with adaptive timeout strategy + executor: UpstreamTaskExecutor, + + /// Maps locale names to their configured upstream Sentry cells + locales: Locales, + + /// Routes public keys to their owning cells + router: PublicKeyRouter, +} + +impl RelayProjectConfigsHandler { + pub fn new(locales_config: HashMap>, timeouts: RelayTimeouts) -> Self { + let connector = HttpConnector::new(); + let client = Client::builder(TokioExecutor::new()).build(connector); + + Self { + executor: UpstreamTaskExecutor::new(client, timeouts), + locales: Locales::new(locales_config), + router: PublicKeyRouter::new(), + } + } + + pub async fn handle( + &self, + locale: &str, + request: Request, + ) -> Result>, IngestRouterError> + where + B: Body + Send + 'static, + B::Data: Send, + B::Error: std::error::Error + Send + Sync + 'static, + { + let cells = self.locales.get_cells(locale).ok_or_else(|| { + IngestRouterError::InternalError(format!("No cells configured for locale: {}", locale)) + })?; + + let (parts, body) = request.into_parts(); + let body_bytes = http_body_util::BodyExt::collect(body) + .await + .map(|collected| collected.to_bytes()) + .map_err(|e| IngestRouterError::RequestBodyError(e.to_string()))?; + + let base_request = Request::from_parts(parts, ()); + + let request_data = ProjectConfigsRequest::from_bytes(&body_bytes).map_err(|e| { + IngestRouterError::RequestBodyError(format!("Failed to parse request: {e}")) + })?; + + let split_requests = self.router.split(&request_data, cells); + + let results = self + .executor + .execute(split_requests, &base_request, cells) + .await?; + + if results.is_empty(!request_data.public_keys.is_empty()) { + return Err(IngestRouterError::ServiceUnavailable( + "All upstream cells are unavailable".to_string(), + )); + } + + results.into_response() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http_body_util::BodyExt; + use url::Url; + + #[tokio::test] + async fn test_no_cells_returns_error() { + let mut locales = HashMap::new(); + locales.insert("us".to_string(), vec![]); + + let handler = RelayProjectConfigsHandler::new(locales, RelayTimeouts::default()); + + let request_body = serde_json::json!({ + "publicKeys": ["test-key-1", "test-key-2"] + }); + let body_bytes = Bytes::from(serde_json::to_vec(&request_body).unwrap()); + + let request = Request::builder() + .method("POST") + .uri("/") + .body( + http_body_util::Full::new(body_bytes) + .map_err(|e| match e {}) + .boxed(), + ) + .unwrap(); + + let result = handler.handle("us", request).await; + + // With no cells, we get no split requests, no results, and should error + assert!(result.is_err()); + match result { + Err(IngestRouterError::ServiceUnavailable(msg)) => { + assert_eq!(msg, "All upstream cells are unavailable"); + } + _ => panic!("Expected ServiceUnavailable error"), + } + } + + #[tokio::test] + async fn test_v3_upstream_failure_adds_keys_to_pending() { + let mut locales = HashMap::new(); + locales.insert( + "us".to_string(), + vec![CellConfig { + name: "us-cell-1".to_string(), + relay_url: Url::parse("http://localhost:1").unwrap(), + sentry_url: Url::parse("http://localhost:1").unwrap(), + }], + ); + + let handler = RelayProjectConfigsHandler::new(locales, RelayTimeouts::default()); + + let request_body = serde_json::json!({ + "publicKeys": ["test-key-1", "test-key-2", "test-key-3"] + }); + let body_bytes = Bytes::from(serde_json::to_vec(&request_body).unwrap()); + + let request = Request::builder() + .method("POST") + .uri("/") + .body( + http_body_util::Full::new(body_bytes) + .map_err(|e| match e {}) + .boxed(), + ) + .unwrap(); + + let result = handler.handle("us", request).await; + + assert!(result.is_ok()); + let response = result.unwrap(); + assert_eq!(response.status(), hyper::StatusCode::OK); + + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let parsed: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap(); + + assert!(parsed["configs"].as_object().unwrap().is_empty()); + assert!(parsed.get("pending").is_some()); + let pending = parsed["pending"].as_array().unwrap(); + assert_eq!(pending.len(), 3); + } +} diff --git a/ingest-router/src/relay_project_config_handler/protocol.rs b/ingest-router/src/relay_project_config_handler/protocol.rs new file mode 100644 index 0000000..9f7b553 --- /dev/null +++ b/ingest-router/src/relay_project_config_handler/protocol.rs @@ -0,0 +1,136 @@ +//! Protocol types for the Relay Project Configs endpoint (v3). +//! +//! This module defines the request and response structures for Sentry's +//! `/api/0/relays/projectconfigs/` endpoint. +//! +//! See the module-level documentation in `mod.rs` for complete protocol details. + +use hyper::body::Bytes; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use std::collections::HashMap; + +/// Request format for the relay project configs endpoint. +/// +/// # Example +/// ```json +/// { +/// "publicKeys": ["key1", "key2", "key3"], +/// "noCache": false, +/// "global": true +/// } +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProjectConfigsRequest { + /// DSN public keys to fetch configs for. + #[serde(rename = "publicKeys")] + pub public_keys: Vec, + + /// Other fields (`global`, `noCache`, future fields) for forward compatibility. + /// All fields are passed through as-is to upstreams. + #[serde(flatten)] + pub extra_fields: HashMap, +} + +impl ProjectConfigsRequest { + pub fn from_bytes(bytes: &Bytes) -> Result { + serde_json::from_slice(bytes) + } + + pub fn to_bytes(&self) -> Result { + let json = serde_json::to_vec(self)?; + Ok(Bytes::from(json)) + } +} + +/// Response format for the relay project configs endpoint. +/// +/// # Example +/// ```json +/// { +/// "configs": { +/// "key1": { +/// "disabled": false, +/// "slug": "project-slug", +/// "publicKeys": [...], +/// "config": {...}, +/// "organizationId": 1, +/// "projectId": 42 +/// } +/// }, +/// "pending": ["key2", "key3"], +/// "global": {...}, +/// "global_status": "ready" +/// } +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProjectConfigsResponse { + /// Project configs (HashMap merged from all upstreams). + #[serde(rename = "configs")] + pub project_configs: HashMap, + + /// Keys being computed async or from failed upstreams (concatenated). + #[serde(rename = "pending", skip_serializing_if = "Option::is_none")] + pub pending_keys: Option>, + + /// Other fields (`global`, `global_status`, future fields). + #[serde(flatten)] + pub extra_fields: HashMap, +} + +impl ProjectConfigsResponse { + pub fn from_bytes(bytes: &Bytes) -> Result { + serde_json::from_slice(bytes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_request_serialization() { + let mut extra = HashMap::new(); + extra.insert("global".to_string(), serde_json::json!(true)); + extra.insert("noCache".to_string(), serde_json::json!(false)); + + let request = ProjectConfigsRequest { + public_keys: vec!["key1".to_string(), "key2".to_string()], + extra_fields: extra, + }; + + let bytes = request.to_bytes().unwrap(); + let parsed = ProjectConfigsRequest::from_bytes(&bytes).unwrap(); + + assert_eq!(parsed.public_keys.len(), 2); + assert_eq!( + parsed.extra_fields.get("global"), + Some(&serde_json::json!(true)) + ); + } + + #[test] + fn test_response_serialization() { + let mut configs = HashMap::new(); + configs.insert( + "key1".to_string(), + serde_json::json!({ + "disabled": false, + "slug": "test-project" + }), + ); + + let response = ProjectConfigsResponse { + project_configs: configs, + pending_keys: Some(vec!["key2".to_string()]), + extra_fields: HashMap::new(), + }; + + let bytes = Bytes::from(serde_json::to_vec(&response).unwrap()); + let parsed = ProjectConfigsResponse::from_bytes(&bytes).unwrap(); + + assert_eq!(parsed.project_configs.len(), 1); + assert!(parsed.pending_keys.is_some()); + assert_eq!(parsed.pending_keys.unwrap().len(), 1); + } +} diff --git a/ingest-router/src/relay_project_config_handler/splitter.rs b/ingest-router/src/relay_project_config_handler/splitter.rs new file mode 100644 index 0000000..d4c28b3 --- /dev/null +++ b/ingest-router/src/relay_project_config_handler/splitter.rs @@ -0,0 +1,111 @@ +//! Key routing logic for distributing requests across upstream cells. +//! +//! Current: Round-robin distribution (placeholder) +//! TODO: Replace with control plane locator service lookup + +use crate::locale::{Cells, Upstream}; +use std::collections::HashMap; + +use super::protocol::ProjectConfigsRequest; + +/// A request split for a specific upstream cell. +#[derive(Clone)] +pub struct SplitRequest { + pub cell_name: String, + pub upstream: Upstream, + pub request: ProjectConfigsRequest, +} + +/// Routes public keys to their owning cells. +pub struct PublicKeyRouter { + // TODO: Add locator service in future PR +} + +impl PublicKeyRouter { + pub fn new() -> Self { + Self {} + } + + /// Splits request across cells using round-robin distribution. + /// + /// TODO: Replace with locator service lookup per key. + pub fn split(&self, request: &ProjectConfigsRequest, cells: &Cells) -> Vec { + if cells.cell_list.is_empty() { + return Vec::new(); + } + + let mut split: HashMap> = HashMap::new(); + + // TODO: Replace with locator service lookup + for (index, public_key) in request.public_keys.iter().enumerate() { + let cell_name = &cells.cell_list[index % cells.cell_list.len()]; + split + .entry(cell_name.clone()) + .or_default() + .push(public_key.clone()); + } + + split + .into_iter() + .map(|(cell_name, public_keys)| { + let upstream = cells + .cell_to_upstreams + .get(&cell_name) + .expect("Cell name in list must exist in HashMap"); + + SplitRequest { + cell_name, + upstream: upstream.clone(), + request: ProjectConfigsRequest { + public_keys, + extra_fields: request.extra_fields.clone(), + }, + } + }) + .collect() + } +} + +impl Default for PublicKeyRouter { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::CellConfig; + use crate::locale::Locales; + use std::collections::HashMap; + use url::Url; + + #[test] + fn test_extra_fields_passthrough() { + let splitter = PublicKeyRouter::new(); + + let locales = HashMap::from([( + "us".to_string(), + vec![CellConfig { + name: "us1".to_string(), + sentry_url: Url::parse("http://us1:8080").unwrap(), + relay_url: Url::parse("http://us1:8090").unwrap(), + }], + )]); + + let locales_obj = Locales::new(locales); + let cells = locales_obj.get_cells("us").unwrap(); + + let mut extra = HashMap::new(); + extra.insert("global".to_string(), serde_json::json!(true)); + + let request = ProjectConfigsRequest { + public_keys: vec!["key1".to_string()], + extra_fields: extra.clone(), + }; + + let splits = splitter.split(&request, cells); + + assert_eq!(splits[0].request.extra_fields, extra); + } +} diff --git a/ingest-router/src/relay_project_config_handler/task_executor.rs b/ingest-router/src/relay_project_config_handler/task_executor.rs new file mode 100644 index 0000000..6624f61 --- /dev/null +++ b/ingest-router/src/relay_project_config_handler/task_executor.rs @@ -0,0 +1,606 @@ +//! Task execution for parallel upstream requests. + +use crate::config::RelayTimeouts; +use crate::errors::IngestRouterError; +use crate::http::send_to_upstream; +use crate::locale::Cells; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::{Request, Response}; +use hyper_util::client::legacy::Client; +use hyper_util::client::legacy::connect::HttpConnector; +use std::collections::HashMap; +use std::time::Duration; +use tokio::task::JoinSet; +use tokio::time::Instant; + +use super::merger::MergedResults; +use super::protocol::{ProjectConfigsRequest, ProjectConfigsResponse}; +use super::splitter::SplitRequest; + +/// Result from a single upstream task execution. +struct UpstreamTaskResult { + /// The cell name this result came from + cell_name: String, + + /// The public keys that were requested from this upstream. + /// If the request fails, these keys will be added to the pending array + public_keys: Vec, + + /// The HTTP response from the upstream + result: Result, IngestRouterError>, +} + +/// Collection of spawned upstream tasks with tracking metadata. +/// +/// Used to track which public keys were sent to which tasks, so that +/// if a task fails or is aborted, we can add its keys to the pending array. +struct SpawnedTasks { + /// The set of spawned async tasks making requests to upstreams + join_set: JoinSet, + + /// Maps task IDs to their public keys for failure handling + task_keys: HashMap>, +} + +/// State machine for adaptive timeout collection strategy. +/// +/// The collection process has two phases: +/// 1. **WaitingForFirst**: Initial phase waiting for first successful response +/// 2. **CollectingRemaining**: After first success, collecting remaining responses with shorter deadline +enum CollectionState { + /// Initial phase: waiting for first successful response with long timeout. + WaitingForFirst, + + /// Subsequent phase: first success received, collecting remaining with shorter timeout. + /// Tracks when the first success occurred to calculate the subsequent deadline. + CollectingRemaining { first_success_at: Instant }, +} + +impl CollectionState { + /// Calculates the current deadline based on state and timeout configuration. + fn current_deadline(&self, timeouts: &RelayTimeouts) -> Instant { + match self { + CollectionState::WaitingForFirst => { + Instant::now() + Duration::from_secs(timeouts.task_initial_timeout_secs as u64) + } + CollectionState::CollectingRemaining { first_success_at } => { + *first_success_at + + Duration::from_secs(timeouts.task_subsequent_timeout_secs as u64) + } + } + } + + /// Transitions to the subsequent collection phase after first success. + /// Only transitions if currently in WaitingForFirst state. + fn transition_to_subsequent(&mut self) { + if matches!(self, CollectionState::WaitingForFirst) { + *self = CollectionState::CollectingRemaining { + first_success_at: Instant::now(), + }; + } + } +} + +/// Orchestrates parallel execution of upstream requests with adaptive timeout strategy. +/// +/// This executor fans out requests to multiple upstream Sentry instances simultaneously, +/// collects their responses, and merges them. It implements a two-phase adaptive timeout +/// strategy to balance responsiveness with resilience: +/// +/// 1. **Initial phase**: Wait up to `task_initial_timeout_secs` for the first upstream to respond +/// 2. **Subsequent phase**: Once first success occurs, give all remaining tasks `task_subsequent_timeout_secs` total to complete +/// +/// This ensures fast cells aren't blocked by slow/failing cells, while still allowing +/// sufficient time for healthy upstreams to respond. +pub struct UpstreamTaskExecutor { + /// HTTP client for making requests to upstream Sentry instances + client: Client>, + + /// Timeout configuration for HTTP requests and task-level deadlines + timeouts: RelayTimeouts, +} + +impl UpstreamTaskExecutor { + pub fn new(client: Client>, timeouts: RelayTimeouts) -> Self { + Self { client, timeouts } + } + + /// Spawns and collects results from all upstream tasks. + pub async fn execute( + &self, + split_requests: Vec, + base_request: &Request<()>, + cells: &Cells, + ) -> Result { + let spawned_tasks = self.spawn_upstream_tasks(split_requests, base_request)?; + let merged = self.collect_results(spawned_tasks, cells).await; + Ok(merged) + } + + /// Spawns parallel tasks for all upstream requests. + /// + /// Returns spawned tasks with tracking metadata for failure handling. + fn spawn_upstream_tasks( + &self, + split_requests: Vec, + base_request: &Request<()>, + ) -> Result { + let mut join_set = JoinSet::new(); + let mut task_keys = HashMap::new(); + + for split in split_requests { + let request = self.build_upstream_request(&split.request, base_request)?; + let public_keys_for_tracking = split.request.public_keys.clone(); + + let client = self.client.clone(); + let sentry_url = split.upstream.sentry_url; + let http_timeout = self.timeouts.http_timeout_secs as u64; + let cell_name = split.cell_name; + let public_keys = split.request.public_keys; + + let abort_handle = join_set.spawn(async move { + let result = send_to_upstream(&client, &sentry_url, request, http_timeout).await; + + UpstreamTaskResult { + cell_name, + public_keys, + result, + } + }); + + task_keys.insert(abort_handle.id(), public_keys_for_tracking); + } + + Ok(SpawnedTasks { + join_set, + task_keys, + }) + } + + /// Builds an HTTP request to send to an upstream Sentry instance. + /// + /// Copies method, URI, version, and headers from the original request, + /// but replaces the body with the split request data. + fn build_upstream_request( + &self, + split_request: &ProjectConfigsRequest, + base_request: &Request<()>, + ) -> Result>, IngestRouterError> { + let request_body = split_request.to_bytes().map_err(|e| { + IngestRouterError::InternalError(format!("Failed to serialize request: {e}")) + })?; + + let mut req_builder = Request::builder() + .method(base_request.method()) + .uri(base_request.uri()) + .version(base_request.version()); + + for (name, value) in base_request.headers() { + req_builder = req_builder.header(name, value); + } + + req_builder.body(Full::new(request_body)).map_err(|e| { + IngestRouterError::InternalError(format!("Failed to build HTTP request: {e}")) + }) + } + + /// Collects results with adaptive timeout strategy using state machine. + async fn collect_results(&self, spawned_tasks: SpawnedTasks, cells: &Cells) -> MergedResults { + let SpawnedTasks { + mut join_set, + mut task_keys, + } = spawned_tasks; + let mut results = MergedResults::new(); + let mut extra_by_cell = HashMap::new(); + let mut headers_by_cell = HashMap::new(); + + let mut state = CollectionState::WaitingForFirst; + let deadline = tokio::time::sleep_until(state.current_deadline(&self.timeouts)); + tokio::pin!(deadline); + + loop { + tokio::select! { + Some(join_result) = join_set.join_next_with_id() => { + if let Some((cell_name, extra, headers, request_succeeded)) = + self.handle_task_completion(join_result, &mut task_keys, &mut results) + { + extra_by_cell.insert(cell_name.clone(), extra); + headers_by_cell.insert(cell_name, headers); + + if request_succeeded && matches!(state, CollectionState::WaitingForFirst) { + state.transition_to_subsequent(); + deadline.as_mut().reset(state.current_deadline(&self.timeouts)); + } + } + } + _ = &mut deadline => { + let remaining = join_set.len(); + match state { + CollectionState::WaitingForFirst => { + tracing::error!( + "Initial timeout reached with no successful responses, aborting {} tasks", + remaining + ); + } + CollectionState::CollectingRemaining { .. } => { + tracing::debug!( + "Subsequent deadline reached after first success, aborting {} remaining tasks", + remaining + ); + } + } + join_set.abort_all(); + break; + } + else => { + tracing::debug!("All tasks completed"); + break; + } + } + } + + while let Some(result) = join_set.join_next_with_id().await { + let task_id = match result { + Ok((id, _)) => id, + Err(e) => e.id(), + }; + if let Some(keys) = task_keys.remove(&task_id) { + tracing::debug!("Adding {} keys from aborted task to pending", keys.len()); + results.add_pending_keys(keys); + } + } + + if let Some((extra, headers)) = cells.cell_list.iter().find_map(|name| { + extra_by_cell + .remove(name) + .map(|e| (e, headers_by_cell.remove(name).unwrap_or_default())) + }) { + results.extra_fields = extra; + results.http_headers = headers; + } + + results + } + + /// Handles the completion of a single upstream task. + /// + /// Processes task join results, handles failures, and extracts response metadata. + /// Returns cell name, extra fields, headers, and success status for successful requests. + /// Failed tasks have their keys added to pending. + fn handle_task_completion( + &self, + join_result: Result<(tokio::task::Id, UpstreamTaskResult), tokio::task::JoinError>, + task_keys: &mut HashMap>, + results: &mut MergedResults, + ) -> Option<( + String, + HashMap, + hyper::header::HeaderMap, + bool, + )> { + let (task_id, upstream_result) = match join_result { + Ok((id, result)) => (id, result), + Err(e) => { + tracing::error!("Task failed: {e}"); + if let Some(keys) = task_keys.remove(&e.id()) { + results.add_pending_keys(keys); + } + return None; + } + }; + + task_keys.remove(&task_id); + + let request_succeeded = upstream_result.result.is_ok(); + let (cell_name, extra, headers) = self.process_result(upstream_result, results)?; + Some((cell_name, extra, headers, request_succeeded)) + } + + fn process_result( + &self, + upstream_result: UpstreamTaskResult, + results: &mut MergedResults, + ) -> Option<( + String, + HashMap, + hyper::header::HeaderMap, + )> { + let cell_name = upstream_result.cell_name; + + let Ok(response) = upstream_result.result else { + results.add_pending_keys(upstream_result.public_keys); + return None; + }; + + let (parts, body) = response.into_parts(); + + match ProjectConfigsResponse::from_bytes(&body) { + Ok(data) => { + results.merge_project_configs(data.project_configs); + if let Some(pending) = data.pending_keys { + results.add_pending_keys(pending); + } + Some((cell_name, data.extra_fields, parts.headers)) + } + Err(e) => { + tracing::error!(error = %e, "Failed to parse response"); + results.add_pending_keys(upstream_result.public_keys); + None + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::CellConfig; + use crate::locale::Locales; + use hyper::Method; + use hyper::service::service_fn; + use hyper_util::rt::TokioExecutor; + use std::convert::Infallible; + use std::sync::Arc; + use tokio::net::TcpListener; + use tokio::sync::Mutex; + use url::Url; + + /// Start a mock HTTP server that responds with custom data + async fn start_mock_server(response_fn: F) -> u16 + where + F: Fn() -> ProjectConfigsResponse + Send + Sync + 'static, + { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let response_fn = Arc::new(Mutex::new(response_fn)); + + tokio::spawn(async move { + loop { + let (stream, _) = listener.accept().await.unwrap(); + let io = hyper_util::rt::TokioIo::new(stream); + let response_fn = response_fn.clone(); + + tokio::spawn(async move { + let service = service_fn(move |_req: Request| { + let response_fn = response_fn.clone(); + async move { + let response = (response_fn.lock().await)(); + let json = serde_json::to_vec(&response).unwrap(); + Ok::<_, Infallible>(Response::new(Full::new(Bytes::from(json)))) + } + }); + + let _ = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) + .serve_connection(io, service) + .await; + }); + } + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + port + } + + fn test_cells(ports: Vec<(&str, u16)>) -> Cells { + let cell_configs: Vec = ports + .into_iter() + .map(|(name, port)| CellConfig { + name: name.to_string(), + sentry_url: Url::parse(&format!("http://127.0.0.1:{}", port)).unwrap(), + relay_url: Url::parse(&format!("http://127.0.0.1:{}", port)).unwrap(), + }) + .collect(); + + let mut locales_map = HashMap::new(); + locales_map.insert("test".to_string(), cell_configs); + Locales::new(locales_map).get_cells("test").unwrap().clone() + } + + fn test_executor() -> UpstreamTaskExecutor { + let connector = HttpConnector::new(); + let client = Client::builder(TokioExecutor::new()).build(connector); + let timeouts = RelayTimeouts { + http_timeout_secs: 5, + task_initial_timeout_secs: 10, + task_subsequent_timeout_secs: 2, + }; + UpstreamTaskExecutor::new(client, timeouts) + } + + fn test_split_request(cells: &Cells, cell_name: &str, keys: Vec) -> SplitRequest { + let upstream = cells.cell_to_upstreams.get(cell_name).unwrap().clone(); + SplitRequest { + cell_name: cell_name.to_string(), + upstream, + request: ProjectConfigsRequest { + public_keys: keys, + extra_fields: HashMap::new(), + }, + } + } + + #[tokio::test] + async fn test_multiple_upstreams_all_succeed() { + // Mock server 1 returns config for key1 + let port1 = start_mock_server(|| { + let mut resp = ProjectConfigsResponse { + project_configs: HashMap::new(), + pending_keys: None, + extra_fields: HashMap::new(), + }; + resp.project_configs.insert( + "key1".to_string(), + serde_json::json!({"disabled": false, "slug": "project1"}), + ); + resp.extra_fields + .insert("global".to_string(), serde_json::json!({"from": "cell1"})); + resp + }) + .await; + + // Mock server 2 returns config for key2 + let port2 = start_mock_server(|| { + let mut resp = ProjectConfigsResponse { + project_configs: HashMap::new(), + pending_keys: None, + extra_fields: HashMap::new(), + }; + resp.project_configs.insert( + "key2".to_string(), + serde_json::json!({"disabled": false, "slug": "project2"}), + ); + resp.extra_fields + .insert("global".to_string(), serde_json::json!({"from": "cell2"})); + resp + }) + .await; + + let cells = test_cells(vec![("cell1", port1), ("cell2", port2)]); + let executor = test_executor(); + + let split_requests = vec![ + test_split_request(&cells, "cell1", vec!["key1".to_string()]), + test_split_request(&cells, "cell2", vec!["key2".to_string()]), + ]; + + let base_request = Request::builder() + .method(Method::POST) + .uri("/test") + .body(()) + .unwrap(); + + let results = executor + .execute(split_requests, &base_request, &cells) + .await + .unwrap(); + + // Both configs should be merged + assert_eq!(results.project_configs.len(), 2); + assert!(results.project_configs.contains_key("key1")); + assert!(results.project_configs.contains_key("key2")); + + // Should select extra fields from cell1 (highest priority) + assert_eq!( + results.extra_fields.get("global"), + Some(&serde_json::json!({"from": "cell1"})) + ); + + assert!(results.pending_keys.is_empty()); + } + + #[tokio::test] + async fn test_partial_failure() { + // Mock server 1 succeeds + let port1 = start_mock_server(|| { + let mut resp = ProjectConfigsResponse { + project_configs: HashMap::new(), + pending_keys: None, + extra_fields: HashMap::new(), + }; + resp.project_configs + .insert("key1".to_string(), serde_json::json!({"disabled": false})); + resp + }) + .await; + + // cell2 will fail (invalid port) + let cells = test_cells(vec![("cell1", port1), ("cell2", 1)]); + let executor = test_executor(); + + let split_requests = vec![ + test_split_request(&cells, "cell1", vec!["key1".to_string()]), + test_split_request( + &cells, + "cell2", + vec!["key2".to_string(), "key3".to_string()], + ), + ]; + + let base_request = Request::builder() + .method(Method::POST) + .uri("/test") + .body(()) + .unwrap(); + + let results = executor + .execute(split_requests, &base_request, &cells) + .await + .unwrap(); + + // Successful config from cell1 + assert_eq!(results.project_configs.len(), 1); + assert!(results.project_configs.contains_key("key1")); + + // Failed keys from cell2 should be in pending + assert_eq!(results.pending_keys.len(), 2); + assert!(results.pending_keys.contains(&"key2".to_string())); + assert!(results.pending_keys.contains(&"key3".to_string())); + } + + #[tokio::test] + async fn test_upstream_returns_pending_keys() { + let port = start_mock_server(|| { + let mut resp = ProjectConfigsResponse { + project_configs: HashMap::new(), + pending_keys: None, + extra_fields: HashMap::new(), + }; + resp.project_configs + .insert("key1".to_string(), serde_json::json!({"disabled": false})); + // Upstream says key2 is pending + resp.pending_keys = Some(vec!["key2".to_string()]); + resp + }) + .await; + + let cells = test_cells(vec![("cell1", port)]); + let executor = test_executor(); + + let split_requests = vec![test_split_request( + &cells, + "cell1", + vec!["key1".to_string(), "key2".to_string()], + )]; + + let base_request = Request::builder() + .method(Method::POST) + .uri("/test") + .body(()) + .unwrap(); + + let results = executor + .execute(split_requests, &base_request, &cells) + .await + .unwrap(); + + // key1 in configs + assert!(results.project_configs.contains_key("key1")); + + // key2 in pending (from upstream) + assert_eq!(results.pending_keys.len(), 1); + assert!(results.pending_keys.contains(&"key2".to_string())); + } + + #[tokio::test] + async fn test_empty_split_requests() { + let cells = test_cells(vec![("cell1", 8080)]); + let executor = test_executor(); + + let split_requests = vec![]; + let base_request = Request::builder() + .method(Method::POST) + .uri("/test") + .body(()) + .unwrap(); + + let results = executor + .execute(split_requests, &base_request, &cells) + .await + .unwrap(); + + assert!(results.project_configs.is_empty()); + assert!(results.pending_keys.is_empty()); + assert!(results.extra_fields.is_empty()); + } +} diff --git a/ingest-router/src/router.rs b/ingest-router/src/router.rs index 015c4a5..ed13a9d 100644 --- a/ingest-router/src/router.rs +++ b/ingest-router/src/router.rs @@ -1,5 +1,6 @@ use crate::config::{HandlerAction, Route}; use crate::errors::IngestRouterError; +use crate::relay_project_config_handler::RelayProjectConfigsHandler; use http_body_util::{BodyExt, Full, combinators::BoxBody}; use hyper::body::Bytes; use hyper::{Request, Response, StatusCode}; @@ -9,13 +10,15 @@ use std::sync::Arc; #[derive(Clone)] pub struct Router { routes: Arc>, + handler: Arc, } impl Router { - /// Creates a new router with the given routes - pub fn new(routes: Vec) -> Self { + /// Creates a new router with the given routes and handler + pub fn new(routes: Vec, handler: RelayProjectConfigsHandler) -> Self { Self { routes: Arc::new(routes), + handler: Arc::new(handler), } } @@ -26,12 +29,21 @@ impl Router { ) -> Result>, IngestRouterError> where B: hyper::body::Body + Send + 'static, + B::Data: Send, + B::Error: std::error::Error + Send + Sync + 'static, { // Find a matching route match self.find_matching_route(&req) { Some(action) => { tracing::debug!(action = ?action, "Matched route"); - self.handle_action(req, action).await + // Convert errors to HTTP responses with proper status codes + match self.handle_action(req, action).await { + Ok(response) => Ok(response), + Err(e) => { + tracing::error!(error = %e, "Handler error"); + Ok(e.into_response()) + } + } } None => { tracing::warn!( @@ -90,37 +102,22 @@ impl Router { true } - /// Handles a matched action (placeholder for now) + /// Handles a matched action by calling the appropriate handler async fn handle_action( &self, - _req: Request, + req: Request, action: &HandlerAction, ) -> Result>, IngestRouterError> where B: hyper::body::Body + Send + 'static, + B::Data: Send, + B::Error: std::error::Error + Send + Sync + 'static, { - // TODO: Implement actual handler logic - // This will need to be passed to a separate Handler interface that has access to - // locale_to_cells mapping and upstreams - - // For now, just return a debug response showing which handler would be called - let response_body = match action { + match action { HandlerAction::RelayProjectConfigs(args) => { - format!( - "Route matched!\nHandler: RelayProjectConfigs\nLocale: {}\n", - args.locale - ) + self.handler.handle(&args.locale, req).await } - }; - - Response::builder() - .status(StatusCode::OK) - .body( - Full::new(response_body.into()) - .map_err(|e| match e {}) - .boxed(), - ) - .map_err(|e| IngestRouterError::InternalError(format!("Failed to build response: {e}"))) + } } /// Handles an unmatched request @@ -142,13 +139,44 @@ impl Router { mod tests { use super::*; use crate::config::{HttpMethod, Match, RelayProjectConfigsArgs, Route}; - use http_body_util::Empty; use hyper::body::Bytes; use hyper::header::HOST; use hyper::{Method, Request}; fn test_router(routes: Vec) -> Router { - Router::new(routes) + use crate::config::{CellConfig, RelayTimeouts}; + use std::collections::HashMap; + use url::Url; + + // Create test locales + let mut locales = HashMap::new(); + locales.insert( + "us".to_string(), + vec![CellConfig { + name: "us-cell-1".to_string(), + relay_url: Url::parse("http://us-relay.example.com:8080").unwrap(), + sentry_url: Url::parse("http://us-sentry.example.com:8080").unwrap(), + }], + ); + locales.insert( + "de".to_string(), + vec![CellConfig { + name: "de-cell-1".to_string(), + relay_url: Url::parse("http://de-relay.example.com:8080").unwrap(), + sentry_url: Url::parse("http://de-sentry.example.com:8080").unwrap(), + }], + ); + locales.insert( + "local".to_string(), + vec![CellConfig { + name: "local-cell".to_string(), + relay_url: Url::parse("http://local-relay.example.com:8080").unwrap(), + sentry_url: Url::parse("http://local-sentry.example.com:8080").unwrap(), + }], + ); + + let handler = RelayProjectConfigsHandler::new(locales, RelayTimeouts::default()); + Router::new(routes, handler) } fn test_request( @@ -156,13 +184,20 @@ mod tests { path: &str, host: Option<&str>, ) -> Request> { + // Create a valid relay project configs request body with empty publicKeys + // so we don't need to contact upstreams in routing tests + let request_body = serde_json::json!({ + "publicKeys": [] + }); + let body_str = serde_json::to_string(&request_body).unwrap(); + let mut builder = Request::builder().method(method).uri(path); if let Some(h) = host { builder = builder.header(HOST, h); } builder .body( - Empty::::new() + http_body_util::Full::new(Bytes::from(body_str)) .map_err(|never| match never {}) .boxed(), ) @@ -254,4 +289,29 @@ mod tests { let response = router.route(req).await.unwrap(); assert_eq!(response.status(), StatusCode::NOT_FOUND); } + + #[tokio::test] + async fn test_error_converted_to_response() { + use std::collections::HashMap; + + // Create a router with a route that references a non-existent locale + let handler = RelayProjectConfigsHandler::new( + HashMap::new(), // Empty locales + crate::config::RelayTimeouts::default(), + ); + let routes = vec![test_route( + None, + Some("/test".to_string()), + None, + "nonexistent", // This locale doesn't exist + )]; + let router = Router::new(routes, handler); + + // This should trigger an InternalError (locale not found) and return 500 + let req = test_request(Method::POST, "/test", None); + let response = router.route(req).await.unwrap(); + + // Verify the error was converted to a proper HTTP response + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); + } } diff --git a/locator/Cargo.toml b/locator/Cargo.toml index b0bd4c7..7c8be1a 100644 --- a/locator/Cargo.toml +++ b/locator/Cargo.toml @@ -13,7 +13,7 @@ google-cloud-storage = "1.4.0" metrics = { workspace = true } moka = { version = "0.12.11", features = ["sync"] } serde = { workspace = true } -serde_json = "1.0.145" +serde_json = { workspace = true } reqwest = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 7f786ac..406a197 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -10,3 +10,4 @@ hyper = { workspace = true } hyper-util = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +url = { workspace = true }