Skip to content

Commit 2217a7b

Browse files
author
hamidr
committed
Add sentinel support
1 parent 0f2ac2d commit 2217a7b

File tree

4 files changed

+829
-140
lines changed

4 files changed

+829
-140
lines changed

examples/redis_sentinel.hpp

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
#pragma once
2+
3+
#include <vector>
4+
#include <sentinel.hpp>
5+
6+
namespace async_redis {
7+
namespace redis_impl
8+
{
9+
template<typename InputOutputHandler>
10+
class redis_sentinel
11+
{
12+
using tcp_socket_t = ::async_redis::network::tcp_socket<InputOutputHandler>;
13+
14+
using sentinel_t = sentinel<InputOutputHandler>;
15+
using sentinel_ptr_t = std::shared_ptr<sentinel_t>;
16+
17+
public:
18+
enum State {
19+
SentinelsConnectingFailed = 0,
20+
GetMasterAddrFailed,
21+
MasterNotFound,
22+
FoundMaster,
23+
MasterChanged
24+
};
25+
26+
using master_watcher_cb_t = std::function<void (const string& ip, int port, State)>;
27+
28+
redis_sentinel(InputOutputHandler &io, const string& cluster_name)
29+
: io_(io), cluster_name_(cluster_name)
30+
{ }
31+
32+
void connect(const std::vector<std::pair<std::string, int>>& sentinel_addrs, master_watcher_cb_t&& master_watcher_cb)
33+
{
34+
for (auto &addr : sentinel_addrs) {
35+
const string &ip = addr.first;
36+
const int& port = addr.second;
37+
38+
string key = ip + ":" + std::to_string(port);
39+
sentinels_.emplace(key, std::make_unique<sentinel_t>(io_));
40+
41+
sentinels_[key]->connect(ip, port, std::bind(&redis_sentinel::sentinel_connected, this, key, std::placeholders::_1));
42+
}
43+
44+
master_watcher_ = std::make_unique<master_watcher_cb_t>(std::move(master_watcher_cb));
45+
}
46+
47+
private:
48+
void sentinel_connected(const string& key, bool res)
49+
{
50+
res ?
51+
++sentinels_connected_ :
52+
++sentinels_not_connected_;
53+
54+
if (sentinels_.size() == sentinels_connected_) {
55+
sentinels_connected_ = 0;
56+
sentinels_not_connected_ = 0;
57+
return get_master_addr();
58+
}
59+
60+
if ((sentinels_connected_ + sentinels_not_connected_) == sentinels_.size()) {
61+
sentinels_connected_ = 0;
62+
sentinels_not_connected_ = 0;
63+
64+
return call_master_watcher(nullptr, -1, State::SentinelsConnectingFailed);
65+
}
66+
}
67+
68+
void call_master_watcher(const string& ip, int port, State state)
69+
{
70+
if (master_watcher_)
71+
return (*master_watcher_)(ip, port, state);
72+
//TODO: LOG this
73+
}
74+
75+
void get_master_addr()
76+
{
77+
// <ip, port, total_sentin, total_got_res>
78+
auto state = std::make_shared<std::tuple<string, int, int, int>>(std::make_tuple(nullptr, -1, sentinels_.size(), 0));
79+
80+
auto &sen = sentinels_.begin()->second;
81+
for(auto &itr : sentinels_)
82+
{
83+
auto &sen = itr.second;
84+
sen->master_addr_by_name(cluster_name_,
85+
[this, sen, state](const string& ip, int port, bool res)
86+
{
87+
string &final_ip = std::get<0>(*state);
88+
int &final_port = std::get<1>(*state);
89+
int &num = std::get<2>(*state);
90+
int &got_res = std::get<3>(*state);
91+
92+
if (sentinels_.size() == num) {
93+
//First recieve
94+
final_port = port;
95+
final_ip = ip;
96+
} else {
97+
// other receives to override the ip and port
98+
99+
//TODO: what should we do if the sentinels dont agree on ip and port of master
100+
if (final_port != port || final_ip != ip)
101+
/* probably something is wrong! */;
102+
103+
final_port = port;
104+
final_ip = ip;
105+
}
106+
107+
if (res)
108+
++got_res;
109+
--num;
110+
111+
if (num == 0 && got_res > 0) {
112+
call_master_watcher(ip, port, State::FoundMaster);
113+
watch_for_master(sen);
114+
} else if (num == 0 && got_res == 0) {
115+
call_master_watcher(ip, port, State::MasterNotFound);
116+
}
117+
118+
}
119+
);
120+
}
121+
}
122+
123+
void watch_for_master(const sentinel_ptr_t& sen)
124+
{
125+
sen->watch_master_change(
126+
[this](const string& ip, int port) -> bool
127+
{
128+
call_master_watcher(ip, port, State::MasterChanged);
129+
return true;
130+
}
131+
);
132+
}
133+
134+
private:
135+
InputOutputHandler &io_;
136+
std::unordered_map<string, sentinel_ptr_t> sentinels_;
137+
std::unique_ptr<master_watcher_cb_t> master_watcher_;
138+
int sentinels_connected_ = 0;
139+
int sentinels_not_connected_ = 0;
140+
const string& cluster_name_;
141+
};
142+
}
143+
}

examples/tcp_server.hpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#pragma once
2+
3+
#include <string>
4+
#include <memory>
5+
#include <functional>
6+
#include <iostream>
7+
#include <sstream>
8+
#include <unordered_map>
9+
10+
namespace async_redis {
11+
namespace tcp_server {
12+
13+
template<typename InputOutputHandler>
14+
class tcp_server
15+
{
16+
public:
17+
using tcp_socket = async_redis::network::tcp_socket<InputOutputHandler>;
18+
19+
tcp_server(InputOutputHandler &event_loop)
20+
: loop_(event_loop) {
21+
listener_ = std::make_shared<tcp_socket>(event_loop);
22+
}
23+
24+
void listen(int port) {
25+
if (!listener_->bind("127.0.0.1", port) || !listener_->listen())
26+
throw;
27+
28+
auto receiver = std::bind(&tcp_server::accept, this, std::placeholders::_1);
29+
listener_->template async_accept<tcp_socket>(receiver);
30+
}
31+
32+
void accept(std::shared_ptr<tcp_socket> socket) {
33+
auto receiver = std::bind(&tcp_server::chunk_received, this, std::placeholders::_1, socket);
34+
socket->async_read(buffer_, max_buffer_length, receiver);
35+
36+
conns_.emplace(socket, nullptr);
37+
}
38+
39+
private:
40+
void chunk_received(int len, std::shared_ptr<tcp_socket>& socket)
41+
{
42+
std::string command;
43+
44+
if (len <= 0) {
45+
conns_.erase(socket);
46+
return;
47+
}
48+
49+
for(int n = 0; n < len; ++n) {
50+
51+
char c = buffer_[n];
52+
switch(c)
53+
{
54+
case '\r':
55+
case '\n':
56+
continue;
57+
break;
58+
59+
default:
60+
command.push_back(c);
61+
}
62+
}
63+
64+
fprintf(stdout, ("cmd: " + command + "\n").data());
65+
fflush(stdout);
66+
67+
if (command == "close") {
68+
socket->async_write("good bye!", [this, &socket](ssize_t l) {
69+
loop_.async_timeout(1, [this, &socket]() {
70+
conns_.erase(socket);
71+
});
72+
});
73+
return; // dont read
74+
}
75+
76+
auto receiver = std::bind(&tcp_server::chunk_received, this, std::placeholders::_1, socket);
77+
socket->async_read(buffer_, max_buffer_length, receiver);
78+
}
79+
80+
private:
81+
using socket_t = std::shared_ptr<tcp_socket>;
82+
83+
socket_t listener_;
84+
InputOutputHandler& loop_;
85+
std::unordered_map<socket_t, void*> conns_;
86+
enum { max_buffer_length = 1024 };
87+
char buffer_[max_buffer_length];
88+
};
89+
90+
}
91+
}

0 commit comments

Comments
 (0)