diff --git a/Makefile b/Makefile index d9694fa..d9423d5 100644 --- a/Makefile +++ b/Makefile @@ -106,6 +106,10 @@ run-mock-control-api: python scripts/mock_control_api.py .PHONY: run-mock-control-api +run-mock-relay-api: + python scripts/mock_relay_api.py +.PHONY: run-mock-relay-api + # CI-like checks (what runs in GitHub Actions) ci: fmt-check lint test build @echo "All CI checks passed!" diff --git a/ingest-router/src/config.rs b/ingest-router/src/config.rs index 2cebbdd..18da577 100644 --- a/ingest-router/src/config.rs +++ b/ingest-router/src/config.rs @@ -66,17 +66,17 @@ pub struct RelayTimeouts { /// HTTP request timeout for individual upstream calls (seconds). /// This is the maximum time a single HTTP request can take. /// Default: 15 seconds - pub http_timeout_secs: u16, + pub http_timeout_secs: u64, /// Task timeout when waiting for the first upstream to respond (seconds). /// Must be >= http_timeout_secs to allow HTTP requests to complete. /// Default: 20 seconds - pub task_initial_timeout_secs: u16, + pub task_initial_timeout_secs: u64, /// Deadline for all remaining tasks after first success (seconds). /// Aggressively cuts off slow upstreams once we have good data. /// Default: 5 seconds - pub task_subsequent_timeout_secs: u16, + pub task_subsequent_timeout_secs: u64, } impl Default for RelayTimeouts { diff --git a/ingest-router/src/executor.rs b/ingest-router/src/executor.rs new file mode 100644 index 0000000..680c3f2 --- /dev/null +++ b/ingest-router/src/executor.rs @@ -0,0 +1,162 @@ +use crate::config::RelayTimeouts; +use crate::errors::IngestRouterError; +use crate::handler::{CellId, Handler, HandlerBody}; +use crate::http::send_to_upstream; +use crate::locale::Cells; +use http::StatusCode; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use hyper::{Request, Response}; +use hyper_util::client::legacy::Client; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::rt::TokioExecutor; +use shared::http::make_error_response; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::task::JoinSet; +use tokio::time::{Duration, sleep}; + +#[derive(Clone)] +pub struct Executor { + client: Client>, + timeouts: RelayTimeouts, +} + +impl Executor { + pub fn new(timeouts: RelayTimeouts) -> Self { + let client = Client::builder(TokioExecutor::new()).build(HttpConnector::new()); + Self { client, timeouts } + } + + // Splits, executes, and merges the responses using the provided handler. + pub async fn execute( + &self, + handler: Arc, + request: Request, + cells: Cells, + ) -> Response { + let (split_requests, metadata) = match handler.split_request(request, &cells).await { + Ok(result) => result, + Err(_e) => return make_error_response(StatusCode::INTERNAL_SERVER_ERROR), + }; + + let results = self.execute_parallel(split_requests, cells).await; + + handler.merge_responses(results, metadata).await + } + + /// Execute split requests in parallel against their cell upstreams + async fn execute_parallel( + &self, + requests: Vec<(CellId, Request)>, + cells: Cells, + ) -> Vec<(CellId, Result, IngestRouterError>)> { + let mut join_set = JoinSet::new(); + + let mut pending_cells = HashSet::new(); + + // Spawn requests for each cell + for (cell_id, request) in requests { + let cells = cells.clone(); + let client = self.client.clone(); + let timeout_secs = self.timeouts.http_timeout_secs; + + pending_cells.insert(cell_id.clone()); + join_set.spawn(async move { + let result = send_to_cell(&client, &cell_id, request, &cells, timeout_secs).await; + (cell_id, result) + }); + } + + let mut results = Vec::new(); + + // Use the longer initial timeout for the first result + let initial_timeout = sleep(Duration::from_secs(self.timeouts.task_initial_timeout_secs)); + + tokio::select! { + _ = initial_timeout => {}, + join_result = join_set.join_next() => { + match join_result { + Some(Ok((cell_id, result))) => { + pending_cells.remove(&cell_id); + results.push((cell_id, result)); + } + Some(Err(e)) => tracing::error!("Task panicked: {}", e), + // The join set is empty -- this should never happen + None => return results, + } + } + } + + // Use the shorter subsequent timeout for any remaining results + let timeout = sleep(Duration::from_secs( + self.timeouts.task_subsequent_timeout_secs, + )); + tokio::pin!(timeout); + + loop { + tokio::select! { + _ = &mut timeout => { + break; + }, + join_result = join_set.join_next() => { + match join_result { + Some(Ok((cell_id, result))) => { + pending_cells.remove(&cell_id); + results.push((cell_id, result)); + }, + Some(Err(e)) => tracing::error!("Task panicked: {}", e), + // No more tasks + None => break, + } + } + } + } + + // Add all remaining pending cells to results + for cell_id in pending_cells.drain() { + results.push(( + cell_id.clone(), + Err(IngestRouterError::UpstreamTimeout(cell_id)), + )); + } + + results + } +} + +/// Send a request to a specific cell's upstream +/// TODO: simplify body types so these conversions are not needed - consider converting to +/// Bytes at the boundary and using bytes only throughout the handlers. +async fn send_to_cell( + client: &Client>, + cell_id: &str, + request: Request, + cells: &Cells, + timeout_secs: u64, +) -> Result, IngestRouterError> { + // Look up the upstream for this cell + let upstream = cells + .cell_to_upstreams() + .get(cell_id) + .ok_or_else(|| IngestRouterError::InternalError(format!("Unknown cell: {}", cell_id)))?; + + // Convert HandlerBody to Full for the HTTP client + let (parts, body) = request.into_parts(); + let body_bytes = body + .collect() + .await + .map_err(|e| IngestRouterError::RequestBodyError(e.to_string()))? + .to_bytes(); + + let request = Request::from_parts(parts, Full::new(body_bytes)); + + // Send to upstream (using relay_url) + let response = send_to_upstream(client, &upstream.relay_url, request, timeout_secs).await?; + + // Convert Response back to Response + let (parts, body_bytes) = response.into_parts(); + let handler_body: HandlerBody = Full::new(body_bytes).map_err(|e| match e {}).boxed(); + + Ok(Response::from_parts(parts, handler_body)) +} diff --git a/ingest-router/src/lib.rs b/ingest-router/src/lib.rs index 68a4c1c..b6bab9a 100644 --- a/ingest-router/src/lib.rs +++ b/ingest-router/src/lib.rs @@ -1,6 +1,7 @@ pub mod api; pub mod config; pub mod errors; +mod executor; pub mod handler; pub mod http; pub mod locale; @@ -11,7 +12,7 @@ pub mod router; mod testutils; use crate::errors::IngestRouterError; -use http_body_util::{BodyExt, Full, combinators::BoxBody}; +use http_body_util::{BodyExt, combinators::BoxBody}; use hyper::StatusCode; use hyper::body::Bytes; use hyper::service::Service; @@ -24,9 +25,11 @@ use std::pin::Pin; pub async fn run(config: config::Config) -> Result<(), IngestRouterError> { let locator = Locator::new(config.locator.to_client_config()).await?; - let ingest_router_service = IngestRouterService { - router: router::Router::new(config.routes, config.locales, locator), - }; + let ingest_router_service = IngestRouterService::new( + router::Router::new(config.routes, config.locales, locator), + config.relay_timeouts, + ); + let router_task = run_http_service( &config.listener.host, config.listener.port, @@ -38,6 +41,14 @@ pub async fn run(config: config::Config) -> Result<(), IngestRouterError> { struct IngestRouterService { router: router::Router, + executor: executor::Executor, +} + +impl IngestRouterService { + pub fn new(router: router::Router, timeouts: config::RelayTimeouts) -> Self { + let executor = executor::Executor::new(timeouts); + Self { router, executor } + } } impl Service> for IngestRouterService @@ -63,18 +74,9 @@ where .boxed(); let handler_req = Request::from_parts(parts, handler_body); - // TODO: Placeholder response - Box::pin(async move { - let (split, _metadata) = handler.split_request(handler_req, &cells).await?; + let executor = self.executor.clone(); - for (cell_id, req) in split { - println!("Cell: {}, URI: {}", cell_id, req.uri()); - } - - Ok(Response::new( - Full::new("ok\n".into()).map_err(|e| match e {}).boxed(), - )) - }) + Box::pin(async move { Ok(executor.execute(handler, handler_req, cells).await) }) } None => Box::pin(async move { Ok(make_error_response(StatusCode::BAD_REQUEST)) }), } @@ -84,22 +86,46 @@ where #[cfg(test)] mod tests { use super::*; + use crate::api::utils::deserialize_body; use crate::config::{HandlerAction, HttpMethod, Match, Route}; use hyper::Method; use hyper::body::Bytes; use hyper::header::HOST; use crate::config::CellConfig; - use locator::config::LocatorDataType; - use locator::locator::Locator as LocatorService; use std::collections::HashMap; + use std::process::{Child, Command}; use url::Url; - use crate::testutils::get_mock_provider; - use std::sync::Arc; + use crate::project_config::protocol::ProjectConfigsResponse; + use crate::testutils::create_test_locator; + use http_body_util::{BodyExt, Full}; + + struct TestServer { + child: Child, + } + + impl TestServer { + fn spawn() -> std::io::Result { + let child = Command::new("python") + .arg("../scripts/mock_relay_api.py") + .spawn()?; + + Ok(Self { child }) + } + } + + impl Drop for TestServer { + fn drop(&mut self) { + let _ = self.child.kill(); + let _ = self.child.wait(); + } + } #[tokio::test] async fn test_ingest_router() { + let _relay_server = TestServer::spawn().expect("Failed to spawn test server"); + let routes_config = vec![Route { r#match: Match { host: Some("us.sentry.io".to_string()), @@ -115,22 +141,24 @@ mod tests { vec![CellConfig { id: "us1".to_string(), sentry_url: Url::parse("https://sentry.io/us1").unwrap(), - relay_url: Url::parse("https://relay.io/us1").unwrap(), + relay_url: Url::parse("http://localhost:8000").unwrap(), }], )]); - let (_dir, provider) = get_mock_provider().await; - let locator_service = LocatorService::new( - LocatorDataType::ProjectKey, - "http://control-plane-url".to_string(), - Arc::new(provider), - None, + let locator = create_test_locator(HashMap::from([( + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + "us1".to_string(), + )])) + .await; + + let service = IngestRouterService::new( + router::Router::new(routes_config, locales, locator), + config::RelayTimeouts { + http_timeout_secs: 5000, + task_initial_timeout_secs: 10000, + task_subsequent_timeout_secs: 10000, + }, ); - let locator = Locator::from_in_process_service(locator_service); - - let service = IngestRouterService { - router: router::Router::new(routes_config, locales, locator), - }; // Project configs request let request = Request::builder() @@ -138,16 +166,23 @@ mod tests { .uri("/api/0/relays/projectconfigs/") .header(HOST, "us.sentry.io") .body( - Full::new(Bytes::from(r#"{"publicKeys": ["test-key"]}"#)) - .map_err(|e| match e {}) - .boxed(), + Full::new(Bytes::from( + r#"{"publicKeys": ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"], "global": 1}"#, + )) + .map_err(|e| match e {}) + .boxed(), ) .unwrap(); let response = service.call(request).await.unwrap(); - // TODO: call the scripts/mock_relay_api.py server and validate the response + let (parts, body) = response.into_parts(); + + assert_eq!(parts.status, 200); - assert_eq!(response.status(), 200); + let parsed: ProjectConfigsResponse = deserialize_body(body).await.unwrap(); + assert_eq!(parsed.project_configs.len(), 1); + assert_eq!(parsed.pending_keys.len(), 0); + assert_eq!(parsed.extra_fields.len(), 2); } } diff --git a/ingest-router/src/locale.rs b/ingest-router/src/locale.rs index eaccd6f..31b7edd 100644 --- a/ingest-router/src/locale.rs +++ b/ingest-router/src/locale.rs @@ -48,11 +48,16 @@ impl From for Upstream { } /// Collection of upstreams grouped by cell name +#[derive(Debug)] +struct CellsInner { + /// Prioritized list of cell names (first = highest priority) + cell_list: Vec, + cell_to_upstreams: HashMap, +} + #[derive(Clone, Debug)] pub struct Cells { - /// Prioritized list of cell names (first = highest priority) - pub cell_list: Vec, - pub cell_to_upstreams: HashMap, + inner: Arc, } impl Cells { @@ -70,17 +75,26 @@ impl Cells { } Self { - cell_list, - cell_to_upstreams, + inner: Arc::new(CellsInner { + cell_list, + cell_to_upstreams, + }), } } + + pub fn cell_list(&self) -> &[String] { + &self.inner.cell_list + } + + pub fn cell_to_upstreams(&self) -> &HashMap { + &self.inner.cell_to_upstreams + } } /// Maps locales to their cells (which map to upstreams) -#[derive(Clone, Debug)] pub struct Locales { /// Mapping from locale to cells - locale_to_cells: HashMap>, + locale_to_cells: HashMap, } impl Locales { @@ -90,7 +104,7 @@ impl Locales { let locale_to_cells = locales .into_iter() .map(|(locale, cells)| { - let cells = Arc::new(Cells::from_config(cells)); + let cells = Cells::from_config(cells); (locale, cells) }) .collect(); @@ -99,7 +113,7 @@ impl Locales { } /// Get the cells for a specific locale - pub fn get_cells(&self, locale: &str) -> Option> { + pub fn get_cells(&self, locale: &str) -> Option { self.locale_to_cells.get(locale).cloned() } } @@ -147,20 +161,20 @@ mod tests { // Verify US locale has 2 cells let us_cells = locales.get_cells("us").unwrap(); - assert_eq!(us_cells.cell_to_upstreams.len(), 2); - assert_eq!(us_cells.cell_list.len(), 2); - assert!(us_cells.cell_to_upstreams.contains_key("us1")); - assert!(us_cells.cell_to_upstreams.contains_key("us2")); + assert_eq!(us_cells.cell_to_upstreams().len(), 2); + assert_eq!(us_cells.cell_list().len(), 2); + assert!(us_cells.cell_to_upstreams().contains_key("us1")); + assert!(us_cells.cell_to_upstreams().contains_key("us2")); // Verify priority order - assert_eq!(us_cells.cell_list[0], "us1"); - assert_eq!(us_cells.cell_list[1], "us2"); + assert_eq!(us_cells.cell_list()[0], "us1"); + assert_eq!(us_cells.cell_list()[1], "us2"); // Verify DE locale has 1 cell let de_cells = locales.get_cells("de").unwrap(); - assert_eq!(de_cells.cell_to_upstreams.len(), 1); - assert_eq!(de_cells.cell_list.len(), 1); - assert!(de_cells.cell_to_upstreams.contains_key("de1")); - assert_eq!(de_cells.cell_list[0], "de1"); + assert_eq!(de_cells.cell_to_upstreams().len(), 1); + assert_eq!(de_cells.cell_list().len(), 1); + assert!(de_cells.cell_to_upstreams().contains_key("de1")); + assert_eq!(de_cells.cell_list()[0], "de1"); // Verify unknown locale returns None assert!(locales.get_cells("unknown").is_none()); diff --git a/ingest-router/src/project_config/handler.rs b/ingest-router/src/project_config/handler.rs index 3b821ea..aa9297c 100644 --- a/ingest-router/src/project_config/handler.rs +++ b/ingest-router/src/project_config/handler.rs @@ -51,13 +51,14 @@ impl Handler for ProjectConfigsHandler { let public_keys = parsed.public_keys; let extra_fields = parsed.extra_fields; - let cell_ids: HashSet<&String> = cells.cell_list.iter().collect(); + let cell_ids: HashSet<&String> = cells.cell_list().iter().collect(); // Route each public key to its owning cell using the locator service let mut cell_to_keys: HashMap> = HashMap::new(); let mut pending: Vec = Vec::new(); for public_key in public_keys { + // TODO: Enforce locality here? match self.locator.lookup(&public_key, None).await { Ok(cell_id) => { if !cell_ids.contains(&cell_id) { @@ -185,64 +186,10 @@ mod tests { use super::*; use crate::config::CellConfig; use crate::locale::Locales; - use locator::backup_routes::BackupRouteProvider; - use locator::types::RouteData; + use crate::testutils::create_test_locator; use std::collections::HashMap; - use std::sync::Arc; use url::Url; - // Mock backup provider for testing - struct MockBackupProvider { - data: RouteData, - } - - #[async_trait::async_trait] - impl BackupRouteProvider for MockBackupProvider { - async fn load(&self) -> Result { - Ok(self.data.clone()) - } - - async fn store( - &self, - _data: &RouteData, - ) -> Result<(), locator::backup_routes::BackupError> { - Ok(()) - } - } - - async fn create_test_locator(key_to_cell: HashMap) -> Locator { - let route_data = RouteData::from( - key_to_cell, - "cursor".to_string(), - HashMap::from([ - ("us1".to_string(), "us".to_string()), - ("us2".to_string(), "us".to_string()), - ]), - ); - - let provider = Arc::new(MockBackupProvider { data: route_data }); - - let service = locator::locator::Locator::new( - locator::config::LocatorDataType::ProjectKey, - "http://invalid-control-plane:9000".to_string(), - provider, - None, - ); - - let locator = Locator::from_in_process_service(service); - - // Wait for locator to be ready - for _ in 0..50 { - if locator.is_ready() { - break; - } - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } - assert!(locator.is_ready(), "Locator should be ready"); - - locator - } - fn build_request(project_configs_request: ProjectConfigsRequest) -> Request { let body = serialize_to_body(&project_configs_request).unwrap(); Request::builder() diff --git a/ingest-router/src/project_config/mod.rs b/ingest-router/src/project_config/mod.rs index 4ad967e..408674c 100644 --- a/ingest-router/src/project_config/mod.rs +++ b/ingest-router/src/project_config/mod.rs @@ -282,6 +282,6 @@ //! ``` pub mod handler; -mod protocol; +pub mod protocol; pub use handler::ProjectConfigsHandler; diff --git a/ingest-router/src/project_config/protocol.rs b/ingest-router/src/project_config/protocol.rs index 1728020..6fd2735 100644 --- a/ingest-router/src/project_config/protocol.rs +++ b/ingest-router/src/project_config/protocol.rs @@ -75,3 +75,9 @@ impl ProjectConfigsResponse { } } } + +impl Default for ProjectConfigsResponse { + fn default() -> Self { + Self::new() + } +} diff --git a/ingest-router/src/router.rs b/ingest-router/src/router.rs index 2fff5fb..8a6259c 100644 --- a/ingest-router/src/router.rs +++ b/ingest-router/src/router.rs @@ -38,7 +38,7 @@ impl Router { } /// Finds the first route that matches the incoming request - pub fn resolve(&self, req: &Request) -> Option<(Arc, Arc)> { + pub fn resolve(&self, req: &Request) -> Option<(Arc, Cells)> { self.routes .iter() .find(|route| self.matches_route(req, route)) diff --git a/ingest-router/src/testutils.rs b/ingest-router/src/testutils.rs index a60b71d..4f281d5 100644 --- a/ingest-router/src/testutils.rs +++ b/ingest-router/src/testutils.rs @@ -1,7 +1,9 @@ use locator::backup_routes::{BackupRouteProvider, FilesystemRouteProvider}; +use locator::client::Locator; use locator::config::Compression; use locator::types::RouteData; use std::collections::HashMap; +use std::sync::Arc; pub async fn get_mock_provider() -> (tempfile::TempDir, FilesystemRouteProvider) { let route_data = RouteData::from( @@ -23,3 +25,52 @@ pub async fn get_mock_provider() -> (tempfile::TempDir, FilesystemRouteProvider) provider.store(&route_data).await.unwrap(); (dir, provider) } + +// Mock backup provider for testing +struct MockBackupProvider { + data: RouteData, +} + +#[async_trait::async_trait] +impl BackupRouteProvider for MockBackupProvider { + async fn load(&self) -> Result { + Ok(self.data.clone()) + } + + async fn store(&self, _data: &RouteData) -> Result<(), locator::backup_routes::BackupError> { + Ok(()) + } +} + +pub async fn create_test_locator(key_to_cell: HashMap) -> Locator { + let route_data = RouteData::from( + key_to_cell, + "cursor".to_string(), + HashMap::from([ + ("us1".to_string(), "us".to_string()), + ("us2".to_string(), "us".to_string()), + ]), + ); + + let provider = Arc::new(MockBackupProvider { data: route_data }); + + let service = locator::locator::Locator::new( + locator::config::LocatorDataType::ProjectKey, + "http://invalid-control-plane:9000".to_string(), + provider, + None, + ); + + let locator = Locator::from_in_process_service(service); + + // Wait for locator to be ready + for _ in 0..50 { + if locator.is_ready() { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + assert!(locator.is_ready(), "Locator should be ready"); + + locator +} diff --git a/scripts/mock_relay_api.py b/scripts/mock_relay_api.py index 523b89e..8570137 100755 --- a/scripts/mock_relay_api.py +++ b/scripts/mock_relay_api.py @@ -17,6 +17,7 @@ DEFAULT_MOCK_DATA = { "valid_keys": { "abc123def456": {"project_id": 100, "project_slug": "test-project", "org_id": 1}, + "a" * 32: {"project_id": 100, "project_slug": "test-project", "org_id": 1}, "xyz789uvw012": {"project_id": 200, "project_slug": "another-project", "org_id": 2}, }, "inactive_keys": { @@ -113,11 +114,12 @@ def process_relay_request(public_keys: List[str], mock_data: MockRelayData) -> D return configs + class MockRelayHandler(BaseHTTPRequestHandler): """HTTP request handler for mock relay endpoint""" # Class variable to hold mock data (can be set before creating server) - mock_data: MockRelayData = None + mock_data: Optional[MockRelayData] = None def _read_json_body(self) -> dict: """Read and parse JSON body from request""" @@ -145,7 +147,7 @@ def do_POST(self): try: request_data = self._read_json_body() query_params = parse_qs(parsed_path.query) - version = query_params.get("version", ["2"])[0] + version = query_params.get("version", ["3"])[0] public_keys = request_data.get("publicKeys", []) # Process the request