Skip to content

Commit 8dd752d

Browse files
authored
Merge pull request #20 from hamidr/refactoring
Refactored!
2 parents 2217a7b + fdd259b commit 8dd752d

28 files changed

+1490
-1442
lines changed

CMakeLists.txt

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,39 @@ include_directories("/usr/local/include")
2828
link_directories("/usr/lib")
2929
link_directories("/usr/local/lib")
3030

31-
add_library(event_loop
31+
32+
add_library(event_loop
33+
${PROJECT_SOURCE_DIR}/event_loop/socket_watcher.cpp
3234
${PROJECT_SOURCE_DIR}/event_loop/event_loop_ev.cpp)
3335

34-
add_library(parser
36+
add_library(network
37+
${PROJECT_SOURCE_DIR}/network/async_socket.cpp
38+
${PROJECT_SOURCE_DIR}/network/unix_socket.cpp
39+
${PROJECT_SOURCE_DIR}/network/tcp_socket.cpp)
40+
41+
add_library(parser
42+
${PROJECT_SOURCE_DIR}/parser/base_resp_parser.cpp
3543
${PROJECT_SOURCE_DIR}/parser/array_parser.cpp
3644
${PROJECT_SOURCE_DIR}/parser/bulk_string_parser.cpp
3745
${PROJECT_SOURCE_DIR}/parser/error_parser.cpp
3846
${PROJECT_SOURCE_DIR}/parser/number_parser.cpp
39-
${PROJECT_SOURCE_DIR}/parser/redis_response.cpp
4047
${PROJECT_SOURCE_DIR}/parser/simple_string_parser.cpp)
4148

49+
add_library(async_redis
50+
${PROJECT_SOURCE_DIR}/connection.cpp
51+
${PROJECT_SOURCE_DIR}/monitor.cpp
52+
${PROJECT_SOURCE_DIR}/sentinel.cpp
53+
${PROJECT_SOURCE_DIR}/redis_client.cpp)
54+
55+
56+
## Compiler flags
57+
if(CMAKE_COMPILER_IS_GNUCXX)
58+
set(CMAKE_CXX_FLAGS "-O2") ## Optimize
59+
set(CMAKE_EXE_LINKER_FLAGS "-s") ## Strip binary
60+
endif()
61+
4262
target_link_libraries(event_loop ev)
63+
target_link_libraries(async_redis event_loop parser network)
4364

4465
install(TARGETS event_loop
4566
LIBRARY DESTINATION /usr/local/lib/
@@ -52,5 +73,5 @@ install(TARGETS parser
5273

5374
install(DIRECTORY ${PROJECT_INCLUDE_DIR}/ DESTINATION /usr/local/include)
5475

55-
add_executable (test ${CMAKE_SOURCE_DIR}/test/main.cpp)
56-
target_link_libraries(test parser event_loop ev)
76+
add_executable (a1.out ${CMAKE_SOURCE_DIR}/test/main.cpp)
77+
target_link_libraries(a1.out async_redis)

examples/tcp_server.hpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,33 @@
1010
namespace async_redis {
1111
namespace tcp_server {
1212

13-
template<typename InputOutputHandler>
1413
class tcp_server
1514
{
1615
public:
17-
using tcp_socket = async_redis::network::tcp_socket<InputOutputHandler>;
16+
using tcp_socket = async_redis::network::tcp_socket;
17+
using async_socket = async_redis::network::async_socket;
1818

19-
tcp_server(InputOutputHandler &event_loop)
20-
: loop_(event_loop) {
21-
listener_ = std::make_shared<tcp_socket>(event_loop);
19+
tcp_server(event_loop::event_loop_ev &event_loop)
20+
: loop_(event_loop), listener_(event_loop) {
2221
}
2322

2423
void listen(int port) {
25-
if (!listener_->bind("127.0.0.1", port) || !listener_->listen())
24+
if (!listener_.bind("127.0.0.1", port) || !listener_.listen())
2625
throw;
2726

2827
auto receiver = std::bind(&tcp_server::accept, this, std::placeholders::_1);
29-
listener_->template async_accept<tcp_socket>(receiver);
28+
listener_.async_accept(receiver);
3029
}
3130

32-
void accept(std::shared_ptr<tcp_socket> socket) {
31+
void accept(std::shared_ptr<async_socket> socket) {
3332
auto receiver = std::bind(&tcp_server::chunk_received, this, std::placeholders::_1, socket);
3433
socket->async_read(buffer_, max_buffer_length, receiver);
3534

3635
conns_.emplace(socket, nullptr);
3736
}
3837

3938
private:
40-
void chunk_received(int len, std::shared_ptr<tcp_socket>& socket)
39+
void chunk_received(int len, std::shared_ptr<async_socket>& socket)
4140
{
4241
std::string command;
4342

@@ -78,10 +77,10 @@ namespace tcp_server {
7877
}
7978

8079
private:
81-
using socket_t = std::shared_ptr<tcp_socket>;
80+
using socket_t = std::shared_ptr<async_socket>;
8281

83-
socket_t listener_;
84-
InputOutputHandler& loop_;
82+
tcp_socket listener_;
83+
event_loop::event_loop_ev& loop_;
8584
std::unordered_map<socket_t, void*> conns_;
8685
enum { max_buffer_length = 1024 };
8786
char buffer_[max_buffer_length];

includes/connection.hpp

Lines changed: 27 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -5,109 +5,39 @@
55
#include <memory>
66
#include <tuple>
77

8-
namespace async_redis {
9-
namespace redis_impl
10-
{
11-
using std::string;
12-
13-
template<typename InputOutputHandler, typename SocketType, typename ParserPolicy>
14-
class connection
15-
{
16-
public:
17-
using parser_t = typename ParserPolicy::parser;
18-
using reply_cb_t = std::function<void (parser_t)>;
19-
20-
connection(InputOutputHandler &event_loop)
21-
: event_loop_(event_loop) {
22-
socket_ = std::make_unique<SocketType>(event_loop);
23-
}
24-
25-
template<typename ...Args>
26-
inline void connect(Args... args) {
27-
if (!socket_->is_valid())
28-
socket_ = std::make_unique<SocketType>(event_loop_);
29-
30-
socket_->template async_connect<SocketType>(0, std::forward<Args>(args)...);
31-
}
32-
33-
bool is_connected() const
34-
{ return socket_ && socket_->is_connected(); }
35-
36-
inline int pressure() const
37-
{ return req_queue_.size(); }
38-
39-
void disconnect() {
40-
socket_->close();
41-
decltype(req_queue_) free_me;
42-
free_me.swap(req_queue_);
43-
}
44-
45-
bool pipelined_send(string&& pipelined_cmds, std::vector<reply_cb_t>&& callbacks)
46-
{
47-
return
48-
socket_->async_write(pipelined_cmds, [this, cbs = std::move(callbacks)](ssize_t sent_chunk_len) {
49-
if (sent_chunk_len == 0)
50-
return disconnect();
8+
#include <parser/base_resp_parser.h>
9+
#include <network/async_socket.hpp>
5110

52-
if (!req_queue_.size() && cbs.size())
53-
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
54-
55-
for(auto &&cb : cbs)
56-
req_queue_.emplace(std::move(cb), nullptr);
57-
});
58-
}
59-
60-
bool send(const string&& command, const reply_cb_t& reply_cb)
61-
{
62-
bool read_it = !req_queue_.size();
63-
req_queue_.emplace(reply_cb, nullptr);
64-
65-
return
66-
socket_->async_write(std::move(command), [this, read_it](ssize_t sent_chunk_len) {
67-
if (sent_chunk_len == 0)
68-
return disconnect();
69-
70-
if (read_it)
71-
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
72-
});
73-
}
74-
75-
protected:
76-
void reply_received(ssize_t len)
77-
{
78-
if (len == 0)
79-
return disconnect();
80-
81-
ssize_t acc = 0;
82-
while (acc < len && req_queue_.size())
83-
{
84-
auto& request = req_queue_.front();
85-
86-
auto &cb = std::get<0>(request);
87-
auto &parser = std::get<1>(request);
11+
namespace async_redis
12+
{
13+
class connection
14+
{
15+
using async_socket = network::async_socket;
8816

89-
bool is_finished = false;
90-
acc += ParserPolicy(parser).append_chunk(data_ + acc, len - acc, is_finished);
17+
public:
18+
using parser_t = parser::base_resp_parser::parser;
19+
using reply_cb_t = std::function<void (parser_t)>;
9120

92-
if (!is_finished)
93-
break;
21+
connection(event_loop::event_loop_ev& event_loop);
9422

95-
cb(parser);
96-
req_queue_.pop(); //free the resources
97-
}
23+
void connect(async_socket::connect_handler_t handler, const std::string& ip, int port);
24+
void connect(async_socket::connect_handler_t handler, const std::string& path);
9825

99-
if (req_queue_.size())
100-
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
101-
}
26+
bool is_connected() const;
27+
void disconnect();
28+
bool pipelined_send(std::string&& pipelined_cmds, std::vector<reply_cb_t>&& callbacks);
29+
bool send(const std::string&& command, const reply_cb_t& reply_cb);
10230

103-
private:
104-
std::unique_ptr<SocketType> socket_;
105-
std::queue<std::tuple<reply_cb_t, parser_t>> req_queue_;
31+
private:
32+
void do_read();
33+
void reply_received(ssize_t len);
10634

107-
InputOutputHandler& event_loop_;
108-
enum { max_data_size = 1024 };
109-
char data_[max_data_size];
110-
};
35+
private:
36+
std::unique_ptr<async_socket> socket_;
37+
std::queue<std::tuple<reply_cb_t, parser_t>> req_queue_;
11138

112-
}
39+
event_loop::event_loop_ev& event_loop_;
40+
enum { max_data_size = 1024 };
41+
char data_[max_data_size];
42+
};
11343
}

includes/event_loop/event_loop_ev.h

Lines changed: 6 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,15 @@
11
#pragma once
22

3-
#include <ev.h>
4-
#include <functional>
3+
#include <event_loop/socket_watcher.h>
54
#include <memory>
6-
#include <unordered_map>
7-
#include <queue>
85

96
namespace async_redis {
107
namespace event_loop
118
{
129
class event_loop_ev
1310
{
14-
using socket_id = int;
15-
using string = std::string;
16-
struct socket_queue;
17-
1811
public:
19-
using action = std::function<void()>;
20-
using socket_identifier_t = std::shared_ptr<socket_queue>;
12+
using socket_identifier_t = std::shared_ptr<socket_watcher>;
2113

2214
private:
2315
struct timer_watcher
@@ -32,50 +24,22 @@ namespace async_redis {
3224
}
3325
};
3426

35-
struct socket_queue
36-
{
37-
event_loop_ev& loop_;
38-
39-
ev_io write_watcher;
40-
ev_io read_watcher;
41-
42-
std::queue<action> write_handlers;
43-
std::queue<action> read_handlers;
44-
45-
socket_queue(event_loop_ev& loop, int fd)
46-
: loop_(loop)
47-
{
48-
ev_io_init(&read_watcher, &event_loop_ev::read_handler, fd, EV_READ);
49-
ev_io_init(&write_watcher, &event_loop_ev::write_handler, fd, EV_WRITE);
50-
51-
write_watcher.data = this;
52-
read_watcher.data = this;
53-
}
54-
55-
void stop() {
56-
loop_.stop(write_watcher);
57-
loop_.stop(read_watcher);
58-
}
59-
};
6027

6128
public:
6229
event_loop_ev();
6330
event_loop_ev(struct ev_loop *);
31+
6432
void run();
6533

6634
socket_identifier_t watch(int);
6735
void unwatch(socket_identifier_t&);
6836

69-
void async_write(socket_identifier_t& id, const action& cb);
70-
void async_read(socket_identifier_t& id, const action& cb);
71-
void async_timeout(double time, const action& cb );
37+
void async_write(socket_identifier_t& id, action&& cb);
38+
void async_read(socket_identifier_t& id, action&& cb);
39+
void async_timeout(double time, action&& cb );
7240

7341
private:
74-
static void read_handler(EV_P_ ev_io* w, int revents);
75-
static void write_handler(EV_P_ ev_io* w, int revents);
7642
static void timer_handler(EV_P_ ev_timer* w, int revents);
77-
void stop(ev_io&);
78-
void start(ev_io&);
7943

8044
private:
8145
struct ev_loop* loop_;

includes/event_loop/socket_watcher.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#pragma once
2+
3+
#include <ev.h>
4+
#include <queue>
5+
#include <functional>
6+
7+
namespace async_redis {
8+
namespace event_loop {
9+
10+
typedef std::function<void()> action;
11+
12+
class socket_watcher
13+
{
14+
public:
15+
socket_watcher(struct ev_loop* loop, int fd);
16+
17+
void stop();
18+
19+
void start_reading_with(action&& cb);
20+
void start_writing_with(action&& cb);
21+
22+
private:
23+
void call_read();
24+
void call_write();
25+
26+
static void read_handler(EV_P_ ev_io* w, int revents);
27+
static void write_handler(EV_P_ ev_io* w, int revents);
28+
29+
private:
30+
void start_reading();
31+
void start_writing();
32+
33+
void stop_reading();
34+
void stop_writing();
35+
36+
private:
37+
struct ev_loop* loop_;
38+
39+
ev_io write_watcher_;
40+
ev_io read_watcher_;
41+
42+
std::queue<action> write_handlers_;
43+
std::queue<action> read_handlers_;
44+
};
45+
46+
}
47+
}

0 commit comments

Comments
 (0)