Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ reqwest = { version = "0.12.23", features = ["json"] }
sentry = { version = "0.45.0", features = ["tracing"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_yaml = "0.9.34"
serde_json = "1.0.145"
tempfile = "3.23.0"
thiserror = "2.0.17"
tokio = { version = "1.48.0", features = ["full"] }
Expand Down
16 changes: 13 additions & 3 deletions example_config_ingest_router.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,27 @@ ingest_router:
port: 3001
locales:
us:
us1:
- name: us1
sentry_url: "http://10.0.0.1:8080"
relay_url: "http://10.0.0.1:8090"
us2:
- name: us2
sentry_url: "http://10.0.0.2:8080"
relay_url: "http://10.0.0.2:8090"
de:
de:
- name: de
sentry_url: "http://10.0.0.3:8080"
relay_url: "http://10.0.0.3:8090"

# Timeout configuration for relay handlers (optional, defaults shown)
# Two-layer timeout strategy:
# - HTTP timeout: Per-request timeout for individual HTTP calls (fixed)
# - Task timeout: Global cutoff - once first succeeds, ALL remaining tasks have
# task_subsequent_timeout_secs total to complete (prevents slow cells from blocking)
# relay_timeouts:
# http_timeout_secs: 15 # HTTP request timeout
# task_initial_timeout_secs: 20 # Wait for first upstream (must be >= http_timeout)
# task_subsequent_timeout_secs: 5 # Global deadline after first success

routes:
- match:
host: us.sentry.io
Expand Down
1 change: 1 addition & 0 deletions ingest-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
shared = { path = "../shared" }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
162 changes: 131 additions & 31 deletions ingest-router/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub enum ValidationError {

#[error("Locale '{0}' has no valid cells (none of its cells match any upstream)")]
LocaleHasNoValidCells(String),

#[error("Invalid timeout configuration: {0}")]
InvalidTimeouts(String),
}

/// HTTP methods supported for route matching
Expand Down Expand Up @@ -59,10 +62,63 @@ pub struct RelayProjectConfigsArgs {
pub locale: String,
}

/// Timeout configuration for relay project configs handler
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(default)]
pub struct RelayTimeouts {
/// HTTP request timeout for individual upstream calls (seconds).
/// This is the maximum time a single HTTP request can take.
/// Default: 15 seconds
pub http_timeout_secs: u16,

/// Task timeout when waiting for the first upstream to respond (seconds).
/// Must be >= http_timeout_secs to allow HTTP requests to complete.
/// Default: 20 seconds
pub task_initial_timeout_secs: u16,

/// Deadline for all remaining tasks after first success (seconds).
/// Aggressively cuts off slow upstreams once we have good data.
/// Default: 5 seconds
pub task_subsequent_timeout_secs: u16,
}

impl Default for RelayTimeouts {
fn default() -> Self {
Self {
http_timeout_secs: 15,
task_initial_timeout_secs: 20,
task_subsequent_timeout_secs: 5,
}
}
}

impl RelayTimeouts {
/// Validates the timeout configuration
pub fn validate(&self) -> Result<(), ValidationError> {
// Initial task timeout must be >= HTTP timeout to allow requests to complete
if self.task_initial_timeout_secs < self.http_timeout_secs {
return Err(ValidationError::InvalidTimeouts(
"task_initial_timeout_secs must be >= http_timeout_secs".to_string(),
));
}

// Subsequent task timeout should be > 0
if self.task_subsequent_timeout_secs == 0 {
return Err(ValidationError::InvalidTimeouts(
"task_subsequent_timeout_secs must be > 0".to_string(),
));
}

Ok(())
}
}

/// Cell/upstream configuration
/// Note: The cell name is the HashMap key in Config.locales
#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct CellConfig {
/// Name/identifier of the cell
pub name: String,
/// URL of the Sentry upstream server
pub sentry_url: Url,
/// URL of the Relay upstream server
Expand All @@ -80,12 +136,14 @@ pub struct Config {
pub admin_listener: AdminListener,
/// Maps locale identifiers to their cells (cell name -> cell config)
///
/// Note: Uses String keys instead of an enum to allow flexible,
/// deployment-specific locale configuration without code changes.
/// Different deployments may use different locale identifiers.
pub locales: HashMap<String, HashMap<String, CellConfig>>,
/// Cells are stored as a Vec to maintain priority order - the first cell
/// in the list has highest priority for global config responses.
pub locales: HashMap<String, Vec<CellConfig>>,
/// Request routing rules
pub routes: Vec<Route>,
/// Timeout configuration for relay handlers
#[serde(default)]
pub relay_timeouts: RelayTimeouts,
}

impl Config {
Expand All @@ -95,18 +153,25 @@ impl Config {
self.listener.validate()?;
self.admin_listener.validate()?;

// Validate timeouts
self.relay_timeouts.validate()?;

// Validate locales and cells
for (locale, cells) in &self.locales {
// Check that locale has at least one cell
if cells.is_empty() {
return Err(ValidationError::LocaleHasNoValidCells(locale.clone()));
}

// Check for empty cell names (HashMap keys)
for cell_name in cells.keys() {
if cell_name.is_empty() {
// Check for empty cell names and collect for duplicate checking
let mut seen_names = HashSet::new();
for cell in cells {
if cell.name.is_empty() {
return Err(ValidationError::EmptyUpstreamName);
}
if !seen_names.insert(&cell.name) {
return Err(ValidationError::DuplicateUpstream(cell.name.clone()));
}
}
}

Expand Down Expand Up @@ -226,16 +291,16 @@ admin_listener:
port: 3001
locales:
us:
us1:
sentry_url: "http://127.0.0.1:8080"
relay_url: "http://127.0.0.1:8090"
us2:
sentry_url: "http://10.0.0.2:8080"
relay_url: "http://10.0.0.2:8090"
- name: us1
sentry_url: "http://127.0.0.1:8080"
relay_url: "http://127.0.0.1:8090"
- name: us2
sentry_url: "http://10.0.0.2:8080"
relay_url: "http://10.0.0.2:8090"
de:
de1:
sentry_url: "http://10.0.0.3:8080"
relay_url: "http://10.0.0.3:8090"
- name: de1
sentry_url: "http://10.0.0.3:8080"
relay_url: "http://10.0.0.3:8090"
routes:
- match:
host: us.sentry.io
Expand All @@ -261,6 +326,8 @@ routes:
assert_eq!(config.locales.len(), 2);
assert_eq!(config.locales.get("us").unwrap().len(), 2);
assert_eq!(config.locales.get("de").unwrap().len(), 1);
assert_eq!(config.locales.get("us").unwrap()[0].name, "us1");
assert_eq!(config.locales.get("us").unwrap()[1].name, "us2");
assert_eq!(config.routes.len(), 2);
assert_eq!(config.routes[0].r#match.method, Some(HttpMethod::Post));
assert_eq!(config.routes[1].r#match.host, None);
Expand All @@ -285,14 +352,13 @@ routes:
},
locales: HashMap::from([(
"us".to_string(),
HashMap::from([(
"us1".to_string(),
CellConfig {
sentry_url: Url::parse("http://127.0.0.1:8080").unwrap(),
relay_url: Url::parse("http://127.0.0.1:8090").unwrap(),
},
)]),
vec![CellConfig {
name: "us1".to_string(),
sentry_url: Url::parse("http://127.0.0.1:8080").unwrap(),
relay_url: Url::parse("http://127.0.0.1:8090").unwrap(),
}],
)]),
relay_timeouts: RelayTimeouts::default(),
routes: vec![Route {
r#match: Match {
path: Some("/api/".to_string()),
Expand All @@ -315,18 +381,28 @@ routes:

// Test empty cell name
let mut config = base_config.clone();
config.locales.get_mut("us").unwrap().insert(
"".to_string(),
CellConfig {
sentry_url: Url::parse("http://10.0.0.2:8080").unwrap(),
relay_url: Url::parse("http://10.0.0.2:8090").unwrap(),
},
);
config.locales.get_mut("us").unwrap().push(CellConfig {
name: "".to_string(),
sentry_url: Url::parse("http://10.0.0.2:8080").unwrap(),
relay_url: Url::parse("http://10.0.0.2:8090").unwrap(),
});
assert!(matches!(
config.validate().unwrap_err(),
ValidationError::EmptyUpstreamName
));

// Test duplicate cell name
let mut config = base_config.clone();
config.locales.get_mut("us").unwrap().push(CellConfig {
name: "us1".to_string(),
sentry_url: Url::parse("http://10.0.0.2:8080").unwrap(),
relay_url: Url::parse("http://10.0.0.2:8090").unwrap(),
});
assert!(matches!(
config.validate().unwrap_err(),
ValidationError::DuplicateUpstream(_)
));

// Test unknown locale in action
let mut config = base_config.clone();
config.routes[0].action = HandlerAction::RelayProjectConfigs(RelayProjectConfigsArgs {
Expand All @@ -341,11 +417,35 @@ routes:
let mut config = base_config.clone();
config
.locales
.insert("invalid_locale".to_string(), HashMap::new());
.insert("invalid_locale".to_string(), Vec::new());
assert!(matches!(
config.validate().unwrap_err(),
ValidationError::LocaleHasNoValidCells(_)
));

// Test invalid timeouts: task_initial < http
let mut config = base_config.clone();
config.relay_timeouts = RelayTimeouts {
http_timeout_secs: 20,
task_initial_timeout_secs: 15, // Less than HTTP timeout
task_subsequent_timeout_secs: 5,
};
assert!(matches!(
config.validate().unwrap_err(),
ValidationError::InvalidTimeouts(_)
));

// Test invalid timeouts: task_subsequent = 0
let mut config = base_config.clone();
config.relay_timeouts = RelayTimeouts {
http_timeout_secs: 15,
task_initial_timeout_secs: 20,
task_subsequent_timeout_secs: 0, // Zero timeout
};
assert!(matches!(
config.validate().unwrap_err(),
ValidationError::InvalidTimeouts(_)
));
}

#[test]
Expand Down
Loading
Loading