Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ reqwest = "0.12.23"
retry-policies = "0.4.0"
reqwest-retry = "0.7.0"
reqwest-middleware = "0.4.2"
vrl = { version = "0.27.0", features = ["compiler", "parser", "value", "diagnostic", "stdlib", "core"] }
1 change: 1 addition & 0 deletions bin/router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ jsonwebtoken = { workspace = true }
retry-policies = { workspace = true}
reqwest-retry = { workspace = true }
reqwest-middleware = { workspace = true }
vrl = { workspace = true }

mimalloc = { version = "0.1.48", features = ["v3"] }
moka = { version = "0.12.10", features = ["future"] }
Expand Down
9 changes: 8 additions & 1 deletion bin/router/src/pipeline/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use serde::{Deserialize, Serialize};

use crate::{
jwt::errors::JwtForwardingError,
pipeline::header::{RequestAccepts, APPLICATION_GRAPHQL_RESPONSE_JSON_STR},
pipeline::{
header::{RequestAccepts, APPLICATION_GRAPHQL_RESPONSE_JSON_STR},
progressive_override::LabelEvaluationError,
},
};

#[derive(Debug)]
Expand Down Expand Up @@ -79,6 +82,8 @@ pub enum PipelineErrorVariant {
PlanExecutionError(PlanExecutionError),
#[error("Failed to produce a plan: {0}")]
PlannerError(Arc<PlannerError>),
#[error(transparent)]
LabelEvaluationError(LabelEvaluationError),

// HTTP Security-related errors
#[error("Required CSRF header(s) not present")]
Expand All @@ -95,6 +100,7 @@ impl PipelineErrorVariant {
Self::UnsupportedHttpMethod(_) => "METHOD_NOT_ALLOWED",
Self::PlannerError(_) => "QUERY_PLAN_BUILD_FAILED",
Self::PlanExecutionError(_) => "QUERY_PLAN_EXECUTION_FAILED",
Self::LabelEvaluationError(_) => "OVERRIDE_LABEL_EVALUATION_FAILED",
Self::FailedToParseOperation(_) => "GRAPHQL_PARSE_FAILED",
Self::ValidationErrors(_) => "GRAPHQL_VALIDATION_FAILED",
Self::VariablesCoercionError(_) => "BAD_USER_INPUT",
Expand Down Expand Up @@ -122,6 +128,7 @@ impl PipelineErrorVariant {
match (self, prefer_ok) {
(Self::PlannerError(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
(Self::PlanExecutionError(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
(Self::LabelEvaluationError(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
(Self::JwtForwardingError(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
(Self::UnsupportedHttpMethod(_), _) => StatusCode::METHOD_NOT_ALLOWED,
(Self::InvalidHeaderValue(_), _) => StatusCode::BAD_REQUEST,
Expand Down
35 changes: 30 additions & 5 deletions bin/router/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::{borrow::Cow, sync::Arc};

use hive_router_plan_executor::execution::plan::PlanExecutionOutput;
use hive_router_query_planner::utils::cancellation::CancellationToken;
use hive_router_plan_executor::execution::plan::{
ClientRequestDetails, OperationDetails, PlanExecutionOutput,
};
use hive_router_query_planner::{
state::supergraph_state::OperationKind, utils::cancellation::CancellationToken,
};
use http::{header::CONTENT_TYPE, HeaderValue, Method};
use ntex::{
util::Bytes,
Expand All @@ -12,7 +16,7 @@ use crate::{
pipeline::{
coerce_variables::coerce_request_variables,
csrf_prevention::perform_csrf_prevention,
error::PipelineError,
error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant},
execution::execute_plan,
execution_request::get_execution_request,
header::{
Expand Down Expand Up @@ -111,7 +115,6 @@ pub async fn execute_pipeline(
validate_operation_with_cache(req, supergraph, schema_state, shared_state, &parser_payload)
.await?;

let progressive_override_ctx = request_override_context()?;
let normalize_payload = normalize_request_with_cache(
req,
supergraph,
Expand All @@ -120,13 +123,35 @@ pub async fn execute_pipeline(
&parser_payload,
)
.await?;
let query = Cow::Owned(execution_request.query.clone());
let query: Cow<'_, str> = Cow::Owned(execution_request.query.clone());
let variable_payload =
coerce_request_variables(req, supergraph, execution_request, &normalize_payload)?;

let query_plan_cancellation_token =
CancellationToken::with_timeout(shared_state.router_config.query_planner.timeout);

let progressive_override_ctx =
request_override_context(&shared_state.override_labels_evaluator, || {
ClientRequestDetails {
method: req.method().clone(),
url: req.uri().clone(),
headers: req.headers(),
operation: OperationDetails {
name: normalize_payload.operation_for_plan.name.clone(),
kind: match normalize_payload.operation_for_plan.operation_kind {
Some(OperationKind::Query) => "query",
Some(OperationKind::Mutation) => "mutation",
Some(OperationKind::Subscription) => "subscription",
None => "query",
},
query: query.clone(),
},
}
})
.map_err(|error| {
req.new_pipeline_error(PipelineErrorVariant::LabelEvaluationError(error))
})?;

let query_plan_payload = plan_operation_with_cache(
req,
supergraph,
Expand Down
149 changes: 144 additions & 5 deletions bin/router/src/pipeline/progressive_override.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,44 @@
use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};

use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig};
use hive_router_plan_executor::execution::plan::ClientRequestDetails;
use hive_router_query_planner::{
graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR},
state::supergraph_state::SupergraphState,
};
use rand::Rng;
use vrl::{
compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue},
core::Value as VrlValue,
prelude::{
state::RuntimeState as VrlState, Context as VrlContext, ExpressionError,
TimeZone as VrlTimeZone,
},
stdlib::all as vrl_build_functions,
value::Secrets as VrlSecrets,
};

#[derive(thiserror::Error, Debug)]
#[error("Failed to compile override label expression for label '{label}': {error}")]
pub struct OverrideLabelsCompileError {
pub label: String,
pub error: String,
}

use super::error::PipelineError;
#[derive(thiserror::Error, Debug)]
pub enum LabelEvaluationError {
#[error(
"Failed to resolve VRL expression for override label '{label}'. Runtime error: {source}"
)]
ExpressionResolutionFailure {
label: String,
source: ExpressionError,
},
#[error(
"VRL expression for override label '{label}' did not evaluate to a boolean. Got: {got}"
)]
ExpressionWrongType { label: String, got: String },
}

/// Contains the request-specific context for progressive overrides.
/// This is stored in the request extensions
Expand All @@ -19,9 +51,14 @@ pub struct RequestOverrideContext {
}

#[inline]
pub fn request_override_context() -> Result<RequestOverrideContext, PipelineError> {
// No active flags by default - until we implement it
let active_flags = HashSet::new();
pub fn request_override_context<'req, F>(
override_labels_evaluator: &OverrideLabelsEvaluator,
get_client_request: F,
) -> Result<RequestOverrideContext, LabelEvaluationError>
where
F: FnOnce() -> ClientRequestDetails<'req>,
{
let active_flags = override_labels_evaluator.evaluate(get_client_request)?;

// Generate the random percentage value for this request.
// Percentage is 0 - 100_000_000_000 (100*PERCENTAGE_SCALE_FACTOR)
Expand Down Expand Up @@ -77,3 +114,105 @@ impl StableOverrideContext {
}
}
}

/// Evaluator for override labels based on configuration.
/// This struct compiles and evaluates the override label expressions.
/// It's intended to be used as a shared state in the router.
pub struct OverrideLabelsEvaluator {
static_enabled_labels: HashSet<String>,
expressions: HashMap<String, VrlProgram>,
}

impl OverrideLabelsEvaluator {
pub(crate) fn from_config(
override_labels_config: &OverrideLabelsConfig,
) -> Result<Self, OverrideLabelsCompileError> {
let mut static_enabled_labels = HashSet::new();
let mut expressions = HashMap::new();
let vrl_functions = vrl_build_functions();

for (label, value) in override_labels_config.iter() {
match value {
LabelOverrideValue::Boolean(true) => {
static_enabled_labels.insert(label.clone());
}
LabelOverrideValue::Expression { expression } => {
let compilation_result =
vrl_compile(expression, &vrl_functions).map_err(|diagnostics| {
OverrideLabelsCompileError {
label: label.clone(),
error: diagnostics
.errors()
.into_iter()
.map(|d| d.code.to_string() + ": " + &d.message)
.collect::<Vec<_>>()
.join(", "),
}
})?;
expressions.insert(label.clone(), compilation_result.program);
}
_ => {} // Skip false booleans
}
}

Ok(Self {
static_enabled_labels,
expressions,
})
}

pub(crate) fn evaluate<'req, F>(
&self,
get_client_request: F,
) -> Result<HashSet<String>, LabelEvaluationError>
where
F: FnOnce() -> ClientRequestDetails<'req>,
{
let mut active_flags = self.static_enabled_labels.clone();

if self.expressions.is_empty() {
return Ok(active_flags);
}

let client_request = get_client_request();
let mut target = VrlTargetValue {
value: VrlValue::Object(BTreeMap::from([(
"request".into(),
(&client_request).into(),
)])),
metadata: VrlValue::Object(BTreeMap::new()),
secrets: VrlSecrets::default(),
};

let mut state = VrlState::default();
let timezone = VrlTimeZone::default();
let mut ctx = VrlContext::new(&mut target, &mut state, &timezone);

for (label, expression) in &self.expressions {
match expression.resolve(&mut ctx) {
Ok(evaluated_value) => match evaluated_value {
VrlValue::Boolean(true) => {
active_flags.insert(label.clone());
}
VrlValue::Boolean(false) => {
// Do nothing for false
}
invalid_value => {
return Err(LabelEvaluationError::ExpressionWrongType {
label: label.clone(),
got: format!("{:?}", invalid_value),
});
}
},
Err(err) => {
return Err(LabelEvaluationError::ExpressionResolutionFailure {
label: label.clone(),
source: err,
});
}
}
}

Ok(active_flags)
}
}
12 changes: 10 additions & 2 deletions bin/router/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use std::sync::Arc;

use crate::jwt::JwtAuthRuntime;
use crate::pipeline::cors::{CORSConfigError, Cors};
use crate::pipeline::progressive_override::{OverrideLabelsCompileError, OverrideLabelsEvaluator};

pub struct RouterSharedState {
pub validation_plan: ValidationPlan,
pub parse_cache: Cache<u64, Arc<graphql_parser::query::Document<'static, String>>>,
pub router_config: Arc<HiveRouterConfig>,
pub headers_plan: HeaderRulesPlan,
pub override_labels_evaluator: OverrideLabelsEvaluator,
pub cors_runtime: Option<Cors>,
pub jwt_auth_runtime: Option<JwtAuthRuntime>,
}
Expand All @@ -29,6 +31,10 @@ impl RouterSharedState {
parse_cache: moka::future::Cache::new(1000),
cors_runtime: Cors::from_config(&router_config.cors).map_err(Box::new)?,
router_config: router_config.clone(),
override_labels_evaluator: OverrideLabelsEvaluator::from_config(
&router_config.override_labels,
)
.map_err(Box::new)?,
jwt_auth_runtime,
})
}
Expand All @@ -37,7 +43,9 @@ impl RouterSharedState {
#[derive(thiserror::Error, Debug)]
pub enum SharedStateError {
#[error("invalid headers config: {0}")]
HeaderRuleCompileError(#[from] Box<HeaderRuleCompileError>),
HeaderRuleCompile(#[from] Box<HeaderRuleCompileError>),
#[error("invalid regex in CORS config: {0}")]
CORSConfigError(#[from] Box<CORSConfigError>),
CORSConfig(#[from] Box<CORSConfigError>),
#[error("invalid override labels config: {0}")]
OverrideLabelsCompile(#[from] Box<OverrideLabelsCompileError>),
}
14 changes: 14 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
|[**http**](#http)|`object`|Configuration for the HTTP server/listener.<br/>Default: `{"host":"0.0.0.0","port":4000}`<br/>||
|[**jwt**](#jwt)|`object`, `null`|Configuration for JWT authentication plugin.<br/>|yes|
|[**log**](#log)|`object`|The router logger configuration.<br/>Default: `{"filter":null,"format":"json","level":"info"}`<br/>||
|[**override\_labels**](#override_labels)|`object`|Configuration for overriding labels.<br/>||
|[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.<br/>Default: `{}`<br/>||
|[**query\_planner**](#query_planner)|`object`|Query planning configuration.<br/>Default: `{"allow_expose":false,"timeout":"10s"}`<br/>||
|[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).<br/>||
Expand Down Expand Up @@ -63,6 +64,7 @@ log:
filter: null
format: json
level: info
override_labels: {}
override_subgraph_urls:
accounts:
url: https://accounts.example.com/graphql
Expand Down Expand Up @@ -1536,6 +1538,18 @@ level: info

```

<a name="override_labels"></a>
## override\_labels: object

Configuration for overriding labels.


**Additional Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**Additional Properties**||Defines the value for a label override.<br/><br/>It can be a simple boolean,<br/>or an object containing the expression that evaluates to a boolean.<br/>||

<a name="override_subgraph_urls"></a>
## override\_subgraph\_urls: object

Expand Down
4 changes: 2 additions & 2 deletions lib/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ thiserror = { workspace = true }
xxhash-rust = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
dashmap = { workspace = true }
vrl = { workspace = true }

ahash = "0.8.12"
regex-automata = "0.4.10"
vrl = { version = "0.27.0", features = ["compiler", "parser", "value", "diagnostic", "stdlib", "core"] }
strum = { version = "0.27.2", features = ["derive"] }

ntex-http = "0.1.15"
hyper-tls = { version = "0.6.0", features = ["vendored"] }
hyper-util = { version = "0.1.16", features = [
Expand Down
Loading
Loading