Skip to content

Commit 018dba9

Browse files
committed
Response Cache Plugin
1 parent 4c405b7 commit 018dba9

File tree

6 files changed

+227
-0
lines changed

6 files changed

+227
-0
lines changed

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/executor/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ ahash = "0.8.12"
3333
regex-automata = "0.4.10"
3434
vrl = { version = "0.27.0", features = ["compiler", "parser", "value", "diagnostic", "stdlib", "core"] }
3535

36+
ntex = { version = "2", features = ["tokio"] }
3637
ntex-http = "0.1.15"
3738
hyper-tls = { version = "0.6.0", features = ["vendored"] }
3839
hyper-util = { version = "0.1.16", features = [
@@ -47,6 +48,7 @@ itoa = "1.0.15"
4748
ryu = "1.0.20"
4849
indexmap = "2.10.0"
4950
bumpalo = "3.19.0"
51+
redis = "0.32.7"
5052

5153
[dev-dependencies]
5254
subgraphs = { path = "../../bench/subgraphs" }

lib/executor/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ pub mod executors;
44
pub mod headers;
55
pub mod introspection;
66
pub mod json_writer;
7+
pub mod plugins;
78
pub mod projection;
89
pub mod response;
910
pub mod utils;
1011
pub mod variables;
1112

1213
pub use execution::plan::execute_query_plan;
1314
pub use executors::map::SubgraphExecutorMap;
15+
pub use plugins::response_cache::*;

lib/executor/src/plugins/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod response_cache;
2+
pub mod traits;
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
use dashmap::DashMap;
2+
use ntex::web::HttpResponse;
3+
use redis::Commands;
4+
use sonic_rs::json;
5+
6+
use crate::{
7+
plugins::traits::{
8+
ControlFlow, OnExecuteEnd, OnExecuteEndPayload, OnExecuteStart, OnExecuteStartPayload,
9+
OnSchemaReload, OnSchemaReloadPayload,
10+
},
11+
utils::consts::TYPENAME_FIELD_NAME,
12+
};
13+
14+
pub struct ResponseCachePlugin {
15+
redis_client: redis::Client,
16+
ttl_per_type: DashMap<String, u64>,
17+
}
18+
19+
impl ResponseCachePlugin {
20+
pub fn try_new(redis_url: &str) -> Result<Self, redis::RedisError> {
21+
let redis_client = redis::Client::open(redis_url)?;
22+
Ok(Self {
23+
redis_client,
24+
ttl_per_type: DashMap::new(),
25+
})
26+
}
27+
}
28+
29+
pub struct ResponseCacheContext {
30+
key: String,
31+
}
32+
33+
impl OnExecuteStart for ResponseCachePlugin {
34+
fn on_execute_start(&self, payload: OnExecuteStartPayload) -> ControlFlow {
35+
let key = format!(
36+
"response_cache:{}:{:?}",
37+
payload.query_plan, payload.variable_values
38+
);
39+
payload
40+
.router_http_request
41+
.extensions_mut()
42+
.insert(ResponseCacheContext { key: key.clone() });
43+
if let Ok(mut conn) = self.redis_client.get_connection() {
44+
let cached_response: Option<Vec<u8>> = conn.get(&key).ok();
45+
if let Some(cached_response) = cached_response {
46+
return ControlFlow::Break(
47+
HttpResponse::Ok()
48+
.header("Content-Type", "application/json")
49+
.body(cached_response),
50+
);
51+
}
52+
}
53+
ControlFlow::Continue
54+
}
55+
}
56+
57+
impl OnExecuteEnd for ResponseCachePlugin {
58+
fn on_execute_end(&self, payload: OnExecuteEndPayload) -> ControlFlow {
59+
// Do not cache if there are errors
60+
if !payload.errors.is_empty() {
61+
return ControlFlow::Continue;
62+
}
63+
if let Some(key) = payload
64+
.router_http_request
65+
.extensions()
66+
.get::<ResponseCacheContext>()
67+
.map(|ctx| &ctx.key)
68+
{
69+
if let Ok(mut conn) = self.redis_client.get_connection() {
70+
if let Ok(serialized) = sonic_rs::to_vec(&payload.data) {
71+
// Decide on the ttl somehow
72+
// Get the type names
73+
let mut max_ttl = 0;
74+
75+
// Imagine this code is traversing the response data to find type names
76+
if let Some(obj) = payload.data.as_object() {
77+
if let Some(typename) = obj
78+
.iter()
79+
.position(|(k, _)| k == &TYPENAME_FIELD_NAME)
80+
.and_then(|idx| obj[idx].1.as_str())
81+
{
82+
if let Some(ttl) = self.ttl_per_type.get(typename).map(|v| *v) {
83+
max_ttl = max_ttl.max(ttl);
84+
}
85+
}
86+
}
87+
88+
// If no ttl found, default to 60 seconds
89+
if max_ttl == 0 {
90+
max_ttl = 60;
91+
}
92+
93+
// Insert the ttl into extensions for client awareness
94+
payload
95+
.extensions
96+
.insert("response_cache_ttl".to_string(), json!(max_ttl));
97+
98+
// Set the cache with the decided ttl
99+
let _: () = conn.set_ex(key, serialized, max_ttl).unwrap_or(());
100+
}
101+
}
102+
}
103+
ControlFlow::Continue
104+
}
105+
}
106+
107+
impl OnSchemaReload for ResponseCachePlugin {
108+
fn on_schema_reload(&self, payload: OnSchemaReloadPayload) {
109+
// Visit the schema and update ttl_per_type based on some directive
110+
payload
111+
.new_schema
112+
.document
113+
.definitions
114+
.iter()
115+
.for_each(|def| {
116+
if let graphql_parser::schema::Definition::TypeDefinition(type_def) = def {
117+
if let graphql_parser::schema::TypeDefinition::Object(obj_type) = type_def {
118+
for directive in &obj_type.directives {
119+
if directive.name == "cacheControl" {
120+
for arg in &directive.arguments {
121+
if arg.0 == "maxAge" {
122+
if let graphql_parser::query::Value::Int(max_age) = &arg.1 {
123+
if let Some(max_age) = max_age.as_i64() {
124+
self.ttl_per_type
125+
.insert(obj_type.name.clone(), max_age as u64);
126+
}
127+
}
128+
}
129+
}
130+
}
131+
}
132+
}
133+
}
134+
});
135+
}
136+
}

lib/executor/src/plugins/traits.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use hive_router_query_planner::consumer_schema::ConsumerSchema;
4+
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
5+
use ntex::web::HttpRequest;
6+
use ntex::web::HttpResponse;
7+
8+
use crate::response::graphql_error::GraphQLError;
9+
use crate::response::value::Value;
10+
11+
pub enum ControlFlow {
12+
Continue,
13+
Break(HttpResponse),
14+
}
15+
16+
pub struct ExecutionResult<'exec> {
17+
pub data: &'exec mut Value<'exec>,
18+
pub errors: &'exec mut Vec<GraphQLError>,
19+
pub extensions: &'exec mut Option<HashMap<String, Value<'exec>>>,
20+
}
21+
22+
pub struct OnExecuteStartPayload<'exec> {
23+
pub router_http_request: &'exec HttpRequest,
24+
pub query_plan: Arc<QueryPlan>,
25+
26+
pub data: &'exec mut Value<'exec>,
27+
pub errors: &'exec mut Vec<GraphQLError>,
28+
pub extensions: Option<&'exec mut sonic_rs::Value>,
29+
30+
pub skip_execution: bool,
31+
32+
pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
33+
}
34+
35+
pub trait OnExecuteStart {
36+
fn on_execute_start(&self, payload: OnExecuteStartPayload) -> ControlFlow;
37+
}
38+
39+
pub struct OnExecuteEndPayload<'exec> {
40+
pub router_http_request: &'exec HttpRequest,
41+
pub query_plan: Arc<QueryPlan>,
42+
43+
pub data: &'exec Value<'exec>,
44+
pub errors: &'exec Vec<GraphQLError>,
45+
pub extensions: &'exec mut HashMap<String, sonic_rs::Value>,
46+
47+
pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
48+
}
49+
50+
pub trait OnExecuteEnd {
51+
fn on_execute_end(&self, payload: OnExecuteEndPayload) -> ControlFlow;
52+
}
53+
54+
pub struct OnSchemaReloadPayload {
55+
pub old_schema: &'static ConsumerSchema,
56+
pub new_schema: &'static mut ConsumerSchema,
57+
}
58+
59+
pub trait OnSchemaReload {
60+
fn on_schema_reload(&self, payload: OnSchemaReloadPayload);
61+
}

0 commit comments

Comments
 (0)