Skip to content
Merged
2 changes: 2 additions & 0 deletions ingest-router/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod health;
pub mod utils;
28 changes: 28 additions & 0 deletions ingest-router/src/api/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::errors::IngestRouterError;
use crate::handler::{CellId, Handler, HandlerBody, SplitMetadata};
use crate::locale::Cells;
use async_trait::async_trait;
use hyper::{Request, Response};

/// This endpoint returns success if any one upstream is available.
/// Synapse should continue to operate even if one cell is down.
pub struct HealthHandler {}

#[async_trait]
impl Handler for HealthHandler {
async fn split_request(
&self,
_request: Request<HandlerBody>,
_cells: &Cells,
) -> Result<(Vec<(CellId, Request<HandlerBody>)>, SplitMetadata), IngestRouterError> {
Comment on lines +15 to +17

This comment was marked as outdated.

unimplemented!();
}

async fn merge_responses(
&self,
_responses: Vec<(CellId, Result<Response<HandlerBody>, IngestRouterError>)>,
_metadata: SplitMetadata,
) -> Response<HandlerBody> {
unimplemented!();
}
}
35 changes: 35 additions & 0 deletions ingest-router/src/api/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::errors::IngestRouterError;
use http::Version;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::header::HeaderMap;
use hyper::header::{CONTENT_LENGTH, TRANSFER_ENCODING};
use serde::Serialize;
use serde::de::DeserializeOwned;
use shared::http::filter_hop_by_hop;

pub type HandlerBody = BoxBody<Bytes, IngestRouterError>;

/// Deserializes a JSON request body into the specified type.
pub async fn deserialize_body<T: DeserializeOwned>(
body: HandlerBody,
) -> Result<T, IngestRouterError> {
let bytes = body.collect().await?.to_bytes();
serde_json::from_slice(&bytes).map_err(|e| IngestRouterError::RequestBodyError(e.to_string()))
}

/// Serializes a value to a JSON body.
pub fn serialize_to_body<T: Serialize>(value: &T) -> Result<HandlerBody, IngestRouterError> {
let bytes = serde_json::to_vec(value).map(Bytes::from)?;
Ok(Full::new(bytes).map_err(|e| match e {}).boxed())
}
Comment on lines +15 to +26
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved these functions out of the project config implementation into utils as we can reuse them for any kind of json endpoint


/// Common header normalization for all requests and responses.
pub fn normalize_headers(headers: &mut HeaderMap, version: Version) -> &mut HeaderMap {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pulled this out as well, as we can reuse it for any relay endpoint in both directions - requests sent upstream and responses back

filter_hop_by_hop(headers, version);
headers.remove(CONTENT_LENGTH);
headers.remove(TRANSFER_ENCODING);

headers
}
3 changes: 3 additions & 0 deletions ingest-router/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ pub enum IngestRouterError {

#[error("Locator client error: {0}")]
LocatorClientError(#[from] locator::client::ClientError),

#[error("Serde error: {0}")]
SerdeError(#[from] serde_json::Error),
}
39 changes: 18 additions & 21 deletions ingest-router/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,44 @@
use crate::errors::IngestRouterError;
use crate::locale::Cells;
use async_trait::async_trait;
use serde::{Serialize, de::DeserializeOwned};
use http_body_util::combinators::BoxBody;
use hyper::body::Bytes;
use hyper::{Request, Response};
use std::any::Any;

pub type CellId = String;
pub type HandlerBody = BoxBody<Bytes, IngestRouterError>;
pub type SplitMetadata = Box<dyn Any + Send>;

/// Handler for endpoints that split requests across cells and merge results
///
/// The handler implements endpoint-specific logic:
/// - How to split a request into per-cell requests
/// - How to merge results from multiple cells
/// ```
#[async_trait]
pub trait Handler<Req, Res>: Send + Sync
where
Req: Serialize + DeserializeOwned + Send,
Res: Serialize + DeserializeOwned + Send,
{
/// Metadata that flows from split_requests to merge_results
///
/// This allows passing data from the split phase to the merge phase.
/// Some use cases:
/// - Pending keys that couldn't be routed (e.g., `Vec<PublicKey>`)
type SplitMetadata: Send;
pub trait Handler: Send + Sync {
/// Returns the type name of this handler for test assertions
fn type_name(&self) -> &'static str {
std::any::type_name::<Self>()
}

/// Split one request into multiple per-cell requests
///
/// This method routes the request data to appropriate cells and builds
/// per-cell requests that will be sent to upstreams.
async fn split_requests(
async fn split_request(
&self,
request: Req,
request: Request<HandlerBody>,
cells: &Cells,
) -> Result<(Vec<(CellId, Req)>, Self::SplitMetadata), IngestRouterError>;
) -> Result<(Vec<(CellId, Request<HandlerBody>)>, SplitMetadata), IngestRouterError>;

/// Merge results from multiple cells into a single response
///
/// This method combines responses from successful cells, handles failures,
/// and incorporates metadata from the split phase.
///
fn merge_results(
async fn merge_responses(
&self,
results: Vec<Result<(CellId, Res), (CellId, IngestRouterError)>>,
metadata: Self::SplitMetadata,
) -> Res;
responses: Vec<(CellId, Result<Response<HandlerBody>, IngestRouterError>)>,
metadata: SplitMetadata,
) -> Response<HandlerBody>;
}
46 changes: 35 additions & 11 deletions ingest-router/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod api;
pub mod config;
pub mod errors;
pub mod handler;
Expand All @@ -24,7 +25,7 @@ pub async fn run(config: config::Config) -> Result<(), IngestRouterError> {
let locator = Locator::new(config.locator.to_client_config()).await?;

let ingest_router_service = IngestRouterService {
router: router::Router::new(config.routes, locator),
router: router::Router::new(config.routes, config.locales, locator),
};
let router_task = run_http_service(
&config.listener.host,
Expand All @@ -51,15 +52,25 @@ where
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn call(&self, req: Request<B>) -> Self::Future {
let handler = self
.router
.resolve(&req)
.and_then(|action| self.router.get_handler(action));
let maybe_handler = self.router.resolve(&req);

match maybe_handler {
Some((handler, cells)) => {
// Convert Request<B> to Request<HandlerBody>
let (parts, body) = req.into_parts();
let handler_body = body
.map_err(|e| IngestRouterError::RequestBodyError(e.to_string()))
.boxed();
let handler_req = Request::from_parts(parts, handler_body);

match handler {
Some(_handler) => {
// TODO: Placeholder response
Box::pin(async move {
let (split, _metadata) = handler.split_request(handler_req, &cells).await?;

for (cell_id, req) in split {
println!("Cell: {}, URI: {}", cell_id, req.uri());
}

Ok(Response::new(
Full::new("ok\n".into()).map_err(|e| match e {}).boxed(),
))
Expand All @@ -74,13 +85,15 @@ where
mod tests {
use super::*;
use crate::config::{HandlerAction, HttpMethod, Match, Route};
use http_body_util::Empty;
use hyper::Method;
use hyper::body::Bytes;
use hyper::header::HOST;

use crate::config::CellConfig;
use locator::config::LocatorDataType;
use locator::locator::Locator as LocatorService;
use std::collections::HashMap;
use url::Url;

use crate::testutils::get_mock_provider;
use std::sync::Arc;
Expand All @@ -97,6 +110,15 @@ mod tests {
locale: "us".to_string(),
}];

let locales = HashMap::from([(
"us".to_string(),
vec![CellConfig {
id: "us1".to_string(),
sentry_url: Url::parse("https://sentry.io/us1").unwrap(),
relay_url: Url::parse("https://relay.io/us1").unwrap(),
}],
)]);

let (_dir, provider) = get_mock_provider().await;
let locator_service = LocatorService::new(
LocatorDataType::ProjectKey,
Expand All @@ -107,7 +129,7 @@ mod tests {
let locator = Locator::from_in_process_service(locator_service);

let service = IngestRouterService {
router: router::Router::new(routes_config, locator),
router: router::Router::new(routes_config, locales, locator),
};

// Project configs request
Expand All @@ -116,14 +138,16 @@ mod tests {
.uri("/api/0/relays/projectconfigs/")
.header(HOST, "us.sentry.io")
.body(
Empty::<Bytes>::new()
.map_err(|never| match never {})
Full::new(Bytes::from(r#"{"publicKeys": ["test-key"]}"#))
.map_err(|e| match e {})
.boxed(),
)
.unwrap();

let response = service.call(request).await.unwrap();

// TODO: call the scripts/mock_relay_api.py server and validate the response
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a note for later -- we should add a true end to end test here


assert_eq!(response.status(), 200);
}
}
9 changes: 5 additions & 4 deletions ingest-router/src/locale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! during request processing.

use std::collections::HashMap;
use std::sync::Arc;
use url::Url;

use crate::config::CellConfig;
Expand Down Expand Up @@ -79,7 +80,7 @@ impl Cells {
#[derive(Clone, Debug)]
pub struct Locales {
/// Mapping from locale to cells
locale_to_cells: HashMap<String, Cells>,
locale_to_cells: HashMap<String, Arc<Cells>>,
}

impl Locales {
Expand All @@ -89,7 +90,7 @@ impl Locales {
let locale_to_cells = locales
.into_iter()
.map(|(locale, cells)| {
let cells = Cells::from_config(cells);
let cells = Arc::new(Cells::from_config(cells));
(locale, cells)
})
.collect();
Expand All @@ -98,8 +99,8 @@ impl Locales {
}

/// Get the cells for a specific locale
pub fn get_cells(&self, locale: &str) -> Option<&Cells> {
self.locale_to_cells.get(locale)
pub fn get_cells(&self, locale: &str) -> Option<Arc<Cells>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to Arc<Cells> since we pass Cells to a bunch of places now - and it's easier for the data to be owned

self.locale_to_cells.get(locale).cloned()
}
}

Expand Down
Loading