Skip to content

Commit 78ac60d

Browse files
committed
feat: support run commands in parallam
1 parent 217b263 commit 78ac60d

File tree

2 files changed

+46
-39
lines changed

2 files changed

+46
-39
lines changed

agent/examples/publish_command.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ async fn main() -> anyhow::Result<()> {
1515
"ls -ls".to_string()
1616
};
1717

18-
let config_path:PathBuf = "./ctrl_config.yml".parse().unwrap();
19-
//let config_path:PathBuf = "./config.prod.yml".parse().unwrap();
18+
//let config_path:PathBuf = "./ctrl_config.yml".parse().unwrap();
19+
let config_path:PathBuf = "./config.prod.yml".parse().unwrap();
2020

2121
let is_terminal = std::io::stdout().is_terminal();
2222

agent/src/handler.rs

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,64 +3,71 @@ use tokio::sync::broadcast;
33
use tracing::{info};
44
use crate::connection::Connection;
55
use std::process::{Command, Stdio};
6+
use std::sync::Arc;
67
use crate::message::{RequestMessage, ResponseMessage};
78

89

910
type Receiver = broadcast::Receiver<(String,RequestMessage)>;
1011
pub struct Handler {
1112
cmd_rx: Receiver,
12-
connection: Connection,
13-
publish_topic: String,
13+
connection: Arc<Connection>,
14+
publish_topic: Arc<String>,
1415
}
1516

1617

1718

1819
impl Handler {
1920
pub fn new(cmd_rx: Receiver, connection: Connection, publish_topic: String) -> Self {
20-
Self { cmd_rx, connection, publish_topic}
21+
Self { cmd_rx, connection: Arc::new(connection), publish_topic: Arc::new(publish_topic)}
2122
}
2223

23-
pub async fn run(&mut self, _shutdown_rx:broadcast::Receiver<bool>) {
24-
25-
while let Ok((_, cmd)) = self.cmd_rx.recv().await {
26-
info!("begin to handle cmd: {:?}",&cmd);
27-
match cmd {
28-
RequestMessage::Cmd { command,request_id } => {
29-
let command_parsed = shellish_parse::parse(&command,false);
30-
let command_parsed = match command_parsed {
31-
Ok(result) => result,
32-
Err(e) => {
33-
let _ = self.connection.publish_response(&self.publish_topic, ResponseMessage::Err {request_id: request_id.clone(), message:format!("{}", e)}).await;
34-
continue;
35-
}
36-
};
37-
38-
let mut seq:u32 = 1;
39-
let mut command = Command::new(&command_parsed[0]);
40-
command.args(&command_parsed[1..]);
41-
42-
command.stdout(Stdio::piped()).stderr(Stdio::piped());
43-
match command.spawn() {
44-
Ok(mut child) => {
45-
let pid = child.id();
46-
if let Some(stdout) = child.stdout.take() {
47-
let reader = BufReader::new(stdout);
48-
for line in reader.lines().filter_map(|line| line.ok()) {
49-
//TODO: handle publish error
50-
let _ = self.connection.publish_response(
51-
&self.publish_topic, ResponseMessage::Ok {request_id: request_id.clone(),data:line, seq: seq, pid: pid}).await;
52-
seq +=1;
53-
}
24+
async fn run_command(connection:Arc<Connection>, cmd:RequestMessage, publish_topic:Arc<String>) {
25+
match cmd {
26+
RequestMessage::Cmd { command,request_id } => {
27+
let command_parsed = shellish_parse::parse(&command,false);
28+
let command_parsed = match command_parsed {
29+
Ok(result) => result,
30+
Err(e) => {
31+
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {request_id: request_id.clone(), message:format!("{}", e)}).await;
32+
return;
33+
}
34+
};
35+
36+
let mut seq:u32 = 1;
37+
let mut command = Command::new(&command_parsed[0]);
38+
command.args(&command_parsed[1..]);
39+
40+
command.stdout(Stdio::piped()).stderr(Stdio::piped());
41+
match command.spawn() {
42+
Ok(mut child) => {
43+
let pid = child.id();
44+
if let Some(stdout) = child.stdout.take() {
45+
let reader = BufReader::new(stdout);
46+
for line in reader.lines().filter_map(|line| line.ok()) {
47+
//TODO: handle publish error
48+
let _ = connection.publish_response(
49+
&publish_topic, ResponseMessage::Ok {request_id: request_id.clone(),data:line, seq: seq, pid: pid}).await;
50+
seq +=1;
5451
}
5552
}
56-
Err(e) => {
57-
let _ = self.connection.publish_response(&self.publish_topic, ResponseMessage::Err {request_id: request_id.clone(), message:format!("{}", e)}).await;
58-
}
53+
}
54+
Err(e) => {
55+
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {request_id: request_id.clone(), message:format!("{}", e)}).await;
5956
}
6057
}
6158
}
6259
}
6360
}
61+
pub async fn run(&mut self, _shutdown_rx:broadcast::Receiver<bool>) {
62+
while let Ok((_, cmd)) = self.cmd_rx.recv().await {
63+
info!("begin to handle cmd: {:?}",&cmd);
64+
let connection = self.connection.clone();
65+
let publish_topic = self.publish_topic.clone();
66+
tokio::spawn(async move {
67+
Self::run_command(connection, cmd, publish_topic).await;
68+
});
69+
}
70+
}
6471
}
6572

6673
#[cfg(test)]

0 commit comments

Comments
 (0)