Skip to content

Commit fdd259b

Browse files
author
hamidr
committed
Refactoring finished
1 parent 68f906d commit fdd259b

File tree

9 files changed

+790
-653
lines changed

9 files changed

+790
-653
lines changed

CMakeLists.txt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ add_library(parser
4646
${PROJECT_SOURCE_DIR}/parser/number_parser.cpp
4747
${PROJECT_SOURCE_DIR}/parser/simple_string_parser.cpp)
4848

49+
add_library(async_redis
50+
${PROJECT_SOURCE_DIR}/connection.cpp
51+
${PROJECT_SOURCE_DIR}/monitor.cpp
52+
${PROJECT_SOURCE_DIR}/sentinel.cpp
53+
${PROJECT_SOURCE_DIR}/redis_client.cpp)
54+
4955

5056
## Compiler flags
5157
if(CMAKE_COMPILER_IS_GNUCXX)
@@ -54,6 +60,7 @@ if(CMAKE_COMPILER_IS_GNUCXX)
5460
endif()
5561

5662
target_link_libraries(event_loop ev)
63+
target_link_libraries(async_redis event_loop parser network)
5764

5865
install(TARGETS event_loop
5966
LIBRARY DESTINATION /usr/local/lib/
@@ -67,4 +74,4 @@ install(TARGETS parser
6774
install(DIRECTORY ${PROJECT_INCLUDE_DIR}/ DESTINATION /usr/local/include)
6875

6976
add_executable (a1.out ${CMAKE_SOURCE_DIR}/test/main.cpp)
70-
target_link_libraries(a1.out parser event_loop network)
77+
target_link_libraries(a1.out async_redis)

includes/connection.hpp

Lines changed: 10 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -6,122 +6,31 @@
66
#include <tuple>
77

88
#include <parser/base_resp_parser.h>
9-
#include <network/tcp_socket.hpp>
10-
#include <network/unix_socket.hpp>
9+
#include <network/async_socket.hpp>
1110

1211
namespace async_redis
1312
{
1413
class connection
1514
{
1615
using async_socket = network::async_socket;
17-
using tcp_socket = network::tcp_socket;
18-
using unix_socket = network::unix_socket;
1916

2017
public:
2118
using parser_t = parser::base_resp_parser::parser;
2219
using reply_cb_t = std::function<void (parser_t)>;
2320

24-
connection(event_loop::event_loop_ev& event_loop)
25-
: event_loop_(event_loop) {
26-
}
21+
connection(event_loop::event_loop_ev& event_loop);
2722

28-
void connect(async_socket::connect_handler_t handler, const std::string& ip, int port)
29-
{
30-
if (!socket_ || !socket_->is_valid())
31-
socket_ = std::make_unique<tcp_socket>(event_loop_);
23+
void connect(async_socket::connect_handler_t handler, const std::string& ip, int port);
24+
void connect(async_socket::connect_handler_t handler, const std::string& path);
3225

33-
static_cast<tcp_socket*>(socket_.get())->async_connect(ip, port, handler);
34-
}
35-
36-
void connect(async_socket::connect_handler_t handler, const std::string& path)
37-
{
38-
if (!socket_ || !socket_->is_valid())
39-
socket_ = std::make_unique<unix_socket>(event_loop_);
40-
41-
static_cast<unix_socket*>(socket_.get())->async_connect(path, handler);
42-
}
43-
44-
bool is_connected() const
45-
{ return socket_ && socket_->is_connected(); }
46-
47-
inline int pressure() const
48-
{ return req_queue_.size(); }
49-
50-
void disconnect() {
51-
socket_->close();
52-
//TODO: check the policy! Should we free queue or retry again?
53-
decltype(req_queue_) free_me;
54-
free_me.swap(req_queue_);
55-
}
56-
57-
bool pipelined_send(std::string&& pipelined_cmds, std::vector<reply_cb_t>&& callbacks)
58-
{
59-
if (!is_connected())
60-
return false;
61-
62-
return
63-
socket_->async_write(pipelined_cmds, [this, cbs = std::move(callbacks)](ssize_t sent_chunk_len) {
64-
if (sent_chunk_len == 0)
65-
return disconnect();
66-
67-
if (!req_queue_.size() && cbs.size())
68-
do_read();
69-
70-
for(auto &&cb : cbs)
71-
req_queue_.emplace(std::move(cb), nullptr);
72-
});
73-
}
74-
75-
bool send(const std::string&& command, const reply_cb_t& reply_cb)
76-
{
77-
if (!is_connected())
78-
return false;
79-
80-
bool read_it = !req_queue_.size();
81-
req_queue_.emplace(reply_cb, nullptr);
82-
83-
return
84-
socket_->async_write(std::move(command), [this, read_it](ssize_t sent_chunk_len) {
85-
if (sent_chunk_len == 0)
86-
return disconnect();
87-
88-
if (read_it)
89-
do_read();
90-
});
91-
}
26+
bool is_connected() const;
27+
void disconnect();
28+
bool pipelined_send(std::string&& pipelined_cmds, std::vector<reply_cb_t>&& callbacks);
29+
bool send(const std::string&& command, const reply_cb_t& reply_cb);
9230

9331
private:
94-
inline
95-
void do_read() {
96-
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
97-
}
98-
99-
void reply_received(ssize_t len)
100-
{
101-
if (len == 0)
102-
return disconnect();
103-
104-
ssize_t acc = 0;
105-
while (acc < len && req_queue_.size())
106-
{
107-
auto& request = req_queue_.front();
108-
109-
auto &cb = std::get<0>(request);
110-
auto &parser = std::get<1>(request);
111-
112-
bool is_finished = false;
113-
acc += parser::base_resp_parser::append_chunk(parser, data_ + acc, len - acc, is_finished);
114-
115-
if (!is_finished)
116-
break;
117-
118-
cb(parser);
119-
req_queue_.pop(); //free the resources
120-
}
121-
122-
if (req_queue_.size())
123-
do_read();
124-
}
32+
void do_read();
33+
void reply_received(ssize_t len);
12534

12635
private:
12736
std::unique_ptr<async_socket> socket_;

0 commit comments

Comments
 (0)