Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/config/integration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ servers = [
]
password = "front-standalone-secret"
backend_password = "backend-standalone-secret"
backend_resp_version = "resp3"

[[clusters]]
name = "cluster"
Expand All @@ -24,3 +25,4 @@ auth = { password = "front-cluster-secret", users = [
{ username = "ops", password = "ops-secret" }
] }
backend_password = "backend-cluster-secret"
backend_resp_version = "resp3"
20 changes: 20 additions & 0 deletions docker/integration-test.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ wait_for_cluster() {

wait_for_cluster

if ! hello_standalone="$(REDISCLI_AUTH="$STANDALONE_PASS" redis-cli -h aster-proxy -p 6380 --raw HELLO 3 AUTH default "$STANDALONE_PASS")"; then
echo "HELLO 3 handshake against standalone proxy failed" >&2
exit 1
fi
if ! printf "%s\n" "$hello_standalone" | awk 'BEGIN{found=0} {if(prev=="proto" && $0=="3"){found=1} prev=$0} END{exit(found?0:1)}'; then
echo "HELLO 3 response from standalone proxy missing proto=3:" >&2
printf "%s\n" "$hello_standalone" >&2
exit 1
fi

if ! hello_cluster="$(REDISCLI_AUTH="$CLUSTER_USER_PASS" redis-cli -h aster-proxy -p 6381 --user "$CLUSTER_USER" --raw HELLO 3 AUTH "$CLUSTER_USER" "$CLUSTER_USER_PASS")"; then
echo "HELLO 3 handshake against cluster proxy failed" >&2
exit 1
fi
if ! printf "%s\n" "$hello_cluster" | awk 'BEGIN{found=0} {if(prev=="proto" && $0=="3"){found=1} prev=$0} END{exit(found?0:1)}'; then
echo "HELLO 3 response from cluster proxy missing proto=3:" >&2
printf "%s\n" "$hello_cluster" >&2
exit 1
fi

noauth_output="$(redis-cli -h aster-proxy -p 6380 PING 2>&1 || true)"
if echo "$noauth_output" | grep -q "PONG"; then
echo "Expected standalone proxy to require authentication" >&2
Expand Down
11 changes: 11 additions & 0 deletions src/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ impl BackendAuth {
.expect("AUTH command is always valid")
}

pub fn hello_credentials(&self) -> Option<(Bytes, Bytes)> {
match self.parts.len() {
2 => Some((
Bytes::from_static(DEFAULT_USER.as_bytes()),
self.parts[1].clone(),
)),
3 => Some((self.parts[1].clone(), self.parts[2].clone())),
_ => None,
}
}

pub async fn apply_to_stream(
&self,
framed: &mut Framed<TcpStream, RespCodec>,
Expand Down
170 changes: 152 additions & 18 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use crate::hotkey::Hotkey;
use crate::info::{InfoContext, ProxyMode};
use crate::metrics;
use crate::protocol::redis::{
BlockingKind, MultiDispatch, RedisCommand, RespCodec, RespValue, SlotMap, SubCommand,
SubResponse, SubscriptionKind, SLOT_COUNT,
BlockingKind, MultiDispatch, RedisCommand, RespCodec, RespValue, RespVersion, SlotMap,
SubCommand, SubResponse, SubscriptionKind, SLOT_COUNT,
};
use crate::slowlog::Slowlog;
use crate::utils::{crc16, trim_hash_tag};
Expand Down Expand Up @@ -78,6 +78,7 @@ impl ClusterProxy {
runtime.clone(),
REQUEST_TIMEOUT_MS,
backend_auth.clone(),
config.backend_resp_version,
));
let pool = Arc::new(ConnectionPool::new(cluster.clone(), connector.clone()));
let auth = config
Expand Down Expand Up @@ -143,17 +144,25 @@ impl ClusterProxy {
let client_id = ClientId::new();
let _guard = FrontConnectionGuard::new(&self.cluster);

let (mut sink, stream) = Framed::new(socket, RespCodec::default()).split();
let framed = Framed::new(socket, RespCodec::default());
let codec_handle = framed.codec().clone();
let (mut sink, stream) = framed.split();
let mut stream = stream.fuse();
let mut pending: FuturesOrdered<BoxFuture<'static, RespValue>> = FuturesOrdered::new();
let mut pending: FuturesOrdered<BoxFuture<'static, (RespValue, Option<RespVersion>)>> =
FuturesOrdered::new();
let mut _resp3_negotiated = codec_handle.version() == RespVersion::Resp3;
let mut inflight = 0usize;
let mut stream_closed = false;
let mut auth_state = self.auth.as_ref().map(|auth| auth.new_session());

loop {
tokio::select! {
Some(resp) = pending.next(), if inflight > 0 => {
Some((resp, version)) = pending.next(), if inflight > 0 => {
inflight -= 1;
if let Some(version) = version {
codec_handle.set_version(version);
_resp3_negotiated = version == RespVersion::Resp3;
}
sink.send(resp).await?;
}
frame_opt = stream.next(), if !stream_closed && inflight < PIPELINE_LIMIT => {
Expand All @@ -168,14 +177,13 @@ impl ClusterProxy {
cmd = new_cmd;
}
AuthAction::Reply(resp) => {
let fut = async move { resp };
let fut = async move { (resp, None) };
pending.push_back(Box::pin(fut));
inflight += 1;
continue;
}
}
}

if matches!(cmd.as_subscription(), SubscriptionKind::Channel | SubscriptionKind::Pattern) {
let kind_label = cmd.kind_label();
if cmd.args().len() <= 1 {
Expand Down Expand Up @@ -230,8 +238,12 @@ impl ClusterProxy {
}
};
while inflight > 0 {
if let Some(resp) = pending.next().await {
if let Some((resp, version)) = pending.next().await {
inflight -= 1;
if let Some(version) = version {
codec_handle.set_version(version);
_resp3_negotiated = version == RespVersion::Resp3;
}
sink.send(resp).await?;
} else {
inflight = 0;
Expand Down Expand Up @@ -291,7 +303,7 @@ impl ClusterProxy {
kind_label,
success,
);
let fut = async move { response };
let fut = async move { (response, None) };
pending.push_back(Box::pin(fut));
inflight += 1;
continue;
Expand All @@ -302,7 +314,7 @@ impl ClusterProxy {
cmd.kind_label(),
true,
);
let fut = async move { response };
let fut = async move { (response, None) };
pending.push_back(Box::pin(fut));
inflight += 1;
continue;
Expand All @@ -314,7 +326,7 @@ impl ClusterProxy {
cmd.kind_label(),
success,
);
let fut = async move { response };
let fut = async move { (response, None) };
pending.push_back(Box::pin(fut));
inflight += 1;
continue;
Expand All @@ -326,21 +338,31 @@ impl ClusterProxy {
cmd.kind_label(),
success,
);
let fut = async move { response };
let fut = async move { (response, None) };
pending.push_back(Box::pin(fut));
inflight += 1;
continue;
}
let requested_version = cmd.resp_version_request();
let guard = self.prepare_dispatch(client_id, cmd);
pending.push_back(Box::pin(guard));
let fut = async move {
let resp = guard.await;
let version = if !resp.is_error() {
requested_version
} else {
None
};
(resp, version)
};
pending.push_back(Box::pin(fut));
inflight += 1;
}
Err(err) => {
metrics::global_error_incr();
metrics::front_error(self.cluster.as_ref(), "parse");
metrics::front_command(self.cluster.as_ref(), "invalid", false);
let message = Bytes::from(format!("ERR {err}"));
let fut = async move { RespValue::Error(message) };
let fut = async move { (RespValue::Error(message), None) };
pending.push_back(Box::pin(fut));
inflight += 1;
}
Expand All @@ -364,7 +386,11 @@ impl ClusterProxy {
}
}

while let Some(resp) = pending.next().await {
while let Some((resp, version)) = pending.next().await {
if let Some(version) = version {
codec_handle.set_version(version);
_resp3_negotiated = version == RespVersion::Resp3;
}
sink.send(resp).await?;
}
sink.close().await?;
Expand Down Expand Up @@ -903,13 +929,15 @@ struct ClusterConnector {
heartbeat_interval: Duration,
reconnect_base_delay: Duration,
max_reconnect_attempts: usize,
backend_resp_version: RespVersion,
}

impl ClusterConnector {
fn new(
runtime: Arc<ClusterRuntime>,
default_timeout_ms: u64,
backend_auth: Option<BackendAuth>,
backend_resp_version: RespVersion,
) -> Self {
Self {
runtime,
Expand All @@ -918,6 +946,7 @@ impl ClusterConnector {
heartbeat_interval: Duration::from_secs(30),
reconnect_base_delay: Duration::from_millis(50),
max_reconnect_attempts: 3,
backend_resp_version,
}
}

Expand Down Expand Up @@ -950,11 +979,91 @@ impl ClusterConnector {
Ok(framed)
}

async fn negotiate_resp_version(
&self,
cluster: &str,
node: &BackendNode,
framed: &mut Framed<TcpStream, RespCodec>,
) -> Result<RespVersion> {
framed.codec_mut().set_version(RespVersion::Resp2);
if self.backend_resp_version != RespVersion::Resp3 {
return Ok(RespVersion::Resp2);
}

let timeout_duration = self.current_timeout();
let mut hello_parts = vec![
RespValue::BulkString(Bytes::from_static(b"HELLO")),
RespValue::BulkString(Bytes::from_static(b"3")),
];
if let Some(auth) = &self.backend_auth {
if let Some((username, password)) = auth.hello_credentials() {
hello_parts.push(RespValue::BulkString(Bytes::from_static(b"AUTH")));
hello_parts.push(RespValue::BulkString(username));
hello_parts.push(RespValue::BulkString(password));
}
}
let hello = RespValue::Array(hello_parts);

match timeout(timeout_duration, framed.send(hello)).await {
Ok(Ok(())) => {}
Ok(Err(err)) => {
metrics::backend_error(cluster, node.as_str(), "resp3_handshake");
return Err(err.context(format!("failed to send RESP3 HELLO to {}", node.as_str())));
}
Err(_) => {
metrics::backend_error(cluster, node.as_str(), "resp3_handshake");
return Err(anyhow!(
"backend {} timed out sending RESP3 HELLO",
node.as_str()
));
}
}

let reply = match timeout(timeout_duration, framed.next()).await {
Ok(Some(Ok(value))) => value,
Ok(Some(Err(err))) => {
metrics::backend_error(cluster, node.as_str(), "resp3_handshake");
return Err(err.context(format!(
"failed to read RESP3 HELLO reply from {}",
node.as_str()
)));
}
Ok(None) => {
metrics::backend_error(cluster, node.as_str(), "resp3_handshake");
return Err(anyhow!(
"backend {} closed connection during RESP3 HELLO",
node.as_str()
));
}
Err(_) => {
metrics::backend_error(cluster, node.as_str(), "resp3_handshake");
return Err(anyhow!(
"backend {} timed out waiting for RESP3 HELLO reply",
node.as_str()
));
}
};

if reply.is_error() {
info!(
cluster = %cluster,
backend = %node.as_str(),
"backend rejected RESP3 HELLO; falling back to RESP2"
);
framed.codec_mut().set_version(RespVersion::Resp2);
return Ok(RespVersion::Resp2);
}

framed.codec_mut().set_version(RespVersion::Resp3);
Ok(RespVersion::Resp3)
}

async fn execute(
&self,
framed: &mut Framed<TcpStream, RespCodec>,
command: RedisCommand,
) -> Result<RespValue> {
let requested_version = command.resp_version_request();
let blocking = command.as_blocking();
if let Ok(name) = std::str::from_utf8(command.command_name()) {
if name.eq_ignore_ascii_case("blpop") || name.eq_ignore_ascii_case("brpop") {
Expand All @@ -966,7 +1075,7 @@ impl ClusterConnector {
.await
.context("timed out sending command")??;

match blocking {
let response = match blocking {
BlockingKind::Queue { .. } | BlockingKind::Stream { .. } => match framed.next().await {
Some(Ok(value)) => Ok(value),
Some(Err(err)) => Err(err.into()),
Expand All @@ -978,7 +1087,14 @@ impl ClusterConnector {
Ok(None) => Err(anyhow!("backend closed connection")),
Err(_) => Err(anyhow!("timed out waiting for response")),
},
}?;

if let Some(version) = requested_version {
if !response.is_error() {
framed.codec_mut().set_version(version);
}
}
Ok(response)
}

async fn connect_with_retry(
Expand All @@ -990,15 +1106,33 @@ impl ClusterConnector {
for attempt in 0..self.max_reconnect_attempts {
let attempt_start = Instant::now();
match self.open_stream(node.as_str()).await {
Ok(stream) => {
Ok(mut stream) => {
metrics::backend_probe_duration(
cluster,
node.as_str(),
"connect",
attempt_start.elapsed(),
);
metrics::backend_probe_result(cluster, node.as_str(), "connect", true);
return Ok(stream);
match self
.negotiate_resp_version(cluster, node, &mut stream)
.await
{
Ok(_) => return Ok(stream),
Err(err) => {
warn!(
cluster = %cluster,
backend = %node.as_str(),
attempt = attempt + 1,
error = %err,
"failed to negotiate RESP version with backend"
);
last_error = Some(err);
if attempt + 1 < self.max_reconnect_attempts {
sleep(self.reconnect_base_delay).await;
}
}
}
}
Err(err) => {
let elapsed = attempt_start.elapsed();
Expand Down
Loading