-
-
Notifications
You must be signed in to change notification settings - Fork 1
ingest-router: Simplify handler type #92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2315715
23d2c80
f94c2ec
83ea0d5
b4df820
9d8824c
a956d70
139acf0
b84d372
64ca3d4
f26e5ac
db100bb
86f3804
a1d0f09
95b7a70
4e856e3
da9ab40
41b66c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| pub mod health; | ||
| pub mod utils; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| use crate::errors::IngestRouterError; | ||
| use crate::handler::{CellId, Handler, HandlerBody, SplitMetadata}; | ||
| use crate::locale::Cells; | ||
| use async_trait::async_trait; | ||
| use hyper::{Request, Response}; | ||
|
|
||
| /// This endpoint returns success if any one upstream is available. | ||
| /// Synapse should continue to operate even if one cell is down. | ||
| pub struct HealthHandler {} | ||
|
|
||
| #[async_trait] | ||
| impl Handler for HealthHandler { | ||
| async fn split_request( | ||
| &self, | ||
| _request: Request<HandlerBody>, | ||
| _cells: &Cells, | ||
| ) -> Result<(Vec<(CellId, Request<HandlerBody>)>, SplitMetadata), IngestRouterError> { | ||
| unimplemented!(); | ||
| } | ||
|
|
||
| async fn merge_responses( | ||
| &self, | ||
| _responses: Vec<(CellId, Result<Response<HandlerBody>, IngestRouterError>)>, | ||
| _metadata: SplitMetadata, | ||
| ) -> Response<HandlerBody> { | ||
| unimplemented!(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| use crate::errors::IngestRouterError; | ||
| use http::Version; | ||
| use http_body_util::combinators::BoxBody; | ||
| use http_body_util::{BodyExt, Full}; | ||
| use hyper::body::Bytes; | ||
| use hyper::header::HeaderMap; | ||
| use hyper::header::{CONTENT_LENGTH, TRANSFER_ENCODING}; | ||
| use serde::Serialize; | ||
| use serde::de::DeserializeOwned; | ||
| use shared::http::filter_hop_by_hop; | ||
|
|
||
| pub type HandlerBody = BoxBody<Bytes, IngestRouterError>; | ||
|
|
||
| /// Deserializes a JSON request body into the specified type. | ||
| pub async fn deserialize_body<T: DeserializeOwned>( | ||
| body: HandlerBody, | ||
| ) -> Result<T, IngestRouterError> { | ||
| let bytes = body.collect().await?.to_bytes(); | ||
| serde_json::from_slice(&bytes).map_err(|e| IngestRouterError::RequestBodyError(e.to_string())) | ||
| } | ||
|
|
||
| /// Serializes a value to a JSON body. | ||
| pub fn serialize_to_body<T: Serialize>(value: &T) -> Result<HandlerBody, IngestRouterError> { | ||
| let bytes = serde_json::to_vec(value).map(Bytes::from)?; | ||
| Ok(Full::new(bytes).map_err(|e| match e {}).boxed()) | ||
| } | ||
|
Comment on lines
+15
to
+26
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved these functions out of the project config implementation into utils as we can reuse them for any kind of json endpoint |
||
|
|
||
| /// Common header normalization for all requests and responses. | ||
| pub fn normalize_headers(headers: &mut HeaderMap, version: Version) -> &mut HeaderMap { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pulled this out as well, as we can reuse it for any relay endpoint in both directions - requests sent upstream and responses back |
||
| filter_hop_by_hop(headers, version); | ||
| headers.remove(CONTENT_LENGTH); | ||
| headers.remove(TRANSFER_ENCODING); | ||
|
|
||
| headers | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,47 +1,44 @@ | ||
| use crate::errors::IngestRouterError; | ||
| use crate::locale::Cells; | ||
| use async_trait::async_trait; | ||
| use serde::{Serialize, de::DeserializeOwned}; | ||
| use http_body_util::combinators::BoxBody; | ||
| use hyper::body::Bytes; | ||
| use hyper::{Request, Response}; | ||
| use std::any::Any; | ||
|
|
||
| pub type CellId = String; | ||
| pub type HandlerBody = BoxBody<Bytes, IngestRouterError>; | ||
| pub type SplitMetadata = Box<dyn Any + Send>; | ||
|
|
||
| /// Handler for endpoints that split requests across cells and merge results | ||
| /// | ||
| /// The handler implements endpoint-specific logic: | ||
| /// - How to split a request into per-cell requests | ||
| /// - How to merge results from multiple cells | ||
| /// ``` | ||
| #[async_trait] | ||
| pub trait Handler<Req, Res>: Send + Sync | ||
| where | ||
| Req: Serialize + DeserializeOwned + Send, | ||
| Res: Serialize + DeserializeOwned + Send, | ||
| { | ||
| /// Metadata that flows from split_requests to merge_results | ||
| /// | ||
| /// This allows passing data from the split phase to the merge phase. | ||
| /// Some use cases: | ||
| /// - Pending keys that couldn't be routed (e.g., `Vec<PublicKey>`) | ||
| type SplitMetadata: Send; | ||
| pub trait Handler: Send + Sync { | ||
| /// Returns the type name of this handler for test assertions | ||
| fn type_name(&self) -> &'static str { | ||
| std::any::type_name::<Self>() | ||
| } | ||
|
|
||
| /// Split one request into multiple per-cell requests | ||
| /// | ||
| /// This method routes the request data to appropriate cells and builds | ||
| /// per-cell requests that will be sent to upstreams. | ||
| async fn split_requests( | ||
| async fn split_request( | ||
| &self, | ||
| request: Req, | ||
| request: Request<HandlerBody>, | ||
| cells: &Cells, | ||
| ) -> Result<(Vec<(CellId, Req)>, Self::SplitMetadata), IngestRouterError>; | ||
| ) -> Result<(Vec<(CellId, Request<HandlerBody>)>, SplitMetadata), IngestRouterError>; | ||
|
|
||
| /// Merge results from multiple cells into a single response | ||
| /// | ||
| /// This method combines responses from successful cells, handles failures, | ||
| /// and incorporates metadata from the split phase. | ||
| /// | ||
| fn merge_results( | ||
| async fn merge_responses( | ||
| &self, | ||
| results: Vec<Result<(CellId, Res), (CellId, IngestRouterError)>>, | ||
| metadata: Self::SplitMetadata, | ||
| ) -> Res; | ||
| responses: Vec<(CellId, Result<Response<HandlerBody>, IngestRouterError>)>, | ||
| metadata: SplitMetadata, | ||
| ) -> Response<HandlerBody>; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| pub mod api; | ||
| pub mod config; | ||
| pub mod errors; | ||
| pub mod handler; | ||
|
|
@@ -24,7 +25,7 @@ pub async fn run(config: config::Config) -> Result<(), IngestRouterError> { | |
| let locator = Locator::new(config.locator.to_client_config()).await?; | ||
|
|
||
| let ingest_router_service = IngestRouterService { | ||
| router: router::Router::new(config.routes, locator), | ||
| router: router::Router::new(config.routes, config.locales, locator), | ||
| }; | ||
| let router_task = run_http_service( | ||
| &config.listener.host, | ||
|
|
@@ -51,15 +52,25 @@ where | |
| Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; | ||
|
|
||
| fn call(&self, req: Request<B>) -> Self::Future { | ||
| let handler = self | ||
| .router | ||
| .resolve(&req) | ||
| .and_then(|action| self.router.get_handler(action)); | ||
| let maybe_handler = self.router.resolve(&req); | ||
|
|
||
| match maybe_handler { | ||
| Some((handler, cells)) => { | ||
| // Convert Request<B> to Request<HandlerBody> | ||
| let (parts, body) = req.into_parts(); | ||
| let handler_body = body | ||
| .map_err(|e| IngestRouterError::RequestBodyError(e.to_string())) | ||
| .boxed(); | ||
| let handler_req = Request::from_parts(parts, handler_body); | ||
|
|
||
| match handler { | ||
| Some(_handler) => { | ||
| // TODO: Placeholder response | ||
| Box::pin(async move { | ||
| let (split, _metadata) = handler.split_request(handler_req, &cells).await?; | ||
|
|
||
| for (cell_id, req) in split { | ||
| println!("Cell: {}, URI: {}", cell_id, req.uri()); | ||
| } | ||
|
|
||
| Ok(Response::new( | ||
| Full::new("ok\n".into()).map_err(|e| match e {}).boxed(), | ||
| )) | ||
|
|
@@ -74,13 +85,15 @@ where | |
| mod tests { | ||
| use super::*; | ||
| use crate::config::{HandlerAction, HttpMethod, Match, Route}; | ||
| use http_body_util::Empty; | ||
| use hyper::Method; | ||
| use hyper::body::Bytes; | ||
| use hyper::header::HOST; | ||
|
|
||
| use crate::config::CellConfig; | ||
| use locator::config::LocatorDataType; | ||
| use locator::locator::Locator as LocatorService; | ||
| use std::collections::HashMap; | ||
| use url::Url; | ||
|
|
||
| use crate::testutils::get_mock_provider; | ||
| use std::sync::Arc; | ||
|
|
@@ -97,6 +110,15 @@ mod tests { | |
| locale: "us".to_string(), | ||
| }]; | ||
|
|
||
| let locales = HashMap::from([( | ||
| "us".to_string(), | ||
| vec![CellConfig { | ||
| id: "us1".to_string(), | ||
| sentry_url: Url::parse("https://sentry.io/us1").unwrap(), | ||
| relay_url: Url::parse("https://relay.io/us1").unwrap(), | ||
| }], | ||
| )]); | ||
|
|
||
| let (_dir, provider) = get_mock_provider().await; | ||
| let locator_service = LocatorService::new( | ||
| LocatorDataType::ProjectKey, | ||
|
|
@@ -107,7 +129,7 @@ mod tests { | |
| let locator = Locator::from_in_process_service(locator_service); | ||
|
|
||
| let service = IngestRouterService { | ||
| router: router::Router::new(routes_config, locator), | ||
| router: router::Router::new(routes_config, locales, locator), | ||
| }; | ||
|
|
||
| // Project configs request | ||
|
|
@@ -116,14 +138,16 @@ mod tests { | |
| .uri("/api/0/relays/projectconfigs/") | ||
| .header(HOST, "us.sentry.io") | ||
| .body( | ||
| Empty::<Bytes>::new() | ||
| .map_err(|never| match never {}) | ||
| Full::new(Bytes::from(r#"{"publicKeys": ["test-key"]}"#)) | ||
| .map_err(|e| match e {}) | ||
| .boxed(), | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let response = service.call(request).await.unwrap(); | ||
|
|
||
| // TODO: call the scripts/mock_relay_api.py server and validate the response | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a note for later -- we should add a true end to end test here |
||
|
|
||
| assert_eq!(response.status(), 200); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| //! during request processing. | ||
|
|
||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
| use url::Url; | ||
|
|
||
| use crate::config::CellConfig; | ||
|
|
@@ -79,7 +80,7 @@ impl Cells { | |
| #[derive(Clone, Debug)] | ||
| pub struct Locales { | ||
| /// Mapping from locale to cells | ||
| locale_to_cells: HashMap<String, Cells>, | ||
| locale_to_cells: HashMap<String, Arc<Cells>>, | ||
| } | ||
|
|
||
| impl Locales { | ||
|
|
@@ -89,7 +90,7 @@ impl Locales { | |
| let locale_to_cells = locales | ||
| .into_iter() | ||
| .map(|(locale, cells)| { | ||
| let cells = Cells::from_config(cells); | ||
| let cells = Arc::new(Cells::from_config(cells)); | ||
| (locale, cells) | ||
| }) | ||
| .collect(); | ||
|
|
@@ -98,8 +99,8 @@ impl Locales { | |
| } | ||
|
|
||
| /// Get the cells for a specific locale | ||
| pub fn get_cells(&self, locale: &str) -> Option<&Cells> { | ||
| self.locale_to_cells.get(locale) | ||
| pub fn get_cells(&self, locale: &str) -> Option<Arc<Cells>> { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched to |
||
| self.locale_to_cells.get(locale).cloned() | ||
| } | ||
| } | ||
|
|
||
|
|
||
This comment was marked as outdated.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.