Skip to content

Commit aae11d1

Browse files
committed
Removed mio from UDP pathways, since it seemed to be introducing a lot of loss and ordering irrelgularities
1 parent 2373aac commit aae11d1

File tree

2 files changed

+57
-77
lines changed

2 files changed

+57
-77
lines changed

src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ pub fn execute(args:ArgMatches) -> BoxResult<()> {
292292

293293

294294
if is_alive() { //if interrupted while waiting for the server to respond, there's no reason to continue
295-
log::info!("informing server that testing can begin");
295+
log::info!("informing server that testing can begin...");
296296
//tell the server to start
297297
send(&mut stream, &prepare_begin())?;
298298

src/stream/udp.rs

Lines changed: 56 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,9 @@ pub mod receiver {
7474

7575
use chrono::{NaiveDateTime};
7676

77-
use mio::net::UdpSocket;
78-
use mio::{Events, Ready, Poll, PollOpt, Token};
77+
use std::net::UdpSocket;
7978

80-
const POLL_TIMEOUT:Duration = Duration::from_millis(250);
79+
const READ_TIMEOUT:Duration = Duration::from_millis(50);
8180
const RECEIVE_TIMEOUT:Duration = Duration::from_secs(3);
8281

8382
struct UdpReceiverIntervalHistory {
@@ -99,8 +98,6 @@ pub mod receiver {
9998
next_packet_id: u64,
10099

101100
socket: UdpSocket,
102-
mio_poll_token: Token,
103-
mio_poll: Poll,
104101
}
105102
impl UdpReceiver {
106103
pub fn new(test_definition:super::UdpTestDefinition, stream_idx:&u8, port:&u16, peer_ip:&IpAddr, receive_buffer:&usize) -> super::BoxResult<UdpReceiver> {
@@ -109,6 +106,7 @@ pub mod receiver {
109106
IpAddr::V6(_) => UdpSocket::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port)).expect(format!("failed to bind UDP socket, port {}", port).as_str()),
110107
IpAddr::V4(_) => UdpSocket::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port)).expect(format!("failed to bind UDP socket, port {}", port).as_str()),
111108
};
109+
socket.set_read_timeout(Some(READ_TIMEOUT))?;
112110
if !cfg!(windows) { //NOTE: features unsupported on Windows
113111
if *receive_buffer != 0 {
114112
log::debug!("setting receive-buffer to {}...", receive_buffer);
@@ -117,15 +115,6 @@ pub mod receiver {
117115
}
118116
log::debug!("bound UDP receive socket for stream {}: {}", stream_idx, socket.local_addr()?);
119117

120-
let mio_poll_token = Token(0);
121-
let mio_poll = Poll::new()?;
122-
mio_poll.register(
123-
&socket,
124-
mio_poll_token,
125-
Ready::readable(),
126-
PollOpt::edge(),
127-
)?;
128-
129118
Ok(UdpReceiver{
130119
active: true,
131120
test_definition: test_definition,
@@ -134,8 +123,6 @@ pub mod receiver {
134123
next_packet_id: 0,
135124

136125
socket: socket,
137-
mio_poll_token: mio_poll_token,
138-
mio_poll: mio_poll,
139126
})
140127
}
141128

@@ -227,7 +214,6 @@ pub mod receiver {
227214
}
228215
impl super::TestStream for UdpReceiver {
229216
fn run_interval(&mut self) -> Option<super::BoxResult<Box<dyn super::IntervalResult + Sync + Send>>> {
230-
let mut events = Events::with_capacity(1); //only watching one socket
231217
let mut buf = vec![0_u8; self.test_definition.length.into()];
232218

233219
let mut bytes_received:u64 = 0;
@@ -251,66 +237,57 @@ pub mod receiver {
251237
return Some(Err(Box::new(simple_error::simple_error!("UDP reception for stream {} timed out, likely because the end-signal was lost", self.stream_idx))));
252238
}
253239

254-
log::trace!("awaiting UDP packet on stream {}...", self.stream_idx);
255-
let poll_result = self.mio_poll.poll(&mut events, Some(POLL_TIMEOUT));
256-
if poll_result.is_err() {
257-
return Some(Err(Box::new(poll_result.unwrap_err())));
258-
}
259-
for event in events.iter() {
260-
if event.token() == self.mio_poll_token {
261-
loop {
262-
match self.socket.recv_from(&mut buf) {
263-
Ok((packet_size, peer_addr)) => {
264-
log::trace!("received {} bytes in UDP packet {} from {}", packet_size, self.stream_idx, peer_addr);
265-
if packet_size == 16 { //possible end-of-test message
266-
if &buf[0..16] == self.test_definition.test_id { //test's over
267-
self.stop();
268-
break;
269-
}
270-
}
271-
if packet_size < super::TEST_HEADER_SIZE as usize {
272-
log::warn!("received malformed packet with size {} for UDP stream {} from {}", packet_size, self.stream_idx, peer_addr);
273-
continue
274-
}
275-
276-
if self.process_packet(&buf, &mut history) {
277-
//NOTE: duplicate packets increase this count; this is intentional because the stack still processed data
278-
bytes_received += packet_size as u64 + super::UDP_HEADER_SIZE as u64;
279-
280-
let elapsed_time = start.elapsed();
281-
if elapsed_time >= super::INTERVAL {
282-
return Some(Ok(Box::new(super::UdpReceiveResult{
283-
timestamp: super::get_unix_timestamp(),
284-
285-
stream_idx: self.stream_idx,
286-
287-
duration: elapsed_time.as_secs_f32(),
288-
289-
bytes_received: bytes_received,
290-
packets_received: history.packets_received,
291-
packets_lost: history.packets_lost,
292-
packets_out_of_order: history.packets_out_of_order,
293-
packets_duplicated: history.packets_duplicated,
294-
295-
unbroken_sequence: history.unbroken_sequence,
296-
jitter_seconds: history.jitter_seconds,
297-
})))
298-
}
299-
} else {
300-
log::warn!("received packet unrelated to UDP stream {} from {}", self.stream_idx, peer_addr);
301-
continue
302-
}
303-
},
304-
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { //receive timeout
240+
log::trace!("awaiting UDP packets on stream {}...", self.stream_idx);
241+
loop {
242+
match self.socket.recv_from(&mut buf) {
243+
Ok((packet_size, peer_addr)) => {
244+
log::trace!("received {} bytes in UDP packet {} from {}", packet_size, self.stream_idx, peer_addr);
245+
if packet_size == 16 { //possible end-of-test message
246+
if &buf[0..16] == self.test_definition.test_id { //test's over
247+
self.stop();
305248
break;
306-
},
307-
Err(e) => {
308-
return Some(Err(Box::new(e)));
309-
},
249+
}
310250
}
311-
}
312-
} else {
313-
log::warn!("got event for unbound token: {:?}", event);
251+
if packet_size < super::TEST_HEADER_SIZE as usize {
252+
log::warn!("received malformed packet with size {} for UDP stream {} from {}", packet_size, self.stream_idx, peer_addr);
253+
continue;
254+
}
255+
256+
if self.process_packet(&buf, &mut history) {
257+
//NOTE: duplicate packets increase this count; this is intentional because the stack still processed data
258+
bytes_received += packet_size as u64 + super::UDP_HEADER_SIZE as u64;
259+
260+
let elapsed_time = start.elapsed();
261+
if elapsed_time >= super::INTERVAL {
262+
return Some(Ok(Box::new(super::UdpReceiveResult{
263+
timestamp: super::get_unix_timestamp(),
264+
265+
stream_idx: self.stream_idx,
266+
267+
duration: elapsed_time.as_secs_f32(),
268+
269+
bytes_received: bytes_received,
270+
packets_received: history.packets_received,
271+
packets_lost: history.packets_lost,
272+
packets_out_of_order: history.packets_out_of_order,
273+
packets_duplicated: history.packets_duplicated,
274+
275+
unbroken_sequence: history.unbroken_sequence,
276+
jitter_seconds: history.jitter_seconds,
277+
})))
278+
}
279+
} else {
280+
log::warn!("received packet unrelated to UDP stream {} from {}", self.stream_idx, peer_addr);
281+
continue;
282+
}
283+
},
284+
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { //receive timeout
285+
//break;
286+
continue;
287+
},
288+
Err(e) => {
289+
return Some(Err(Box::new(e)));
290+
},
314291
}
315292
}
316293
}
@@ -358,10 +335,12 @@ pub mod sender {
358335
use std::os::unix::io::AsRawFd;
359336
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
360337

361-
use mio::net::UdpSocket;
338+
use std::net::UdpSocket;
362339

363340
use std::thread::{sleep};
364341

342+
const WRITE_TIMEOUT:Duration = Duration::from_millis(50);
343+
365344
pub struct UdpSender {
366345
active: bool,
367346
test_definition: super::UdpTestDefinition,
@@ -384,6 +363,7 @@ pub mod sender {
384363
IpAddr::V6(_) => UdpSocket::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port)).expect(format!("failed to bind UDP socket, port {}", port).as_str()),
385364
IpAddr::V4(_) => UdpSocket::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port)).expect(format!("failed to bind UDP socket, port {}", port).as_str()),
386365
};
366+
socket.set_write_timeout(Some(WRITE_TIMEOUT))?;
387367
if !cfg!(windows) { //NOTE: features unsupported on Windows
388368
if *send_buffer != 0 {
389369
log::debug!("setting send-buffer to {}...", send_buffer);

0 commit comments

Comments
 (0)