Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion codex-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ seccompiler = "0.5.0"
sentry = "0.34.0"
serde = "1"
serde_json = "1"
serde_yaml = "0.9"
serde_with = "3.16"
serde_yaml = "0.9"
serial_test = "3.2.0"
sha1 = "0.10.6"
sha2 = "0.10"
Expand Down Expand Up @@ -288,6 +288,7 @@ opt-level = 0
# Uncomment to debug local changes.
# ratatui = { path = "../../ratatui" }
crossterm = { git = "https://github.com/nornagon/crossterm", branch = "nornagon/color-query" }
portable-pty = { git = "https://github.com/pakrym/wezterm", branch = "PSUEDOCONSOLE_INHERIT_CURSOR" }
ratatui = { git = "https://github.com/nornagon/ratatui", branch = "nornagon-v0.29.0-patch" }

# Uncomment to debug local changes.
Expand Down
10 changes: 10 additions & 0 deletions codex-rs/core/tests/common/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,13 @@ macro_rules! skip_if_no_network {
}
}};
}

#[macro_export]
macro_rules! skip_if {
($cond:expr, $return_value:expr $(,)?) => {{
if $cond {
eprintln!("Skipping test because condition was true.");
return $return_value;
}
}};
}
94 changes: 93 additions & 1 deletion codex-rs/core/tests/suite/unified_exec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![cfg(not(target_os = "windows"))]
use std::collections::HashMap;
use std::ffi::OsStr;
use std::fs;
Expand All @@ -22,6 +21,7 @@ use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if;
use core_test_support::skip_if_no_network;
use core_test_support::skip_if_sandbox;
use core_test_support::test_codex::TestCodex;
Expand Down Expand Up @@ -155,6 +155,7 @@ fn collect_tool_outputs(bodies: &[Value]) -> Result<HashMap<String, ParsedUnifie
async fn unified_exec_intercepts_apply_patch_exec_command() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let builder = test_codex().with_config(|config| {
config.include_apply_patch_tool = true;
Expand Down Expand Up @@ -279,6 +280,7 @@ async fn unified_exec_intercepts_apply_patch_exec_command() -> Result<()> {
async fn unified_exec_emits_exec_command_begin_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -350,6 +352,7 @@ async fn unified_exec_emits_exec_command_begin_event() -> Result<()> {
async fn unified_exec_resolves_relative_workdir() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -427,6 +430,7 @@ async fn unified_exec_resolves_relative_workdir() -> Result<()> {
async fn unified_exec_respects_workdir_override() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -505,6 +509,7 @@ async fn unified_exec_respects_workdir_override() -> Result<()> {
async fn unified_exec_emits_exec_command_end_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -591,6 +596,7 @@ async fn unified_exec_emits_exec_command_end_event() -> Result<()> {
async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -662,6 +668,7 @@ async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> {
async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -761,6 +768,7 @@ async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -857,6 +865,7 @@ async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> {
async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -978,6 +987,7 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()>
async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1085,6 +1095,7 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1177,6 +1188,7 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1338,6 +1350,7 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1442,6 +1455,7 @@ async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()
async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1553,6 +1567,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
async fn unified_exec_streams_after_lagged_output() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1684,6 +1699,7 @@ PY
async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1790,6 +1806,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
async fn unified_exec_formats_large_output_summary() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1875,6 +1892,7 @@ PY
async fn unified_exec_runs_under_sandbox() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1943,11 +1961,85 @@ async fn unified_exec_runs_under_sandbox() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_runs_on_all_platforms() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why skipping windows on this one?


let server = start_mock_server().await;

let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;

let call_id = "uexec";
let args = serde_json::json!({
"cmd": "echo 'hello'",
"yield_time_ms": 500,
});

let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;

let session_model = session_configured.model.clone();

codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "summarize large output".into(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
})
.await?;

wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;

let requests = server.received_requests().await.expect("recorded requests");
assert!(!requests.is_empty(), "expected at least one POST request");

let bodies = requests
.iter()
.map(|req| req.body_json::<Value>().expect("request json"))
.collect::<Vec<_>>();

let outputs = collect_tool_outputs(&bodies)?;
let output = outputs.get(call_id).expect("missing output");

// TODO: Weaker match because windows produces control characters
assert_regex_match(".*?hello[\r\n]+", &output.output);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if!(cfg!(target_os = "windows"), Ok(()));

let server = start_mock_server().await;

Expand Down
2 changes: 1 addition & 1 deletion codex-rs/utils/pty/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
portable-pty = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time"] }
17 changes: 17 additions & 0 deletions codex-rs/utils/pty/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::fmt;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::path::Path;
Expand All @@ -9,13 +10,17 @@ use std::time::Duration;
use anyhow::Result;
use portable_pty::native_pty_system;
use portable_pty::CommandBuilder;
use portable_pty::PtyPair;
use portable_pty::PtySize;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex as TokioMutex;
use tokio::task::JoinHandle;

#[allow(dead_code)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To suppress the unused field error.

struct PtyPairWrapper(PtyPair);

#[derive(Debug)]
pub struct ExecCommandSession {
writer_tx: mpsc::Sender<Vec<u8>>,
Expand All @@ -26,6 +31,15 @@ pub struct ExecCommandSession {
wait_handle: StdMutex<Option<JoinHandle<()>>>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
// PtyPair must be preserved because the process will receive Control+C if the
// slave is closed
_pair: StdMutex<PtyPairWrapper>,
}

impl fmt::Debug for PtyPairWrapper {
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
}
}

impl ExecCommandSession {
Expand All @@ -39,6 +53,7 @@ impl ExecCommandSession {
wait_handle: JoinHandle<()>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
pair: PtyPair,
) -> (Self, broadcast::Receiver<Vec<u8>>) {
let initial_output_rx = output_tx.subscribe();
(
Expand All @@ -51,6 +66,7 @@ impl ExecCommandSession {
wait_handle: StdMutex::new(Some(wait_handle)),
exit_status,
exit_code,
_pair: StdMutex::new(PtyPairWrapper(pair)),
},
initial_output_rx,
)
Expand Down Expand Up @@ -201,6 +217,7 @@ pub async fn spawn_pty_process(
wait_handle,
exit_status,
exit_code,
pair,
);

Ok(SpawnedPty {
Expand Down
Loading