Skip to content

Commit f8a678a

Browse files
committed
refactor
1 parent 09999fe commit f8a678a

File tree

4 files changed

+74
-69
lines changed

4 files changed

+74
-69
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pm2-web-rust"
3-
version = "0.1.1"
3+
version = "0.1.2"
44
edition = "2021"
55

66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

src/main.rs

Lines changed: 70 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async fn logs_handler(req: HttpRequest, stream: web::Payload) -> Result<HttpResp
7777
if session.text(x).await.is_err() {
7878
removed_clients_ch_s.send(uuid).await;
7979
session.close(None).await;
80-
return
80+
break;
8181
}
8282
}
8383
},
@@ -86,7 +86,7 @@ async fn logs_handler(req: HttpRequest, stream: web::Payload) -> Result<HttpResp
8686
if session.text(x).await.is_err() {
8787
removed_clients_ch_s.send(uuid).await;
8888
session.close(None).await;
89-
return;
89+
break;
9090
}
9191
}
9292
}
@@ -113,87 +113,92 @@ async fn main() -> Result<(), std::io::Error> {
113113

114114
let (j1, j2) = pm2::PM2::start(stats_ch_s, logs_ch_s, Duration::from_secs(3));
115115

116-
tokio::task::spawn(async move { if j1.await.is_err() { process::exit(1); } });
117-
tokio::task::spawn(async move { if j2.await.is_err() { process::exit(1); } });
118-
119-
tokio::task::spawn(async move {
120-
if tokio::task::spawn(async move {
121-
let mut clients: HashMap<Uuid, Client> = HashMap::new();
122-
let mut stats = String::new();
123-
let mut logs: VecDeque<String> = VecDeque::with_capacity(10);
124-
loop {
125-
tokio::select! {
126-
data = stats_ch_r.recv() => {
127-
if let Some(x) = data {
128-
stats = x;
129-
for client in clients.values() {
130-
tokio::select! {
131-
_ = client.stats_ch.send(stats.clone()) => {},
132-
else => ()
133-
}
116+
let j3 = tokio::task::spawn(async move {
117+
let mut clients: HashMap<Uuid, Client> = HashMap::new();
118+
let mut stats = String::new();
119+
let mut logs: VecDeque<String> = VecDeque::with_capacity(10);
120+
loop {
121+
tokio::select! {
122+
data = stats_ch_r.recv() => {
123+
if let Some(x) = data {
124+
stats = x;
125+
for client in clients.values() {
126+
tokio::select! {
127+
_ = client.stats_ch.send(stats.clone()) => {},
128+
else => ()
134129
}
135130
}
136-
},
137-
data = logs_ch_r.recv() => {
138-
while logs.len() >= 200 {
139-
logs.pop_front();
140-
}
141-
if let Some(data) = data {
142-
let data_clone = data.clone();
143-
logs.push_back(data);
144-
for client in clients.values() {
145-
tokio::select! {
146-
_ = client.logs_ch.send(data_clone.clone()) => (),
147-
else => ()
148-
}
131+
}
132+
},
133+
data = logs_ch_r.recv() => {
134+
while logs.len() >= 200 {
135+
logs.pop_front();
136+
}
137+
if let Some(data) = data {
138+
let data_clone = data.clone();
139+
logs.push_back(data);
140+
for client in clients.values() {
141+
tokio::select! {
142+
_ = client.logs_ch.send(data_clone.clone()) => (),
143+
else => ()
149144
}
150145
}
151146
}
152-
client = clients_ch_r.recv() => {
153-
if let Some(client) = client {
154-
println!("client connected: {}", client.uuid);
155-
156-
if !stats.is_empty() {
157-
tokio::select! {
158-
_ = client.stats_ch.send(stats.clone()) => (),
159-
else => ()
160-
}
147+
}
148+
client = clients_ch_r.recv() => {
149+
if let Some(client) = client {
150+
println!("client connected: {}", client.uuid);
151+
152+
if !stats.is_empty() {
153+
tokio::select! {
154+
_ = client.stats_ch.send(stats.clone()) => (),
155+
else => ()
161156
}
162-
163-
for log in logs.iter().cloned() {
164-
tokio::select! {
165-
_ = client.logs_ch.send(log) => (),
166-
else => ()
167-
}
157+
}
158+
159+
for log in logs.iter().cloned() {
160+
tokio::select! {
161+
_ = client.logs_ch.send(log) => (),
162+
else => ()
168163
}
169-
170-
clients.insert(client.uuid, client);
171-
println!("num connected clients: {}", clients.len());
172164
}
165+
166+
clients.insert(client.uuid, client);
167+
println!("num connected clients: {}", clients.len());
173168
}
174-
uuid = removed_clients_ch_r.recv() => {
175-
if let Some(uuid) = uuid {
176-
println!("client disconnected: {}", uuid);
177-
clients.remove(&uuid);
178-
println!("num connected clients: {}", clients.len());
179-
}
180-
},
181169
}
170+
uuid = removed_clients_ch_r.recv() => {
171+
if let Some(uuid) = uuid {
172+
println!("client disconnected: {}", uuid);
173+
clients.remove(&uuid);
174+
println!("num connected clients: {}", clients.len());
175+
}
176+
},
182177
}
183-
}).await.is_err() { process::exit(0); }
178+
}
184179
});
185180

186-
HttpServer::new(move || {
181+
let result = HttpServer::new(move || {
187182
App::new()
188183
// .wrap(Logger::default())
189184
.app_data(clients_ch_s.clone())
190185
.app_data(removed_clients_ch_s.clone())
191186
.route("/script.js", web::get().to(js_handler))
192187
.service(web::resource("/logs").route(web::get().to(logs_handler)))
193188
.service(Files::new("/", "./static/").index_file("index.html"))
194-
})
195-
.bind(("0.0.0.0", 6060))?
196-
.workers(2)
197-
.run()
198-
.await
189+
}).workers(2).bind(("0.0.0.0", 6060));
190+
191+
let j4 = tokio::task::spawn(match result {
192+
Ok(x) => x,
193+
Err(err) => {
194+
println!("{}", err);
195+
process::exit(1);
196+
}
197+
}.run());
198+
199+
if tokio::try_join!(j1, j2, j3, j4).is_err() {
200+
process::exit(1);
201+
}
202+
203+
Ok(())
199204
}

src/pm2.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl PM2 {
143143
tokio::time::sleep(interval).await;
144144
}
145145
}),
146-
tokio::task::spawn_blocking(move || loop {
146+
tokio::task::spawn_blocking(move || {
147147
if let Ok(child) = Command::new("pm2")
148148
.arg("logs")
149149
.arg("--format")
@@ -228,7 +228,7 @@ impl PM2 {
228228
}
229229
}
230230
}
231-
std::thread::sleep(interval);
231+
panic!("error while reading logs")
232232
}),
233233
);
234234
}

0 commit comments

Comments
 (0)