Skip to content

Commit e8d93c1

Browse files
committed
Move peer discovery from "services" to "peers-clear"
This is necessary to implement TLS as service only report the clear service port. It will be easy to expand the "services_name" to return peers-tls
1 parent 6ac1a06 commit e8d93c1

File tree

5 files changed

+143
-32
lines changed

5 files changed

+143
-32
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ error-chain = "0.12"
3535
parking_lot = "0.9"
3636
pwhash = "0.3"
3737
serde = { version = "1.0", features = ["derive"], optional = true }
38+
logos = "0.12.0"
3839

3940
[features]
4041
serialization = ["serde"]

src/cluster/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod node;
1717
pub mod node_validator;
1818
pub mod partition;
1919
pub mod partition_tokenizer;
20+
pub mod peers;
2021

2122
use std::collections::HashMap;
2223
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};

src/cluster/node.rs

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use crate::errors::{ErrorKind, Result, ResultExt};
3030
use crate::net::{ConnectionPool, Host, PooledConnection};
3131
use crate::policy::ClientPolicy;
3232

33+
use super::peers::parse_peers_info;
34+
3335
pub const PARTITIONS: usize = 4096;
3436

3537
#[derive(Debug)]
@@ -138,9 +140,9 @@ impl Node {
138140

139141
const fn services_name(&self) -> &'static str {
140142
if self.client_policy.use_services_alternate {
141-
"services-alternate"
143+
"peers-clear-alt"
142144
} else {
143-
"services"
145+
"peers-clear-std"
144146
}
145147
}
146148

@@ -197,35 +199,7 @@ impl Node {
197199
Some(friend_string) => friend_string,
198200
};
199201

200-
let friend_names = friend_string.split(';');
201-
for friend in friend_names {
202-
let mut friend_info = friend.split(':');
203-
if friend_info.clone().count() != 2 {
204-
error!(
205-
"Node info from asinfo:services is malformed. Expected HOST:PORT, but got \
206-
'{}'",
207-
friend
208-
);
209-
continue;
210-
}
211-
212-
let host = friend_info.next().unwrap();
213-
let port = u16::from_str(friend_info.next().unwrap())?;
214-
let alias = match self.client_policy.ip_map {
215-
Some(ref ip_map) if ip_map.contains_key(host) => {
216-
Host::new(ip_map.get(host).unwrap(), port)
217-
}
218-
_ => Host::new(host, port),
219-
};
220-
221-
if current_aliases.contains_key(&alias) {
222-
self.reference_count.fetch_add(1, Ordering::Relaxed);
223-
} else if !friends.contains(&alias) {
224-
friends.push(alias);
225-
}
226-
}
227-
228-
Ok(friends)
202+
parse_peers_info(friend_string)
229203
}
230204

231205
fn update_partitions(&self, info_map: &HashMap<String, String>) -> Result<()> {

src/cluster/peers.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use crate::errors::{ErrorKind, Result};
2+
use crate::net::{Host, ToHosts};
3+
use logos::{Lexer, Logos};
4+
5+
#[derive(Logos, Debug, PartialEq)]
6+
enum Token {
7+
#[token("[")]
8+
OpenBracket,
9+
10+
#[token("]")]
11+
CloseBracket,
12+
13+
#[regex("[0-9a-zA-Z-./_:]+")]
14+
Text,
15+
16+
#[error]
17+
#[regex(r"[ \t\f,]+", logos::skip)]
18+
Error,
19+
}
20+
21+
fn parse_error(lex: &Lexer<Token>, source: &str) -> String {
22+
format!(
23+
"Failed to parse peers: {}, at {:?} ({})",
24+
source,
25+
lex.span(),
26+
lex.slice()
27+
)
28+
}
29+
30+
pub fn parse_peers_info(info_peers: &str) -> Result<Vec<Host>> {
31+
let mut lex = Token::lexer(info_peers);
32+
33+
let _peer_gen = match lex.next() {
34+
Some(Token::Text) => lex.slice(),
35+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
36+
};
37+
let default_port_str = match lex.next() {
38+
Some(Token::Text) => lex.slice(),
39+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
40+
};
41+
42+
let default_port = match default_port_str.parse::<u16>() {
43+
Ok(port) => port,
44+
Err(_) => bail!(ErrorKind::BadResponse(format!(
45+
"Invalid default port: {}",
46+
default_port_str
47+
))),
48+
};
49+
50+
match lex.next() {
51+
Some(Token::OpenBracket) => parse_peers(info_peers, &mut lex, default_port),
52+
_ => Ok(Vec::new()),
53+
}
54+
}
55+
56+
fn parse_peers(info_peers: &str, lex: &mut Lexer<Token>, default_port: u16) -> Result<Vec<Host>> {
57+
let mut peers = Vec::new();
58+
loop {
59+
match lex.next() {
60+
Some(Token::OpenBracket) => peers.extend(parse_peer(info_peers, lex, default_port)?),
61+
Some(Token::CloseBracket) => return Ok(peers),
62+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
63+
}
64+
lex.next(); // Close brackets
65+
}
66+
}
67+
68+
fn parse_peer(info_peers: &str, lex: &mut Lexer<Token>, default_port: u16) -> Result<Vec<Host>> {
69+
let _id = match lex.next() {
70+
Some(Token::Text) => lex.slice(),
71+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
72+
};
73+
let _tls_hostname = match lex.next() {
74+
Some(Token::Text) => lex.slice(),
75+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
76+
};
77+
match lex.next() {
78+
Some(Token::OpenBracket) => (),
79+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
80+
};
81+
82+
let hosts = match lex.next() {
83+
Some(Token::Text) => lex.slice(),
84+
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
85+
}
86+
.to_hosts_with_default_port(default_port)?;
87+
88+
lex.next(); // Close brackets
89+
Ok(hosts)
90+
}
91+
92+
#[cfg(test)]
93+
mod tests {
94+
use std::vec;
95+
96+
use super::*;
97+
98+
#[test]
99+
fn parse_peers_works() {
100+
let work = "6,3000,[[12A0,aerospike.preprod.crto.in,[192.168.1.13:4333]],[11A1,aerospike.preprod.crto.in,[192.168.3.78]],[11A1,aerospike.preprod.crto.in,[localhost]]]";
101+
let fail = "6,3foobar,[[12A0,aerospike.preprod.crto.in,[192.168.1.13:4333]],[11A1,aerospike.preprod.crto.in,[192.168.3.78:4333]]]";
102+
let empty = "6,3000,[]";
103+
assert!(parse_peers_info(fail).is_err());
104+
let work = parse_peers_info(work).unwrap();
105+
println!("{:?}", work);
106+
assert!(
107+
work == vec![
108+
Host {
109+
name: "192.168.1.13".to_string(),
110+
port: 4333
111+
},
112+
Host {
113+
name: "192.168.3.78".to_string(),
114+
port: 3000
115+
},
116+
Host {
117+
name: "localhost".to_string(),
118+
port: 3000
119+
}
120+
]
121+
);
122+
let empty = parse_peers_info(empty).unwrap();
123+
assert!(empty == vec![]);
124+
}
125+
}

src/net/host.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,24 @@ pub trait ToHosts {
6767
///
6868
/// Any errors encountered during conversion will be returned as an `Err`.
6969
fn to_hosts(&self) -> Result<Vec<Host>>;
70+
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>>;
7071
}
7172

7273
impl ToHosts for Vec<Host> {
7374
fn to_hosts(&self) -> Result<Vec<Host>> {
7475
Ok(self.clone())
7576
}
77+
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
78+
Ok(self.clone())
79+
}
7680
}
7781

7882
impl ToHosts for String {
7983
fn to_hosts(&self) -> Result<Vec<Host>> {
80-
let mut parser = Parser::new(self, 3000);
84+
self.to_hosts_with_default_port(3000)
85+
}
86+
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
87+
let mut parser = Parser::new(self, default_port);
8188
parser
8289
.read_hosts()
8390
.chain_err(|| ErrorKind::InvalidArgument(format!("Invalid hosts list: '{}'", self)))
@@ -88,6 +95,9 @@ impl<'a> ToHosts for &'a str {
8895
fn to_hosts(&self) -> Result<Vec<Host>> {
8996
(*self).to_string().to_hosts()
9097
}
98+
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
99+
(*self).to_string().to_hosts_with_default_port(default_port)
100+
}
91101
}
92102

93103
#[cfg(test)]

0 commit comments

Comments
 (0)