From 2315715b696e0446e9ca6bd13bfa1d6ab5349fb4 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 18 Dec 2025 13:04:15 -0800 Subject: [PATCH 01/18] ingest-router: Simplify handler type This chnage simplifies the handler trait so it is no longer generic. The generics solution does not work well for our use case since generics requires the concrete type to be known at compile time, however in our case the router picks the handler dynamically at runtime based on inspecting the incoming route. With Arc, we can more easily dyanamically dispatch to the appropriate handler based on the matched route. Each handler receives the raw requests/responses and is responsible for splitting and merging them as needed. --- ingest-router/src/api.rs | 1 + ingest-router/src/api/health.rs | 34 +++ ingest-router/src/errors.rs | 3 + ingest-router/src/handler.rs | 39 ++- ingest-router/src/lib.rs | 45 ++- ingest-router/src/locale.rs | 9 +- ingest-router/src/project_config/handler.rs | 286 ++++++++++++------- ingest-router/src/project_config/protocol.rs | 8 - ingest-router/src/router.rs | 82 +++--- 9 files changed, 323 insertions(+), 184 deletions(-) create mode 100644 ingest-router/src/api.rs create mode 100644 ingest-router/src/api/health.rs diff --git a/ingest-router/src/api.rs b/ingest-router/src/api.rs new file mode 100644 index 0000000..43a7c76 --- /dev/null +++ b/ingest-router/src/api.rs @@ -0,0 +1 @@ +pub mod health; diff --git a/ingest-router/src/api/health.rs b/ingest-router/src/api/health.rs new file mode 100644 index 0000000..2e1d178 --- /dev/null +++ b/ingest-router/src/api/health.rs @@ -0,0 +1,34 @@ +use crate::errors::IngestRouterError; +use crate::handler::{CellId, Handler, HandlerBody, SplitMetadata}; +use crate::locale::Cells; +use async_trait::async_trait; +use hyper::{Request, Response}; + +/// This endpoint returns success if any one upstream is available. +/// Synapse should continue to operate even if one cell is down. +pub struct HealthHandler; + +impl HealthHandler { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl Handler for HealthHandler { + async fn split_request( + &self, + _request: Request, + _cells: &Cells, + ) -> Result<(Vec<(CellId, Request)>, SplitMetadata), IngestRouterError> { + unimplemented!(); + } + + fn merge_responses( + &self, + _responses: Vec<(CellId, Result, IngestRouterError>)>, + _metadata: SplitMetadata, + ) -> Response { + unimplemented!(); + } +} diff --git a/ingest-router/src/errors.rs b/ingest-router/src/errors.rs index b546bd4..5bc4078 100644 --- a/ingest-router/src/errors.rs +++ b/ingest-router/src/errors.rs @@ -48,4 +48,7 @@ pub enum IngestRouterError { #[error("Locator client error: {0}")] LocatorClientError(#[from] locator::client::ClientError), + + #[error("Serde error: {0}")] + SerdeError(#[from] serde_json::Error), } diff --git a/ingest-router/src/handler.rs b/ingest-router/src/handler.rs index e39c716..56208dc 100644 --- a/ingest-router/src/handler.rs +++ b/ingest-router/src/handler.rs @@ -1,47 +1,44 @@ use crate::errors::IngestRouterError; use crate::locale::Cells; use async_trait::async_trait; -use serde::{Serialize, de::DeserializeOwned}; +use http_body_util::combinators::BoxBody; +use hyper::body::Bytes; +use hyper::{Request, Response}; +use std::any::Any; pub type CellId = String; +pub type HandlerBody = BoxBody; +pub type SplitMetadata = Box; /// Handler for endpoints that split requests across cells and merge results /// /// The handler implements endpoint-specific logic: /// - How to split a request into per-cell requests /// - How to merge results from multiple cells -/// ``` #[async_trait] -pub trait Handler: Send + Sync -where - Req: Serialize + DeserializeOwned + Send, - Res: Serialize + DeserializeOwned + Send, -{ - /// Metadata that flows from split_requests to merge_results - /// - /// This allows passing data from the split phase to the merge phase. - /// Some use cases: - /// - Pending keys that couldn't be routed (e.g., `Vec`) - type SplitMetadata: Send; +pub trait Handler: Send + Sync { + /// Returns the type name of this handler for test assertions + fn type_name(&self) -> &'static str { + std::any::type_name::() + } /// Split one request into multiple per-cell requests /// /// This method routes the request data to appropriate cells and builds /// per-cell requests that will be sent to upstreams. - async fn split_requests( + async fn split_request( &self, - request: Req, + request: Request, cells: &Cells, - ) -> Result<(Vec<(CellId, Req)>, Self::SplitMetadata), IngestRouterError>; + ) -> Result<(Vec<(CellId, Request)>, SplitMetadata), IngestRouterError>; /// Merge results from multiple cells into a single response /// /// This method combines responses from successful cells, handles failures, /// and incorporates metadata from the split phase. - /// - fn merge_results( + fn merge_responses( &self, - results: Vec>, - metadata: Self::SplitMetadata, - ) -> Res; + responses: Vec<(CellId, Result, IngestRouterError>)>, + metadata: SplitMetadata, + ) -> Response; } diff --git a/ingest-router/src/lib.rs b/ingest-router/src/lib.rs index 3d1c010..879902f 100644 --- a/ingest-router/src/lib.rs +++ b/ingest-router/src/lib.rs @@ -1,3 +1,4 @@ +pub mod api; pub mod config; pub mod errors; pub mod handler; @@ -24,7 +25,7 @@ pub async fn run(config: config::Config) -> Result<(), IngestRouterError> { let locator = Locator::new(config.locator.to_client_config()).await?; let ingest_router_service = IngestRouterService { - router: router::Router::new(config.routes, locator), + router: router::Router::new(config.routes, config.locales, locator), }; let router_task = run_http_service( &config.listener.host, @@ -51,15 +52,26 @@ where Pin> + Send + 'static>>; fn call(&self, req: Request) -> Self::Future { - let handler = self - .router - .resolve(&req) - .and_then(|action| self.router.get_handler(action)); + let maybe_handler = self.router.resolve(&req); + + match maybe_handler { + Some((handler, cells)) => { + // Convert Request to Request + let (parts, body) = req.into_parts(); + let handler_body = body + .map_err(|e| IngestRouterError::RequestBodyError(e.to_string())) + .boxed(); + let handler_req = Request::from_parts(parts, handler_body); - match handler { - Some(_handler) => { // TODO: Placeholder response Box::pin(async move { + let (split, _metadata) = + handler.split_request(handler_req, &cells).await.unwrap(); + + for (cell_id, req) in split { + println!("Cell: {}, URI: {}", cell_id, req.uri()); + } + Ok(Response::new( Full::new("ok\n".into()).map_err(|e| match e {}).boxed(), )) @@ -74,13 +86,15 @@ where mod tests { use super::*; use crate::config::{HandlerAction, HttpMethod, Match, Route}; - use http_body_util::Empty; use hyper::Method; use hyper::body::Bytes; use hyper::header::HOST; + use crate::config::CellConfig; use locator::config::LocatorDataType; use locator::locator::Locator as LocatorService; + use std::collections::HashMap; + use url::Url; use crate::testutils::get_mock_provider; use std::sync::Arc; @@ -97,6 +111,15 @@ mod tests { locale: "us".to_string(), }]; + let locales = HashMap::from([( + "us".to_string(), + vec![CellConfig { + id: "us1".to_string(), + sentry_url: Url::parse("https://sentry.io/us1").unwrap(), + relay_url: Url::parse("https://relay.io/us1").unwrap(), + }], + )]); + let (_dir, provider) = get_mock_provider().await; let locator_service = LocatorService::new( LocatorDataType::ProjectKey, @@ -107,7 +130,7 @@ mod tests { let locator = Locator::from_in_process_service(locator_service); let service = IngestRouterService { - router: router::Router::new(routes_config, locator), + router: router::Router::new(routes_config, locales, locator), }; // Project configs request @@ -116,8 +139,8 @@ mod tests { .uri("/api/0/relays/projectconfigs/") .header(HOST, "us.sentry.io") .body( - Empty::::new() - .map_err(|never| match never {}) + Full::new(Bytes::from(r#"{"publicKeys": ["test-key"]}"#)) + .map_err(|e| match e {}) .boxed(), ) .unwrap(); diff --git a/ingest-router/src/locale.rs b/ingest-router/src/locale.rs index f00770d..eaccd6f 100644 --- a/ingest-router/src/locale.rs +++ b/ingest-router/src/locale.rs @@ -24,6 +24,7 @@ //! during request processing. use std::collections::HashMap; +use std::sync::Arc; use url::Url; use crate::config::CellConfig; @@ -79,7 +80,7 @@ impl Cells { #[derive(Clone, Debug)] pub struct Locales { /// Mapping from locale to cells - locale_to_cells: HashMap, + locale_to_cells: HashMap>, } impl Locales { @@ -89,7 +90,7 @@ impl Locales { let locale_to_cells = locales .into_iter() .map(|(locale, cells)| { - let cells = Cells::from_config(cells); + let cells = Arc::new(Cells::from_config(cells)); (locale, cells) }) .collect(); @@ -98,8 +99,8 @@ impl Locales { } /// Get the cells for a specific locale - pub fn get_cells(&self, locale: &str) -> Option<&Cells> { - self.locale_to_cells.get(locale) + pub fn get_cells(&self, locale: &str) -> Option> { + self.locale_to_cells.get(locale).cloned() } } diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index 9a596fc..23bdbb7 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -1,13 +1,21 @@ //! Handler implementation for the Relay Project Configs endpoint use crate::errors::IngestRouterError; -use crate::handler::{CellId, Handler}; +use crate::handler::{CellId, Handler, HandlerBody, SplitMetadata}; use crate::locale::Cells; use crate::project_config::protocol::{ProjectConfigsRequest, ProjectConfigsResponse}; use async_trait::async_trait; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use hyper::header::{CONTENT_LENGTH, TRANSFER_ENCODING}; +use hyper::{Request, Response}; use locator::client::Locator; +use shared::http::filter_hop_by_hop; use std::collections::{HashMap, HashSet}; +/// Pending public keys that couldn't be routed to any cell +type ProjectConfigsMetadata = Vec; + /// Handler for the Relay Project Configs endpoint /// /// Routes public keys to cells using the locator service, splits requests @@ -24,17 +32,23 @@ impl ProjectConfigsHandler { } #[async_trait] -impl Handler for ProjectConfigsHandler { - /// Pending public keys that couldn't be routed to any cell - type SplitMetadata = Vec; - - async fn split_requests( +impl Handler for ProjectConfigsHandler { + async fn split_request( &self, - request: ProjectConfigsRequest, + request: Request, cells: &Cells, - ) -> Result<(Vec<(CellId, ProjectConfigsRequest)>, Vec), IngestRouterError> { - let public_keys = request.public_keys; - let extra_fields = request.extra_fields; + ) -> Result<(Vec<(CellId, Request)>, SplitMetadata), IngestRouterError> { + // Split request into parts first, then collect body + let (mut parts, body) = request.into_parts(); + let bytes = body + .collect() + .await + .map_err(|e| IngestRouterError::RequestBodyError(e.to_string()))? + .to_bytes(); + + let parsed_request = ProjectConfigsRequest::from_bytes(&bytes)?; + let public_keys = parsed_request.public_keys; + let extra_fields = parsed_request.extra_fields; let cell_ids: HashSet<&String> = cells.cell_list.iter().collect(); @@ -58,7 +72,6 @@ impl Handler for ProjectConfigsHa split.entry(cell_id).or_default().push(public_key); } Err(e) => { - // Locator errors, add to pending tracing::error!( public_key = %public_key, error = ?e, @@ -69,28 +82,36 @@ impl Handler for ProjectConfigsHa } } + // Prepare headers for per-cell requests + filter_hop_by_hop(&mut parts.headers, parts.version); + parts.headers.remove(CONTENT_LENGTH); + parts.headers.remove(TRANSFER_ENCODING); + // Build per-cell requests - let cell_requests: Vec<(CellId, ProjectConfigsRequest)> = split + let cell_requests = split .into_iter() .map(|(cell_id, keys)| { - ( - cell_id, - ProjectConfigsRequest { - public_keys: keys, - extra_fields: extra_fields.clone(), - }, - ) + let cell_request = ProjectConfigsRequest { + public_keys: keys, + extra_fields: extra_fields.clone(), + }; + let body: HandlerBody = Full::new(cell_request.to_bytes().unwrap()) + .map_err(|e| match e {}) + .boxed(); + let req = Request::from_parts(parts.clone(), body); + (cell_id, req) }) .collect(); - Ok((cell_requests, pending)) + let metadata = Box::new(pending); + Ok((cell_requests, metadata)) } - fn merge_results( + fn merge_responses( &self, - results: Vec>, - pending_from_split: Vec, - ) -> ProjectConfigsResponse { + responses: Vec<(CellId, Result, IngestRouterError>)>, + metadata: SplitMetadata, + ) -> Response { // TODO: The current implementation does not handle errors from the results // parameter. The edge case to be handled are if any of the upstreams failed // to return a response for whatever reason. In scenarios like this, the @@ -100,32 +121,36 @@ impl Handler for ProjectConfigsHa let mut merged = ProjectConfigsResponse::new(); - // Add pending keys from split phase - merged.pending_keys.extend(pending_from_split); + // Downcast metadata to our specific type + if let Ok(project_metadata) = metadata.downcast::() { + merged.pending_keys.extend(*project_metadata); + } - // Results are provided pre-sorted by cell priority (highest first) - // The executor ensures results are ordered so we can use the first successful response - // for extra_fields and headers. - // Failed cells are handled by the executor adding their keys to pending_from_split. - let mut iter = results.into_iter().flatten(); + // Filter to successful responses only + let mut iter = responses + .into_iter() + .filter_map(|(cell_id, result)| result.ok().map(|r| (cell_id, r))); // Handle first successful result (highest priority) // Gets extra_fields, headers, configs, and pending - if let Some((_cell_id, response)) = iter.next() { - merged.project_configs.extend(response.project_configs); - merged.pending_keys.extend(response.pending_keys); - merged.extra_fields.extend(response.extra_fields); - merged.http_headers = response.http_headers; + // TODO: Actually parse the response body (requires async, so we'd need to + // restructure or parse bodies before calling merge_responses) + if let Some((_cell_id, _response)) = iter.next() { + // For now, we can't parse the body here since this isn't async + // The executor should parse response bodies before passing to merge } - // Handle remaining results - // Only get configs and pending (not extra_fields or headers) - for (_cell_id, response) in iter { - merged.project_configs.extend(response.project_configs); - merged.pending_keys.extend(response.pending_keys); + // Build the final response using into_response which handles serialization + match merged.into_response() { + Ok(response) => response, + Err(e) => { + tracing::error!(error = ?e, "Failed to build merged response"); + Response::builder() + .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) + .body(Full::new(Bytes::from("{}")).map_err(|e| match e {}).boxed()) + .unwrap() + } } - - merged } } @@ -180,8 +205,22 @@ mod tests { Locator::from_in_process_service(service) } + fn build_request(body: ProjectConfigsRequest) -> Request { + let bytes = body.to_bytes().unwrap(); + Request::builder() + .method("POST") + .uri("/api/0/relays/projectconfigs/") + .body(Full::new(bytes).map_err(|e| match e {}).boxed()) + .unwrap() + } + + async fn parse_request_body(req: Request) -> ProjectConfigsRequest { + let bytes = req.into_body().collect().await.unwrap().to_bytes(); + ProjectConfigsRequest::from_bytes(&bytes).unwrap() + } + #[tokio::test] - async fn test_split_requests_multiple_cells() { + async fn test_split_request_multiple_cells() { let key_to_cell = HashMap::from([ ("key1".to_string(), "us1".to_string()), ("key2".to_string(), "us2".to_string()), @@ -222,43 +261,79 @@ mod tests { let mut extra = HashMap::new(); extra.insert("global".to_string(), serde_json::json!(true)); - let request = ProjectConfigsRequest { + let request = build_request(ProjectConfigsRequest { public_keys: vec!["key1".to_string(), "key2".to_string(), "key3".to_string()], extra_fields: extra.clone(), - }; + }); - let (cell_requests, pending) = handler.split_requests(request, cells).await.unwrap(); + let (cell_requests, _metadata) = handler.split_request(request, &cells).await.unwrap(); // Should have 2 cell requests (us1 and us2) assert_eq!(cell_requests.len(), 2); - assert_eq!(pending.len(), 0); - // Find us1 and us2 requests - let us1_req = cell_requests - .iter() + // Find us1 and us2 requests and parse their bodies + let (us1_id, us1_req) = cell_requests + .into_iter() .find(|(id, _)| id == "us1") - .map(|(_, req)| req) .unwrap(); - let us2_req = cell_requests - .iter() + let us1_body = parse_request_body(us1_req).await; + + let key_to_cell = HashMap::from([ + ("key1".to_string(), "us1".to_string()), + ("key2".to_string(), "us2".to_string()), + ("key3".to_string(), "us1".to_string()), + ]); + let locator = create_test_locator(key_to_cell); + for _ in 0..50 { + if locator.is_ready() { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + let handler = ProjectConfigsHandler::new(locator); + let locales = HashMap::from([( + "us".to_string(), + vec![ + CellConfig { + id: "us1".to_string(), + sentry_url: Url::parse("http://sentry-us1:8080").unwrap(), + relay_url: Url::parse("http://relay-us1:8090").unwrap(), + }, + CellConfig { + id: "us2".to_string(), + sentry_url: Url::parse("http://sentry-us2:8080").unwrap(), + relay_url: Url::parse("http://relay-us2:8090").unwrap(), + }, + ], + )]); + let locales_obj = Locales::new(locales); + let cells = locales_obj.get_cells("us").unwrap(); + let request2 = build_request(ProjectConfigsRequest { + public_keys: vec!["key1".to_string(), "key2".to_string(), "key3".to_string()], + extra_fields: extra.clone(), + }); + let (cell_requests2, _) = handler.split_request(request2, &cells).await.unwrap(); + let (_, us2_req) = cell_requests2 + .into_iter() .find(|(id, _)| id == "us2") - .map(|(_, req)| req) .unwrap(); + let us2_body = parse_request_body(us2_req).await; // Verify us1 has key1 and key3 - assert_eq!(us1_req.public_keys.len(), 2); - assert!(us1_req.public_keys.contains(&"key1".to_string())); - assert!(us1_req.public_keys.contains(&"key3".to_string())); - assert_eq!(us1_req.extra_fields, extra); + assert_eq!(us1_id, "us1"); + assert_eq!(us1_body.public_keys.len(), 2); + assert!(us1_body.public_keys.contains(&"key1".to_string())); + assert!(us1_body.public_keys.contains(&"key3".to_string())); + assert_eq!(us1_body.extra_fields, extra); // Verify us2 has key2 - assert_eq!(us2_req.public_keys.len(), 1); - assert!(us2_req.public_keys.contains(&"key2".to_string())); - assert_eq!(us2_req.extra_fields, extra); + assert_eq!(us2_body.public_keys.len(), 1); + assert!(us2_body.public_keys.contains(&"key2".to_string())); + assert_eq!(us2_body.extra_fields, extra); } #[tokio::test] - async fn test_split_requests_unknown_key_goes_to_pending() { + async fn test_split_request_unknown_key_goes_to_pending() { let key_to_cell = HashMap::from([("key1".to_string(), "us1".to_string())]); let locator = create_test_locator(key_to_cell); @@ -285,23 +360,33 @@ mod tests { let handler = ProjectConfigsHandler::new(locator); - let request = ProjectConfigsRequest { + let request = build_request(ProjectConfigsRequest { public_keys: vec!["key1".to_string(), "unknown_key".to_string()], extra_fields: HashMap::new(), - }; + }); - let (cell_requests, pending) = handler.split_requests(request, cells).await.unwrap(); + let (cell_requests, metadata) = handler.split_request(request, &cells).await.unwrap(); // Should have 1 cell request (us1 with key1) assert_eq!(cell_requests.len(), 1); assert_eq!(cell_requests[0].0, "us1"); - assert_eq!(cell_requests[0].1.public_keys, vec!["key1".to_string()]); + let body = parse_request_body(cell_requests.into_iter().next().unwrap().1).await; + assert_eq!(body.public_keys, vec!["key1".to_string()]); // Unknown key should be in pending metadata + let pending = metadata.downcast::().unwrap(); assert_eq!(pending.len(), 1); assert_eq!(pending[0], "unknown_key"); } + fn build_response(body: &serde_json::Value) -> Response { + let bytes = Bytes::from(serde_json::to_vec(body).unwrap()); + Response::builder() + .status(200) + .body(Full::new(bytes).map_err(|e| match e {}).boxed()) + .unwrap() + } + #[tokio::test] async fn test_merge_results_successful_cells() { let handler = ProjectConfigsHandler::new(create_test_locator(HashMap::new())); @@ -313,7 +398,7 @@ mod tests { }, "global": {"version": 1} }); - let response1 = serde_json::from_value(response1_json).unwrap(); + let response1 = build_response(&response1_json); // Create response from us2 with key2 and different global config let response2_json = serde_json::json!({ @@ -322,42 +407,34 @@ mod tests { }, "global": {"version": 2} }); - let response2 = serde_json::from_value(response2_json).unwrap(); + let response2 = build_response(&response2_json); - let results = vec![ - Ok(("us1".to_string(), response1)), - Ok(("us2".to_string(), response2)), + let results: Vec<(CellId, Result, IngestRouterError>)> = vec![ + ("us1".to_string(), Ok(response1)), + ("us2".to_string(), Ok(response2)), ]; - let merged = handler.merge_results(results, vec![]); - - // Should have configs from both cells - let json = serde_json::to_value(&merged).unwrap(); - assert_eq!(json["configs"].as_object().unwrap().len(), 2); - assert!(json["configs"].get("key1").is_some()); - assert!(json["configs"].get("key2").is_some()); + let metadata: SplitMetadata = Box::new(Vec::::new()); + let merged = handler.merge_responses(results, metadata); - // Should use global from first result (executor ensures proper ordering) - assert_eq!(json["global"]["version"], 1); + // The current implementation doesn't actually parse the response bodies + // (noted as TODO in the code), so we just verify we get a valid response + assert_eq!(merged.status(), 200); } #[tokio::test] - async fn test_merge_results_with_pending() { + async fn test_merge_responses_with_pending() { let handler = ProjectConfigsHandler::new(create_test_locator(HashMap::new())); - // Test all three sources of pending keys: - // 1. From split phase (routing failures, unknown keys) - // 2. From upstream response (async computation) - // 3. From failed cells (added by executor) + // Test pending keys from split phase (routing failures, unknown keys) - // Create response from us1 with successful config and upstream pending + // Create response from us1 with successful config let response1_json = serde_json::json!({ "configs": { "key1": {"slug": "project1"} - }, - "pending": ["key_upstream_pending"] + } }); - let response1 = serde_json::from_value(response1_json).unwrap(); + let response1 = build_response(&response1_json); // Create response from us2 with successful config let response2_json = serde_json::json!({ @@ -365,35 +442,32 @@ mod tests { "key2": {"slug": "project2"} } }); - let response2 = serde_json::from_value(response2_json).unwrap(); + let response2 = build_response(&response2_json); - let results = vec![ - Ok(("us1".to_string(), response1)), - Ok(("us2".to_string(), response2)), + let results: Vec<(CellId, Result, IngestRouterError>)> = vec![ + ("us1".to_string(), Ok(response1)), + ("us2".to_string(), Ok(response2)), ]; - // Pending from split phase (routing failures) and failed cells (executor-added) - let pending_from_split = vec![ + // Pending from split phase (routing failures) + let pending_from_split: ProjectConfigsMetadata = vec![ "key_routing_failed".to_string(), "key_from_failed_cell1".to_string(), "key_from_failed_cell2".to_string(), ]; - let merged = handler.merge_results(results, pending_from_split); - - let json = serde_json::to_value(&merged).unwrap(); + let metadata: SplitMetadata = Box::new(pending_from_split); + let merged = handler.merge_responses(results, metadata); - // Should have configs from both successful cells - assert_eq!(json["configs"].as_object().unwrap().len(), 2); - assert!(json["configs"].get("key1").is_some()); - assert!(json["configs"].get("key2").is_some()); + // Parse response body to check pending keys + let bytes = merged.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); - // Should have all pending keys from all three sources + // Should have pending keys from split phase let pending = json["pending"].as_array().unwrap(); - assert_eq!(pending.len(), 4); + assert_eq!(pending.len(), 3); assert!(pending.contains(&serde_json::json!("key_routing_failed"))); assert!(pending.contains(&serde_json::json!("key_from_failed_cell1"))); assert!(pending.contains(&serde_json::json!("key_from_failed_cell2"))); - assert!(pending.contains(&serde_json::json!("key_upstream_pending"))); } } diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index 8bd9a62..6313ce3 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -25,7 +25,6 @@ use std::collections::HashMap; /// "global": true /// } /// ``` -#[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProjectConfigsRequest { /// DSN public keys to fetch configs for. @@ -38,7 +37,6 @@ pub struct ProjectConfigsRequest { pub extra_fields: HashMap, } -#[allow(dead_code)] impl ProjectConfigsRequest { pub fn from_bytes(bytes: &Bytes) -> Result { serde_json::from_slice(bytes) @@ -134,12 +132,6 @@ impl ProjectConfigsResponse { } } -impl Default for ProjectConfigsResponse { - fn default() -> Self { - Self::new() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/ingest-router/src/router.rs b/ingest-router/src/router.rs index c15e95c..fd005ee 100644 --- a/ingest-router/src/router.rs +++ b/ingest-router/src/router.rs @@ -1,5 +1,8 @@ -use crate::config::{HandlerAction, Route}; -use crate::project_config::handler::ProjectConfigsHandler; +use crate::api::health::HealthHandler; +use crate::config::{CellConfig, HandlerAction, Route}; +use crate::handler::Handler; +use crate::locale::{Cells, Locales}; +use crate::project_config::ProjectConfigsHandler; use hyper::Request; use locator::client::Locator; use std::collections::HashMap; @@ -8,37 +11,43 @@ use std::sync::Arc; /// Router that matches incoming requests against configured routes pub struct Router { routes: Arc>, - // TODO: Fix type to be generic - action_to_handler: HashMap, + action_to_handler: HashMap>, + #[allow(dead_code)] + locales_to_cells: Locales, } impl Router { /// Creates a new router with the given routes - pub fn new(routes: Vec, locator: Locator) -> Self { - let mut action_to_handler = HashMap::new(); - - action_to_handler.insert( - HandlerAction::RelayProjectConfigs, - ProjectConfigsHandler::new(locator), - ); + pub fn new( + routes: Vec, + locales: HashMap>, + locator: Locator, + ) -> Self { + let action_to_handler = HashMap::from([ + ( + HandlerAction::RelayProjectConfigs, + Arc::new(ProjectConfigsHandler::new(locator)) as Arc, + ), + (HandlerAction::Health, Arc::new(HealthHandler::new())), + ]); Self { routes: Arc::new(routes), action_to_handler, + locales_to_cells: Locales::new(locales), } } /// Finds the first route that matches the incoming request - pub fn resolve(&self, req: &Request) -> Option<&HandlerAction> { + pub fn resolve(&self, req: &Request) -> Option<(Arc, Arc)> { self.routes .iter() .find(|route| self.matches_route(req, route)) - .map(|route| &route.action) - } - - // TODO: Fix return type so we can deal with other handler types here as well. - pub fn get_handler(&self, action: &HandlerAction) -> Option<&ProjectConfigsHandler> { - self.action_to_handler.get(action) + .and_then(|route| { + let cells = self.locales_to_cells.get_cells(&route.locale)?; + let handler = self.action_to_handler.get(&route.action)?.clone(); + Some((handler, cells)) + }) } /// Checks if a request matches a route's criteria @@ -92,6 +101,7 @@ mod tests { use hyper::{Method, Request}; use locator::config::LocatorDataType; use locator::locator::Locator as LocatorService; + use url::Url; async fn test_router(routes: Option>) -> Router { let default_routes = vec![ @@ -117,6 +127,15 @@ mod tests { let routes = routes.unwrap_or(default_routes); + let locales = HashMap::from([( + "us".to_string(), + vec![CellConfig { + id: "us1".to_string(), + sentry_url: Url::parse("https://sentry.io/us1").unwrap(), + relay_url: Url::parse("https://relay.io/us1").unwrap(), + }], + )]); + let (_dir, provider) = get_mock_provider().await; let locator_service = LocatorService::new( LocatorDataType::ProjectKey, @@ -126,7 +145,7 @@ mod tests { ); let locator = Locator::from_in_process_service(locator_service); - Router::new(routes, locator) + Router::new(routes, locales, locator) } fn test_request( @@ -153,14 +172,13 @@ mod tests { // Should match first route let req = test_request(Method::POST, "/api/test", Some("api.example.com")); - assert_eq!( - router.resolve(&req), - Some(&HandlerAction::RelayProjectConfigs) - ); + let (handler, _cells) = router.resolve(&req).unwrap(); + assert!(handler.type_name().contains("ProjectConfigsHandler")); // Should match second route let req = test_request(Method::GET, "/health", None); - assert_eq!(router.resolve(&req), Some(&HandlerAction::Health)); + let (handler, _cells) = router.resolve(&req).unwrap(); + assert!(handler.type_name().contains("HealthHandler")); } #[tokio::test] @@ -168,7 +186,7 @@ mod tests { let router = test_router(None).await; let req = test_request(Method::GET, "/different", None); - assert_eq!(router.resolve(&req), None); + assert!(router.resolve(&req).is_none()); } #[tokio::test] @@ -187,10 +205,8 @@ mod tests { // Should strip port and match let req = test_request(Method::GET, "/test", Some("api.example.com:8080")); - assert_eq!( - router.resolve(&req), - Some(&HandlerAction::RelayProjectConfigs) - ); + let (handler, _cells) = router.resolve(&req).unwrap(); + assert!(handler.type_name().contains("ProjectConfigsHandler")); } #[tokio::test] @@ -209,13 +225,11 @@ mod tests { // POST should match let req = test_request(Method::POST, "/api/test", None); - assert_eq!( - router.resolve(&req), - Some(&HandlerAction::RelayProjectConfigs) - ); + let (handler, _cells) = router.resolve(&req).unwrap(); + assert!(handler.type_name().contains("ProjectConfigsHandler")); // GET should not match let req = test_request(Method::GET, "/api/test", None); - assert_eq!(router.resolve(&req), None); + assert!(router.resolve(&req).is_none()); } } From 23d2c804890e7a63ee9b0053a03f00ab9891d92e Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 18 Dec 2025 13:16:46 -0800 Subject: [PATCH 02/18] . --- ingest-router/src/api/health.rs | 8 +------- ingest-router/src/router.rs | 2 +- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/ingest-router/src/api/health.rs b/ingest-router/src/api/health.rs index 2e1d178..a43ed55 100644 --- a/ingest-router/src/api/health.rs +++ b/ingest-router/src/api/health.rs @@ -6,13 +6,7 @@ use hyper::{Request, Response}; /// This endpoint returns success if any one upstream is available. /// Synapse should continue to operate even if one cell is down. -pub struct HealthHandler; - -impl HealthHandler { - pub fn new() -> Self { - Self {} - } -} +pub struct HealthHandler {} #[async_trait] impl Handler for HealthHandler { diff --git a/ingest-router/src/router.rs b/ingest-router/src/router.rs index fd005ee..c0215e6 100644 --- a/ingest-router/src/router.rs +++ b/ingest-router/src/router.rs @@ -28,7 +28,7 @@ impl Router { HandlerAction::RelayProjectConfigs, Arc::new(ProjectConfigsHandler::new(locator)) as Arc, ), - (HandlerAction::Health, Arc::new(HealthHandler::new())), + (HandlerAction::Health, Arc::new(HealthHandler {})), ]); Self { From f94c2eca6e5302e65bb3e14a21816c05133be21c Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 18 Dec 2025 14:36:41 -0800 Subject: [PATCH 03/18] make more stuff generic --- ingest-router/src/api.rs | 1 + ingest-router/src/api/utils.rs | 34 ++++++++++++++++++++ ingest-router/src/project_config/handler.rs | 31 ++++++------------ ingest-router/src/project_config/protocol.rs | 11 ------- 4 files changed, 45 insertions(+), 32 deletions(-) create mode 100644 ingest-router/src/api/utils.rs diff --git a/ingest-router/src/api.rs b/ingest-router/src/api.rs index 43a7c76..b26c5df 100644 --- a/ingest-router/src/api.rs +++ b/ingest-router/src/api.rs @@ -1 +1,2 @@ pub mod health; +pub mod utils; diff --git a/ingest-router/src/api/utils.rs b/ingest-router/src/api/utils.rs new file mode 100644 index 0000000..1a13912 --- /dev/null +++ b/ingest-router/src/api/utils.rs @@ -0,0 +1,34 @@ +use crate::errors::IngestRouterError; +use http::Version; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use hyper::header::HeaderMap; +use hyper::header::{CONTENT_LENGTH, TRANSFER_ENCODING}; +use serde::Serialize; +use serde::de::DeserializeOwned; +use shared::http::filter_hop_by_hop; + +pub type HandlerBody = BoxBody; + +/// Deserializes a JSON request body into the specified type. +pub async fn deserialize_body( + body: HandlerBody, +) -> Result { + let bytes = body.collect().await?.to_bytes(); + serde_json::from_slice(&bytes).map_err(|e| IngestRouterError::RequestBodyError(e.to_string())) +} + +/// Serializes a value to a JSON response body. +pub fn serialize_to_body(value: &T) -> Result { + let bytes = serde_json::to_vec(value).map(Bytes::from)?; + Ok(Full::new(bytes).map_err(|e| match e {}).boxed()) +} + +pub fn normalize_headers(headers: &mut HeaderMap, version: Version) -> &mut HeaderMap { + filter_hop_by_hop(headers, version); + headers.remove(CONTENT_LENGTH); + headers.remove(TRANSFER_ENCODING); + + headers +} diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index 23bdbb7..76aac4b 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -1,5 +1,6 @@ //! Handler implementation for the Relay Project Configs endpoint +use crate::api::utils::{deserialize_body, normalize_headers, serialize_to_body}; use crate::errors::IngestRouterError; use crate::handler::{CellId, Handler, HandlerBody, SplitMetadata}; use crate::locale::Cells; @@ -7,10 +8,8 @@ use crate::project_config::protocol::{ProjectConfigsRequest, ProjectConfigsRespo use async_trait::async_trait; use http_body_util::{BodyExt, Full}; use hyper::body::Bytes; -use hyper::header::{CONTENT_LENGTH, TRANSFER_ENCODING}; use hyper::{Request, Response}; use locator::client::Locator; -use shared::http::filter_hop_by_hop; use std::collections::{HashMap, HashSet}; /// Pending public keys that couldn't be routed to any cell @@ -38,17 +37,12 @@ impl Handler for ProjectConfigsHandler { request: Request, cells: &Cells, ) -> Result<(Vec<(CellId, Request)>, SplitMetadata), IngestRouterError> { - // Split request into parts first, then collect body let (mut parts, body) = request.into_parts(); - let bytes = body - .collect() - .await - .map_err(|e| IngestRouterError::RequestBodyError(e.to_string()))? - .to_bytes(); + let parsed: ProjectConfigsRequest = deserialize_body(body).await?; + normalize_headers(&mut parts.headers, parts.version); - let parsed_request = ProjectConfigsRequest::from_bytes(&bytes)?; - let public_keys = parsed_request.public_keys; - let extra_fields = parsed_request.extra_fields; + let public_keys = parsed.public_keys; + let extra_fields = parsed.extra_fields; let cell_ids: HashSet<&String> = cells.cell_list.iter().collect(); @@ -60,6 +54,7 @@ impl Handler for ProjectConfigsHandler { match self.locator.lookup(&public_key, None).await { Ok(cell_id) => { if !cell_ids.contains(&cell_id) { + // Locator errors, add to pending tracing::warn!( public_key = %public_key, cell_id = %cell_id, @@ -82,11 +77,6 @@ impl Handler for ProjectConfigsHandler { } } - // Prepare headers for per-cell requests - filter_hop_by_hop(&mut parts.headers, parts.version); - parts.headers.remove(CONTENT_LENGTH); - parts.headers.remove(TRANSFER_ENCODING); - // Build per-cell requests let cell_requests = split .into_iter() @@ -95,13 +85,12 @@ impl Handler for ProjectConfigsHandler { public_keys: keys, extra_fields: extra_fields.clone(), }; - let body: HandlerBody = Full::new(cell_request.to_bytes().unwrap()) - .map_err(|e| match e {}) - .boxed(); + + let body = serialize_to_body(&cell_request)?; let req = Request::from_parts(parts.clone(), body); - (cell_id, req) + Ok((cell_id, req)) }) - .collect(); + .collect::>()?; let metadata = Box::new(pending); Ok((cell_requests, metadata)) diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index 6313ce3..ac57d2a 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -37,17 +37,6 @@ pub struct ProjectConfigsRequest { pub extra_fields: HashMap, } -impl ProjectConfigsRequest { - pub fn from_bytes(bytes: &Bytes) -> Result { - serde_json::from_slice(bytes) - } - - pub fn to_bytes(&self) -> Result { - let json = serde_json::to_vec(self)?; - Ok(Bytes::from(json)) - } -} - /// Response format for the relay project configs endpoint. /// /// # Example From 83ea0d5d0689f7ed74227db564b46c745ce145c5 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 18 Dec 2025 14:38:02 -0800 Subject: [PATCH 04/18] it's used now --- ingest-router/src/router.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ingest-router/src/router.rs b/ingest-router/src/router.rs index c0215e6..2fff5fb 100644 --- a/ingest-router/src/router.rs +++ b/ingest-router/src/router.rs @@ -12,7 +12,6 @@ use std::sync::Arc; pub struct Router { routes: Arc>, action_to_handler: HashMap>, - #[allow(dead_code)] locales_to_cells: Locales, } From b4df820e077813d7e64083e0b1e8b9e8ba96add3 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 18 Dec 2025 14:39:51 -0800 Subject: [PATCH 05/18] . --- ingest-router/src/api/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest-router/src/api/utils.rs b/ingest-router/src/api/utils.rs index 1a13912..7dec77c 100644 --- a/ingest-router/src/api/utils.rs +++ b/ingest-router/src/api/utils.rs @@ -19,7 +19,7 @@ pub async fn deserialize_body( serde_json::from_slice(&bytes).map_err(|e| IngestRouterError::RequestBodyError(e.to_string())) } -/// Serializes a value to a JSON response body. +/// Serializes a value to a JSON body. pub fn serialize_to_body(value: &T) -> Result { let bytes = serde_json::to_vec(value).map(Bytes::from)?; Ok(Full::new(bytes).map_err(|e| match e {}).boxed()) From 9d8824cb203dd56408cc8f4c3ee8a8605d8ea180 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 19 Dec 2025 10:35:38 -0800 Subject: [PATCH 06/18] fix unwrap --- ingest-router/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest-router/src/lib.rs b/ingest-router/src/lib.rs index 879902f..03df77f 100644 --- a/ingest-router/src/lib.rs +++ b/ingest-router/src/lib.rs @@ -66,7 +66,7 @@ where // TODO: Placeholder response Box::pin(async move { let (split, _metadata) = - handler.split_request(handler_req, &cells).await.unwrap(); + handler.split_request(handler_req, &cells).await?; for (cell_id, req) in split { println!("Cell: {}, URI: {}", cell_id, req.uri()); From a956d707873c7a61c047f06e62a6c7445d42686d Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 19 Dec 2025 10:37:07 -0800 Subject: [PATCH 07/18] . --- ingest-router/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ingest-router/src/lib.rs b/ingest-router/src/lib.rs index 03df77f..3df7c11 100644 --- a/ingest-router/src/lib.rs +++ b/ingest-router/src/lib.rs @@ -65,8 +65,7 @@ where // TODO: Placeholder response Box::pin(async move { - let (split, _metadata) = - handler.split_request(handler_req, &cells).await?; + let (split, _metadata) = handler.split_request(handler_req, &cells).await?; for (cell_id, req) in split { println!("Cell: {}, URI: {}", cell_id, req.uri()); From 139acf0f8e4e82a601500446069001cc0625a9b4 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 19 Dec 2025 10:38:19 -0800 Subject: [PATCH 08/18] revert moving comment --- ingest-router/src/project_config/handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index 76aac4b..ce763f3 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -54,7 +54,6 @@ impl Handler for ProjectConfigsHandler { match self.locator.lookup(&public_key, None).await { Ok(cell_id) => { if !cell_ids.contains(&cell_id) { - // Locator errors, add to pending tracing::warn!( public_key = %public_key, cell_id = %cell_id, @@ -67,6 +66,7 @@ impl Handler for ProjectConfigsHandler { split.entry(cell_id).or_default().push(public_key); } Err(e) => { + // Locator errors, add to pending tracing::error!( public_key = %public_key, error = ?e, From b84d37297ff4540437dec73c837afcd031935c79 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 19 Dec 2025 10:53:20 -0800 Subject: [PATCH 09/18] . --- ingest-router/src/api/utils.rs | 1 + ingest-router/src/project_config/handler.rs | 6 ++---- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ingest-router/src/api/utils.rs b/ingest-router/src/api/utils.rs index 7dec77c..90f14ca 100644 --- a/ingest-router/src/api/utils.rs +++ b/ingest-router/src/api/utils.rs @@ -25,6 +25,7 @@ pub fn serialize_to_body(value: &T) -> Result &mut HeaderMap { filter_hop_by_hop(headers, version); headers.remove(CONTENT_LENGTH); diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index ce763f3..42ded9a 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -77,16 +77,15 @@ impl Handler for ProjectConfigsHandler { } } - // Build per-cell requests let cell_requests = split .into_iter() .map(|(cell_id, keys)| { - let cell_request = ProjectConfigsRequest { + let project_configs_request = ProjectConfigsRequest { public_keys: keys, extra_fields: extra_fields.clone(), }; - let body = serialize_to_body(&cell_request)?; + let body = serialize_to_body(&project_configs_request)?; let req = Request::from_parts(parts.clone(), body); Ok((cell_id, req)) }) @@ -114,7 +113,6 @@ impl Handler for ProjectConfigsHandler { if let Ok(project_metadata) = metadata.downcast::() { merged.pending_keys.extend(*project_metadata); } - // Filter to successful responses only let mut iter = responses .into_iter() From 64ca3d4bd46659d09cb8128917792df32356d444 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 19 Dec 2025 11:48:13 -0800 Subject: [PATCH 10/18] remove test for deleted func --- ingest-router/src/project_config/protocol.rs | 21 -------------------- 1 file changed, 21 deletions(-) diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index ac57d2a..d38c26f 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -127,27 +127,6 @@ mod tests { use http_body_util::BodyExt; use hyper::header::{CACHE_CONTROL, HeaderValue}; - #[test] - fn test_request_serialization() { - let mut extra = HashMap::new(); - extra.insert("global".to_string(), serde_json::json!(true)); - extra.insert("noCache".to_string(), serde_json::json!(false)); - - let request = ProjectConfigsRequest { - public_keys: vec!["key1".to_string(), "key2".to_string()], - extra_fields: extra, - }; - - let bytes = request.to_bytes().unwrap(); - let parsed = ProjectConfigsRequest::from_bytes(&bytes).unwrap(); - - assert_eq!(parsed.public_keys.len(), 2); - assert_eq!( - parsed.extra_fields.get("global"), - Some(&serde_json::json!(true)) - ); - } - #[test] fn test_response_serialization() { let mut configs = HashMap::new(); From f26e5acee9fb16f612bfb40df07b1627e0f1849e Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 19 Dec 2025 15:01:17 -0800 Subject: [PATCH 11/18] merge responses --- ingest-router/src/api/health.rs | 2 +- ingest-router/src/handler.rs | 2 +- ingest-router/src/project_config/handler.rs | 95 +++++++++++++------- ingest-router/src/project_config/protocol.rs | 1 - 4 files changed, 64 insertions(+), 36 deletions(-) diff --git a/ingest-router/src/api/health.rs b/ingest-router/src/api/health.rs index a43ed55..8101d95 100644 --- a/ingest-router/src/api/health.rs +++ b/ingest-router/src/api/health.rs @@ -18,7 +18,7 @@ impl Handler for HealthHandler { unimplemented!(); } - fn merge_responses( + async fn merge_responses( &self, _responses: Vec<(CellId, Result, IngestRouterError>)>, _metadata: SplitMetadata, diff --git a/ingest-router/src/handler.rs b/ingest-router/src/handler.rs index 56208dc..ba7dc6e 100644 --- a/ingest-router/src/handler.rs +++ b/ingest-router/src/handler.rs @@ -36,7 +36,7 @@ pub trait Handler: Send + Sync { /// /// This method combines responses from successful cells, handles failures, /// and incorporates metadata from the split phase. - fn merge_responses( + async fn merge_responses( &self, responses: Vec<(CellId, Result, IngestRouterError>)>, metadata: SplitMetadata, diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index 42ded9a..2bb6a07 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -6,14 +6,20 @@ use crate::handler::{CellId, Handler, HandlerBody, SplitMetadata}; use crate::locale::Cells; use crate::project_config::protocol::{ProjectConfigsRequest, ProjectConfigsResponse}; use async_trait::async_trait; +use http::response::Parts; use http_body_util::{BodyExt, Full}; use hyper::body::Bytes; use hyper::{Request, Response}; use locator::client::Locator; use std::collections::{HashMap, HashSet}; -/// Pending public keys that couldn't be routed to any cell -type ProjectConfigsMetadata = Vec; +#[derive(Default)] +struct ProjectConfigsMetadata { + // keys that are assigned to a cell + cell_to_keys: HashMap>, + // keys that couldn't be assigned to any cell + unassigned_keys: Vec, +} /// Handler for the Relay Project Configs endpoint /// @@ -47,7 +53,7 @@ impl Handler for ProjectConfigsHandler { let cell_ids: HashSet<&String> = cells.cell_list.iter().collect(); // Route each public key to its owning cell using the locator service - let mut split: HashMap> = HashMap::new(); + let mut cell_to_keys: HashMap> = HashMap::new(); let mut pending: Vec = Vec::new(); for public_key in public_keys { @@ -63,7 +69,7 @@ impl Handler for ProjectConfigsHandler { continue; } - split.entry(cell_id).or_default().push(public_key); + cell_to_keys.entry(cell_id).or_default().push(public_key); } Err(e) => { // Locator errors, add to pending @@ -77,54 +83,77 @@ impl Handler for ProjectConfigsHandler { } } - let cell_requests = split - .into_iter() + let cell_requests = cell_to_keys + .iter() .map(|(cell_id, keys)| { let project_configs_request = ProjectConfigsRequest { - public_keys: keys, + public_keys: keys.clone(), extra_fields: extra_fields.clone(), }; let body = serialize_to_body(&project_configs_request)?; let req = Request::from_parts(parts.clone(), body); - Ok((cell_id, req)) + Ok((cell_id.into(), req)) }) .collect::>()?; - let metadata = Box::new(pending); + let metadata = Box::new(ProjectConfigsMetadata { + cell_to_keys, + unassigned_keys: pending, + }); Ok((cell_requests, metadata)) } - fn merge_responses( + async fn merge_responses( &self, responses: Vec<(CellId, Result, IngestRouterError>)>, metadata: SplitMetadata, ) -> Response { - // TODO: The current implementation does not handle errors from the results - // parameter. The edge case to be handled are if any of the upstreams failed - // to return a response for whatever reason. In scenarios like this, the - // executor needs to provide all the project config keys which failed to - // resolve on the upstream. We would need to add those project keys to the - // pending response. + let meta = metadata + .downcast::() + .unwrap_or(Box::new(ProjectConfigsMetadata::default())); let mut merged = ProjectConfigsResponse::new(); - - // Downcast metadata to our specific type - if let Ok(project_metadata) = metadata.downcast::() { - merged.pending_keys.extend(*project_metadata); - } - // Filter to successful responses only - let mut iter = responses - .into_iter() - .filter_map(|(cell_id, result)| result.ok().map(|r| (cell_id, r))); - - // Handle first successful result (highest priority) - // Gets extra_fields, headers, configs, and pending - // TODO: Actually parse the response body (requires async, so we'd need to - // restructure or parse bodies before calling merge_responses) - if let Some((_cell_id, _response)) = iter.next() { - // For now, we can't parse the body here since this isn't async - // The executor should parse response bodies before passing to merge + let mut pending = meta.unassigned_keys; + + // Parts is populated from the first response. The responses are previously + // sorted so successful responses (if they exist) come first. + let mut parts: Option = None; + + for (cell_id, result) in responses { + match result { + Ok(response) => { + if response.status().is_success() { + let (p, body) = response.into_parts(); + if parts.is_none() { + parts = Some(p); + } + + if let Ok(parsed) = deserialize_body::(body).await { + merged.project_configs.extend(parsed.project_configs); + merged.extra_fields.extend(parsed.extra_fields); + merged.pending_keys.extend(parsed.pending_keys); + } + } else { + // Non-success status codes add their keys to pending + tracing::error!( + status = %response.status(), + cell_id = %cell_id, + "Upstream response returned non-success status" + ); + if let Some(keys) = meta.cell_to_keys.get(&cell_id) { + pending.extend(keys.clone()); + } + } + } + Err(e) => { + // If any request failed, add its keys to pending + tracing::error!(error = ?e, "Failed to build merged response"); + if let Some(keys) = meta.cell_to_keys.get(&cell_id) { + pending.extend(keys.clone()); + } + } + } } // Build the final response using into_response which handles serialization diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index d38c26f..1e54e6a 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -57,7 +57,6 @@ pub struct ProjectConfigsRequest { /// "global_status": "ready" /// } /// ``` -#[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProjectConfigsResponse { /// Project configs (HashMap merged from all upstreams). From db100bb23af05747172d08266530ce12bbf4c20b Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 19 Dec 2025 15:12:58 -0800 Subject: [PATCH 12/18] more refactor --- ingest-router/src/project_config/handler.rs | 50 ++++++++------------ ingest-router/src/project_config/protocol.rs | 7 --- 2 files changed, 19 insertions(+), 38 deletions(-) diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index 2bb6a07..125352e 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -121,38 +121,26 @@ impl Handler for ProjectConfigsHandler { let mut parts: Option = None; for (cell_id, result) in responses { - match result { - Ok(response) => { - if response.status().is_success() { - let (p, body) = response.into_parts(); - if parts.is_none() { - parts = Some(p); - } - - if let Ok(parsed) = deserialize_body::(body).await { - merged.project_configs.extend(parsed.project_configs); - merged.extra_fields.extend(parsed.extra_fields); - merged.pending_keys.extend(parsed.pending_keys); - } - } else { - // Non-success status codes add their keys to pending - tracing::error!( - status = %response.status(), - cell_id = %cell_id, - "Upstream response returned non-success status" - ); - if let Some(keys) = meta.cell_to_keys.get(&cell_id) { - pending.extend(keys.clone()); - } - } - } - Err(e) => { - // If any request failed, add its keys to pending - tracing::error!(error = ?e, "Failed to build merged response"); - if let Some(keys) = meta.cell_to_keys.get(&cell_id) { - pending.extend(keys.clone()); - } + // Only process successful responses with success status codes + let successful_response = result.ok().filter(|r| r.status().is_success()); + + let Some(response) = successful_response else { + // Any failure adds the cell's keys to pending + if let Some(keys) = meta.cell_to_keys.get(&cell_id) { + pending.extend(keys.clone()); } + continue; + }; + + let (p, body) = response.into_parts(); + if parts.is_none() { + parts = Some(p); + } + + if let Ok(parsed) = deserialize_body::(body).await { + merged.project_configs.extend(parsed.project_configs); + merged.extra_fields.extend(parsed.extra_fields); + merged.pending_keys.extend(parsed.pending_keys); } } diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index 1e54e6a..49dd692 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -76,7 +76,6 @@ pub struct ProjectConfigsResponse { pub http_headers: HeaderMap, } -#[allow(dead_code)] impl ProjectConfigsResponse { pub fn new() -> Self { Self { @@ -87,12 +86,6 @@ impl ProjectConfigsResponse { } } - pub fn from_bytes(bytes: &Bytes) -> Result { - let mut response: Self = serde_json::from_slice(bytes)?; - response.http_headers = HeaderMap::new(); - Ok(response) - } - /// Builds an HTTP response from the merged results. /// Filters out hop-by-hop headers and Content-Length (which is recalculated for the new body). pub fn into_response( From 86f38043c6276e1e20c30f015635f8b0d54983ad Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 22 Dec 2025 11:50:38 -0800 Subject: [PATCH 13/18] finish implementing merge_responses --- ingest-router/src/project_config/handler.rs | 26 +++++++------- ingest-router/src/project_config/protocol.rs | 37 -------------------- 2 files changed, 13 insertions(+), 50 deletions(-) diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index 125352e..7c82426 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -6,11 +6,12 @@ use crate::handler::{CellId, Handler, HandlerBody, SplitMetadata}; use crate::locale::Cells; use crate::project_config::protocol::{ProjectConfigsRequest, ProjectConfigsResponse}; use async_trait::async_trait; +use http::StatusCode; use http::response::Parts; -use http_body_util::{BodyExt, Full}; -use hyper::body::Bytes; +use hyper::header::{CONTENT_TYPE, HeaderValue}; use hyper::{Request, Response}; use locator::client::Locator; +use shared::http::make_error_response; use std::collections::{HashMap, HashSet}; #[derive(Default)] @@ -121,7 +122,6 @@ impl Handler for ProjectConfigsHandler { let mut parts: Option = None; for (cell_id, result) in responses { - // Only process successful responses with success status codes let successful_response = result.ok().filter(|r| r.status().is_success()); let Some(response) = successful_response else { @@ -144,16 +144,16 @@ impl Handler for ProjectConfigsHandler { } } - // Build the final response using into_response which handles serialization - match merged.into_response() { - Ok(response) => response, - Err(e) => { - tracing::error!(error = ?e, "Failed to build merged response"); - Response::builder() - .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) - .body(Full::new(Bytes::from("{}")).map_err(|e| match e {}).boxed()) - .unwrap() - } + let serialized_body = serialize_to_body(&merged); + + if let (Some(mut p), Ok(body)) = (parts, serialized_body) { + normalize_headers(&mut p.headers, p.version); + p.headers + .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + + Response::from_parts(p, body) + } else { + make_error_response(StatusCode::INTERNAL_SERVER_ERROR) } } } diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index 49dd692..e19ed2d 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -5,14 +5,8 @@ //! //! See the module-level documentation in `mod.rs` for complete protocol details. -use crate::errors::IngestRouterError; -use http_body_util::{BodyExt, Full, combinators::BoxBody}; -use hyper::body::Bytes; -use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE, HeaderMap}; -use hyper::{Response, StatusCode}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; -use shared::http::filter_hop_by_hop; use std::collections::HashMap; /// Request format for the relay project configs endpoint. @@ -70,10 +64,6 @@ pub struct ProjectConfigsResponse { /// Other fields (`global`, `global_status`, future fields). #[serde(flatten)] pub extra_fields: HashMap, - - /// HTTP headers from the highest priority upstream (not serialized). - #[serde(skip)] - pub http_headers: HeaderMap, } impl ProjectConfigsResponse { @@ -82,34 +72,7 @@ impl ProjectConfigsResponse { project_configs: HashMap::new(), pending_keys: Vec::new(), extra_fields: HashMap::new(), - http_headers: HeaderMap::new(), - } - } - - /// Builds an HTTP response from the merged results. - /// Filters out hop-by-hop headers and Content-Length (which is recalculated for the new body). - pub fn into_response( - mut self, - ) -> Result>, IngestRouterError> { - let merged_json = serde_json::to_vec(&self) - .map_err(|e| IngestRouterError::ResponseSerializationError(e.to_string()))?; - - filter_hop_by_hop(&mut self.http_headers, hyper::Version::HTTP_11); - self.http_headers.remove(CONTENT_LENGTH); - - let mut builder = Response::builder().status(StatusCode::OK); - for (name, value) in self.http_headers.iter() { - builder = builder.header(name, value); } - - builder = builder.header(CONTENT_TYPE, "application/json"); - builder - .body( - Full::new(Bytes::from(merged_json)) - .map_err(|e| match e {}) - .boxed(), - ) - .map_err(|e| IngestRouterError::HyperError(e.to_string())) } } From a1d0f096a8692d101ee611f6ac9cb5270d5e9931 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 22 Dec 2025 12:20:26 -0800 Subject: [PATCH 14/18] refactor tests --- ingest-router/src/lib.rs | 2 + ingest-router/src/project_config/handler.rs | 163 +++++------ ingest-router/src/project_config/protocol.rs | 272 +++++++++---------- 3 files changed, 220 insertions(+), 217 deletions(-) diff --git a/ingest-router/src/lib.rs b/ingest-router/src/lib.rs index 3df7c11..68a4c1c 100644 --- a/ingest-router/src/lib.rs +++ b/ingest-router/src/lib.rs @@ -146,6 +146,8 @@ mod tests { let response = service.call(request).await.unwrap(); + // TODO: call the scripts/mock_relay_api.py server and validate the response + assert_eq!(response.status(), 200); } } diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index 7c82426..fbdae50 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -14,7 +14,7 @@ use locator::client::Locator; use shared::http::make_error_response; use std::collections::{HashMap, HashSet}; -#[derive(Default)] +#[derive(Default, Debug)] struct ProjectConfigsMetadata { // keys that are assigned to a cell cell_to_keys: HashMap>, @@ -115,7 +115,7 @@ impl Handler for ProjectConfigsHandler { .unwrap_or(Box::new(ProjectConfigsMetadata::default())); let mut merged = ProjectConfigsResponse::new(); - let mut pending = meta.unassigned_keys; + merged.pending_keys.extend(meta.unassigned_keys); // Parts is populated from the first response. The responses are previously // sorted so successful responses (if they exist) come first. @@ -127,7 +127,7 @@ impl Handler for ProjectConfigsHandler { let Some(response) = successful_response else { // Any failure adds the cell's keys to pending if let Some(keys) = meta.cell_to_keys.get(&cell_id) { - pending.extend(keys.clone()); + merged.pending_keys.extend(keys.clone()); } continue; }; @@ -188,7 +188,7 @@ mod tests { } } - fn create_test_locator(key_to_cell: HashMap) -> Locator { + async fn create_test_locator(key_to_cell: HashMap) -> Locator { let route_data = RouteData::from( key_to_cell, "cursor".to_string(), @@ -206,21 +206,33 @@ mod tests { provider, None, ); - Locator::from_in_process_service(service) + + let locator = Locator::from_in_process_service(service); + + // Wait for locator to be ready + for _ in 0..50 { + if locator.is_ready() { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + assert!(locator.is_ready(), "Locator should be ready"); + + locator } - fn build_request(body: ProjectConfigsRequest) -> Request { - let bytes = body.to_bytes().unwrap(); + fn build_request(project_configs_request: ProjectConfigsRequest) -> Request { + let body = serialize_to_body(&project_configs_request).unwrap(); Request::builder() .method("POST") .uri("/api/0/relays/projectconfigs/") - .body(Full::new(bytes).map_err(|e| match e {}).boxed()) + .body(body) .unwrap() } - async fn parse_request_body(req: Request) -> ProjectConfigsRequest { - let bytes = req.into_body().collect().await.unwrap().to_bytes(); - ProjectConfigsRequest::from_bytes(&bytes).unwrap() + fn build_response(project_configs_response: serde_json::Value) -> Response { + let body = serialize_to_body(&project_configs_response).unwrap(); + Response::builder().status(200).body(body).unwrap() } #[tokio::test] @@ -230,16 +242,7 @@ mod tests { ("key2".to_string(), "us2".to_string()), ("key3".to_string(), "us1".to_string()), ]); - let locator = create_test_locator(key_to_cell); - - // Wait for locator to be ready - for _ in 0..50 { - if locator.is_ready() { - break; - } - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } - assert!(locator.is_ready(), "Locator should be ready"); + let locator = create_test_locator(key_to_cell).await; let locales = HashMap::from([( "us".to_string(), @@ -280,20 +283,15 @@ mod tests { .into_iter() .find(|(id, _)| id == "us1") .unwrap(); - let us1_body = parse_request_body(us1_req).await; + + let us1_body: ProjectConfigsRequest = deserialize_body(us1_req.into_body()).await.unwrap(); let key_to_cell = HashMap::from([ ("key1".to_string(), "us1".to_string()), ("key2".to_string(), "us2".to_string()), ("key3".to_string(), "us1".to_string()), ]); - let locator = create_test_locator(key_to_cell); - for _ in 0..50 { - if locator.is_ready() { - break; - } - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } + let locator = create_test_locator(key_to_cell).await; let handler = ProjectConfigsHandler::new(locator); let locales = HashMap::from([( "us".to_string(), @@ -316,12 +314,13 @@ mod tests { public_keys: vec!["key1".to_string(), "key2".to_string(), "key3".to_string()], extra_fields: extra.clone(), }); - let (cell_requests2, _) = handler.split_request(request2, &cells).await.unwrap(); + let (cell_requests2, metadata) = handler.split_request(request2, &cells).await.unwrap(); let (_, us2_req) = cell_requests2 .into_iter() .find(|(id, _)| id == "us2") .unwrap(); - let us2_body = parse_request_body(us2_req).await; + + let us2_body: ProjectConfigsRequest = deserialize_body(us2_req.into_body()).await.unwrap(); // Verify us1 has key1 and key3 assert_eq!(us1_id, "us1"); @@ -334,22 +333,17 @@ mod tests { assert_eq!(us2_body.public_keys.len(), 1); assert!(us2_body.public_keys.contains(&"key2".to_string())); assert_eq!(us2_body.extra_fields, extra); + + let meta = metadata + .downcast::() + .unwrap_or(Box::new(ProjectConfigsMetadata::default())); + assert!(meta.unassigned_keys.is_empty()); } #[tokio::test] async fn test_split_request_unknown_key_goes_to_pending() { let key_to_cell = HashMap::from([("key1".to_string(), "us1".to_string())]); - let locator = create_test_locator(key_to_cell); - - // Wait for locator to be ready - for _ in 0..50 { - if locator.is_ready() { - break; - } - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } - assert!(locator.is_ready(), "Locator should be ready"); - + let locator = create_test_locator(key_to_cell).await; let locales = HashMap::from([( "us".to_string(), vec![CellConfig { @@ -374,26 +368,18 @@ mod tests { // Should have 1 cell request (us1 with key1) assert_eq!(cell_requests.len(), 1); assert_eq!(cell_requests[0].0, "us1"); - let body = parse_request_body(cell_requests.into_iter().next().unwrap().1).await; - assert_eq!(body.public_keys, vec!["key1".to_string()]); - - // Unknown key should be in pending metadata - let pending = metadata.downcast::().unwrap(); - assert_eq!(pending.len(), 1); - assert_eq!(pending[0], "unknown_key"); - } - fn build_response(body: &serde_json::Value) -> Response { - let bytes = Bytes::from(serde_json::to_vec(body).unwrap()); - Response::builder() - .status(200) - .body(Full::new(bytes).map_err(|e| match e {}).boxed()) - .unwrap() + // unknown key in metadata + let meta = metadata + .downcast::() + .unwrap_or(Box::new(ProjectConfigsMetadata::default())); + assert_eq!(meta.unassigned_keys, Vec::from(["unknown_key".to_string()])); } #[tokio::test] async fn test_merge_results_successful_cells() { - let handler = ProjectConfigsHandler::new(create_test_locator(HashMap::new())); + let locator = create_test_locator(HashMap::new()).await; + let handler = ProjectConfigsHandler::new(locator); // Create response from us1 with key1 and global config let response1_json = serde_json::json!({ @@ -402,7 +388,7 @@ mod tests { }, "global": {"version": 1} }); - let response1 = build_response(&response1_json); + let response1 = build_response(response1_json); // Create response from us2 with key2 and different global config let response2_json = serde_json::json!({ @@ -411,24 +397,26 @@ mod tests { }, "global": {"version": 2} }); - let response2 = build_response(&response2_json); + let response2 = build_response(response2_json); - let results: Vec<(CellId, Result, IngestRouterError>)> = vec![ + let results = vec![ ("us1".to_string(), Ok(response1)), ("us2".to_string(), Ok(response2)), ]; let metadata: SplitMetadata = Box::new(Vec::::new()); - let merged = handler.merge_responses(results, metadata); + let merged = handler.merge_responses(results, metadata).await; - // The current implementation doesn't actually parse the response bodies - // (noted as TODO in the code), so we just verify we get a valid response - assert_eq!(merged.status(), 200); + let parsed: ProjectConfigsResponse = deserialize_body(merged.into_body()).await.unwrap(); + + assert!(parsed.project_configs.contains_key("key1")); + assert!(parsed.project_configs.contains_key("key2")); } #[tokio::test] async fn test_merge_responses_with_pending() { - let handler = ProjectConfigsHandler::new(create_test_locator(HashMap::new())); + let locator = create_test_locator(HashMap::new()).await; + let handler = ProjectConfigsHandler::new(locator); // Test pending keys from split phase (routing failures, unknown keys) @@ -438,7 +426,7 @@ mod tests { "key1": {"slug": "project1"} } }); - let response1 = build_response(&response1_json); + let response1 = build_response(response1_json); // Create response from us2 with successful config let response2_json = serde_json::json!({ @@ -446,7 +434,7 @@ mod tests { "key2": {"slug": "project2"} } }); - let response2 = build_response(&response2_json); + let response2 = build_response(response2_json); let results: Vec<(CellId, Result, IngestRouterError>)> = vec![ ("us1".to_string(), Ok(response1)), @@ -454,24 +442,37 @@ mod tests { ]; // Pending from split phase (routing failures) - let pending_from_split: ProjectConfigsMetadata = vec![ - "key_routing_failed".to_string(), - "key_from_failed_cell1".to_string(), - "key_from_failed_cell2".to_string(), - ]; + let pending_from_split: ProjectConfigsMetadata = ProjectConfigsMetadata { + cell_to_keys: HashMap::new(), + unassigned_keys: vec![ + "key_routing_failed".to_string(), + "key_from_failed_cell1".to_string(), + "key_from_failed_cell2".to_string(), + ], + }; let metadata: SplitMetadata = Box::new(pending_from_split); - let merged = handler.merge_responses(results, metadata); + let merged = handler.merge_responses(results, metadata).await; - // Parse response body to check pending keys - let bytes = merged.into_body().collect().await.unwrap().to_bytes(); - let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + // Parse merged response body so we can assert on pending keys + let parsed: ProjectConfigsResponse = deserialize_body(merged.into_body()).await.unwrap(); // Should have pending keys from split phase - let pending = json["pending"].as_array().unwrap(); - assert_eq!(pending.len(), 3); - assert!(pending.contains(&serde_json::json!("key_routing_failed"))); - assert!(pending.contains(&serde_json::json!("key_from_failed_cell1"))); - assert!(pending.contains(&serde_json::json!("key_from_failed_cell2"))); + assert_eq!(parsed.pending_keys.len(), 3); + assert!( + parsed + .pending_keys + .contains(&"key_routing_failed".to_string()) + ); + assert!( + parsed + .pending_keys + .contains(&"key_from_failed_cell1".to_string()) + ); + assert!( + parsed + .pending_keys + .contains(&"key_from_failed_cell2".to_string()) + ); } } diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index e19ed2d..98eed1f 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -76,139 +76,139 @@ impl ProjectConfigsResponse { } } -#[cfg(test)] -mod tests { - use super::*; - use http_body_util::BodyExt; - use hyper::header::{CACHE_CONTROL, HeaderValue}; - - #[test] - fn test_response_serialization() { - let mut configs = HashMap::new(); - configs.insert( - "key1".to_string(), - serde_json::json!({ - "disabled": false, - "slug": "test-project" - }), - ); - - let response = ProjectConfigsResponse { - project_configs: configs, - pending_keys: vec!["key2".to_string()], - extra_fields: HashMap::new(), - http_headers: HeaderMap::new(), - }; - - let bytes = Bytes::from(serde_json::to_vec(&response).unwrap()); - let parsed = ProjectConfigsResponse::from_bytes(&bytes).unwrap(); - - assert_eq!(parsed.project_configs.len(), 1); - assert_eq!(parsed.pending_keys.len(), 1); - } - - #[tokio::test] - async fn test_response_merging() { - let mut results = ProjectConfigsResponse::new(); - - // Merge configs from multiple upstreams - let mut configs1 = HashMap::new(); - configs1.insert( - "key1".to_string(), - serde_json::json!({"disabled": false, "slug": "project1"}), - ); - - let mut configs2 = HashMap::new(); - configs2.insert( - "key2".to_string(), - serde_json::json!({"disabled": false, "slug": "project2"}), - ); - - results.project_configs.extend(configs1); - results.project_configs.extend(configs2); - - // Add pending keys - results - .pending_keys - .extend(vec!["key3".to_string(), "key4".to_string()]); - results.pending_keys.extend(vec!["key5".to_string()]); - - // Merge extra fields - let mut extra = HashMap::new(); - extra.insert( - "global".to_string(), - serde_json::json!({"measurements": {"maxCustomMeasurements": 10}}), - ); - extra.insert("global_status".to_string(), serde_json::json!("ready")); - results.extra_fields.extend(extra); - - let response = results.into_response().unwrap(); - let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); - let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); - - // Verify configs merged correctly - assert_eq!(parsed["configs"].as_object().unwrap().len(), 2); - assert!(parsed["configs"].get("key1").is_some()); - assert!(parsed["configs"].get("key2").is_some()); - - // Verify pending keys added correctly - assert_eq!(parsed["pending"].as_array().unwrap().len(), 3); - - // Verify extra fields merged correctly - assert_eq!( - parsed["global"]["measurements"]["maxCustomMeasurements"], - 10 - ); - assert_eq!(parsed["global_status"], "ready"); - } - - #[tokio::test] - async fn test_empty_pending_omitted() { - let results = ProjectConfigsResponse::new(); - let response = results.into_response().unwrap(); - let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); - let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); - - assert!(parsed.get("pending").is_none()); - } - - #[tokio::test] - async fn test_headers_forwarding() { - let mut results = ProjectConfigsResponse::new(); - - // Add upstream headers - results - .http_headers - .insert(CACHE_CONTROL, HeaderValue::from_static("max-age=300")); - results.http_headers.insert( - "X-Sentry-Rate-Limit-Remaining", - HeaderValue::from_static("100"), - ); - // Add hop-by-hop header that should be filtered - results.http_headers.insert( - hyper::header::CONNECTION, - HeaderValue::from_static("keep-alive"), - ); - - let response = results.into_response().unwrap(); - - // Verify headers are copied - assert_eq!( - response.headers().get(CACHE_CONTROL), - Some(&HeaderValue::from_static("max-age=300")) - ); - assert_eq!( - response.headers().get("X-Sentry-Rate-Limit-Remaining"), - Some(&HeaderValue::from_static("100")) - ); - - // Verify hop-by-hop headers are filtered out - assert!(response.headers().get(hyper::header::CONNECTION).is_none()); - - // Verify Content-Type is always set - assert_eq!( - response.headers().get(CONTENT_TYPE), - Some(&HeaderValue::from_static("application/json")) - ); - } -} +// #[cfg(test)] +// mod tests { +// use super::*; +// use http_body_util::BodyExt; +// use hyper::header::{CACHE_CONTROL, HeaderValue}; + +// #[test] +// fn test_response_serialization() { +// let mut configs = HashMap::new(); +// configs.insert( +// "key1".to_string(), +// serde_json::json!({ +// "disabled": false, +// "slug": "test-project" +// }), +// ); + +// let response = ProjectConfigsResponse { +// project_configs: configs, +// pending_keys: vec!["key2".to_string()], +// extra_fields: HashMap::new(), +// http_headers: HeaderMap::new(), +// }; + +// let bytes = Bytes::from(serde_json::to_vec(&response).unwrap()); +// let parsed = ProjectConfigsResponse::from_bytes(&bytes).unwrap(); + +// assert_eq!(parsed.project_configs.len(), 1); +// assert_eq!(parsed.pending_keys.len(), 1); +// } + +// #[tokio::test] +// async fn test_response_merging() { +// let mut results = ProjectConfigsResponse::new(); + +// // Merge configs from multiple upstreams +// let mut configs1 = HashMap::new(); +// configs1.insert( +// "key1".to_string(), +// serde_json::json!({"disabled": false, "slug": "project1"}), +// ); + +// let mut configs2 = HashMap::new(); +// configs2.insert( +// "key2".to_string(), +// serde_json::json!({"disabled": false, "slug": "project2"}), +// ); + +// results.project_configs.extend(configs1); +// results.project_configs.extend(configs2); + +// // Add pending keys +// results +// .pending_keys +// .extend(vec!["key3".to_string(), "key4".to_string()]); +// results.pending_keys.extend(vec!["key5".to_string()]); + +// // Merge extra fields +// let mut extra = HashMap::new(); +// extra.insert( +// "global".to_string(), +// serde_json::json!({"measurements": {"maxCustomMeasurements": 10}}), +// ); +// extra.insert("global_status".to_string(), serde_json::json!("ready")); +// results.extra_fields.extend(extra); + +// let response = results.into_response().unwrap(); +// let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); +// let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); + +// // Verify configs merged correctly +// assert_eq!(parsed["configs"].as_object().unwrap().len(), 2); +// assert!(parsed["configs"].get("key1").is_some()); +// assert!(parsed["configs"].get("key2").is_some()); + +// // Verify pending keys added correctly +// assert_eq!(parsed["pending"].as_array().unwrap().len(), 3); + +// // Verify extra fields merged correctly +// assert_eq!( +// parsed["global"]["measurements"]["maxCustomMeasurements"], +// 10 +// ); +// assert_eq!(parsed["global_status"], "ready"); +// } + +// #[tokio::test] +// async fn test_empty_pending_omitted() { +// let results = ProjectConfigsResponse::new(); +// let response = results.into_response().unwrap(); +// let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); +// let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); + +// assert!(parsed.get("pending").is_none()); +// } + +// #[tokio::test] +// async fn test_headers_forwarding() { +// let mut results = ProjectConfigsResponse::new(); + +// // Add upstream headers +// results +// .http_headers +// .insert(CACHE_CONTROL, HeaderValue::from_static("max-age=300")); +// results.http_headers.insert( +// "X-Sentry-Rate-Limit-Remaining", +// HeaderValue::from_static("100"), +// ); +// // Add hop-by-hop header that should be filtered +// results.http_headers.insert( +// hyper::header::CONNECTION, +// HeaderValue::from_static("keep-alive"), +// ); + +// let response = results.into_response().unwrap(); + +// // Verify headers are copied +// assert_eq!( +// response.headers().get(CACHE_CONTROL), +// Some(&HeaderValue::from_static("max-age=300")) +// ); +// assert_eq!( +// response.headers().get("X-Sentry-Rate-Limit-Remaining"), +// Some(&HeaderValue::from_static("100")) +// ); + +// // Verify hop-by-hop headers are filtered out +// assert!(response.headers().get(hyper::header::CONNECTION).is_none()); + +// // Verify Content-Type is always set +// assert_eq!( +// response.headers().get(CONTENT_TYPE), +// Some(&HeaderValue::from_static("application/json")) +// ); +// } +// } From 95b7a70aca71b72027d16a5bf7dd0dfb678dc133 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 22 Dec 2025 15:57:41 -0800 Subject: [PATCH 15/18] add todo --- ingest-router/src/project_config/handler.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index fbdae50..cbe72e3 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -110,6 +110,7 @@ impl Handler for ProjectConfigsHandler { responses: Vec<(CellId, Result, IngestRouterError>)>, metadata: SplitMetadata, ) -> Response { + // TODO: Consider refactoring to avoid runtime downcast let meta = metadata .downcast::() .unwrap_or(Box::new(ProjectConfigsMetadata::default())); From 4e856e3dc3993ff793b83154af4cb08ffc12ede0 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 22 Dec 2025 16:11:29 -0800 Subject: [PATCH 16/18] add error msg --- ingest-router/src/project_config/handler.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index cbe72e3..e39df6e 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -142,6 +142,11 @@ impl Handler for ProjectConfigsHandler { merged.project_configs.extend(parsed.project_configs); merged.extra_fields.extend(parsed.extra_fields); merged.pending_keys.extend(parsed.pending_keys); + } else { + tracing::error!( + cell_id = %cell_id, + "Failed to deserialize project configs response from cell" + ); } } From da9ab40d5ccd107036767d4b6fd7e07ed3e4925d Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 23 Dec 2025 14:05:48 -0800 Subject: [PATCH 17/18] remove commented tests --- ingest-router/src/project_config/protocol.rs | 137 ------------------- 1 file changed, 137 deletions(-) diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index 98eed1f..1728020 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -75,140 +75,3 @@ impl ProjectConfigsResponse { } } } - -// #[cfg(test)] -// mod tests { -// use super::*; -// use http_body_util::BodyExt; -// use hyper::header::{CACHE_CONTROL, HeaderValue}; - -// #[test] -// fn test_response_serialization() { -// let mut configs = HashMap::new(); -// configs.insert( -// "key1".to_string(), -// serde_json::json!({ -// "disabled": false, -// "slug": "test-project" -// }), -// ); - -// let response = ProjectConfigsResponse { -// project_configs: configs, -// pending_keys: vec!["key2".to_string()], -// extra_fields: HashMap::new(), -// http_headers: HeaderMap::new(), -// }; - -// let bytes = Bytes::from(serde_json::to_vec(&response).unwrap()); -// let parsed = ProjectConfigsResponse::from_bytes(&bytes).unwrap(); - -// assert_eq!(parsed.project_configs.len(), 1); -// assert_eq!(parsed.pending_keys.len(), 1); -// } - -// #[tokio::test] -// async fn test_response_merging() { -// let mut results = ProjectConfigsResponse::new(); - -// // Merge configs from multiple upstreams -// let mut configs1 = HashMap::new(); -// configs1.insert( -// "key1".to_string(), -// serde_json::json!({"disabled": false, "slug": "project1"}), -// ); - -// let mut configs2 = HashMap::new(); -// configs2.insert( -// "key2".to_string(), -// serde_json::json!({"disabled": false, "slug": "project2"}), -// ); - -// results.project_configs.extend(configs1); -// results.project_configs.extend(configs2); - -// // Add pending keys -// results -// .pending_keys -// .extend(vec!["key3".to_string(), "key4".to_string()]); -// results.pending_keys.extend(vec!["key5".to_string()]); - -// // Merge extra fields -// let mut extra = HashMap::new(); -// extra.insert( -// "global".to_string(), -// serde_json::json!({"measurements": {"maxCustomMeasurements": 10}}), -// ); -// extra.insert("global_status".to_string(), serde_json::json!("ready")); -// results.extra_fields.extend(extra); - -// let response = results.into_response().unwrap(); -// let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); -// let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); - -// // Verify configs merged correctly -// assert_eq!(parsed["configs"].as_object().unwrap().len(), 2); -// assert!(parsed["configs"].get("key1").is_some()); -// assert!(parsed["configs"].get("key2").is_some()); - -// // Verify pending keys added correctly -// assert_eq!(parsed["pending"].as_array().unwrap().len(), 3); - -// // Verify extra fields merged correctly -// assert_eq!( -// parsed["global"]["measurements"]["maxCustomMeasurements"], -// 10 -// ); -// assert_eq!(parsed["global_status"], "ready"); -// } - -// #[tokio::test] -// async fn test_empty_pending_omitted() { -// let results = ProjectConfigsResponse::new(); -// let response = results.into_response().unwrap(); -// let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); -// let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); - -// assert!(parsed.get("pending").is_none()); -// } - -// #[tokio::test] -// async fn test_headers_forwarding() { -// let mut results = ProjectConfigsResponse::new(); - -// // Add upstream headers -// results -// .http_headers -// .insert(CACHE_CONTROL, HeaderValue::from_static("max-age=300")); -// results.http_headers.insert( -// "X-Sentry-Rate-Limit-Remaining", -// HeaderValue::from_static("100"), -// ); -// // Add hop-by-hop header that should be filtered -// results.http_headers.insert( -// hyper::header::CONNECTION, -// HeaderValue::from_static("keep-alive"), -// ); - -// let response = results.into_response().unwrap(); - -// // Verify headers are copied -// assert_eq!( -// response.headers().get(CACHE_CONTROL), -// Some(&HeaderValue::from_static("max-age=300")) -// ); -// assert_eq!( -// response.headers().get("X-Sentry-Rate-Limit-Remaining"), -// Some(&HeaderValue::from_static("100")) -// ); - -// // Verify hop-by-hop headers are filtered out -// assert!(response.headers().get(hyper::header::CONNECTION).is_none()); - -// // Verify Content-Type is always set -// assert_eq!( -// response.headers().get(CONTENT_TYPE), -// Some(&HeaderValue::from_static("application/json")) -// ); -// } -// } From 41b66c204beb9018e0ceb689bb792d594f8e76b8 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 30 Dec 2025 12:20:48 -0800 Subject: [PATCH 18/18] refactor project configs merge --- ingest-router/src/project_config/handler.rs | 38 +++++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index e39df6e..3b821ea 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -118,11 +118,26 @@ impl Handler for ProjectConfigsHandler { let mut merged = ProjectConfigsResponse::new(); merged.pending_keys.extend(meta.unassigned_keys); - // Parts is populated from the first response. The responses are previously - // sorted so successful responses (if they exist) come first. + // Order the responses so successful ones come first + let sorted_responses = { + let mut sorted = responses; + sorted.sort_by_key(|(_, result)| match result { + Ok(r) if r.status().is_success() => 0, + Ok(_) => 1, + Err(_) => 2, + }); + sorted + }; + + // True if at least one response is ok + let has_successful_response = sorted_responses + .first() + .is_some_and(|(_, r)| r.as_ref().ok().is_some_and(|r| r.status().is_success())); + + // Parts is populated from the first response. let mut parts: Option = None; - for (cell_id, result) in responses { + for (cell_id, result) in sorted_responses { let successful_response = result.ok().filter(|r| r.status().is_success()); let Some(response) = successful_response else { @@ -152,14 +167,15 @@ impl Handler for ProjectConfigsHandler { let serialized_body = serialize_to_body(&merged); - if let (Some(mut p), Ok(body)) = (parts, serialized_body) { - normalize_headers(&mut p.headers, p.version); - p.headers - .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - - Response::from_parts(p, body) - } else { - make_error_response(StatusCode::INTERNAL_SERVER_ERROR) + match (has_successful_response, parts, serialized_body) { + (true, Some(mut p), Ok(body)) => { + normalize_headers(&mut p.headers, p.version); + p.headers + .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + return Response::from_parts(p, body); + } + (_, Some(p), _) => make_error_response(p.status), + (_, _, _) => make_error_response(StatusCode::BAD_GATEWAY), } } }