diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index ee800676a..8078228ef 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -216,18 +216,40 @@ enum HostMeshAllocation { } impl HostMesh { - /// Fork a new `HostMesh` from this process, returning the new `HostMesh` - /// to the parent (owning) process, while running forever in child processes - /// (i.e., individual procs). + /// Bring up a local single-host mesh and, in the launcher + /// process, return a `HostMesh` handle for it. /// - /// All of the code preceding the call to `local` will run in each child proc; - /// thus it is important to call `local` early in the lifetime of the program, - /// and to ensure that it is reached unconditionally. + /// There are two execution modes: /// - /// This is intended for testing, development, examples. + /// - bootstrap-child mode: if `Bootstrap::get_from_env()` says + /// this process was launched as a bootstrap child, we call + /// `boot.bootstrap().await`, which hands control to the + /// bootstrap logic for this process (as defined by the + /// `BootstrapCommand` the parent used to spawn it). if that + /// call returns, we log the error and terminate. this branch + /// does not produce a `HostMesh`. + /// + /// - launcher mode: otherwise, we are the process that is setting + /// up the mesh. we create a `Host`, spawn a `HostMeshAgent` in + /// it, and build a single-host `HostMesh` around that. that + /// `HostMesh` is returned to the caller. + /// + /// This API is intended for tests, examples, and local bring-up, + /// not production. /// /// TODO: fix up ownership pub async fn local() -> v1::Result { + Self::local_with_bootstrap(BootstrapCommand::current()?).await + } + + /// Same as [`local`], but the caller supplies the + /// `BootstrapCommand` instead of deriving it from the current + /// process. + /// + /// The provided `bootstrap_cmd` is used when spawning bootstrap + /// children and determines the behavior of + /// `boot.bootstrap().await` in those children. + pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> v1::Result { if let Ok(Some(boot)) = Bootstrap::get_from_env() { let err = boot.bootstrap().await; tracing::error!("failed to bootstrap local host mesh process: {}", err); @@ -236,7 +258,7 @@ impl HostMesh { let addr = config::global::get_cloned(DEFAULT_TRANSPORT).any(); - let manager = BootstrapProcManager::new(BootstrapCommand::current()?)?; + let manager = BootstrapProcManager::new(bootstrap_cmd)?; let (host, _handle) = Host::serve(manager, addr).await?; let addr = host.addr().clone(); let host_mesh_agent = host diff --git a/monarch_hyperactor/Cargo.toml b/monarch_hyperactor/Cargo.toml index 928cb8726..bb7afedf2 100644 --- a/monarch_hyperactor/Cargo.toml +++ b/monarch_hyperactor/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,process_allocator-oss,test_monarch_hyperactor] +# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,monarch_hyperactor_test_bootstrap,process_allocator-oss,test_monarch_hyperactor] [package] name = "monarch_hyperactor" @@ -7,6 +7,11 @@ authors = ["Meta"] edition = "2021" license = "BSD-3-Clause" +[[bin]] +name = "monarch_hyperactor_test_bootstrap" +path = "test/bootstrap.rs" +edition = "2024" + [[bin]] name = "process_allocator" edition = "2024" @@ -52,6 +57,7 @@ tokio-util = { version = "0.7.15", features = ["full"] } tracing = { version = "0.1.41", features = ["attributes", "valuable"] } [dev-dependencies] +buck-resources = "1" dir-diff = "0.3" [features] diff --git a/monarch_hyperactor/src/lib.rs b/monarch_hyperactor/src/lib.rs index 658982c64..14e4fa445 100644 --- a/monarch_hyperactor/src/lib.rs +++ b/monarch_hyperactor/src/lib.rs @@ -32,6 +32,7 @@ pub mod selection; pub mod shape; pub mod supervision; pub mod telemetry; +mod testresource; pub mod v1; pub mod value_mesh; diff --git a/monarch_hyperactor/src/pytokio.rs b/monarch_hyperactor/src/pytokio.rs index b0fd6daef..444a98ebe 100644 --- a/monarch_hyperactor/src/pytokio.rs +++ b/monarch_hyperactor/src/pytokio.rs @@ -50,6 +50,8 @@ use hyperactor::config::CONFIG; use hyperactor::config::ConfigAttr; use monarch_types::SerializablePyErr; use pyo3::IntoPyObjectExt; +#[cfg(test)] +use pyo3::PyClass; use pyo3::exceptions::PyRuntimeError; use pyo3::exceptions::PyStopIteration; use pyo3::exceptions::PyTimeoutError; @@ -547,3 +549,52 @@ pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResul Ok(()) } + +/// Ensure the embedded Python interpreter is initialized exactly +/// once. +/// +/// Safe to call from multiple threads, multiple times. +#[cfg(test)] +pub(crate) fn ensure_python() { + static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + INIT.get_or_init(|| { + pyo3::prepare_freethreaded_python(); + }); +} + +#[cfg(test)] +// Helper: let us "await" a `PyPythonTask` in Rust. +// +// Semantics: +// - consume the `PyPythonTask`, +// - take the inner future, +// - `.await` it on tokio to get `Py`, +// - turn that into `Py`. +pub(crate) trait AwaitPyExt { + async fn await_py(self) -> Result, PyErr>; +} + +#[cfg(test)] +impl AwaitPyExt for PyPythonTask { + async fn await_py(mut self) -> Result, PyErr> { + // Take ownership of the inner future. + let fut = self + .take_task() + .expect("PyPythonTask already consumed in await_py"); + + // Await a Result, PyErr>. + let py_any: Py = fut.await?; + + // Convert Py -> Py. + Python::with_gil(|py| { + let bound_any = py_any.bind(py); + + // Try extract a Py. + let obj: Py = bound_any + .extract::>() + .expect("spawn() did not return expected Python type"); + + Ok(obj) + }) + } +} diff --git a/monarch_hyperactor/src/testresource.rs b/monarch_hyperactor/src/testresource.rs new file mode 100644 index 000000000..20aef8156 --- /dev/null +++ b/monarch_hyperactor/src/testresource.rs @@ -0,0 +1,49 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#![cfg(test)] + +use std::path::PathBuf; + +/// Fetch the named (BUCK) named resource, heuristically falling back on +/// the cargo-built path when possible. Beware! This is not actually a +/// true cargo dependency, so the binaries have to be built independently. +/// +/// We should convert these tests to integration tests, so that cargo can +/// also manage the binaries. +pub fn get(name: S) -> PathBuf +where + S: AsRef, +{ + let name = name.as_ref().to_owned(); + // TODO: actually check if we're running in Buck context or not. + if let Ok(path) = buck_resources::get(name.clone()) { + return path; + } + + assert!( + name.starts_with("monarch/monarch_hyperactor/"), + "invalid resource {}: must start with \"monarch/monarch_hyperactor/\"", + name + ); + + let path: PathBuf = name + .replace( + "monarch/monarch_hyperactor/", + "../target/debug/monarch_hyperactor_test_", + ) + .into(); + + assert!( + path.exists(), + "no cargo-built resource at {}", + path.display() + ); + + path +} diff --git a/monarch_hyperactor/src/v1/logging.rs b/monarch_hyperactor/src/v1/logging.rs index c6c39269d..b2ce75145 100644 --- a/monarch_hyperactor/src/v1/logging.rs +++ b/monarch_hyperactor/src/v1/logging.rs @@ -37,10 +37,20 @@ use crate::v1::proc_mesh::PyProcMesh; module = "monarch._rust_bindings.monarch_hyperactor.v1.logging" )] pub struct LoggingMeshClient { - // handles remote process log forwarding; no python runtime - forwarder_mesh: ActorMesh, - // handles python logger; has python runtime + // Per-proc LogForwardActor mesh (optional). When enabled, each + // remote proc forwards its stdout/stderr back to the client. This + // actor does not interact with the embedded Python runtime. + forwarder_mesh: Option>, + + // Per-proc LoggerRuntimeActor mesh. Runs on every proc in the + // mesh and drives that proc's Python logging configuration (log + // level, handlers, etc.). If the proc isn't running embedded + // Python, this is effectively a no-op. logger_mesh: ActorMesh, + + // Client-side LogClientActor. Lives in the client process; + // receives forwarded output, aggregates and buffers it, and + // coordinates sync flush barriers. client_actor: ActorHandle, } @@ -74,11 +84,40 @@ impl LoggingMeshClient { #[pymethods] impl LoggingMeshClient { + /// Initialize logging for a `ProcMesh` and return a + /// `LoggingMeshClient`. + /// + /// This wires up three pieces of logging infrastructure: + /// + /// 1. A single `LogClientActor` in the *client* process. This + /// actor receives forwarded stdout/stderr, buffers and + /// aggregates it, and coordinates sync flush barriers. + /// + /// 2. (Optional) A `LogForwardActor` on every remote proc in the + /// mesh. These forwarders read that proc's stdout/stderr and + /// stream it back to the client. We only spawn this mesh if + /// `MESH_ENABLE_LOG_FORWARDING` was `true` in the config. If + /// forwarding is disabled at startup, we do not spawn these + /// actors and `forwarder_mesh` will be `None`. + /// + /// 3. A `LoggerRuntimeActor` on every remote proc in the mesh. + /// This actor controls the Python logging runtime (log level, + /// handlers, etc.) in that process. This is always spawned, + /// even if log forwarding is disabled. + /// + /// The returned `LoggingMeshClient` holds handles to those + /// actors. Later, `set_mode(...)` can adjust per-proc log level + /// and (if forwarding was enabled) toggle whether remote output + /// is actually streamed back to the client. If forwarding was + /// disabled by config, requests to enable streaming will fail. #[staticmethod] fn spawn(instance: &PyInstance, proc_mesh: &PyProcMesh) -> PyResult { let proc_mesh = proc_mesh.mesh_ref()?; let instance = instance.clone(); + PyPythonTask::new(async move { + // 1. Spawn the client-side coordinator actor (lives in + // the caller's process). let client_actor: ActorHandle = instance_dispatch!(instance, async move |cx_instance| { cx_instance @@ -87,12 +126,30 @@ impl LoggingMeshClient { .await })?; let client_actor_ref = client_actor.bind(); - let forwarder_mesh = instance_dispatch!(instance, async |cx_instance| { - proc_mesh - .spawn(cx_instance, "log_forwarder", &client_actor_ref) - .await - }) - .map_err(anyhow::Error::from)?; + + // Read config to decide if we stand up per-proc + // stdout/stderr forwarding. + let forwarding_enabled = hyperactor::config::global::get( + hyperactor_mesh::bootstrap::MESH_ENABLE_LOG_FORWARDING, + ); + + // 2. Optionally spawn per-proc `LogForwardActor` mesh + // (stdout/stderr forwarders). + let forwarder_mesh = if forwarding_enabled { + // Spawn a `LogFwdActor` on every proc. + let mesh = instance_dispatch!(instance, async |cx_instance| { + proc_mesh + .spawn(cx_instance, "log_forwarder", &client_actor_ref) + .await + }) + .map_err(anyhow::Error::from)?; + + Some(mesh) + } else { + None + }; + + // 3. Always spawn a `LoggerRuntimeActor` on every proc. let logger_mesh = instance_dispatch!(instance, async |cx_instance| { proc_mesh.spawn(cx_instance, "logger", &()).await }) @@ -106,6 +163,34 @@ impl LoggingMeshClient { }) } + /// Update logging behavior for this mesh. + /// + /// `stream_to_client` controls whether remote procs actively + /// stream their stdout/stderr back to the client process. + /// + /// - If log forwarding was enabled at startup, `forwarder_mesh` + /// is `Some` and we propagate this flag to every per-proc + /// `LogForwardActor`. + /// - If log forwarding was disabled at startup, `forwarder_mesh` + /// is `None`. + /// In that case: + /// * requesting `stream_to_client = false` is a no-op + /// (accepted), + /// * requesting `stream_to_client = true` is rejected, + /// because we did not spawn forwarders and we don't + /// dynamically create them later. + /// + /// `aggregate_window_sec` configures how the client-side + /// `LogClientActor` batches forwarded output. It is only + /// meaningful when streaming is enabled. Calling this with + /// `Some(..)` while `stream_to_client == false` is invalid and + /// returns an error. + /// + /// `level` is the desired Python logging level. We always + /// broadcast this to the per-proc `LoggerRuntimeActor` mesh so + /// each remote process can update its own Python logger + /// configuration, regardless of whether stdout/stderr forwarding + /// is active. fn set_mode( &self, instance: &PyInstance, @@ -113,24 +198,52 @@ impl LoggingMeshClient { aggregate_window_sec: Option, level: u8, ) -> PyResult<()> { + // We can't ask for an aggregation window if we're not + // streaming. if aggregate_window_sec.is_some() && !stream_to_client { return Err(PyErr::new::( "cannot set aggregate window without streaming to client".to_string(), )); } - instance_dispatch!(instance, |cx_instance| { - self.forwarder_mesh - .cast(cx_instance, LogForwardMessage::SetMode { stream_to_client }) - }) - .map_err(|e| PyErr::new::(e.to_string()))?; + // Handle the forwarder side (stdout/stderr streaming back to + // client). + match (&self.forwarder_mesh, stream_to_client) { + // Forwarders exits (config enabled at startup). We can + // toggle live. + (Some(fwd_mesh), _) => { + instance_dispatch!(instance, |cx_instance| { + fwd_mesh.cast(cx_instance, LogForwardMessage::SetMode { stream_to_client }) + }) + .map_err(|e| PyErr::new::(e.to_string()))?; + } + + // Forwarders were never spawned (global forwarding + // disabled) and the caller is asking NOT to stream. + // That's effectively a no-op so we silently accept. + (None, false) => { + // Nothing to do. + } + + // Forwarders were never spawned, but caller is asking to + // stream. We can't satisfy this request without + // re-spawning infra, which we deliberately don't do at + // runtime. + (None, true) => { + return Err(PyErr::new::( + "log forwarding disabled by config at startup; cannot enable streaming_to_client", + )); + } + } + // Always update the per-proc Python logging level. instance_dispatch!(instance, |cx_instance| { self.logger_mesh .cast(cx_instance, LoggerRuntimeMessage::SetLogging { level }) }) .map_err(|e| PyErr::new::(e.to_string()))?; + // Always update the client actor's aggregation window. self.client_actor .send(LogClientMessage::SetAggregate { aggregate_window_sec, @@ -140,13 +253,28 @@ impl LoggingMeshClient { Ok(()) } - // A sync flush mechanism for the client make sure all the stdout/stderr are streamed back and flushed. + /// Force a sync flush of remote stdout/stderr back to the client, + /// and wait for completion. + /// + /// If log forwarding was disabled at startup (so we never spawned + /// any `LogForwardActor`s), this becomes a no-op success: there's + /// nothing to flush from remote procs in that mode, and we don't + /// try to manufacture it dynamically. fn flush(&self, instance: &PyInstance) -> PyResult { - let forwarder_mesh = self.forwarder_mesh.deref().clone(); + let forwarder_mesh_opt = self + .forwarder_mesh + .as_ref() + .map(|mesh| mesh.deref().clone()); let client_actor = self.client_actor.clone(); let instance = instance.clone(); PyPythonTask::new(async move { + // If there's no forwarer mesh (forwarding disabled by + // config), we just succeed immediately. + let Some(forwarder_mesh) = forwarder_mesh_opt else { + return Ok(()); + }; + instance_dispatch!(instance, async move |cx_instance| { Self::flush_internal(cx_instance, client_actor, forwarder_mesh).await }) @@ -171,3 +299,127 @@ pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> { module.add_class::()?; Ok(()) } + +#[cfg(test)] +mod tests { + use anyhow::Result; + use hyperactor::Instance; + use hyperactor::channel::ChannelTransport; + use hyperactor::proc::Proc; + use hyperactor_mesh::v1::ProcMesh; + use hyperactor_mesh::v1::host_mesh::HostMesh; + use ndslice::Extent; + use ndslice::View; // .region(), .num_ranks() etc. + + use super::*; + use crate::pytokio::AwaitPyExt; + use crate::pytokio::ensure_python; + + /// Bring up a minimal "world" suitable for integration-style + /// tests. + pub async fn test_world() -> Result<(Proc, Instance<()>, HostMesh, ProcMesh)> { + ensure_python(); + + let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string()) + .await + .expect("failed to start root Proc"); + + let (instance, _handle) = proc + .instance("client") + .expect("failed to create proc Instance"); + + let host_mesh = HostMesh::local_with_bootstrap( + crate::testresource::get("monarch/monarch_hyperactor/bootstrap").into(), + ) + .await + .expect("failed to bootstrap HostMesh"); + + let proc_mesh = host_mesh + .spawn(&instance, "p0", Extent::unity()) + .await + .expect("failed to spawn ProcMesh"); + + Ok((proc, instance, host_mesh, proc_mesh)) + } + + #[tokio::test] + async fn test_world_smoke() { + let (proc, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed"); + + assert_eq!( + host_mesh.region().num_ranks(), + 1, + "should allocate exactly one host" + ); + assert_eq!( + proc_mesh.region().num_ranks(), + 1, + "should spawn exactly one proc" + ); + assert_eq!( + instance.self_id().proc_id(), + proc.proc_id(), + "returned Instance<()> should be bound to the root Proc" + ); + + host_mesh.shutdown(&instance).await.expect("host shutdown"); + } + + #[tokio::test] + async fn spawn_respects_forwarding_flag() { + let (_, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed"); + + let py_instance = PyInstance::from(&instance); + let py_proc_mesh = PyProcMesh::new_owned(proc_mesh); + + let lock = hyperactor::config::global::lock(); + + // Case 1: forwarding disabled => `forwarder_mesh` should be `None`. + { + let _guard = lock.override_key( + hyperactor_mesh::bootstrap::MESH_ENABLE_LOG_FORWARDING, + false, + ); + + let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh) + .expect("spawn PyPythonTask (forwarding disabled)"); + + let client_py: Py = client_task + .await_py() + .await + .expect("spawn failed (forwarding disabled)"); + + Python::with_gil(|py| { + let client_ref = client_py.borrow(py); + assert!( + client_ref.forwarder_mesh.is_none(), + "forwarder_mesh should be None when forwarding disabled" + ); + }); + } + + // Case 2: forwarding enabled => `forwarder_mesh` should be `Some`. + { + let _guard = + lock.override_key(hyperactor_mesh::bootstrap::MESH_ENABLE_LOG_FORWARDING, true); + + let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh) + .expect("spawn PyPythonTask (forwarding enabled)"); + + let client_py: Py = client_task + .await_py() + .await + .expect("spawn failed (forwarding enabled)"); + + Python::with_gil(|py| { + let client_ref = client_py.borrow(py); + assert!( + client_ref.forwarder_mesh.is_some(), + "forwarder_mesh should be Some(..) when forwarding is enabled" + ); + }); + } + + host_mesh.shutdown(&instance).await.expect("host shutdown"); + } +} diff --git a/monarch_hyperactor/test/bootstrap.rs b/monarch_hyperactor/test/bootstrap.rs new file mode 100644 index 000000000..d141fc92c --- /dev/null +++ b/monarch_hyperactor/test/bootstrap.rs @@ -0,0 +1,31 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +use monarch_hyperactor as _; // Avoid "unused depdency" lint. + +#[tokio::main] +async fn main() { + // This causes folly to intercept SIGTERM. When run in + // '@fbcode//mode/dev-nosan' that translates into SEGFAULTs. + hyperactor::initialize_with_current_runtime(); + // SAFETY: Does not derefrence pointers or rely on undefined + // memory. No other threads are likely to be modifying it + // concurrently. + unsafe { + libc::signal(libc::SIGTERM, libc::SIG_DFL); + } + + // Initialize the embedded Python interpreter before any actor + // code runs. Some per-proc actors (e.g. LoggerRuntimeActor) call + // into Python during `new()`. If Python isn't initialized yet, + // PyO3 will panic ("The Python interpreter is not initialized"). + pyo3::prepare_freethreaded_python(); + + // Enter the hyperactor-mesh bootstrap protocol. + hyperactor_mesh::bootstrap_or_die().await; +}