Skip to content

Commit a33b2f3

Browse files
committed
Go
1 parent fd447be commit a33b2f3

File tree

5 files changed

+48
-60
lines changed

5 files changed

+48
-60
lines changed

docs/README.md

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
|[**log**](#log)|`object`|The router logger configuration.<br/>Default: `{"filter":null,"format":"json","level":"info"}`<br/>||
1212
|[**query\_planner**](#query_planner)|`object`|Query planning configuration.<br/>Default: `{"allow_expose":false,"timeout":"10s"}`<br/>||
1313
|[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).<br/>Default: `{"path":"supergraph.graphql","source":"file"}`<br/>||
14-
|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.<br/>Default: `{"all":{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}}`<br/>||
14+
|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.<br/>Default: `{"all":{"dedupe_enabled":true,"pool_idle_timeout_seconds":50},"max_connections_per_host":100}`<br/>||
1515

1616
**Additional Properties:** not allowed
1717
**Example**
@@ -66,8 +66,8 @@ supergraph:
6666
traffic_shaping:
6767
all:
6868
dedupe_enabled: true
69-
max_connections_per_host: 100
7069
pool_idle_timeout_seconds: 50
70+
max_connections_per_host: 100
7171

7272
```
7373

@@ -1367,16 +1367,17 @@ Configuration for the traffic-shaper executor. Use these configurations to contr
13671367

13681368
|Name|Type|Description|Required|
13691369
|----|----|-----------|--------|
1370-
|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.<br/>Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`<br/>||
1370+
|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.<br/>Default: `{"dedupe_enabled":true,"pool_idle_timeout_seconds":50}`<br/>||
1371+
|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.<br/>Default: `100`<br/>Format: `"uint"`<br/>Minimum: `0`<br/>||
13711372
|[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.<br/>||
13721373

13731374
**Example**
13741375

13751376
```yaml
13761377
all:
13771378
dedupe_enabled: true
1378-
max_connections_per_host: 100
13791379
pool_idle_timeout_seconds: 50
1380+
max_connections_per_host: 100
13801381
13811382
```
13821383

@@ -1391,15 +1392,13 @@ The default configuration that will be applied to all subgraphs, unless overridd
13911392
|Name|Type|Description|Required|
13921393
|----|----|-----------|--------|
13931394
|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>Default: `true`<br/>||
1394-
|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.<br/>Default: `100`<br/>Format: `"uint"`<br/>Minimum: `0`<br/>||
13951395
|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.<br/>Default: `50`<br/>Format: `"uint64"`<br/>Minimum: `0`<br/>||
13961396
|**timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> 10000<br/> } else {<br/> 5000<br/> }<br/>```<br/>||
13971397

13981398
**Example**
13991399

14001400
```yaml
14011401
dedupe_enabled: true
1402-
max_connections_per_host: 100
14031402
pool_idle_timeout_seconds: 50
14041403
14051404
```
@@ -1424,15 +1423,13 @@ Optional per-subgraph configurations that will override the default configuratio
14241423
|Name|Type|Description|Required|
14251424
|----|----|-----------|--------|
14261425
|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>Default: `true`<br/>||
1427-
|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.<br/>Default: `100`<br/>Format: `"uint"`<br/>Minimum: `0`<br/>||
14281426
|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.<br/>Default: `50`<br/>Format: `"uint64"`<br/>Minimum: `0`<br/>||
14291427
|**timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> 10000<br/> } else {<br/> 5000<br/> }<br/>```<br/>||
14301428

14311429
**Example**
14321430

14331431
```yaml
14341432
dedupe_enabled: true
1435-
max_connections_per_host: 100
14361433
pool_idle_timeout_seconds: 50
14371434
14381435
```

lib/executor/src/executors/map.rs

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use bytes::{BufMut, Bytes, BytesMut};
44
use dashmap::DashMap;
5-
use hive_router_config::{traffic_shaping::TrafficShapingExecutorConfig, TrafficShapingConfig};
5+
use hive_router_config::{
6+
traffic_shaping::TrafficShapingConfig, traffic_shaping::TrafficShapingExecutorConfig,
7+
};
68
use http::Uri;
79
use http_body_util::Full;
810
use hyper_tls::HttpsConnector;
@@ -78,8 +80,10 @@ impl SubgraphExecutorMap {
7880
subgraph_endpoint_map: HashMap<String, String>,
7981
config: TrafficShapingConfig,
8082
) -> Result<Self, SubgraphExecutorError> {
81-
let global_client_arc = from_traffic_shaping_config_to_client(&config.all);
82-
let global_semaphores_by_origin: DashMap<String, Arc<Semaphore>> = DashMap::new();
83+
let max_connections_per_host = config.max_connections_per_host;
84+
let global_client_arc =
85+
from_traffic_shaping_config_to_client(&config.all, max_connections_per_host);
86+
let semaphores_by_origin: DashMap<String, Arc<Semaphore>> = DashMap::new();
8387
let global_config_arc = Arc::new(config.all);
8488
let global_in_flight_requests: Arc<
8589
DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>,
@@ -107,19 +111,11 @@ impl SubgraphExecutorMap {
107111

108112
let subgraph_config = config.subgraphs.get(&subgraph_name);
109113

110-
let semaphore = get_semaphore_for_subgraph(
111-
&origin,
112-
&global_semaphores_by_origin,
113-
subgraph_config
114-
.map(|cfg| cfg.max_connections_per_host)
115-
.unwrap_or(global_config_arc.max_connections_per_host),
116-
global_config_arc.max_connections_per_host,
117-
);
118-
119114
let http_client = get_http_client_for_subgraph(
120115
subgraph_config,
121116
&global_config_arc,
122117
&global_client_arc,
118+
max_connections_per_host,
123119
);
124120

125121
// TODO: Maybe reuse the in-flight requests map in some cases ???
@@ -131,6 +127,11 @@ impl SubgraphExecutorMap {
131127
.map(|cfg| Arc::new(cfg.clone()))
132128
.unwrap_or_else(|| global_config_arc.clone());
133129

130+
let semaphore = semaphores_by_origin
131+
.entry(origin.to_string())
132+
.or_insert_with(|| Arc::new(Semaphore::new(max_connections_per_host)))
133+
.clone();
134+
134135
let mut executor = HTTPSubgraphExecutor::new(
135136
endpoint_uri.clone(),
136137
http_client,
@@ -158,12 +159,13 @@ impl SubgraphExecutorMap {
158159
// Create a new hyper client based on the traffic shaping config
159160
pub fn from_traffic_shaping_config_to_client(
160161
config: &TrafficShapingExecutorConfig,
162+
max_connections_per_host: usize,
161163
) -> Arc<Client<HttpsConnector<HttpConnector>, Full<Bytes>>> {
162164
Arc::new(
163165
Client::builder(TokioExecutor::new())
164166
.pool_timer(TokioTimer::new())
165167
.pool_idle_timeout(Duration::from_secs(config.pool_idle_timeout_seconds))
166-
.pool_max_idle_per_host(config.max_connections_per_host)
168+
.pool_max_idle_per_host(max_connections_per_host)
167169
.build(HttpsConnector::new()),
168170
)
169171
}
@@ -174,35 +176,16 @@ fn get_http_client_for_subgraph(
174176
subgraph_config: Option<&TrafficShapingExecutorConfig>,
175177
global_config: &TrafficShapingExecutorConfig,
176178
global_client: &Arc<Client<HttpsConnector<HttpConnector>, Full<Bytes>>>,
179+
max_connections_per_host: usize,
177180
) -> Arc<Client<HttpsConnector<HttpConnector>, Full<Bytes>>> {
178181
match subgraph_config {
179182
Some(cfg) => {
180-
if global_config.max_connections_per_host == cfg.max_connections_per_host
181-
&& global_config.pool_idle_timeout_seconds == cfg.pool_idle_timeout_seconds
182-
{
183+
if global_config.pool_idle_timeout_seconds == cfg.pool_idle_timeout_seconds {
183184
global_client.clone()
184185
} else {
185-
from_traffic_shaping_config_to_client(cfg)
186+
from_traffic_shaping_config_to_client(cfg, max_connections_per_host)
186187
}
187188
}
188189
None => global_client.clone(),
189190
}
190191
}
191-
192-
// If the subgraph has a specific max_connections_per_host, create a new semaphore for it.
193-
// Otherwise, reuse the global semaphore for that origin.
194-
fn get_semaphore_for_subgraph(
195-
origin: &str,
196-
semaphores_by_origin: &DashMap<String, Arc<Semaphore>>,
197-
max_connections_per_host: usize,
198-
global_max_connections_per_host: usize,
199-
) -> Arc<Semaphore> {
200-
if max_connections_per_host == global_max_connections_per_host {
201-
semaphores_by_origin
202-
.entry(origin.to_string())
203-
.or_insert_with(|| Arc::new(Semaphore::new(global_max_connections_per_host)))
204-
.clone()
205-
} else {
206-
Arc::new(Semaphore::new(max_connections_per_host))
207-
}
208-
}

lib/executor/src/executors/timeout.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ mod tests {
364364
"#;
365365

366366
let config = hive_router_config::parse_yaml_config(config.to_string()).unwrap();
367-
let http_client = from_traffic_shaping_config_to_client(&config.traffic_shaping.all);
367+
let http_client = from_traffic_shaping_config_to_client(&config.traffic_shaping.all, 10);
368368
let http_executor = crate::executors::http::HTTPSubgraphExecutor::new(
369369
endpoint.clone(),
370370
http_client,

lib/router-config/src/lib.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,13 @@ pub mod query_planner;
88
pub mod supergraph;
99
pub mod traffic_shaping;
1010

11-
use std::collections::HashMap;
12-
1311
use config::{Config, Environment, File, FileFormat, FileSourceFile};
1412
use schemars::JsonSchema;
1513
use serde::{Deserialize, Serialize};
1614

1715
use crate::{
1816
http_server::HttpServerConfig, log::LoggingConfig, query_planner::QueryPlannerConfig,
19-
supergraph::SupergraphSource, traffic_shaping::TrafficShapingExecutorConfig,
17+
supergraph::SupergraphSource, traffic_shaping::TrafficShapingConfig,
2018
};
2119

2220
#[derive(Deserialize, Serialize, JsonSchema)]
@@ -58,16 +56,6 @@ pub struct HiveRouterConfig {
5856
pub cors: cors::CORSConfig,
5957
}
6058

61-
#[derive(Clone, Deserialize, Serialize, JsonSchema, Default)]
62-
pub struct TrafficShapingConfig {
63-
/// The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
64-
#[serde(default)]
65-
pub all: TrafficShapingExecutorConfig,
66-
/// Optional per-subgraph configurations that will override the default configuration for specific subgraphs.
67-
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
68-
pub subgraphs: HashMap<String, TrafficShapingExecutorConfig>,
69-
}
70-
7159
#[derive(Debug, thiserror::Error)]
7260
pub enum RouterConfigError {
7361
#[error("Failed to load configuration: {0}")]

lib/router-config/src/traffic_shaping.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,23 @@ use std::time::Duration;
33
use schemars::JsonSchema;
44
use serde::{Deserialize, Serialize};
55

6-
#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
7-
pub struct TrafficShapingExecutorConfig {
6+
use std::collections::HashMap;
7+
8+
#[derive(Clone, Deserialize, Serialize, JsonSchema)]
9+
pub struct TrafficShapingConfig {
10+
/// The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
11+
#[serde(default)]
12+
pub all: TrafficShapingExecutorConfig,
13+
/// Optional per-subgraph configurations that will override the default configuration for specific subgraphs.
14+
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
15+
pub subgraphs: HashMap<String, TrafficShapingExecutorConfig>,
816
/// Limits the concurrent amount of requests/connections per host/subgraph.
917
#[serde(default = "default_max_connections_per_host")]
1018
pub max_connections_per_host: usize,
19+
}
1120

21+
#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
22+
pub struct TrafficShapingExecutorConfig {
1223
/// Timeout for idle sockets being kept-alive.
1324
#[serde(default = "default_pool_idle_timeout_seconds")]
1425
pub pool_idle_timeout_seconds: u64,
@@ -60,14 +71,23 @@ where
6071
impl Default for TrafficShapingExecutorConfig {
6172
fn default() -> Self {
6273
Self {
63-
max_connections_per_host: default_max_connections_per_host(),
6474
pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(),
6575
dedupe_enabled: default_dedupe_enabled(),
6676
timeout: None,
6777
}
6878
}
6979
}
7080

81+
impl Default for TrafficShapingConfig {
82+
fn default() -> Self {
83+
Self {
84+
all: TrafficShapingExecutorConfig::default(),
85+
subgraphs: HashMap::new(),
86+
max_connections_per_host: default_max_connections_per_host(),
87+
}
88+
}
89+
}
90+
7191
fn default_max_connections_per_host() -> usize {
7292
100
7393
}

0 commit comments

Comments
 (0)