Skip to content

Commit fff6bcd

Browse files
committed
fix: sim data recv only after cold started and parents all done
1 parent b7be6a3 commit fff6bcd

File tree

8 files changed

+140
-114
lines changed

8 files changed

+140
-114
lines changed

serverless_sim/Cargo.toml

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,19 @@ rand_pcg = "0.3.1"
3232
async-trait = "0.1.51"
3333
parking_lot = "0.12"
3434
rand_distr = "0.4.3"
35+
futures = "0.3"
36+
cpu-time = "1.0.0"
37+
38+
[target.'cfg(windows)'.dependencies]
3539
thread-priority = "0.3"
36-
# futures = "0.3"
37-
# cpu-time = "1.0.0"
3840

39-
# [dependencies.windows]
40-
# version = "0.52"
41-
# features = [
42-
# "Win32_Foundation",
43-
# "Win32_System_Threading",
44-
# "Win32_System_Diagnostics_Etw",
45-
# "Win32_System_Time",
46-
# "Win32_Foundation",
47-
# "Win32_Security"
48-
# ]
41+
[dependencies.windows]
42+
version = "0.52"
43+
features = [
44+
"Win32_Foundation",
45+
"Win32_System_Threading",
46+
"Win32_System_Diagnostics_Etw",
47+
"Win32_System_Time",
48+
"Win32_Foundation",
49+
"Win32_Security",
50+
]

serverless_sim/src/mechanism_thread.rs

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::sync::mpsc;
22

3-
use enum_as_inner::EnumAsInner;
4-
use thread_priority::{ set_current_thread_priority, ThreadPriority, WinAPIThreadPriority };
5-
// use windows::Win32::System::Threading::{ SetThreadPriority, GetCurrentThread, THREAD_PRIORITY };
3+
#[cfg(target_os = "windows")]
4+
use thread_priority::{set_current_thread_priority, ThreadPriority, WinAPIThreadPriority};
5+
#[cfg(target_os = "windows")]
6+
use windows::Win32::System::Threading::{GetCurrentThread, SetThreadPriority, THREAD_PRIORITY};
67

78
use crate::actions::ESActionWrapper;
8-
use crate::mechanism::{ DownCmd, Mechanism, MechanismImpl, ScheCmd, SimEnvObserve, UpCmd };
9+
use crate::mechanism::{DownCmd, Mechanism, MechanismImpl, ScheCmd, SimEnvObserve, UpCmd};
910

1011
use crate::util;
1112
use crate::with_env_sub::WithEnvHelp;
@@ -43,6 +44,8 @@ pub fn spawn(mech: MechanismImpl) -> mpsc::Sender<MechScheduleOnce> {
4344
// THREAD_PRIORITY(WinAPIThreadPriority::TimeCritical as i32)
4445
// ).unwrap();
4546
// }
47+
48+
#[cfg(target_os = "windows")]
4649
if let Err(e) = set_current_thread_priority(ThreadPriority::Max) {
4750
eprintln!("设置线程优先级失败: {:?}", e);
4851
}
@@ -71,7 +74,11 @@ fn mechanism_loop(rx: mpsc::Receiver<MechScheduleOnce>, mech: MechanismImpl) {
7174
// let passed_ms = measure.passed_100ns();
7275
let end_ms = util::now_ms();
7376
// log::info!("master mech run cpu:{:?}, total:{} ms", begin_cpu.elapsed(), end_ms - begin_ms);
74-
let mech_latency = if mech.config.no_mech_latency { 0 } else { end_ms - begin_ms };
77+
let mech_latency = if mech.config.no_mech_latency {
78+
0
79+
} else {
80+
end_ms - begin_ms
81+
};
7582
res.responser
7683
.send(MechScheduleOnceRes::End {
7784
mech_run_ms: mech_latency,
@@ -85,11 +92,15 @@ fn mechanism_loop(rx: mpsc::Receiver<MechScheduleOnce>, mech: MechanismImpl) {
8592
pub mod tests {
8693
use std::sync::mpsc;
8794

88-
use crate::{ actions::ESActionWrapper, mechanism_thread::MechScheduleOnceRes, sim_env::SimEnv };
95+
use crate::{actions::ESActionWrapper, mechanism_thread::MechScheduleOnceRes, sim_env::SimEnv};
8996

9097
#[test]
9198
pub fn test_algo_latency() {
92-
use std::{ cell::RefCell, rc::Rc, sync::{ atomic::AtomicU64, Arc } };
99+
use std::{
100+
cell::RefCell,
101+
rc::Rc,
102+
sync::{atomic::AtomicU64, Arc},
103+
};
93104

94105
use crate::config::Config;
95106
let _ = env_logger::try_init();
@@ -120,24 +131,20 @@ pub mod tests {
120131
ESActionWrapper::Int(0),
121132
None,
122133
None,
123-
Some(
124-
Box::new(move |env: &SimEnv| {
125-
*begin_frame.borrow_mut() = env.current_frame();
126-
})
127-
),
128-
Some(
129-
Box::new(move |env: &SimEnv| {
130-
// calltime = env.current_frame() - begin_frame;
131-
assert!(
132-
env.current_frame() - *begin_frame2.borrow() == calltime,
133-
"begin_frame:{} current_frame:{} calltime:{}",
134-
begin_frame2.borrow(),
135-
env.current_frame(),
136-
calltime
137-
);
138-
calltime += 1;
139-
})
140-
)
134+
Some(Box::new(move |env: &SimEnv| {
135+
*begin_frame.borrow_mut() = env.current_frame();
136+
})),
137+
Some(Box::new(move |env: &SimEnv| {
138+
// calltime = env.current_frame() - begin_frame;
139+
assert!(
140+
env.current_frame() - *begin_frame2.borrow() == calltime,
141+
"begin_frame:{} current_frame:{} calltime:{}",
142+
begin_frame2.borrow(),
143+
env.current_frame(),
144+
calltime
145+
);
146+
calltime += 1;
147+
})),
141148
);
142149
}
143150
}

serverless_sim/src/request.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ impl Request {
104104
),
105105
);
106106
}
107+
// log::info!("endtime_fn: {:?}", endtime_fn);
107108
let first = endtime_fn.iter().next().unwrap().1.clone().1;
108109
let mut cur: (usize, FnId) = endtime_fn.iter().next_back().unwrap().1.clone();
109110
let mut recur_path = vec![cur.1];
@@ -112,7 +113,12 @@ impl Request {
112113
break;
113114
}
114115
// use cur fn's begin time to get prev fn's end time
115-
let prev: (usize, FnId) = endtime_fn.get(&cur.0).unwrap().clone();
116+
let prev: (usize, FnId) = endtime_fn
117+
.get(&cur.0)
118+
.unwrap_or_else(|| {
119+
panic!("can't find fn end at {}", cur.0);
120+
})
121+
.clone();
116122
recur_path.push(prev.1);
117123
if prev.1 == first {
118124
break;

serverless_sim/src/sim_env.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,8 @@ impl SimEnv {
385385
rng
386386
);
387387
}
388-
// mkdir
389-
std::fs::create_dir("cache");
388+
// mkdir, allow failure
389+
let _ = std::fs::create_dir("cache");
390390
// write to file
391391
let mut file = std::fs::File::create(cache_req_freq).unwrap();
392392
serde_json::to_writer(&mut file, &*self.help.fn_call_frequency()).unwrap();
@@ -507,7 +507,8 @@ impl SimEnv {
507507

508508
// 自增 frame
509509
let mut cur_frame = self.core.current_frame.borrow_mut();
510-
// log::info!("frame done: {}", *cur_frame);
510+
511+
log::info!("frame done: {}", *cur_frame);
511512
*cur_frame += 1;
512513
}
513514
}

serverless_sim/src/sim_events/on_task_done.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@ impl SimEnv {
2323
}
2424
pub fn on_task_done(&self, req: &mut Request, fnid: FnId) {
2525
self.check_sub_tasks_ready_sche(req, fnid);
26-
assert!(req
27-
.fn_metric
28-
.get_mut(&fnid)
29-
.unwrap()
26+
let fn_metric = req.fn_metric.get_mut(&fnid).unwrap();
27+
assert!(fn_metric
3028
.fn_done_time
3129
.replace(self.current_frame())
3230
.is_none());
31+
assert!(fn_metric.ready_sche_time.is_some());
3332
}
3433
}

serverless_sim/src/sim_events/on_task_ready_schedule.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@ use crate::{fn_dag::FnId, request::Request, sim_env::SimEnv};
22

33
impl SimEnv {
44
pub fn on_task_ready_sche(&self, req: &mut Request, fnid: FnId) {
5-
assert!(req
6-
.fn_metric
7-
.get_mut(&fnid)
8-
.unwrap()
9-
.ready_sche_time
10-
.is_none());
11-
req.fn_metric.get_mut(&fnid).unwrap().ready_sche_time = Some(self.current_frame());
5+
let fnmetric = req.fn_metric.get_mut(&fnid).unwrap();
6+
assert!(fnmetric.ready_sche_time.is_none());
7+
fnmetric.ready_sche_time = Some(self.current_frame());
8+
assert!(fnmetric.data_recv_done_time.is_none());
129
// Happend in this frame. So real ready is next frame
1310
}
1411
}

serverless_sim/src/sim_loop.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#[cfg(target_os = "windows")]
12
use thread_priority::{set_current_thread_priority, ThreadPriority};
23

34
use crate::{
@@ -62,9 +63,11 @@ impl SimEnv {
6263
mut hook_algo_end: Option<Box<dyn FnMut(&SimEnv) + 'static>>,
6364
) -> (f32, String) {
6465
// 尝试设置当前线程的优先级
66+
#[cfg(target_os = "windows")]
6567
if let Err(e) = set_current_thread_priority(ThreadPriority::Min) {
6668
eprintln!("设置线程优先级失败: {:?}", e);
6769
}
70+
6871
self.avoid_gc();
6972
let mut master_mech_resp_rx: Option<Receiver<MechScheduleOnceRes>> = None;
7073
let mut frame_when_master_mech_begin = 0;
@@ -180,6 +183,7 @@ impl SimEnv {
180183
hook_algo_begin.as_mut().map(|f| f(self));
181184
}
182185
if !self.one_frame(&mut hook_frame_begin, &mut hook_req_gen) {
186+
log::info!("simulation end");
183187
break;
184188
}
185189

0 commit comments

Comments
 (0)