Skip to content

Commit 255fd5f

Browse files
author
hamidr
committed
Adapt to ASIO
2 parents b0bc663 + 3fa1547 commit 255fd5f

File tree

13 files changed

+161
-168
lines changed

13 files changed

+161
-168
lines changed

.gitmodules

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
[submodule "libs/libevpp"]
2-
path = libs/libevpp
3-
url = https://github.com/hamidr/libevpp
1+
[submodule "libs/asio"]
2+
path = libs/asio
3+
url = https://github.com/chriskohlhoff/asio

CMakeLists.txt

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,7 @@ include_directories(${PROJECT_INCLUDE_DIR})
2525
include_directories(/usr/include/)
2626
include_directories(/usr/local/include)
2727

28-
link_directories(/usr/lib)
29-
link_directories(/usr/local/lib)
30-
31-
add_subdirectory(libs/libevpp/)
32-
33-
include_directories(libs/libevpp/includes)
34-
28+
include_directories(libs/asio/asio/include/)
3529

3630
add_library(parser
3731
${PROJECT_SOURCE_DIR}/parser/base_resp_parser.cpp
@@ -54,7 +48,20 @@ if(CMAKE_COMPILER_IS_GNUCXX)
5448
set(CMAKE_EXE_LINKER_FLAGS "-s") ## Strip binary
5549
endif()
5650

57-
target_link_libraries(async_redis network parser)
51+
target_link_libraries(async_redis parser pthread)
52+
53+
install(TARGETS async_redis
54+
LIBRARY DESTINATION /usr/local/lib/
55+
ARCHIVE DESTINATION /usr/local/lib/
56+
)
57+
58+
install(TARGETS parser
59+
LIBRARY DESTINATION /usr/local/lib/
60+
ARCHIVE DESTINATION /usr/local/lib/
61+
)
62+
63+
64+
install(DIRECTORY ${PROJECT_INCLUDE_DIR}/ DESTINATION /usr/local/include)
5865

5966
add_executable (a2.out test/main.cpp)
6067
target_link_libraries(a2.out async_redis)

includes/async_redis/connection.hpp

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,27 @@
22

33
#include <queue>
44
#include <functional>
5-
#include <memory>
65
#include <tuple>
76

7+
#include <asio/io_context.hpp>
8+
#include <asio/ip/tcp.hpp>
89
#include <async_redis/parser/base_resp_parser.h>
9-
#include <libevpp/event_loop/event_loop_ev.h>
10-
#include <libevpp/network/async_socket.hpp>
11-
12-
using namespace libevpp;
1310

1411
namespace async_redis
1512
{
1613
class connection
1714
{
18-
using async_socket = libevpp::network::async_socket;
15+
connection(const connection&) = delete;
16+
connection& operator = (const connection&) = delete;
1917

2018
public:
19+
using connect_handler_t = std::function<void(bool)>;
2120
using parser_t = parser::base_resp_parser::parser;
2221
using reply_cb_t = std::function<void (parser_t&)>;
2322

24-
connection(event_loop::event_loop_ev& event_loop);
23+
connection(asio::io_context&);
2524

26-
void connect(async_socket::connect_handler_t handler, const std::string& ip, int port);
27-
void connect(async_socket::connect_handler_t handler, const std::string& path);
25+
void connect(connect_handler_t handler, const std::string& ip, int port);
2826

2927
bool is_connected() const;
3028
void disconnect();
@@ -33,14 +31,14 @@ namespace async_redis
3331

3432
private:
3533
void do_read();
36-
void reply_received(ssize_t len);
34+
void reply_received(const asio::error_code& ec, size_t len);
3735

3836
private:
39-
std::unique_ptr<async_socket> socket_;
37+
asio::ip::tcp::socket socket_;
4038
std::queue<std::tuple<reply_cb_t, parser_t>> req_queue_;
4139

42-
event_loop::event_loop_ev& event_loop_;
4340
enum { max_data_size = 1024 };
4441
char data_[max_data_size];
42+
bool is_connected_ = false;
4543
};
4644
}

includes/async_redis/monitor.hpp

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
#pragma once
22

3-
#include <libevpp/network/async_socket.hpp>
43
#include <async_redis/parser/base_resp_parser.h>
54

65
#include <unordered_map>
76
#include <list>
87
#include <string>
98

9+
#include <asio/io_context.hpp>
10+
#include <asio/ip/tcp.hpp>
11+
1012
using std::string;
11-
using namespace libevpp;
1213

1314
namespace async_redis
1415
{
1516
class monitor
1617
{
17-
using async_socket = network::async_socket;
18+
monitor(const monitor&) = delete;
19+
monitor& operator = (const monitor&) = delete;
20+
21+
using connect_handler_t = std::function<void(bool)>;
1822

1923
public:
2024
enum EventState {
@@ -27,10 +31,8 @@ namespace async_redis
2731
using parser_t = parser::base_resp_parser::parser;
2832
using watcher_cb_t = std::function<void (const string&, parser_t&, EventState)>;
2933

30-
monitor(event_loop::event_loop_ev &event_loop);
31-
32-
void connect(async_socket::connect_handler_t handler, const std::string& ip, int port);
33-
void connect(async_socket::connect_handler_t handler, const std::string& path);
34+
monitor(asio::io_context &event_loop);
35+
void connect(connect_handler_t handler, const std::string& ip, int port);
3436

3537
bool is_connected() const;
3638
bool is_watching() const;
@@ -42,6 +44,7 @@ namespace async_redis
4244
bool punsubscribe(const std::list<string>& channels, watcher_cb_t&& cb);
4345

4446
private:
47+
void do_read();
4548
bool send_and_receive(string&& data);
4649
void handle_message_event(parser_t& channel, parser_t& value);
4750
void handle_subscribe_event(parser_t& channel, parser_t& clients);
@@ -52,18 +55,18 @@ namespace async_redis
5255
void handle_event(parser_t&& request);
5356
void report_disconnect();
5457

55-
void stream_received(ssize_t len);
58+
void stream_received(const asio::error_code& ec, size_t len);
5659

5760
private:
5861
parser_t parser_;
5962
std::unordered_map<std::string, watcher_cb_t> watchers_;
6063
std::unordered_map<std::string, watcher_cb_t> pwatchers_;
6164

62-
std::unique_ptr<async_socket> socket_;
63-
event_loop::event_loop_ev &io_;
65+
asio::ip::tcp::socket socket_;
6466
enum { max_data_size = 1024 };
6567
char data_[max_data_size];
6668
bool is_watching_ = false;
69+
bool is_connected_ = false;
6770
};
6871

6972
}

includes/async_redis/redis_client.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ namespace async_redis
1212

1313
class redis_client
1414
{
15+
redis_client(const redis_client&) = delete;
16+
redis_client& operator = (const redis_client&) = delete;
17+
1518
using reply_cb_t = connection::reply_cb_t;
16-
using connect_cb_t = network::async_socket::connect_handler_t;
19+
using connect_cb_t = connection::connect_handler_t;
1720

1821
public:
1922
class connect_exception : std::exception {};
2023
using parser_t = connection::parser_t;
2124

22-
redis_client(event_loop::event_loop_ev &eventIO, int n = 1);
25+
redis_client(asio::io_context &io, uint n = 1);
2326
bool is_connected() const;
2427

2528
template <typename ...Args>

includes/async_redis/sentinel.hpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
11
#pragma once
2-
#include <libevpp/network/async_socket.hpp>
32
#include <async_redis/parser/base_resp_parser.h>
43

5-
#include <async_redis/monitor.hpp>
64
#include <functional>
5+
#include <async_redis/monitor.hpp>
76
#include <async_redis/connection.hpp>
87

9-
using namespace libevpp;
10-
118
namespace async_redis {
129
class sentinel
1310
{
14-
using socket_t = libevpp::network::async_socket;
15-
using connect_cb_t = socket_t::connect_handler_t;
11+
using connect_cb_t = connection::connect_handler_t;
1612

1713
public:
1814
using parser_t = parser::base_resp_parser::parser;
@@ -22,7 +18,7 @@ namespace async_redis {
2218
Watching
2319
};
2420

25-
sentinel(event_loop::event_loop_ev &event_loop);
21+
sentinel(asio::io_context &io);
2622

2723
bool is_connected() const;
2824
bool connect(const string& ip, int port, connect_cb_t&& connector);

libs/asio

Submodule asio added at 14db637

libs/libevpp

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/connection.cpp

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,59 @@
11
#include "../includes/async_redis/connection.hpp"
22

3-
#include <queue>
4-
#include <functional>
5-
#include <memory>
6-
#include <tuple>
73

84
#include <async_redis/parser/base_resp_parser.h>
9-
#include <libevpp/network/tcp_socket.hpp>
10-
#include <libevpp/network/unix_socket.hpp>
11-
125

136
namespace async_redis
147
{
158

16-
using tcp_socket = network::tcp_socket;
17-
using unix_socket = network::unix_socket;
18-
19-
connection::connection(event_loop::event_loop_ev& event_loop)
20-
: event_loop_(event_loop) {
21-
}
9+
connection::connection(asio::io_context& io)
10+
: socket_(io)
11+
{ }
2212

23-
void connection::connect(async_socket::connect_handler_t handler, const std::string& ip, int port)
13+
void connection::connect(connect_handler_t handler, const std::string& ip, int port)
2414
{
25-
if (!socket_ || !socket_->is_valid())
26-
socket_ = std::make_unique<tcp_socket>(event_loop_);
15+
asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip), port);
2716

28-
static_cast<tcp_socket*>(socket_.get())->async_connect(ip, port, handler);
29-
}
30-
31-
void connection::connect(async_socket::connect_handler_t handler, const std::string& path)
32-
{
33-
if (!socket_ || !socket_->is_valid())
34-
socket_ = std::make_unique<unix_socket>(event_loop_);
35-
36-
static_cast<unix_socket*>(socket_.get())->async_connect(path, handler);
17+
socket_.async_connect(endpoint, [handler, this](const asio::error_code &ec) {
18+
is_connected_ = !ec;
19+
handler(!ec);
20+
});
3721
}
3822

3923
bool connection::is_connected() const
4024
{
41-
return socket_ && socket_->is_connected();
25+
return is_connected_;
4226
}
4327

4428
void connection::disconnect()
4529
{
46-
socket_->close();
30+
socket_.close();
4731
//TODO: check the policy! Should we free queue or retry again?
4832
decltype(req_queue_) free_me;
4933
free_me.swap(req_queue_);
34+
is_connected_ = false;
5035
}
5136

5237
bool connection::pipelined_send(std::string&& pipelined_cmds, std::vector<reply_cb_t>&& callbacks)
5338
{
5439
if (!is_connected())
5540
return false;
5641

57-
return
58-
socket_->async_write(pipelined_cmds, [this, cbs = std::move(callbacks)](ssize_t sent_chunk_len) {
59-
if (sent_chunk_len == 0)
42+
socket_.async_send(asio::buffer(pipelined_cmds.data(), pipelined_cmds.length()),
43+
[this, cbs = std::move(callbacks)](const asio::error_code &ec, size_t len)
44+
{
45+
if (ec)
6046
return disconnect();
6147

6248
if (!req_queue_.size() && cbs.size())
6349
do_read();
6450

6551
for(auto &&cb : cbs)
6652
req_queue_.emplace(std::move(cb), nullptr);
67-
});
53+
}
54+
);
55+
56+
return true;
6857
}
6958

7059
bool connection::send(const std::string&& command, const reply_cb_t& reply_cb)
@@ -75,24 +64,36 @@ bool connection::send(const std::string&& command, const reply_cb_t& reply_cb)
7564
bool read_it = !req_queue_.size();
7665
req_queue_.emplace(reply_cb, nullptr);
7766

78-
return
79-
socket_->async_write(std::move(command), [this, read_it](ssize_t sent_chunk_len) {
80-
if (sent_chunk_len == 0)
67+
socket_.async_send(asio::buffer(command.data(), command.length()),
68+
[this, read_it](const asio::error_code &ec, size_t len)
69+
{
70+
// std::cout << ec << std::endl;
71+
if (ec)
8172
return disconnect();
8273

83-
if (read_it)
84-
do_read();
85-
});
74+
if (read_it)
75+
do_read();
76+
}
77+
);
78+
return true;
8679
}
8780

8881
void connection::do_read()
8982
{
90-
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
83+
socket_.async_read_some(
84+
asio::buffer(data_, max_data_size),
85+
std::bind(
86+
&connection::reply_received,
87+
this,
88+
std::placeholders::_1,
89+
std::placeholders::_2
90+
)
91+
);
9192
}
9293

93-
void connection::reply_received(ssize_t len)
94+
void connection::reply_received(const asio::error_code& ec, size_t len)
9495
{
95-
if (len == 0)
96+
if (ec)
9697
return disconnect();
9798

9899
ssize_t acc = 0;

0 commit comments

Comments
 (0)