diff --git a/ingest-router/src/api.rs b/ingest-router/src/api.rs new file mode 100644 index 0000000..b26c5df --- /dev/null +++ b/ingest-router/src/api.rs @@ -0,0 +1,2 @@ +pub mod health; +pub mod utils; diff --git a/ingest-router/src/api/health.rs b/ingest-router/src/api/health.rs new file mode 100644 index 0000000..8101d95 --- /dev/null +++ b/ingest-router/src/api/health.rs @@ -0,0 +1,28 @@ +use crate::errors::IngestRouterError; +use crate::handler::{CellId, Handler, HandlerBody, SplitMetadata}; +use crate::locale::Cells; +use async_trait::async_trait; +use hyper::{Request, Response}; + +/// This endpoint returns success if any one upstream is available. +/// Synapse should continue to operate even if one cell is down. +pub struct HealthHandler {} + +#[async_trait] +impl Handler for HealthHandler { + async fn split_request( + &self, + _request: Request, + _cells: &Cells, + ) -> Result<(Vec<(CellId, Request)>, SplitMetadata), IngestRouterError> { + unimplemented!(); + } + + async fn merge_responses( + &self, + _responses: Vec<(CellId, Result, IngestRouterError>)>, + _metadata: SplitMetadata, + ) -> Response { + unimplemented!(); + } +} diff --git a/ingest-router/src/api/utils.rs b/ingest-router/src/api/utils.rs new file mode 100644 index 0000000..90f14ca --- /dev/null +++ b/ingest-router/src/api/utils.rs @@ -0,0 +1,35 @@ +use crate::errors::IngestRouterError; +use http::Version; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use hyper::header::HeaderMap; +use hyper::header::{CONTENT_LENGTH, TRANSFER_ENCODING}; +use serde::Serialize; +use serde::de::DeserializeOwned; +use shared::http::filter_hop_by_hop; + +pub type HandlerBody = BoxBody; + +/// Deserializes a JSON request body into the specified type. +pub async fn deserialize_body( + body: HandlerBody, +) -> Result { + let bytes = body.collect().await?.to_bytes(); + serde_json::from_slice(&bytes).map_err(|e| IngestRouterError::RequestBodyError(e.to_string())) +} + +/// Serializes a value to a JSON body. +pub fn serialize_to_body(value: &T) -> Result { + let bytes = serde_json::to_vec(value).map(Bytes::from)?; + Ok(Full::new(bytes).map_err(|e| match e {}).boxed()) +} + +/// Common header normalization for all requests and responses. +pub fn normalize_headers(headers: &mut HeaderMap, version: Version) -> &mut HeaderMap { + filter_hop_by_hop(headers, version); + headers.remove(CONTENT_LENGTH); + headers.remove(TRANSFER_ENCODING); + + headers +} diff --git a/ingest-router/src/errors.rs b/ingest-router/src/errors.rs index b546bd4..5bc4078 100644 --- a/ingest-router/src/errors.rs +++ b/ingest-router/src/errors.rs @@ -48,4 +48,7 @@ pub enum IngestRouterError { #[error("Locator client error: {0}")] LocatorClientError(#[from] locator::client::ClientError), + + #[error("Serde error: {0}")] + SerdeError(#[from] serde_json::Error), } diff --git a/ingest-router/src/handler.rs b/ingest-router/src/handler.rs index e39c716..ba7dc6e 100644 --- a/ingest-router/src/handler.rs +++ b/ingest-router/src/handler.rs @@ -1,47 +1,44 @@ use crate::errors::IngestRouterError; use crate::locale::Cells; use async_trait::async_trait; -use serde::{Serialize, de::DeserializeOwned}; +use http_body_util::combinators::BoxBody; +use hyper::body::Bytes; +use hyper::{Request, Response}; +use std::any::Any; pub type CellId = String; +pub type HandlerBody = BoxBody; +pub type SplitMetadata = Box; /// Handler for endpoints that split requests across cells and merge results /// /// The handler implements endpoint-specific logic: /// - How to split a request into per-cell requests /// - How to merge results from multiple cells -/// ``` #[async_trait] -pub trait Handler: Send + Sync -where - Req: Serialize + DeserializeOwned + Send, - Res: Serialize + DeserializeOwned + Send, -{ - /// Metadata that flows from split_requests to merge_results - /// - /// This allows passing data from the split phase to the merge phase. - /// Some use cases: - /// - Pending keys that couldn't be routed (e.g., `Vec`) - type SplitMetadata: Send; +pub trait Handler: Send + Sync { + /// Returns the type name of this handler for test assertions + fn type_name(&self) -> &'static str { + std::any::type_name::() + } /// Split one request into multiple per-cell requests /// /// This method routes the request data to appropriate cells and builds /// per-cell requests that will be sent to upstreams. - async fn split_requests( + async fn split_request( &self, - request: Req, + request: Request, cells: &Cells, - ) -> Result<(Vec<(CellId, Req)>, Self::SplitMetadata), IngestRouterError>; + ) -> Result<(Vec<(CellId, Request)>, SplitMetadata), IngestRouterError>; /// Merge results from multiple cells into a single response /// /// This method combines responses from successful cells, handles failures, /// and incorporates metadata from the split phase. - /// - fn merge_results( + async fn merge_responses( &self, - results: Vec>, - metadata: Self::SplitMetadata, - ) -> Res; + responses: Vec<(CellId, Result, IngestRouterError>)>, + metadata: SplitMetadata, + ) -> Response; } diff --git a/ingest-router/src/lib.rs b/ingest-router/src/lib.rs index 3d1c010..68a4c1c 100644 --- a/ingest-router/src/lib.rs +++ b/ingest-router/src/lib.rs @@ -1,3 +1,4 @@ +pub mod api; pub mod config; pub mod errors; pub mod handler; @@ -24,7 +25,7 @@ pub async fn run(config: config::Config) -> Result<(), IngestRouterError> { let locator = Locator::new(config.locator.to_client_config()).await?; let ingest_router_service = IngestRouterService { - router: router::Router::new(config.routes, locator), + router: router::Router::new(config.routes, config.locales, locator), }; let router_task = run_http_service( &config.listener.host, @@ -51,15 +52,25 @@ where Pin> + Send + 'static>>; fn call(&self, req: Request) -> Self::Future { - let handler = self - .router - .resolve(&req) - .and_then(|action| self.router.get_handler(action)); + let maybe_handler = self.router.resolve(&req); + + match maybe_handler { + Some((handler, cells)) => { + // Convert Request to Request + let (parts, body) = req.into_parts(); + let handler_body = body + .map_err(|e| IngestRouterError::RequestBodyError(e.to_string())) + .boxed(); + let handler_req = Request::from_parts(parts, handler_body); - match handler { - Some(_handler) => { // TODO: Placeholder response Box::pin(async move { + let (split, _metadata) = handler.split_request(handler_req, &cells).await?; + + for (cell_id, req) in split { + println!("Cell: {}, URI: {}", cell_id, req.uri()); + } + Ok(Response::new( Full::new("ok\n".into()).map_err(|e| match e {}).boxed(), )) @@ -74,13 +85,15 @@ where mod tests { use super::*; use crate::config::{HandlerAction, HttpMethod, Match, Route}; - use http_body_util::Empty; use hyper::Method; use hyper::body::Bytes; use hyper::header::HOST; + use crate::config::CellConfig; use locator::config::LocatorDataType; use locator::locator::Locator as LocatorService; + use std::collections::HashMap; + use url::Url; use crate::testutils::get_mock_provider; use std::sync::Arc; @@ -97,6 +110,15 @@ mod tests { locale: "us".to_string(), }]; + let locales = HashMap::from([( + "us".to_string(), + vec![CellConfig { + id: "us1".to_string(), + sentry_url: Url::parse("https://sentry.io/us1").unwrap(), + relay_url: Url::parse("https://relay.io/us1").unwrap(), + }], + )]); + let (_dir, provider) = get_mock_provider().await; let locator_service = LocatorService::new( LocatorDataType::ProjectKey, @@ -107,7 +129,7 @@ mod tests { let locator = Locator::from_in_process_service(locator_service); let service = IngestRouterService { - router: router::Router::new(routes_config, locator), + router: router::Router::new(routes_config, locales, locator), }; // Project configs request @@ -116,14 +138,16 @@ mod tests { .uri("/api/0/relays/projectconfigs/") .header(HOST, "us.sentry.io") .body( - Empty::::new() - .map_err(|never| match never {}) + Full::new(Bytes::from(r#"{"publicKeys": ["test-key"]}"#)) + .map_err(|e| match e {}) .boxed(), ) .unwrap(); let response = service.call(request).await.unwrap(); + // TODO: call the scripts/mock_relay_api.py server and validate the response + assert_eq!(response.status(), 200); } } diff --git a/ingest-router/src/locale.rs b/ingest-router/src/locale.rs index f00770d..eaccd6f 100644 --- a/ingest-router/src/locale.rs +++ b/ingest-router/src/locale.rs @@ -24,6 +24,7 @@ //! during request processing. use std::collections::HashMap; +use std::sync::Arc; use url::Url; use crate::config::CellConfig; @@ -79,7 +80,7 @@ impl Cells { #[derive(Clone, Debug)] pub struct Locales { /// Mapping from locale to cells - locale_to_cells: HashMap, + locale_to_cells: HashMap>, } impl Locales { @@ -89,7 +90,7 @@ impl Locales { let locale_to_cells = locales .into_iter() .map(|(locale, cells)| { - let cells = Cells::from_config(cells); + let cells = Arc::new(Cells::from_config(cells)); (locale, cells) }) .collect(); @@ -98,8 +99,8 @@ impl Locales { } /// Get the cells for a specific locale - pub fn get_cells(&self, locale: &str) -> Option<&Cells> { - self.locale_to_cells.get(locale) + pub fn get_cells(&self, locale: &str) -> Option> { + self.locale_to_cells.get(locale).cloned() } } diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index 9a596fc..3b821ea 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -1,13 +1,27 @@ //! Handler implementation for the Relay Project Configs endpoint +use crate::api::utils::{deserialize_body, normalize_headers, serialize_to_body}; use crate::errors::IngestRouterError; -use crate::handler::{CellId, Handler}; +use crate::handler::{CellId, Handler, HandlerBody, SplitMetadata}; use crate::locale::Cells; use crate::project_config::protocol::{ProjectConfigsRequest, ProjectConfigsResponse}; use async_trait::async_trait; +use http::StatusCode; +use http::response::Parts; +use hyper::header::{CONTENT_TYPE, HeaderValue}; +use hyper::{Request, Response}; use locator::client::Locator; +use shared::http::make_error_response; use std::collections::{HashMap, HashSet}; +#[derive(Default, Debug)] +struct ProjectConfigsMetadata { + // keys that are assigned to a cell + cell_to_keys: HashMap>, + // keys that couldn't be assigned to any cell + unassigned_keys: Vec, +} + /// Handler for the Relay Project Configs endpoint /// /// Routes public keys to cells using the locator service, splits requests @@ -24,22 +38,23 @@ impl ProjectConfigsHandler { } #[async_trait] -impl Handler for ProjectConfigsHandler { - /// Pending public keys that couldn't be routed to any cell - type SplitMetadata = Vec; - - async fn split_requests( +impl Handler for ProjectConfigsHandler { + async fn split_request( &self, - request: ProjectConfigsRequest, + request: Request, cells: &Cells, - ) -> Result<(Vec<(CellId, ProjectConfigsRequest)>, Vec), IngestRouterError> { - let public_keys = request.public_keys; - let extra_fields = request.extra_fields; + ) -> Result<(Vec<(CellId, Request)>, SplitMetadata), IngestRouterError> { + let (mut parts, body) = request.into_parts(); + let parsed: ProjectConfigsRequest = deserialize_body(body).await?; + normalize_headers(&mut parts.headers, parts.version); + + let public_keys = parsed.public_keys; + let extra_fields = parsed.extra_fields; let cell_ids: HashSet<&String> = cells.cell_list.iter().collect(); // Route each public key to its owning cell using the locator service - let mut split: HashMap> = HashMap::new(); + let mut cell_to_keys: HashMap> = HashMap::new(); let mut pending: Vec = Vec::new(); for public_key in public_keys { @@ -55,7 +70,7 @@ impl Handler for ProjectConfigsHa continue; } - split.entry(cell_id).or_default().push(public_key); + cell_to_keys.entry(cell_id).or_default().push(public_key); } Err(e) => { // Locator errors, add to pending @@ -69,63 +84,99 @@ impl Handler for ProjectConfigsHa } } - // Build per-cell requests - let cell_requests: Vec<(CellId, ProjectConfigsRequest)> = split - .into_iter() + let cell_requests = cell_to_keys + .iter() .map(|(cell_id, keys)| { - ( - cell_id, - ProjectConfigsRequest { - public_keys: keys, - extra_fields: extra_fields.clone(), - }, - ) + let project_configs_request = ProjectConfigsRequest { + public_keys: keys.clone(), + extra_fields: extra_fields.clone(), + }; + + let body = serialize_to_body(&project_configs_request)?; + let req = Request::from_parts(parts.clone(), body); + Ok((cell_id.into(), req)) }) - .collect(); + .collect::>()?; - Ok((cell_requests, pending)) + let metadata = Box::new(ProjectConfigsMetadata { + cell_to_keys, + unassigned_keys: pending, + }); + Ok((cell_requests, metadata)) } - fn merge_results( + async fn merge_responses( &self, - results: Vec>, - pending_from_split: Vec, - ) -> ProjectConfigsResponse { - // TODO: The current implementation does not handle errors from the results - // parameter. The edge case to be handled are if any of the upstreams failed - // to return a response for whatever reason. In scenarios like this, the - // executor needs to provide all the project config keys which failed to - // resolve on the upstream. We would need to add those project keys to the - // pending response. + responses: Vec<(CellId, Result, IngestRouterError>)>, + metadata: SplitMetadata, + ) -> Response { + // TODO: Consider refactoring to avoid runtime downcast + let meta = metadata + .downcast::() + .unwrap_or(Box::new(ProjectConfigsMetadata::default())); let mut merged = ProjectConfigsResponse::new(); + merged.pending_keys.extend(meta.unassigned_keys); + + // Order the responses so successful ones come first + let sorted_responses = { + let mut sorted = responses; + sorted.sort_by_key(|(_, result)| match result { + Ok(r) if r.status().is_success() => 0, + Ok(_) => 1, + Err(_) => 2, + }); + sorted + }; - // Add pending keys from split phase - merged.pending_keys.extend(pending_from_split); - - // Results are provided pre-sorted by cell priority (highest first) - // The executor ensures results are ordered so we can use the first successful response - // for extra_fields and headers. - // Failed cells are handled by the executor adding their keys to pending_from_split. - let mut iter = results.into_iter().flatten(); - - // Handle first successful result (highest priority) - // Gets extra_fields, headers, configs, and pending - if let Some((_cell_id, response)) = iter.next() { - merged.project_configs.extend(response.project_configs); - merged.pending_keys.extend(response.pending_keys); - merged.extra_fields.extend(response.extra_fields); - merged.http_headers = response.http_headers; - } + // True if at least one response is ok + let has_successful_response = sorted_responses + .first() + .is_some_and(|(_, r)| r.as_ref().ok().is_some_and(|r| r.status().is_success())); + + // Parts is populated from the first response. + let mut parts: Option = None; + + for (cell_id, result) in sorted_responses { + let successful_response = result.ok().filter(|r| r.status().is_success()); + + let Some(response) = successful_response else { + // Any failure adds the cell's keys to pending + if let Some(keys) = meta.cell_to_keys.get(&cell_id) { + merged.pending_keys.extend(keys.clone()); + } + continue; + }; - // Handle remaining results - // Only get configs and pending (not extra_fields or headers) - for (_cell_id, response) in iter { - merged.project_configs.extend(response.project_configs); - merged.pending_keys.extend(response.pending_keys); + let (p, body) = response.into_parts(); + if parts.is_none() { + parts = Some(p); + } + + if let Ok(parsed) = deserialize_body::(body).await { + merged.project_configs.extend(parsed.project_configs); + merged.extra_fields.extend(parsed.extra_fields); + merged.pending_keys.extend(parsed.pending_keys); + } else { + tracing::error!( + cell_id = %cell_id, + "Failed to deserialize project configs response from cell" + ); + } } - merged + let serialized_body = serialize_to_body(&merged); + + match (has_successful_response, parts, serialized_body) { + (true, Some(mut p), Ok(body)) => { + normalize_headers(&mut p.headers, p.version); + p.headers + .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + return Response::from_parts(p, body); + } + (_, Some(p), _) => make_error_response(p.status), + (_, _, _) => make_error_response(StatusCode::BAD_GATEWAY), + } } } @@ -159,7 +210,7 @@ mod tests { } } - fn create_test_locator(key_to_cell: HashMap) -> Locator { + async fn create_test_locator(key_to_cell: HashMap) -> Locator { let route_data = RouteData::from( key_to_cell, "cursor".to_string(), @@ -177,17 +228,8 @@ mod tests { provider, None, ); - Locator::from_in_process_service(service) - } - #[tokio::test] - async fn test_split_requests_multiple_cells() { - let key_to_cell = HashMap::from([ - ("key1".to_string(), "us1".to_string()), - ("key2".to_string(), "us2".to_string()), - ("key3".to_string(), "us1".to_string()), - ]); - let locator = create_test_locator(key_to_cell); + let locator = Locator::from_in_process_service(service); // Wait for locator to be ready for _ in 0..50 { @@ -198,6 +240,32 @@ mod tests { } assert!(locator.is_ready(), "Locator should be ready"); + locator + } + + fn build_request(project_configs_request: ProjectConfigsRequest) -> Request { + let body = serialize_to_body(&project_configs_request).unwrap(); + Request::builder() + .method("POST") + .uri("/api/0/relays/projectconfigs/") + .body(body) + .unwrap() + } + + fn build_response(project_configs_response: serde_json::Value) -> Response { + let body = serialize_to_body(&project_configs_response).unwrap(); + Response::builder().status(200).body(body).unwrap() + } + + #[tokio::test] + async fn test_split_request_multiple_cells() { + let key_to_cell = HashMap::from([ + ("key1".to_string(), "us1".to_string()), + ("key2".to_string(), "us2".to_string()), + ("key3".to_string(), "us1".to_string()), + ]); + let locator = create_test_locator(key_to_cell).await; + let locales = HashMap::from([( "us".to_string(), vec![ @@ -222,55 +290,82 @@ mod tests { let mut extra = HashMap::new(); extra.insert("global".to_string(), serde_json::json!(true)); - let request = ProjectConfigsRequest { + let request = build_request(ProjectConfigsRequest { public_keys: vec!["key1".to_string(), "key2".to_string(), "key3".to_string()], extra_fields: extra.clone(), - }; + }); - let (cell_requests, pending) = handler.split_requests(request, cells).await.unwrap(); + let (cell_requests, _metadata) = handler.split_request(request, &cells).await.unwrap(); // Should have 2 cell requests (us1 and us2) assert_eq!(cell_requests.len(), 2); - assert_eq!(pending.len(), 0); - // Find us1 and us2 requests - let us1_req = cell_requests - .iter() + // Find us1 and us2 requests and parse their bodies + let (us1_id, us1_req) = cell_requests + .into_iter() .find(|(id, _)| id == "us1") - .map(|(_, req)| req) .unwrap(); - let us2_req = cell_requests - .iter() + + let us1_body: ProjectConfigsRequest = deserialize_body(us1_req.into_body()).await.unwrap(); + + let key_to_cell = HashMap::from([ + ("key1".to_string(), "us1".to_string()), + ("key2".to_string(), "us2".to_string()), + ("key3".to_string(), "us1".to_string()), + ]); + let locator = create_test_locator(key_to_cell).await; + let handler = ProjectConfigsHandler::new(locator); + let locales = HashMap::from([( + "us".to_string(), + vec![ + CellConfig { + id: "us1".to_string(), + sentry_url: Url::parse("http://sentry-us1:8080").unwrap(), + relay_url: Url::parse("http://relay-us1:8090").unwrap(), + }, + CellConfig { + id: "us2".to_string(), + sentry_url: Url::parse("http://sentry-us2:8080").unwrap(), + relay_url: Url::parse("http://relay-us2:8090").unwrap(), + }, + ], + )]); + let locales_obj = Locales::new(locales); + let cells = locales_obj.get_cells("us").unwrap(); + let request2 = build_request(ProjectConfigsRequest { + public_keys: vec!["key1".to_string(), "key2".to_string(), "key3".to_string()], + extra_fields: extra.clone(), + }); + let (cell_requests2, metadata) = handler.split_request(request2, &cells).await.unwrap(); + let (_, us2_req) = cell_requests2 + .into_iter() .find(|(id, _)| id == "us2") - .map(|(_, req)| req) .unwrap(); + let us2_body: ProjectConfigsRequest = deserialize_body(us2_req.into_body()).await.unwrap(); + // Verify us1 has key1 and key3 - assert_eq!(us1_req.public_keys.len(), 2); - assert!(us1_req.public_keys.contains(&"key1".to_string())); - assert!(us1_req.public_keys.contains(&"key3".to_string())); - assert_eq!(us1_req.extra_fields, extra); + assert_eq!(us1_id, "us1"); + assert_eq!(us1_body.public_keys.len(), 2); + assert!(us1_body.public_keys.contains(&"key1".to_string())); + assert!(us1_body.public_keys.contains(&"key3".to_string())); + assert_eq!(us1_body.extra_fields, extra); // Verify us2 has key2 - assert_eq!(us2_req.public_keys.len(), 1); - assert!(us2_req.public_keys.contains(&"key2".to_string())); - assert_eq!(us2_req.extra_fields, extra); + assert_eq!(us2_body.public_keys.len(), 1); + assert!(us2_body.public_keys.contains(&"key2".to_string())); + assert_eq!(us2_body.extra_fields, extra); + + let meta = metadata + .downcast::() + .unwrap_or(Box::new(ProjectConfigsMetadata::default())); + assert!(meta.unassigned_keys.is_empty()); } #[tokio::test] - async fn test_split_requests_unknown_key_goes_to_pending() { + async fn test_split_request_unknown_key_goes_to_pending() { let key_to_cell = HashMap::from([("key1".to_string(), "us1".to_string())]); - let locator = create_test_locator(key_to_cell); - - // Wait for locator to be ready - for _ in 0..50 { - if locator.is_ready() { - break; - } - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } - assert!(locator.is_ready(), "Locator should be ready"); - + let locator = create_test_locator(key_to_cell).await; let locales = HashMap::from([( "us".to_string(), vec![CellConfig { @@ -285,26 +380,28 @@ mod tests { let handler = ProjectConfigsHandler::new(locator); - let request = ProjectConfigsRequest { + let request = build_request(ProjectConfigsRequest { public_keys: vec!["key1".to_string(), "unknown_key".to_string()], extra_fields: HashMap::new(), - }; + }); - let (cell_requests, pending) = handler.split_requests(request, cells).await.unwrap(); + let (cell_requests, metadata) = handler.split_request(request, &cells).await.unwrap(); // Should have 1 cell request (us1 with key1) assert_eq!(cell_requests.len(), 1); assert_eq!(cell_requests[0].0, "us1"); - assert_eq!(cell_requests[0].1.public_keys, vec!["key1".to_string()]); - // Unknown key should be in pending metadata - assert_eq!(pending.len(), 1); - assert_eq!(pending[0], "unknown_key"); + // unknown key in metadata + let meta = metadata + .downcast::() + .unwrap_or(Box::new(ProjectConfigsMetadata::default())); + assert_eq!(meta.unassigned_keys, Vec::from(["unknown_key".to_string()])); } #[tokio::test] async fn test_merge_results_successful_cells() { - let handler = ProjectConfigsHandler::new(create_test_locator(HashMap::new())); + let locator = create_test_locator(HashMap::new()).await; + let handler = ProjectConfigsHandler::new(locator); // Create response from us1 with key1 and global config let response1_json = serde_json::json!({ @@ -313,7 +410,7 @@ mod tests { }, "global": {"version": 1} }); - let response1 = serde_json::from_value(response1_json).unwrap(); + let response1 = build_response(response1_json); // Create response from us2 with key2 and different global config let response2_json = serde_json::json!({ @@ -322,42 +419,36 @@ mod tests { }, "global": {"version": 2} }); - let response2 = serde_json::from_value(response2_json).unwrap(); + let response2 = build_response(response2_json); let results = vec![ - Ok(("us1".to_string(), response1)), - Ok(("us2".to_string(), response2)), + ("us1".to_string(), Ok(response1)), + ("us2".to_string(), Ok(response2)), ]; - let merged = handler.merge_results(results, vec![]); + let metadata: SplitMetadata = Box::new(Vec::::new()); + let merged = handler.merge_responses(results, metadata).await; - // Should have configs from both cells - let json = serde_json::to_value(&merged).unwrap(); - assert_eq!(json["configs"].as_object().unwrap().len(), 2); - assert!(json["configs"].get("key1").is_some()); - assert!(json["configs"].get("key2").is_some()); + let parsed: ProjectConfigsResponse = deserialize_body(merged.into_body()).await.unwrap(); - // Should use global from first result (executor ensures proper ordering) - assert_eq!(json["global"]["version"], 1); + assert!(parsed.project_configs.contains_key("key1")); + assert!(parsed.project_configs.contains_key("key2")); } #[tokio::test] - async fn test_merge_results_with_pending() { - let handler = ProjectConfigsHandler::new(create_test_locator(HashMap::new())); + async fn test_merge_responses_with_pending() { + let locator = create_test_locator(HashMap::new()).await; + let handler = ProjectConfigsHandler::new(locator); - // Test all three sources of pending keys: - // 1. From split phase (routing failures, unknown keys) - // 2. From upstream response (async computation) - // 3. From failed cells (added by executor) + // Test pending keys from split phase (routing failures, unknown keys) - // Create response from us1 with successful config and upstream pending + // Create response from us1 with successful config let response1_json = serde_json::json!({ "configs": { "key1": {"slug": "project1"} - }, - "pending": ["key_upstream_pending"] + } }); - let response1 = serde_json::from_value(response1_json).unwrap(); + let response1 = build_response(response1_json); // Create response from us2 with successful config let response2_json = serde_json::json!({ @@ -365,35 +456,45 @@ mod tests { "key2": {"slug": "project2"} } }); - let response2 = serde_json::from_value(response2_json).unwrap(); - - let results = vec![ - Ok(("us1".to_string(), response1)), - Ok(("us2".to_string(), response2)), - ]; + let response2 = build_response(response2_json); - // Pending from split phase (routing failures) and failed cells (executor-added) - let pending_from_split = vec![ - "key_routing_failed".to_string(), - "key_from_failed_cell1".to_string(), - "key_from_failed_cell2".to_string(), + let results: Vec<(CellId, Result, IngestRouterError>)> = vec![ + ("us1".to_string(), Ok(response1)), + ("us2".to_string(), Ok(response2)), ]; - let merged = handler.merge_results(results, pending_from_split); + // Pending from split phase (routing failures) + let pending_from_split: ProjectConfigsMetadata = ProjectConfigsMetadata { + cell_to_keys: HashMap::new(), + unassigned_keys: vec![ + "key_routing_failed".to_string(), + "key_from_failed_cell1".to_string(), + "key_from_failed_cell2".to_string(), + ], + }; - let json = serde_json::to_value(&merged).unwrap(); + let metadata: SplitMetadata = Box::new(pending_from_split); + let merged = handler.merge_responses(results, metadata).await; - // Should have configs from both successful cells - assert_eq!(json["configs"].as_object().unwrap().len(), 2); - assert!(json["configs"].get("key1").is_some()); - assert!(json["configs"].get("key2").is_some()); + // Parse merged response body so we can assert on pending keys + let parsed: ProjectConfigsResponse = deserialize_body(merged.into_body()).await.unwrap(); - // Should have all pending keys from all three sources - let pending = json["pending"].as_array().unwrap(); - assert_eq!(pending.len(), 4); - assert!(pending.contains(&serde_json::json!("key_routing_failed"))); - assert!(pending.contains(&serde_json::json!("key_from_failed_cell1"))); - assert!(pending.contains(&serde_json::json!("key_from_failed_cell2"))); - assert!(pending.contains(&serde_json::json!("key_upstream_pending"))); + // Should have pending keys from split phase + assert_eq!(parsed.pending_keys.len(), 3); + assert!( + parsed + .pending_keys + .contains(&"key_routing_failed".to_string()) + ); + assert!( + parsed + .pending_keys + .contains(&"key_from_failed_cell1".to_string()) + ); + assert!( + parsed + .pending_keys + .contains(&"key_from_failed_cell2".to_string()) + ); } } diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index 8bd9a62..1728020 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -5,14 +5,8 @@ //! //! See the module-level documentation in `mod.rs` for complete protocol details. -use crate::errors::IngestRouterError; -use http_body_util::{BodyExt, Full, combinators::BoxBody}; -use hyper::body::Bytes; -use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE, HeaderMap}; -use hyper::{Response, StatusCode}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; -use shared::http::filter_hop_by_hop; use std::collections::HashMap; /// Request format for the relay project configs endpoint. @@ -25,7 +19,6 @@ use std::collections::HashMap; /// "global": true /// } /// ``` -#[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProjectConfigsRequest { /// DSN public keys to fetch configs for. @@ -38,18 +31,6 @@ pub struct ProjectConfigsRequest { pub extra_fields: HashMap, } -#[allow(dead_code)] -impl ProjectConfigsRequest { - pub fn from_bytes(bytes: &Bytes) -> Result { - serde_json::from_slice(bytes) - } - - pub fn to_bytes(&self) -> Result { - let json = serde_json::to_vec(self)?; - Ok(Bytes::from(json)) - } -} - /// Response format for the relay project configs endpoint. /// /// # Example @@ -70,7 +51,6 @@ impl ProjectConfigsRequest { /// "global_status": "ready" /// } /// ``` -#[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProjectConfigsResponse { /// Project configs (HashMap merged from all upstreams). @@ -84,216 +64,14 @@ pub struct ProjectConfigsResponse { /// Other fields (`global`, `global_status`, future fields). #[serde(flatten)] pub extra_fields: HashMap, - - /// HTTP headers from the highest priority upstream (not serialized). - #[serde(skip)] - pub http_headers: HeaderMap, } -#[allow(dead_code)] impl ProjectConfigsResponse { pub fn new() -> Self { Self { project_configs: HashMap::new(), pending_keys: Vec::new(), extra_fields: HashMap::new(), - http_headers: HeaderMap::new(), - } - } - - pub fn from_bytes(bytes: &Bytes) -> Result { - let mut response: Self = serde_json::from_slice(bytes)?; - response.http_headers = HeaderMap::new(); - Ok(response) - } - - /// Builds an HTTP response from the merged results. - /// Filters out hop-by-hop headers and Content-Length (which is recalculated for the new body). - pub fn into_response( - mut self, - ) -> Result>, IngestRouterError> { - let merged_json = serde_json::to_vec(&self) - .map_err(|e| IngestRouterError::ResponseSerializationError(e.to_string()))?; - - filter_hop_by_hop(&mut self.http_headers, hyper::Version::HTTP_11); - self.http_headers.remove(CONTENT_LENGTH); - - let mut builder = Response::builder().status(StatusCode::OK); - for (name, value) in self.http_headers.iter() { - builder = builder.header(name, value); } - - builder = builder.header(CONTENT_TYPE, "application/json"); - builder - .body( - Full::new(Bytes::from(merged_json)) - .map_err(|e| match e {}) - .boxed(), - ) - .map_err(|e| IngestRouterError::HyperError(e.to_string())) - } -} - -impl Default for ProjectConfigsResponse { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use http_body_util::BodyExt; - use hyper::header::{CACHE_CONTROL, HeaderValue}; - - #[test] - fn test_request_serialization() { - let mut extra = HashMap::new(); - extra.insert("global".to_string(), serde_json::json!(true)); - extra.insert("noCache".to_string(), serde_json::json!(false)); - - let request = ProjectConfigsRequest { - public_keys: vec!["key1".to_string(), "key2".to_string()], - extra_fields: extra, - }; - - let bytes = request.to_bytes().unwrap(); - let parsed = ProjectConfigsRequest::from_bytes(&bytes).unwrap(); - - assert_eq!(parsed.public_keys.len(), 2); - assert_eq!( - parsed.extra_fields.get("global"), - Some(&serde_json::json!(true)) - ); - } - - #[test] - fn test_response_serialization() { - let mut configs = HashMap::new(); - configs.insert( - "key1".to_string(), - serde_json::json!({ - "disabled": false, - "slug": "test-project" - }), - ); - - let response = ProjectConfigsResponse { - project_configs: configs, - pending_keys: vec!["key2".to_string()], - extra_fields: HashMap::new(), - http_headers: HeaderMap::new(), - }; - - let bytes = Bytes::from(serde_json::to_vec(&response).unwrap()); - let parsed = ProjectConfigsResponse::from_bytes(&bytes).unwrap(); - - assert_eq!(parsed.project_configs.len(), 1); - assert_eq!(parsed.pending_keys.len(), 1); - } - - #[tokio::test] - async fn test_response_merging() { - let mut results = ProjectConfigsResponse::new(); - - // Merge configs from multiple upstreams - let mut configs1 = HashMap::new(); - configs1.insert( - "key1".to_string(), - serde_json::json!({"disabled": false, "slug": "project1"}), - ); - - let mut configs2 = HashMap::new(); - configs2.insert( - "key2".to_string(), - serde_json::json!({"disabled": false, "slug": "project2"}), - ); - - results.project_configs.extend(configs1); - results.project_configs.extend(configs2); - - // Add pending keys - results - .pending_keys - .extend(vec!["key3".to_string(), "key4".to_string()]); - results.pending_keys.extend(vec!["key5".to_string()]); - - // Merge extra fields - let mut extra = HashMap::new(); - extra.insert( - "global".to_string(), - serde_json::json!({"measurements": {"maxCustomMeasurements": 10}}), - ); - extra.insert("global_status".to_string(), serde_json::json!("ready")); - results.extra_fields.extend(extra); - - let response = results.into_response().unwrap(); - let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); - let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); - - // Verify configs merged correctly - assert_eq!(parsed["configs"].as_object().unwrap().len(), 2); - assert!(parsed["configs"].get("key1").is_some()); - assert!(parsed["configs"].get("key2").is_some()); - - // Verify pending keys added correctly - assert_eq!(parsed["pending"].as_array().unwrap().len(), 3); - - // Verify extra fields merged correctly - assert_eq!( - parsed["global"]["measurements"]["maxCustomMeasurements"], - 10 - ); - assert_eq!(parsed["global_status"], "ready"); - } - - #[tokio::test] - async fn test_empty_pending_omitted() { - let results = ProjectConfigsResponse::new(); - let response = results.into_response().unwrap(); - let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); - let parsed: JsonValue = serde_json::from_slice(&body_bytes).unwrap(); - - assert!(parsed.get("pending").is_none()); - } - - #[tokio::test] - async fn test_headers_forwarding() { - let mut results = ProjectConfigsResponse::new(); - - // Add upstream headers - results - .http_headers - .insert(CACHE_CONTROL, HeaderValue::from_static("max-age=300")); - results.http_headers.insert( - "X-Sentry-Rate-Limit-Remaining", - HeaderValue::from_static("100"), - ); - // Add hop-by-hop header that should be filtered - results.http_headers.insert( - hyper::header::CONNECTION, - HeaderValue::from_static("keep-alive"), - ); - - let response = results.into_response().unwrap(); - - // Verify headers are copied - assert_eq!( - response.headers().get(CACHE_CONTROL), - Some(&HeaderValue::from_static("max-age=300")) - ); - assert_eq!( - response.headers().get("X-Sentry-Rate-Limit-Remaining"), - Some(&HeaderValue::from_static("100")) - ); - - // Verify hop-by-hop headers are filtered out - assert!(response.headers().get(hyper::header::CONNECTION).is_none()); - - // Verify Content-Type is always set - assert_eq!( - response.headers().get(CONTENT_TYPE), - Some(&HeaderValue::from_static("application/json")) - ); } } diff --git a/ingest-router/src/router.rs b/ingest-router/src/router.rs index c15e95c..2fff5fb 100644 --- a/ingest-router/src/router.rs +++ b/ingest-router/src/router.rs @@ -1,5 +1,8 @@ -use crate::config::{HandlerAction, Route}; -use crate::project_config::handler::ProjectConfigsHandler; +use crate::api::health::HealthHandler; +use crate::config::{CellConfig, HandlerAction, Route}; +use crate::handler::Handler; +use crate::locale::{Cells, Locales}; +use crate::project_config::ProjectConfigsHandler; use hyper::Request; use locator::client::Locator; use std::collections::HashMap; @@ -8,37 +11,42 @@ use std::sync::Arc; /// Router that matches incoming requests against configured routes pub struct Router { routes: Arc>, - // TODO: Fix type to be generic - action_to_handler: HashMap, + action_to_handler: HashMap>, + locales_to_cells: Locales, } impl Router { /// Creates a new router with the given routes - pub fn new(routes: Vec, locator: Locator) -> Self { - let mut action_to_handler = HashMap::new(); - - action_to_handler.insert( - HandlerAction::RelayProjectConfigs, - ProjectConfigsHandler::new(locator), - ); + pub fn new( + routes: Vec, + locales: HashMap>, + locator: Locator, + ) -> Self { + let action_to_handler = HashMap::from([ + ( + HandlerAction::RelayProjectConfigs, + Arc::new(ProjectConfigsHandler::new(locator)) as Arc, + ), + (HandlerAction::Health, Arc::new(HealthHandler {})), + ]); Self { routes: Arc::new(routes), action_to_handler, + locales_to_cells: Locales::new(locales), } } /// Finds the first route that matches the incoming request - pub fn resolve(&self, req: &Request) -> Option<&HandlerAction> { + pub fn resolve(&self, req: &Request) -> Option<(Arc, Arc)> { self.routes .iter() .find(|route| self.matches_route(req, route)) - .map(|route| &route.action) - } - - // TODO: Fix return type so we can deal with other handler types here as well. - pub fn get_handler(&self, action: &HandlerAction) -> Option<&ProjectConfigsHandler> { - self.action_to_handler.get(action) + .and_then(|route| { + let cells = self.locales_to_cells.get_cells(&route.locale)?; + let handler = self.action_to_handler.get(&route.action)?.clone(); + Some((handler, cells)) + }) } /// Checks if a request matches a route's criteria @@ -92,6 +100,7 @@ mod tests { use hyper::{Method, Request}; use locator::config::LocatorDataType; use locator::locator::Locator as LocatorService; + use url::Url; async fn test_router(routes: Option>) -> Router { let default_routes = vec![ @@ -117,6 +126,15 @@ mod tests { let routes = routes.unwrap_or(default_routes); + let locales = HashMap::from([( + "us".to_string(), + vec![CellConfig { + id: "us1".to_string(), + sentry_url: Url::parse("https://sentry.io/us1").unwrap(), + relay_url: Url::parse("https://relay.io/us1").unwrap(), + }], + )]); + let (_dir, provider) = get_mock_provider().await; let locator_service = LocatorService::new( LocatorDataType::ProjectKey, @@ -126,7 +144,7 @@ mod tests { ); let locator = Locator::from_in_process_service(locator_service); - Router::new(routes, locator) + Router::new(routes, locales, locator) } fn test_request( @@ -153,14 +171,13 @@ mod tests { // Should match first route let req = test_request(Method::POST, "/api/test", Some("api.example.com")); - assert_eq!( - router.resolve(&req), - Some(&HandlerAction::RelayProjectConfigs) - ); + let (handler, _cells) = router.resolve(&req).unwrap(); + assert!(handler.type_name().contains("ProjectConfigsHandler")); // Should match second route let req = test_request(Method::GET, "/health", None); - assert_eq!(router.resolve(&req), Some(&HandlerAction::Health)); + let (handler, _cells) = router.resolve(&req).unwrap(); + assert!(handler.type_name().contains("HealthHandler")); } #[tokio::test] @@ -168,7 +185,7 @@ mod tests { let router = test_router(None).await; let req = test_request(Method::GET, "/different", None); - assert_eq!(router.resolve(&req), None); + assert!(router.resolve(&req).is_none()); } #[tokio::test] @@ -187,10 +204,8 @@ mod tests { // Should strip port and match let req = test_request(Method::GET, "/test", Some("api.example.com:8080")); - assert_eq!( - router.resolve(&req), - Some(&HandlerAction::RelayProjectConfigs) - ); + let (handler, _cells) = router.resolve(&req).unwrap(); + assert!(handler.type_name().contains("ProjectConfigsHandler")); } #[tokio::test] @@ -209,13 +224,11 @@ mod tests { // POST should match let req = test_request(Method::POST, "/api/test", None); - assert_eq!( - router.resolve(&req), - Some(&HandlerAction::RelayProjectConfigs) - ); + let (handler, _cells) = router.resolve(&req).unwrap(); + assert!(handler.type_name().contains("ProjectConfigsHandler")); // GET should not match let req = test_request(Method::GET, "/api/test", None); - assert_eq!(router.resolve(&req), None); + assert!(router.resolve(&req).is_none()); } }