Skip to content

Commit 7e9656e

Browse files
: v1/logging: avoid spinning up LogForwardActor mesh when redundant (#1722)
Summary: this diff changes `LoggingMeshClient::spawn` so that log forwarding is no longer unconditional. previously we always spawned a `LogForwardActor` mesh across the `ProcMesh` and stored it in the client. now we read `MESH_ENABLE_LOG_FORWARDING` and only create that mesh if the flag is set. when forwarding is disabled we don’t spawn any `LogForwardActor` actors at all and we record `forwarder_mesh` as `None`. `logger_mesh` (the per-proc `LoggerRuntimeActor` that manages python logging state) is still always spawned. `client_actor` is still spawned locally in the caller's process. because `forwarder_mesh` is now an `Option`, the rest of the API is updated to match the new contract. `set_mode` now either propagates `stream_to_client` to the forwarders when they exist, silently accepts"don’t stream" when they don’t, or returns an error if the caller tries to enable streaming in a configuration where we never created forwarders. `flush` now no-ops successfully in the case where there is no forwarding mesh instead of assuming that those actors are always present. the diff also adds the minimal test harness needed to exercise this logic end-to-end. there is a dedicated bootstrap binary in `monarch_hyperactor` (`monarch_hyperactor_test_bootstrap`) which initializes python and then runs the mesh bootstrap protocol, so remote procs can safely construct `LoggerRuntimeActor `without panicking on `Python::with_gil`. we also add `ensure_python` and `AwaitPyExt `on the rust side so the tokio tests can stand up a `ProcMesh`, call the python-facing `spawn()`, await the returned `PyPythonTask`, and inspect the resulting `LoggingMeshClient`. finally, there is a test that brings up a tiny mesh, forces the forwarding flag off and on, and in each case asserts that `forwarder_mesh` is `None` or `Some(..)` accordingly. follow-up diffs will extend coverage to `set_mode`, `flush`, and shutdown semantics, now that the plumbing exists. Differential Revision: D85919326
1 parent b249a03 commit 7e9656e

File tree

5 files changed

+393
-21
lines changed

5 files changed

+393
-21
lines changed

hyperactor_mesh/src/testresource.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ where
2727
}
2828

2929
assert!(
30-
name.starts_with("monarch/hyperactor_mesh/"),
31-
"invalid resource {}: must start with \"monarch/hyperactor_mesh/\"",
30+
name.starts_with("monarch/monarch_hyperactor/"),
31+
"invalid resource {}: must start with \"monarch/monarch_hyperactor/\"",
3232
name
3333
);
3434

3535
let path: PathBuf = name
3636
.replace(
37-
"monarch/hyperactor_mesh/",
38-
"../target/debug/hyperactor_mesh_test_",
37+
"monarch/monarch_hyperactor/",
38+
"../target/debug/monarch_hyperactor_test_",
3939
)
4040
.into();
4141

monarch_hyperactor/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,process_allocator-oss,test_monarch_hyperactor]
1+
# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,monarch_hyperactor_test_bootstrap,process_allocator-oss,test_monarch_hyperactor]
22

33
[package]
44
name = "monarch_hyperactor"
@@ -7,6 +7,11 @@ authors = ["Meta"]
77
edition = "2021"
88
license = "BSD-3-Clause"
99

10+
[[bin]]
11+
name = "monarch_hyperactor_test_bootstrap"
12+
path = "test/bootstrap.rs"
13+
edition = "2024"
14+
1015
[[bin]]
1116
name = "process_allocator"
1217
edition = "2024"
@@ -52,6 +57,7 @@ tokio-util = { version = "0.7.15", features = ["full"] }
5257
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
5358

5459
[dev-dependencies]
60+
buck-resources = "1"
5561
dir-diff = "0.3"
5662

5763
[features]

monarch_hyperactor/src/pytokio.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ use hyperactor::config::CONFIG;
5050
use hyperactor::config::ConfigAttr;
5151
use monarch_types::SerializablePyErr;
5252
use pyo3::IntoPyObjectExt;
53+
#[cfg(test)]
54+
use pyo3::PyClass;
5355
use pyo3::exceptions::PyRuntimeError;
5456
use pyo3::exceptions::PyStopIteration;
5557
use pyo3::exceptions::PyTimeoutError;
@@ -547,3 +549,52 @@ pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResul
547549

548550
Ok(())
549551
}
552+
553+
/// Ensure the embedded Python interpreter is initialized exactly
554+
/// once.
555+
///
556+
/// Safe to call from multiple threads, multiple times.
557+
#[cfg(test)]
558+
pub(crate) fn ensure_python() {
559+
static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
560+
INIT.get_or_init(|| {
561+
pyo3::prepare_freethreaded_python();
562+
});
563+
}
564+
565+
#[cfg(test)]
566+
// Helper: let us "await" a `PyPythonTask` in Rust.
567+
//
568+
// Semantics:
569+
// - consume the `PyPythonTask`,
570+
// - take the inner future,
571+
// - `.await` it on tokio to get `Py<PyAny>`,
572+
// - turn that into `Py<T>`.
573+
pub(crate) trait AwaitPyExt {
574+
async fn await_py<T: PyClass>(self) -> Result<Py<T>, PyErr>;
575+
}
576+
577+
#[cfg(test)]
578+
impl AwaitPyExt for PyPythonTask {
579+
async fn await_py<T: PyClass>(mut self) -> Result<Py<T>, PyErr> {
580+
// Take ownership of the inner future.
581+
let fut = self
582+
.take_task()
583+
.expect("PyPythonTask already consumed in await_py");
584+
585+
// Await a Result<Py<PyAny>, PyErr>.
586+
let py_any: Py<PyAny> = fut.await?;
587+
588+
// Convert Py<PyAny> -> Py<T>.
589+
Python::with_gil(|py| {
590+
let bound_any = py_any.bind(py);
591+
592+
// Try extract a Py<T>.
593+
let obj: Py<T> = bound_any
594+
.extract::<Py<T>>()
595+
.expect("spawn() did not return expected Python type");
596+
597+
Ok(obj)
598+
})
599+
}
600+
}

0 commit comments

Comments
 (0)