Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion codex-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ seccompiler = "0.5.0"
sentry = "0.34.0"
serde = "1"
serde_json = "1"
serde_yaml = "0.9"
serde_with = "3.16"
serde_yaml = "0.9"
serial_test = "3.2.0"
sha1 = "0.10.6"
sha2 = "0.10"
Expand Down Expand Up @@ -288,6 +288,7 @@ opt-level = 0
# Uncomment to debug local changes.
# ratatui = { path = "../../ratatui" }
crossterm = { git = "https://github.com/nornagon/crossterm", branch = "nornagon/color-query" }
portable-pty = { git = "https://github.com/pakrym/wezterm", branch = "PSUEDOCONSOLE_INHERIT_CURSOR" }
ratatui = { git = "https://github.com/nornagon/ratatui", branch = "nornagon-v0.29.0-patch" }

# Uncomment to debug local changes.
Expand Down
4 changes: 1 addition & 3 deletions codex-rs/core/tests/common/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,7 @@ macro_rules! skip_if_no_network {
macro_rules! skip_if_windows {
($return_value:expr $(,)?) => {{
if cfg!(target_os = "windows") {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
println!("Skipping test because it cannot execute on Windows.");
return $return_value;
}
}};
Expand Down
92 changes: 91 additions & 1 deletion codex-rs/core/tests/suite/unified_exec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![cfg(not(target_os = "windows"))]
use std::collections::HashMap;
use std::ffi::OsStr;
use std::fs;
Expand All @@ -24,6 +23,7 @@ use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::skip_if_sandbox;
use core_test_support::skip_if_windows;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::TestCodexHarness;
use core_test_support::test_codex::test_codex;
Expand Down Expand Up @@ -155,6 +155,7 @@ fn collect_tool_outputs(bodies: &[Value]) -> Result<HashMap<String, ParsedUnifie
async fn unified_exec_intercepts_apply_patch_exec_command() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let builder = test_codex().with_config(|config| {
config.include_apply_patch_tool = true;
Expand Down Expand Up @@ -279,6 +280,7 @@ async fn unified_exec_intercepts_apply_patch_exec_command() -> Result<()> {
async fn unified_exec_emits_exec_command_begin_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -350,6 +352,7 @@ async fn unified_exec_emits_exec_command_begin_event() -> Result<()> {
async fn unified_exec_resolves_relative_workdir() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -427,6 +430,7 @@ async fn unified_exec_resolves_relative_workdir() -> Result<()> {
async fn unified_exec_respects_workdir_override() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -505,6 +509,7 @@ async fn unified_exec_respects_workdir_override() -> Result<()> {
async fn unified_exec_emits_exec_command_end_event() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -591,6 +596,7 @@ async fn unified_exec_emits_exec_command_end_event() -> Result<()> {
async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -662,6 +668,7 @@ async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> {
async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -761,6 +768,7 @@ async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -857,6 +865,7 @@ async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> {
async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -978,6 +987,7 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()>
async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1085,6 +1095,7 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1177,6 +1188,7 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1338,6 +1350,7 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1442,6 +1455,7 @@ async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()
async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1553,6 +1567,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
async fn unified_exec_streams_after_lagged_output() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1684,6 +1699,7 @@ PY
async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1790,6 +1806,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
async fn unified_exec_formats_large_output_summary() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -1875,6 +1892,7 @@ PY
async fn unified_exec_runs_under_sandbox() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down Expand Up @@ -2067,11 +2085,83 @@ async fn unified_exec_python_prompt_under_seatbelt() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_runs_on_all_platforms() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));

let server = start_mock_server().await;

let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;

let call_id = "uexec";
let args = serde_json::json!({
"cmd": "echo 'hello crossplat'",
});

let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;

let session_model = session_configured.model.clone();

codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "summarize large output".into(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
})
.await?;

wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;

let requests = server.received_requests().await.expect("recorded requests");
assert!(!requests.is_empty(), "expected at least one POST request");

let bodies = requests
.iter()
.map(|req| req.body_json::<Value>().expect("request json"))
.collect::<Vec<_>>();

let outputs = collect_tool_outputs(&bodies)?;
let output = outputs.get(call_id).expect("missing output");

// TODO: Weaker match because windows produces control characters
assert_regex_match(".*hello crossplat.*", &output.output);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

Expand Down
2 changes: 1 addition & 1 deletion codex-rs/utils/pty/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
portable-pty = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time"] }
30 changes: 30 additions & 0 deletions codex-rs/utils/pty/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::fmt;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::path::Path;
Expand All @@ -9,13 +10,20 @@ use std::time::Duration;
use anyhow::Result;
use portable_pty::native_pty_system;
use portable_pty::CommandBuilder;
use portable_pty::MasterPty;
use portable_pty::PtySize;
use portable_pty::SlavePty;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex as TokioMutex;
use tokio::task::JoinHandle;

pub struct PtyPairWrapper {
pub _slave: Option<Box<dyn SlavePty + Send>>,
pub _master: Box<dyn MasterPty + Send>,
}

#[derive(Debug)]
pub struct ExecCommandSession {
writer_tx: mpsc::Sender<Vec<u8>>,
Expand All @@ -26,6 +34,15 @@ pub struct ExecCommandSession {
wait_handle: StdMutex<Option<JoinHandle<()>>>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
// PtyPair must be preserved because the process will receive Control+C if the
// slave is closed
_pair: StdMutex<PtyPairWrapper>,
}

impl fmt::Debug for PtyPairWrapper {
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
}
}

impl ExecCommandSession {
Expand All @@ -39,6 +56,7 @@ impl ExecCommandSession {
wait_handle: JoinHandle<()>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
pair: PtyPairWrapper,
) -> (Self, broadcast::Receiver<Vec<u8>>) {
let initial_output_rx = output_tx.subscribe();
(
Expand All @@ -51,6 +69,7 @@ impl ExecCommandSession {
wait_handle: StdMutex::new(Some(wait_handle)),
exit_status,
exit_code,
_pair: StdMutex::new(pair),
},
initial_output_rx,
)
Expand Down Expand Up @@ -192,6 +211,16 @@ pub async fn spawn_pty_process(
let _ = exit_tx.send(code);
});

let pair = PtyPairWrapper {
_slave: if cfg!(windows) {
// Keep the slave handle alive on Windows to prevent the process from receiving Control+C
Some(pair.slave)
} else {
None
},
_master: pair.master,
};

let (session, output_rx) = ExecCommandSession::new(
writer_tx,
output_tx,
Expand All @@ -201,6 +230,7 @@ pub async fn spawn_pty_process(
wait_handle,
exit_status,
exit_code,
pair,
);

Ok(SpawnedPty {
Expand Down
Loading