Skip to content

Commit f152765

Browse files
authored
feat(router): added support for label overrides with @override (#518)
1 parent b8f583c commit f152765

File tree

11 files changed

+246
-15
lines changed

11 files changed

+246
-15
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,4 @@ reqwest = "0.12.23"
5858
retry-policies = "0.4.0"
5959
reqwest-retry = "0.7.0"
6060
reqwest-middleware = "0.4.2"
61+
vrl = { version = "0.27.0", features = ["compiler", "parser", "value", "diagnostic", "stdlib", "core"] }

bin/router/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ jsonwebtoken = { workspace = true }
4343
retry-policies = { workspace = true}
4444
reqwest-retry = { workspace = true }
4545
reqwest-middleware = { workspace = true }
46+
vrl = { workspace = true }
4647

4748
mimalloc = { version = "0.1.48", features = ["v3"] }
4849
moka = { version = "0.12.10", features = ["future"] }

bin/router/src/pipeline/error.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ use serde::{Deserialize, Serialize};
1717

1818
use crate::{
1919
jwt::errors::JwtForwardingError,
20-
pipeline::header::{RequestAccepts, APPLICATION_GRAPHQL_RESPONSE_JSON_STR},
20+
pipeline::{
21+
header::{RequestAccepts, APPLICATION_GRAPHQL_RESPONSE_JSON_STR},
22+
progressive_override::LabelEvaluationError,
23+
},
2124
};
2225

2326
#[derive(Debug)]
@@ -79,6 +82,8 @@ pub enum PipelineErrorVariant {
7982
PlanExecutionError(PlanExecutionError),
8083
#[error("Failed to produce a plan: {0}")]
8184
PlannerError(Arc<PlannerError>),
85+
#[error(transparent)]
86+
LabelEvaluationError(LabelEvaluationError),
8287

8388
// HTTP Security-related errors
8489
#[error("Required CSRF header(s) not present")]
@@ -95,6 +100,7 @@ impl PipelineErrorVariant {
95100
Self::UnsupportedHttpMethod(_) => "METHOD_NOT_ALLOWED",
96101
Self::PlannerError(_) => "QUERY_PLAN_BUILD_FAILED",
97102
Self::PlanExecutionError(_) => "QUERY_PLAN_EXECUTION_FAILED",
103+
Self::LabelEvaluationError(_) => "OVERRIDE_LABEL_EVALUATION_FAILED",
98104
Self::FailedToParseOperation(_) => "GRAPHQL_PARSE_FAILED",
99105
Self::ValidationErrors(_) => "GRAPHQL_VALIDATION_FAILED",
100106
Self::VariablesCoercionError(_) => "BAD_USER_INPUT",
@@ -122,6 +128,7 @@ impl PipelineErrorVariant {
122128
match (self, prefer_ok) {
123129
(Self::PlannerError(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
124130
(Self::PlanExecutionError(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
131+
(Self::LabelEvaluationError(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
125132
(Self::JwtForwardingError(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
126133
(Self::UnsupportedHttpMethod(_), _) => StatusCode::METHOD_NOT_ALLOWED,
127134
(Self::InvalidHeaderValue(_), _) => StatusCode::BAD_REQUEST,

bin/router/src/pipeline/mod.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use std::{borrow::Cow, sync::Arc};
22

3-
use hive_router_plan_executor::execution::plan::PlanExecutionOutput;
4-
use hive_router_query_planner::utils::cancellation::CancellationToken;
3+
use hive_router_plan_executor::execution::plan::{
4+
ClientRequestDetails, OperationDetails, PlanExecutionOutput,
5+
};
6+
use hive_router_query_planner::{
7+
state::supergraph_state::OperationKind, utils::cancellation::CancellationToken,
8+
};
59
use http::{header::CONTENT_TYPE, HeaderValue, Method};
610
use ntex::{
711
util::Bytes,
@@ -12,7 +16,7 @@ use crate::{
1216
pipeline::{
1317
coerce_variables::coerce_request_variables,
1418
csrf_prevention::perform_csrf_prevention,
15-
error::PipelineError,
19+
error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant},
1620
execution::execute_plan,
1721
execution_request::get_execution_request,
1822
header::{
@@ -111,7 +115,6 @@ pub async fn execute_pipeline(
111115
validate_operation_with_cache(req, supergraph, schema_state, shared_state, &parser_payload)
112116
.await?;
113117

114-
let progressive_override_ctx = request_override_context()?;
115118
let normalize_payload = normalize_request_with_cache(
116119
req,
117120
supergraph,
@@ -120,13 +123,35 @@ pub async fn execute_pipeline(
120123
&parser_payload,
121124
)
122125
.await?;
123-
let query = Cow::Owned(execution_request.query.clone());
126+
let query: Cow<'_, str> = Cow::Owned(execution_request.query.clone());
124127
let variable_payload =
125128
coerce_request_variables(req, supergraph, execution_request, &normalize_payload)?;
126129

127130
let query_plan_cancellation_token =
128131
CancellationToken::with_timeout(shared_state.router_config.query_planner.timeout);
129132

133+
let progressive_override_ctx =
134+
request_override_context(&shared_state.override_labels_evaluator, || {
135+
ClientRequestDetails {
136+
method: req.method().clone(),
137+
url: req.uri().clone(),
138+
headers: req.headers(),
139+
operation: OperationDetails {
140+
name: normalize_payload.operation_for_plan.name.clone(),
141+
kind: match normalize_payload.operation_for_plan.operation_kind {
142+
Some(OperationKind::Query) => "query",
143+
Some(OperationKind::Mutation) => "mutation",
144+
Some(OperationKind::Subscription) => "subscription",
145+
None => "query",
146+
},
147+
query: query.clone(),
148+
},
149+
}
150+
})
151+
.map_err(|error| {
152+
req.new_pipeline_error(PipelineErrorVariant::LabelEvaluationError(error))
153+
})?;
154+
130155
let query_plan_payload = plan_operation_with_cache(
131156
req,
132157
supergraph,

bin/router/src/pipeline/progressive_override.rs

Lines changed: 144 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,44 @@
1-
use std::collections::{BTreeMap, HashSet};
1+
use std::collections::{BTreeMap, HashMap, HashSet};
22

3+
use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig};
4+
use hive_router_plan_executor::execution::plan::ClientRequestDetails;
35
use hive_router_query_planner::{
46
graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR},
57
state::supergraph_state::SupergraphState,
68
};
79
use rand::Rng;
10+
use vrl::{
11+
compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue},
12+
core::Value as VrlValue,
13+
prelude::{
14+
state::RuntimeState as VrlState, Context as VrlContext, ExpressionError,
15+
TimeZone as VrlTimeZone,
16+
},
17+
stdlib::all as vrl_build_functions,
18+
value::Secrets as VrlSecrets,
19+
};
20+
21+
#[derive(thiserror::Error, Debug)]
22+
#[error("Failed to compile override label expression for label '{label}': {error}")]
23+
pub struct OverrideLabelsCompileError {
24+
pub label: String,
25+
pub error: String,
26+
}
827

9-
use super::error::PipelineError;
28+
#[derive(thiserror::Error, Debug)]
29+
pub enum LabelEvaluationError {
30+
#[error(
31+
"Failed to resolve VRL expression for override label '{label}'. Runtime error: {source}"
32+
)]
33+
ExpressionResolutionFailure {
34+
label: String,
35+
source: ExpressionError,
36+
},
37+
#[error(
38+
"VRL expression for override label '{label}' did not evaluate to a boolean. Got: {got}"
39+
)]
40+
ExpressionWrongType { label: String, got: String },
41+
}
1042

1143
/// Contains the request-specific context for progressive overrides.
1244
/// This is stored in the request extensions
@@ -19,9 +51,14 @@ pub struct RequestOverrideContext {
1951
}
2052

2153
#[inline]
22-
pub fn request_override_context() -> Result<RequestOverrideContext, PipelineError> {
23-
// No active flags by default - until we implement it
24-
let active_flags = HashSet::new();
54+
pub fn request_override_context<'req, F>(
55+
override_labels_evaluator: &OverrideLabelsEvaluator,
56+
get_client_request: F,
57+
) -> Result<RequestOverrideContext, LabelEvaluationError>
58+
where
59+
F: FnOnce() -> ClientRequestDetails<'req>,
60+
{
61+
let active_flags = override_labels_evaluator.evaluate(get_client_request)?;
2562

2663
// Generate the random percentage value for this request.
2764
// Percentage is 0 - 100_000_000_000 (100*PERCENTAGE_SCALE_FACTOR)
@@ -77,3 +114,105 @@ impl StableOverrideContext {
77114
}
78115
}
79116
}
117+
118+
/// Evaluator for override labels based on configuration.
119+
/// This struct compiles and evaluates the override label expressions.
120+
/// It's intended to be used as a shared state in the router.
121+
pub struct OverrideLabelsEvaluator {
122+
static_enabled_labels: HashSet<String>,
123+
expressions: HashMap<String, VrlProgram>,
124+
}
125+
126+
impl OverrideLabelsEvaluator {
127+
pub(crate) fn from_config(
128+
override_labels_config: &OverrideLabelsConfig,
129+
) -> Result<Self, OverrideLabelsCompileError> {
130+
let mut static_enabled_labels = HashSet::new();
131+
let mut expressions = HashMap::new();
132+
let vrl_functions = vrl_build_functions();
133+
134+
for (label, value) in override_labels_config.iter() {
135+
match value {
136+
LabelOverrideValue::Boolean(true) => {
137+
static_enabled_labels.insert(label.clone());
138+
}
139+
LabelOverrideValue::Expression { expression } => {
140+
let compilation_result =
141+
vrl_compile(expression, &vrl_functions).map_err(|diagnostics| {
142+
OverrideLabelsCompileError {
143+
label: label.clone(),
144+
error: diagnostics
145+
.errors()
146+
.into_iter()
147+
.map(|d| d.code.to_string() + ": " + &d.message)
148+
.collect::<Vec<_>>()
149+
.join(", "),
150+
}
151+
})?;
152+
expressions.insert(label.clone(), compilation_result.program);
153+
}
154+
_ => {} // Skip false booleans
155+
}
156+
}
157+
158+
Ok(Self {
159+
static_enabled_labels,
160+
expressions,
161+
})
162+
}
163+
164+
pub(crate) fn evaluate<'req, F>(
165+
&self,
166+
get_client_request: F,
167+
) -> Result<HashSet<String>, LabelEvaluationError>
168+
where
169+
F: FnOnce() -> ClientRequestDetails<'req>,
170+
{
171+
let mut active_flags = self.static_enabled_labels.clone();
172+
173+
if self.expressions.is_empty() {
174+
return Ok(active_flags);
175+
}
176+
177+
let client_request = get_client_request();
178+
let mut target = VrlTargetValue {
179+
value: VrlValue::Object(BTreeMap::from([(
180+
"request".into(),
181+
(&client_request).into(),
182+
)])),
183+
metadata: VrlValue::Object(BTreeMap::new()),
184+
secrets: VrlSecrets::default(),
185+
};
186+
187+
let mut state = VrlState::default();
188+
let timezone = VrlTimeZone::default();
189+
let mut ctx = VrlContext::new(&mut target, &mut state, &timezone);
190+
191+
for (label, expression) in &self.expressions {
192+
match expression.resolve(&mut ctx) {
193+
Ok(evaluated_value) => match evaluated_value {
194+
VrlValue::Boolean(true) => {
195+
active_flags.insert(label.clone());
196+
}
197+
VrlValue::Boolean(false) => {
198+
// Do nothing for false
199+
}
200+
invalid_value => {
201+
return Err(LabelEvaluationError::ExpressionWrongType {
202+
label: label.clone(),
203+
got: format!("{:?}", invalid_value),
204+
});
205+
}
206+
},
207+
Err(err) => {
208+
return Err(LabelEvaluationError::ExpressionResolutionFailure {
209+
label: label.clone(),
210+
source: err,
211+
});
212+
}
213+
}
214+
}
215+
216+
Ok(active_flags)
217+
}
218+
}

bin/router/src/shared_state.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ use std::sync::Arc;
88

99
use crate::jwt::JwtAuthRuntime;
1010
use crate::pipeline::cors::{CORSConfigError, Cors};
11+
use crate::pipeline::progressive_override::{OverrideLabelsCompileError, OverrideLabelsEvaluator};
1112

1213
pub struct RouterSharedState {
1314
pub validation_plan: ValidationPlan,
1415
pub parse_cache: Cache<u64, Arc<graphql_parser::query::Document<'static, String>>>,
1516
pub router_config: Arc<HiveRouterConfig>,
1617
pub headers_plan: HeaderRulesPlan,
18+
pub override_labels_evaluator: OverrideLabelsEvaluator,
1719
pub cors_runtime: Option<Cors>,
1820
pub jwt_auth_runtime: Option<JwtAuthRuntime>,
1921
}
@@ -29,6 +31,10 @@ impl RouterSharedState {
2931
parse_cache: moka::future::Cache::new(1000),
3032
cors_runtime: Cors::from_config(&router_config.cors).map_err(Box::new)?,
3133
router_config: router_config.clone(),
34+
override_labels_evaluator: OverrideLabelsEvaluator::from_config(
35+
&router_config.override_labels,
36+
)
37+
.map_err(Box::new)?,
3238
jwt_auth_runtime,
3339
})
3440
}
@@ -37,7 +43,9 @@ impl RouterSharedState {
3743
#[derive(thiserror::Error, Debug)]
3844
pub enum SharedStateError {
3945
#[error("invalid headers config: {0}")]
40-
HeaderRuleCompileError(#[from] Box<HeaderRuleCompileError>),
46+
HeaderRuleCompile(#[from] Box<HeaderRuleCompileError>),
4147
#[error("invalid regex in CORS config: {0}")]
42-
CORSConfigError(#[from] Box<CORSConfigError>),
48+
CORSConfig(#[from] Box<CORSConfigError>),
49+
#[error("invalid override labels config: {0}")]
50+
OverrideLabelsCompile(#[from] Box<OverrideLabelsCompileError>),
4351
}

docs/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
|[**http**](#http)|`object`|Configuration for the HTTP server/listener.<br/>Default: `{"host":"0.0.0.0","port":4000}`<br/>||
1212
|[**jwt**](#jwt)|`object`, `null`|Configuration for JWT authentication plugin.<br/>|yes|
1313
|[**log**](#log)|`object`|The router logger configuration.<br/>Default: `{"filter":null,"format":"json","level":"info"}`<br/>||
14+
|[**override\_labels**](#override_labels)|`object`|Configuration for overriding labels.<br/>||
1415
|[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.<br/>Default: `{}`<br/>||
1516
|[**query\_planner**](#query_planner)|`object`|Query planning configuration.<br/>Default: `{"allow_expose":false,"timeout":"10s"}`<br/>||
1617
|[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).<br/>||
@@ -63,6 +64,7 @@ log:
6364
filter: null
6465
format: json
6566
level: info
67+
override_labels: {}
6668
override_subgraph_urls:
6769
accounts:
6870
url: https://accounts.example.com/graphql
@@ -1536,6 +1538,18 @@ level: info
15361538
15371539
```
15381540

1541+
<a name="override_labels"></a>
1542+
## override\_labels: object
1543+
1544+
Configuration for overriding labels.
1545+
1546+
1547+
**Additional Properties**
1548+
1549+
|Name|Type|Description|Required|
1550+
|----|----|-----------|--------|
1551+
|**Additional Properties**||Defines the value for a label override.<br/><br/>It can be a simple boolean,<br/>or an object containing the expression that evaluates to a boolean.<br/>||
1552+
15391553
<a name="override_subgraph_urls"></a>
15401554
## override\_subgraph\_urls: object
15411555

lib/executor/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ thiserror = { workspace = true }
2929
xxhash-rust = { workspace = true }
3030
tokio = { workspace = true, features = ["sync"] }
3131
dashmap = { workspace = true }
32+
vrl = { workspace = true }
33+
3234
ahash = "0.8.12"
3335
regex-automata = "0.4.10"
34-
vrl = { version = "0.27.0", features = ["compiler", "parser", "value", "diagnostic", "stdlib", "core"] }
3536
strum = { version = "0.27.2", features = ["derive"] }
36-
3737
ntex-http = "0.1.15"
3838
hyper-tls = { version = "0.6.0", features = ["vendored"] }
3939
hyper-util = { version = "0.1.16", features = [

0 commit comments

Comments
 (0)