Skip to content

Commit 68f906d

Browse files
author
hamidr
committed
Cleaning socket watcher
1 parent 49203d3 commit 68f906d

File tree

9 files changed

+184
-122
lines changed

9 files changed

+184
-122
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ link_directories("/usr/local/lib")
3030

3131

3232
add_library(event_loop
33+
${PROJECT_SOURCE_DIR}/event_loop/socket_watcher.cpp
3334
${PROJECT_SOURCE_DIR}/event_loop/event_loop_ev.cpp)
3435

3536
add_library(network

includes/event_loop/event_loop_ev.h

Lines changed: 6 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,15 @@
11
#pragma once
22

3-
#include <ev.h>
4-
#include <functional>
3+
#include <event_loop/socket_watcher.h>
54
#include <memory>
6-
#include <unordered_map>
7-
#include <queue>
85

96
namespace async_redis {
107
namespace event_loop
118
{
129
class event_loop_ev
1310
{
14-
using socket_id = int;
15-
using string = std::string;
16-
struct socket_queue;
17-
1811
public:
19-
using action = std::function<void()>;
20-
using socket_identifier_t = std::shared_ptr<socket_queue>;
12+
using socket_identifier_t = std::shared_ptr<socket_watcher>;
2113

2214
private:
2315
struct timer_watcher
@@ -32,50 +24,22 @@ namespace async_redis {
3224
}
3325
};
3426

35-
struct socket_queue
36-
{
37-
event_loop_ev& loop_;
38-
39-
ev_io write_watcher;
40-
ev_io read_watcher;
41-
42-
std::queue<action> write_handlers;
43-
std::queue<action> read_handlers;
44-
45-
socket_queue(event_loop_ev& loop, int fd)
46-
: loop_(loop)
47-
{
48-
ev_io_init(&read_watcher, &event_loop_ev::read_handler, fd, EV_READ);
49-
ev_io_init(&write_watcher, &event_loop_ev::write_handler, fd, EV_WRITE);
50-
51-
write_watcher.data = this;
52-
read_watcher.data = this;
53-
}
54-
55-
void stop() {
56-
loop_.stop(write_watcher);
57-
loop_.stop(read_watcher);
58-
}
59-
};
6027

6128
public:
6229
event_loop_ev();
6330
event_loop_ev(struct ev_loop *);
31+
6432
void run();
6533

6634
socket_identifier_t watch(int);
6735
void unwatch(socket_identifier_t&);
6836

69-
void async_write(socket_identifier_t& id, const action& cb);
70-
void async_read(socket_identifier_t& id, const action& cb);
71-
void async_timeout(double time, const action& cb );
37+
void async_write(socket_identifier_t& id, action&& cb);
38+
void async_read(socket_identifier_t& id, action&& cb);
39+
void async_timeout(double time, action&& cb );
7240

7341
private:
74-
static void read_handler(EV_P_ ev_io* w, int revents);
75-
static void write_handler(EV_P_ ev_io* w, int revents);
7642
static void timer_handler(EV_P_ ev_timer* w, int revents);
77-
void stop(ev_io&);
78-
void start(ev_io&);
7943

8044
private:
8145
struct ev_loop* loop_;

includes/event_loop/socket_watcher.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#pragma once
2+
3+
#include <ev.h>
4+
#include <queue>
5+
#include <functional>
6+
7+
namespace async_redis {
8+
namespace event_loop {
9+
10+
typedef std::function<void()> action;
11+
12+
class socket_watcher
13+
{
14+
public:
15+
socket_watcher(struct ev_loop* loop, int fd);
16+
17+
void stop();
18+
19+
void start_reading_with(action&& cb);
20+
void start_writing_with(action&& cb);
21+
22+
private:
23+
void call_read();
24+
void call_write();
25+
26+
static void read_handler(EV_P_ ev_io* w, int revents);
27+
static void write_handler(EV_P_ ev_io* w, int revents);
28+
29+
private:
30+
void start_reading();
31+
void start_writing();
32+
33+
void stop_reading();
34+
void stop_writing();
35+
36+
private:
37+
struct ev_loop* loop_;
38+
39+
ev_io write_watcher_;
40+
ev_io read_watcher_;
41+
42+
std::queue<action> write_handlers_;
43+
std::queue<action> read_handlers_;
44+
};
45+
46+
}
47+
}

includes/monitor.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <parser/array_parser.h>
44

5+
#include <unordered_map>
56
#include <list>
67
#include <cassert>
78

includes/network/async_socket.hpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <sys/socket.h>
44

55
#include <string>
6+
#include <functional>
67
#include <event_loop/event_loop_ev.h>
78

89
namespace async_redis {
@@ -21,9 +22,9 @@ namespace async_redis {
2122
using socket_t = struct sockaddr;
2223

2324
using socket_identifier_t = event_loop::event_loop_ev::socket_identifier_t;
24-
using recv_cb_t = std::function<void (ssize_t)>;
25-
using ready_cb_t = std::function<void (ssize_t)>;
26-
using connect_handler_t = std::function<void (bool)>;
25+
using recv_cb_t = std::function<void (ssize_t)>;
26+
using ready_cb_t = std::function<void (ssize_t)>;
27+
using connect_handler_t = std::function<void (bool)>;
2728

2829
async_socket(event_loop::event_loop_ev& io);
2930

@@ -36,8 +37,8 @@ namespace async_redis {
3637
bool listen(int backlog = 0);
3738
int accept();
3839
bool close();
39-
bool async_write(const string& data, const ready_cb_t& cb);
40-
bool async_read(char *buffer, int max_len, const recv_cb_t& cb);
40+
bool async_write(const string& data, ready_cb_t cb);
41+
bool async_read(char *buffer, int max_len, recv_cb_t cb);
4142
void async_accept(const std::function<void(std::shared_ptr<async_socket>)>& cb);
4243

4344
bool is_connected() const;

src/event_loop/event_loop_ev.cpp

Lines changed: 6 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -18,94 +18,33 @@ void event_loop_ev::run()
1818
ev_run (loop_, 0);
1919
}
2020

21-
void event_loop_ev::async_write(socket_identifier_t& watcher, const action& cb)
21+
void event_loop_ev::async_write(socket_identifier_t& watcher, action&& cb)
2222
{
23-
auto &handlers = watcher->write_handlers;
24-
handlers.push(cb);
25-
26-
if (watcher->write_handlers.size() == 1) {
27-
ev_io *w = &watcher->write_watcher;
28-
ev_io_start(loop_, w);
29-
}
23+
watcher->start_writing_with(std::move(cb));
3024
}
3125

32-
void event_loop_ev::async_read(socket_identifier_t& watcher, const action& cb)
26+
void event_loop_ev::async_read(socket_identifier_t& watcher, action&& cb)
3327
{
34-
auto &handlers = watcher->read_handlers;
35-
handlers.push(cb);
36-
37-
if (watcher->read_handlers.size() == 1) {
38-
ev_io *w = &watcher->read_watcher;
39-
ev_io_start(loop_, w);
40-
}
28+
watcher->start_reading_with(std::move(cb));
4129
}
4230

43-
void event_loop_ev::async_timeout(double time, const action& cb )
31+
void event_loop_ev::async_timeout(double time, action&& cb )
4432
{
4533
timer_watcher *w = new timer_watcher(time, cb);
4634
ev_timer_start (loop_, &w->timer);
4735
}
4836

49-
void event_loop_ev::read_handler(EV_P_ ev_io* w, int revents)
50-
{
51-
if (revents & EV_ERROR)
52-
return;
53-
54-
socket_queue* sq = reinterpret_cast<socket_queue*>(w->data);
55-
auto &handlers = sq->read_handlers;
56-
57-
if (handlers.size())
58-
{
59-
auto &action = handlers.front();
60-
action();
61-
handlers.pop();
62-
}
63-
64-
if (!handlers.size())
65-
ev_io_stop(loop, &sq->read_watcher);
66-
}
67-
68-
void event_loop_ev::write_handler(EV_P_ ev_io* w, int revents)
69-
{
70-
if (revents & EV_ERROR)
71-
return;
72-
73-
socket_queue* sq = reinterpret_cast<socket_queue*>(w->data);
74-
75-
auto &handlers = sq->write_handlers;
76-
77-
if (handlers.size())
78-
{
79-
auto &action = handlers.front();
80-
action();
81-
handlers.pop();
82-
}
83-
84-
if (!handlers.size())
85-
ev_io_stop(loop, &sq->write_watcher);
86-
}
87-
8837
void event_loop_ev::timer_handler(EV_P_ ev_timer* w, int revents)
8938
{
9039
timer_watcher *watcher = reinterpret_cast<timer_watcher*>(w);
9140
watcher->timeout_cb();
9241
delete watcher;
9342
}
9443

95-
void event_loop_ev::stop(ev_io& io)
96-
{
97-
ev_clear_pending(loop_, &io);
98-
ev_io_stop(loop_, &io);
99-
}
100-
101-
void event_loop_ev::start(ev_io& io)
102-
{
103-
ev_io_start(loop_, &io);
104-
}
10544

10645
event_loop_ev::socket_identifier_t event_loop_ev::watch(int fd)
10746
{
108-
return std::make_shared<event_loop_ev::socket_queue>(*this, fd);
47+
return std::make_shared<socket_watcher>(loop_, fd);
10948
}
11049

11150
void event_loop_ev::unwatch(socket_identifier_t& id)

src/event_loop/socket_watcher.cpp

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#include "../../includes/event_loop/socket_watcher.h"
2+
3+
namespace async_redis {
4+
namespace event_loop {
5+
6+
socket_watcher::socket_watcher(struct ev_loop* loop, int fd)
7+
: loop_(loop)
8+
{
9+
ev_io_init(&read_watcher_, &socket_watcher::read_handler, fd, EV_READ);
10+
ev_io_init(&write_watcher_, &socket_watcher::write_handler, fd, EV_WRITE);
11+
12+
write_watcher_.data = this;
13+
read_watcher_.data = this;
14+
}
15+
16+
void socket_watcher::start_writing_with(action&& cb)
17+
{
18+
write_handlers_.push(std::move(cb));
19+
20+
if (write_handlers_.size() != 1)
21+
return;
22+
23+
start_writing();
24+
}
25+
26+
void socket_watcher::start_reading_with(action&& cb)
27+
{
28+
read_handlers_.push(std::move(cb));
29+
30+
if (read_handlers_.size() != 1)
31+
return;
32+
33+
start_reading();
34+
}
35+
36+
void socket_watcher::call_read()
37+
{
38+
if (read_handlers_.size())
39+
{
40+
auto &action = read_handlers_.front();
41+
action();
42+
read_handlers_.pop();
43+
}
44+
45+
if (!read_handlers_.size())
46+
stop_reading();
47+
}
48+
49+
50+
void socket_watcher::call_write()
51+
{
52+
if (write_handlers_.size())
53+
{
54+
auto &action = write_handlers_.front();
55+
action();
56+
write_handlers_.pop();
57+
}
58+
59+
if (!write_handlers_.size())
60+
stop_writing();
61+
}
62+
63+
void socket_watcher::write_handler(EV_P_ ev_io* w, int revents)
64+
{
65+
if (revents & EV_ERROR)
66+
return;
67+
68+
socket_watcher* sq = reinterpret_cast<socket_watcher*>(w->data);
69+
sq->call_write();
70+
}
71+
72+
void socket_watcher::read_handler(EV_P_ ev_io* w, int revents)
73+
{
74+
if (revents & EV_ERROR)
75+
return;
76+
77+
socket_watcher* sq = reinterpret_cast<socket_watcher*>(w->data);
78+
sq->call_read();
79+
}
80+
81+
82+
void socket_watcher::stop_reading()
83+
{
84+
ev_clear_pending(loop_, &read_watcher_);
85+
ev_io_stop(loop_, &read_watcher_);
86+
}
87+
88+
void socket_watcher::stop_writing()
89+
{
90+
ev_clear_pending(loop_, &write_watcher_);
91+
ev_io_stop(loop_, &write_watcher_);
92+
}
93+
94+
void socket_watcher::start_writing()
95+
{
96+
ev_io_start(loop_, &write_watcher_);
97+
}
98+
99+
void socket_watcher::start_reading()
100+
{
101+
ev_io_start(loop_, &read_watcher_);
102+
}
103+
104+
void socket_watcher::stop()
105+
{
106+
stop_writing();
107+
stop_reading();
108+
}
109+
110+
}
111+
}

0 commit comments

Comments
 (0)