Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ run-mock-control-api:
python scripts/mock_control_api.py
.PHONY: run-mock-control-api

run-mock-relay-api:
python scripts/mock_relay_api.py
.PHONY: run-mock-relay-api

# CI-like checks (what runs in GitHub Actions)
ci: fmt-check lint test build
@echo "All CI checks passed!"
Expand Down
6 changes: 3 additions & 3 deletions ingest-router/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ pub struct RelayTimeouts {
/// HTTP request timeout for individual upstream calls (seconds).
/// This is the maximum time a single HTTP request can take.
/// Default: 15 seconds
pub http_timeout_secs: u16,
pub http_timeout_secs: u64,

/// Task timeout when waiting for the first upstream to respond (seconds).
/// Must be >= http_timeout_secs to allow HTTP requests to complete.
/// Default: 20 seconds
pub task_initial_timeout_secs: u16,
pub task_initial_timeout_secs: u64,

/// Deadline for all remaining tasks after first success (seconds).
/// Aggressively cuts off slow upstreams once we have good data.
/// Default: 5 seconds
pub task_subsequent_timeout_secs: u16,
pub task_subsequent_timeout_secs: u64,
}

impl Default for RelayTimeouts {
Expand Down
162 changes: 162 additions & 0 deletions ingest-router/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use crate::config::RelayTimeouts;
use crate::errors::IngestRouterError;
use crate::handler::{CellId, Handler, HandlerBody};
use crate::http::send_to_upstream;
use crate::locale::Cells;
use http::StatusCode;
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::{Request, Response};
use hyper_util::client::legacy::Client;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::rt::TokioExecutor;
use shared::http::make_error_response;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::task::JoinSet;
use tokio::time::{Duration, sleep};

#[derive(Clone)]
pub struct Executor {
client: Client<HttpConnector, Full<Bytes>>,
timeouts: RelayTimeouts,
}

impl Executor {
pub fn new(timeouts: RelayTimeouts) -> Self {
let client = Client::builder(TokioExecutor::new()).build(HttpConnector::new());
Self { client, timeouts }
}

// Splits, executes, and merges the responses using the provided handler.
pub async fn execute(
&self,
handler: Arc<dyn Handler>,
request: Request<HandlerBody>,
cells: Cells,
) -> Response<HandlerBody> {
let (split_requests, metadata) = match handler.split_request(request, &cells).await {
Ok(result) => result,
Err(_e) => return make_error_response(StatusCode::INTERNAL_SERVER_ERROR),
};

let results = self.execute_parallel(split_requests, cells).await;

handler.merge_responses(results, metadata).await
}

/// Execute split requests in parallel against their cell upstreams
async fn execute_parallel(
&self,
requests: Vec<(CellId, Request<HandlerBody>)>,
cells: Cells,
) -> Vec<(CellId, Result<Response<HandlerBody>, IngestRouterError>)> {
let mut join_set = JoinSet::new();

let mut pending_cells = HashSet::new();

// Spawn requests for each cell
for (cell_id, request) in requests {
let cells = cells.clone();
let client = self.client.clone();
let timeout_secs = self.timeouts.http_timeout_secs;

pending_cells.insert(cell_id.clone());
join_set.spawn(async move {
let result = send_to_cell(&client, &cell_id, request, &cells, timeout_secs).await;
(cell_id, result)
});
}

let mut results = Vec::new();

// Use the longer initial timeout for the first result
let initial_timeout = sleep(Duration::from_secs(self.timeouts.task_initial_timeout_secs));

tokio::select! {
_ = initial_timeout => {},
join_result = join_set.join_next() => {
match join_result {
Some(Ok((cell_id, result))) => {
pending_cells.remove(&cell_id);
results.push((cell_id, result));
}
Some(Err(e)) => tracing::error!("Task panicked: {}", e),
// The join set is empty -- this should never happen
None => return results,
}
}
}

// Use the shorter subsequent timeout for any remaining results
let timeout = sleep(Duration::from_secs(
self.timeouts.task_subsequent_timeout_secs,
));
tokio::pin!(timeout);

loop {
tokio::select! {
_ = &mut timeout => {
break;
},
join_result = join_set.join_next() => {
match join_result {
Some(Ok((cell_id, result))) => {
pending_cells.remove(&cell_id);
results.push((cell_id, result));
},
Some(Err(e)) => tracing::error!("Task panicked: {}", e),
// No more tasks
None => break,
}
}
}
}

// Add all remaining pending cells to results
for cell_id in pending_cells.drain() {
results.push((
cell_id.clone(),
Err(IngestRouterError::UpstreamTimeout(cell_id)),
));
}

results
}
}

/// Send a request to a specific cell's upstream
/// TODO: simplify body types so these conversions are not needed - consider converting to
/// Bytes at the boundary and using bytes only throughout the handlers.
async fn send_to_cell(
client: &Client<HttpConnector, Full<Bytes>>,
cell_id: &str,
request: Request<HandlerBody>,
cells: &Cells,
timeout_secs: u64,
) -> Result<Response<HandlerBody>, IngestRouterError> {
// Look up the upstream for this cell
let upstream = cells
.cell_to_upstreams()
.get(cell_id)
.ok_or_else(|| IngestRouterError::InternalError(format!("Unknown cell: {}", cell_id)))?;

// Convert HandlerBody to Full<Bytes> for the HTTP client
let (parts, body) = request.into_parts();
let body_bytes = body
.collect()
.await
.map_err(|e| IngestRouterError::RequestBodyError(e.to_string()))?
.to_bytes();

let request = Request::from_parts(parts, Full::new(body_bytes));

// Send to upstream (using relay_url)
let response = send_to_upstream(client, &upstream.relay_url, request, timeout_secs).await?;

// Convert Response<Bytes> back to Response<HandlerBody>
let (parts, body_bytes) = response.into_parts();
let handler_body: HandlerBody = Full::new(body_bytes).map_err(|e| match e {}).boxed();

Ok(Response::from_parts(parts, handler_body))
}
107 changes: 71 additions & 36 deletions ingest-router/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod api;
pub mod config;
pub mod errors;
mod executor;
pub mod handler;
pub mod http;
pub mod locale;
Expand All @@ -11,7 +12,7 @@ pub mod router;
mod testutils;

use crate::errors::IngestRouterError;
use http_body_util::{BodyExt, Full, combinators::BoxBody};
use http_body_util::{BodyExt, combinators::BoxBody};
use hyper::StatusCode;
use hyper::body::Bytes;
use hyper::service::Service;
Expand All @@ -24,9 +25,11 @@ use std::pin::Pin;
pub async fn run(config: config::Config) -> Result<(), IngestRouterError> {
let locator = Locator::new(config.locator.to_client_config()).await?;

let ingest_router_service = IngestRouterService {
router: router::Router::new(config.routes, config.locales, locator),
};
let ingest_router_service = IngestRouterService::new(
router::Router::new(config.routes, config.locales, locator),
config.relay_timeouts,
);

let router_task = run_http_service(
&config.listener.host,
config.listener.port,
Expand All @@ -38,6 +41,14 @@ pub async fn run(config: config::Config) -> Result<(), IngestRouterError> {

struct IngestRouterService {
router: router::Router,
executor: executor::Executor,
}

impl IngestRouterService {
pub fn new(router: router::Router, timeouts: config::RelayTimeouts) -> Self {
let executor = executor::Executor::new(timeouts);
Self { router, executor }
}
}

impl<B> Service<Request<B>> for IngestRouterService
Expand All @@ -63,18 +74,9 @@ where
.boxed();
let handler_req = Request::from_parts(parts, handler_body);

// TODO: Placeholder response
Box::pin(async move {
let (split, _metadata) = handler.split_request(handler_req, &cells).await?;
let executor = self.executor.clone();

for (cell_id, req) in split {
println!("Cell: {}, URI: {}", cell_id, req.uri());
}

Ok(Response::new(
Full::new("ok\n".into()).map_err(|e| match e {}).boxed(),
))
})
Box::pin(async move { Ok(executor.execute(handler, handler_req, cells).await) })
}
None => Box::pin(async move { Ok(make_error_response(StatusCode::BAD_REQUEST)) }),
}
Expand All @@ -84,22 +86,46 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::api::utils::deserialize_body;
use crate::config::{HandlerAction, HttpMethod, Match, Route};
use hyper::Method;
use hyper::body::Bytes;
use hyper::header::HOST;

use crate::config::CellConfig;
use locator::config::LocatorDataType;
use locator::locator::Locator as LocatorService;
use std::collections::HashMap;
use std::process::{Child, Command};
use url::Url;

use crate::testutils::get_mock_provider;
use std::sync::Arc;
use crate::project_config::protocol::ProjectConfigsResponse;
use crate::testutils::create_test_locator;

This comment was marked as outdated.

use http_body_util::{BodyExt, Full};

struct TestServer {
child: Child,
}

impl TestServer {
fn spawn() -> std::io::Result<Self> {
let child = Command::new("python")
.arg("../scripts/mock_relay_api.py")
.spawn()?;
Comment on lines +109 to +112

This comment was marked as outdated.


Ok(Self { child })
}
}

impl Drop for TestServer {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}

#[tokio::test]
async fn test_ingest_router() {
let _relay_server = TestServer::spawn().expect("Failed to spawn test server");

let routes_config = vec![Route {
r#match: Match {
host: Some("us.sentry.io".to_string()),
Expand All @@ -115,39 +141,48 @@ mod tests {
vec![CellConfig {
id: "us1".to_string(),
sentry_url: Url::parse("https://sentry.io/us1").unwrap(),
relay_url: Url::parse("https://relay.io/us1").unwrap(),
relay_url: Url::parse("http://localhost:8000").unwrap(),
}],
)]);

let (_dir, provider) = get_mock_provider().await;
let locator_service = LocatorService::new(
LocatorDataType::ProjectKey,
"http://control-plane-url".to_string(),
Arc::new(provider),
None,
let locator = create_test_locator(HashMap::from([(
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(),
"us1".to_string(),
)]))
.await;

let service = IngestRouterService::new(
router::Router::new(routes_config, locales, locator),
config::RelayTimeouts {
http_timeout_secs: 5000,
task_initial_timeout_secs: 10000,
task_subsequent_timeout_secs: 10000,
},
);
let locator = Locator::from_in_process_service(locator_service);

let service = IngestRouterService {
router: router::Router::new(routes_config, locales, locator),
};

// Project configs request
let request = Request::builder()
.method(Method::POST)
.uri("/api/0/relays/projectconfigs/")
.header(HOST, "us.sentry.io")
.body(
Full::new(Bytes::from(r#"{"publicKeys": ["test-key"]}"#))
.map_err(|e| match e {})
.boxed(),
Full::new(Bytes::from(
r#"{"publicKeys": ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"], "global": 1}"#,
))
.map_err(|e| match e {})
.boxed(),
)
.unwrap();

let response = service.call(request).await.unwrap();

// TODO: call the scripts/mock_relay_api.py server and validate the response
let (parts, body) = response.into_parts();

assert_eq!(parts.status, 200);

assert_eq!(response.status(), 200);
let parsed: ProjectConfigsResponse = deserialize_body(body).await.unwrap();
assert_eq!(parsed.project_configs.len(), 1);
assert_eq!(parsed.pending_keys.len(), 0);
assert_eq!(parsed.extra_fields.len(), 2);
}
}
Loading