@@ -37,10 +37,20 @@ use crate::v1::proc_mesh::PyProcMesh;
3737 module = "monarch._rust_bindings.monarch_hyperactor.v1.logging"
3838) ]
3939pub struct LoggingMeshClient {
40- // handles remote process log forwarding; no python runtime
41- forwarder_mesh : ActorMesh < LogForwardActor > ,
42- // handles python logger; has python runtime
40+ // Per-proc LogForwardActor mesh (optional). When enabled, each
41+ // remote proc forwards its stdout/stderr back to the client. This
42+ // actor does not interact with the embedded Python runtime.
43+ forwarder_mesh : Option < ActorMesh < LogForwardActor > > ,
44+
45+ // Per-proc LoggerRuntimeActor mesh. Runs on every proc in the
46+ // mesh and drives that proc's Python logging configuration (log
47+ // level, handlers, etc.). If the proc isn't running embedded
48+ // Python, this is effectively a no-op.
4349 logger_mesh : ActorMesh < LoggerRuntimeActor > ,
50+
51+ // Client-side LogClientActor. Lives in the client process;
52+ // receives forwarded output, aggregates and buffers it, and
53+ // coordinates sync flush barriers.
4454 client_actor : ActorHandle < LogClientActor > ,
4555}
4656
@@ -74,11 +84,40 @@ impl LoggingMeshClient {
7484
7585#[ pymethods]
7686impl LoggingMeshClient {
87+ /// Initialize logging for a `ProcMesh` and return a
88+ /// `LoggingMeshClient`.
89+ ///
90+ /// This wires up three pieces of logging infrastructure:
91+ ///
92+ /// 1. A single `LogClientActor` in the *client* process. This
93+ /// actor receives forwarded stdout/stderr, buffers and
94+ /// aggregates it, and coordinates sync flush barriers.
95+ ///
96+ /// 2. (Optional) A `LogForwardActor` on every remote proc in the
97+ /// mesh. These forwarders read that proc's stdout/stderr and
98+ /// stream it back to the client. We only spawn this mesh if
99+ /// `MESH_ENABLE_LOG_FORWARDING` was `true` in the config. If
100+ /// forwarding is disabled at startup, we do not spawn these
101+ /// actors and `forwarder_mesh` will be `None`.
102+ ///
103+ /// 3. A `LoggerRuntimeActor` on every remote proc in the mesh.
104+ /// This actor controls the Python logging runtime (log level,
105+ /// handlers, etc.) in that process. This is always spawned,
106+ /// even if log forwarding is disabled.
107+ ///
108+ /// The returned `LoggingMeshClient` holds handles to those
109+ /// actors. Later, `set_mode(...)` can adjust per-proc log level
110+ /// and (if forwarding was enabled) toggle whether remote output
111+ /// is actually streamed back to the client. If forwarding was
112+ /// disabled by config, requests to enable streaming will fail.
77113 #[ staticmethod]
78114 fn spawn ( instance : & PyInstance , proc_mesh : & PyProcMesh ) -> PyResult < PyPythonTask > {
79115 let proc_mesh = proc_mesh. mesh_ref ( ) ?;
80116 let instance = instance. clone ( ) ;
117+
81118 PyPythonTask :: new ( async move {
119+ // 1. Spawn the client-side coordinator actor (lives in
120+ // the caller's process).
82121 let client_actor: ActorHandle < LogClientActor > =
83122 instance_dispatch ! ( instance, async move |cx_instance| {
84123 cx_instance
@@ -87,12 +126,30 @@ impl LoggingMeshClient {
87126 . await
88127 } ) ?;
89128 let client_actor_ref = client_actor. bind ( ) ;
90- let forwarder_mesh = instance_dispatch ! ( instance, async |cx_instance| {
91- proc_mesh
92- . spawn( cx_instance, "log_forwarder" , & client_actor_ref)
93- . await
94- } )
95- . map_err ( anyhow:: Error :: from) ?;
129+
130+ // Read config to decide if we stand up per-proc
131+ // stdout/stderr forwarding.
132+ let forwarding_enabled = hyperactor:: config:: global:: get (
133+ hyperactor_mesh:: bootstrap:: MESH_ENABLE_LOG_FORWARDING ,
134+ ) ;
135+
136+ // 2. Optionally spawn per-proc `LogForwardActor` mesh
137+ // (stdout/stderr forwarders).
138+ let forwarder_mesh = if forwarding_enabled {
139+ // Spawn a `LogFwdActor` on every proc.
140+ let mesh = instance_dispatch ! ( instance, async |cx_instance| {
141+ proc_mesh
142+ . spawn( cx_instance, "log_forwarder" , & client_actor_ref)
143+ . await
144+ } )
145+ . map_err ( anyhow:: Error :: from) ?;
146+
147+ Some ( mesh)
148+ } else {
149+ None
150+ } ;
151+
152+ // 3. Always spawn a `LoggerRuntimeActor` on every proc.
96153 let logger_mesh = instance_dispatch ! ( instance, async |cx_instance| {
97154 proc_mesh. spawn( cx_instance, "logger" , & ( ) ) . await
98155 } )
@@ -106,31 +163,87 @@ impl LoggingMeshClient {
106163 } )
107164 }
108165
166+ /// Update logging behavior for this mesh.
167+ ///
168+ /// `stream_to_client` controls whether remote procs actively
169+ /// stream their stdout/stderr back to the client process.
170+ ///
171+ /// - If log forwarding was enabled at startup, `forwarder_mesh`
172+ /// is `Some` and we propagate this flag to every per-proc
173+ /// `LogForwardActor`.
174+ /// - If log forwarding was disabled at startup, `forwarder_mesh`
175+ /// is `None`.
176+ /// In that case:
177+ /// * requesting `stream_to_client = false` is a no-op
178+ /// (accepted),
179+ /// * requesting `stream_to_client = true` is rejected,
180+ /// because we did not spawn forwarders and we don't
181+ /// dynamically create them later.
182+ ///
183+ /// `aggregate_window_sec` configures how the client-side
184+ /// `LogClientActor` batches forwarded output. It is only
185+ /// meaningful when streaming is enabled. Calling this with
186+ /// `Some(..)` while `stream_to_client == false` is invalid and
187+ /// returns an error.
188+ ///
189+ /// `level` is the desired Python logging level. We always
190+ /// broadcast this to the per-proc `LoggerRuntimeActor` mesh so
191+ /// each remote process can update its own Python logger
192+ /// configuration, regardless of whether stdout/stderr forwarding
193+ /// is active.
109194 fn set_mode (
110195 & self ,
111196 instance : & PyInstance ,
112197 stream_to_client : bool ,
113198 aggregate_window_sec : Option < u64 > ,
114199 level : u8 ,
115200 ) -> PyResult < ( ) > {
201+ // We can't ask for an aggregation window if we're not
202+ // streaming.
116203 if aggregate_window_sec. is_some ( ) && !stream_to_client {
117204 return Err ( PyErr :: new :: < pyo3:: exceptions:: PyRuntimeError , _ > (
118205 "cannot set aggregate window without streaming to client" . to_string ( ) ,
119206 ) ) ;
120207 }
121208
122- instance_dispatch ! ( instance, |cx_instance| {
123- self . forwarder_mesh
124- . cast( cx_instance, LogForwardMessage :: SetMode { stream_to_client } )
125- } )
126- . map_err ( |e| PyErr :: new :: < pyo3:: exceptions:: PyRuntimeError , _ > ( e. to_string ( ) ) ) ?;
209+ // Handle the forwarder side (stdout/stderr streaming back to
210+ // client).
211+ match ( & self . forwarder_mesh , stream_to_client) {
212+ // Forwarders exits (config enabled at startup). We can
213+ // toggle live.
214+ ( Some ( fwd_mesh) , _) => {
215+ instance_dispatch ! ( instance, |cx_instance| {
216+ fwd_mesh. cast( cx_instance, LogForwardMessage :: SetMode { stream_to_client } )
217+ } )
218+ . map_err ( |e| PyErr :: new :: < pyo3:: exceptions:: PyRuntimeError , _ > ( e. to_string ( ) ) ) ?;
219+ }
220+
221+ // Forwarders were never spawned (global forwarding
222+ // disabled) and the caller is asking NOT to stream.
223+ // That's effectively a no-op so we silently accept.
224+ ( None , false ) => {
225+ // Nothing to do.
226+ }
227+
228+ // Forwarders were never spawned, but caller is asking to
229+ // stream. We can't satisfy this request without
230+ // re-spawning infra, which we deliberately don't do at
231+ // runtime.
232+ ( None , true ) => {
233+ return Err ( PyErr :: new :: < pyo3:: exceptions:: PyRuntimeError , _ > (
234+ "log forwarding disabled by config at startup; cannot enable streaming_to_client" ,
235+ ) ) ;
236+ }
237+ }
127238
239+ // Always update the per-proc Python logging level.
128240 instance_dispatch ! ( instance, |cx_instance| {
129241 self . logger_mesh
130242 . cast( cx_instance, LoggerRuntimeMessage :: SetLogging { level } )
131243 } )
132244 . map_err ( |e| PyErr :: new :: < pyo3:: exceptions:: PyRuntimeError , _ > ( e. to_string ( ) ) ) ?;
133245
246+ // Always update the client actor's aggregation window.
134247 self . client_actor
135248 . send ( LogClientMessage :: SetAggregate {
136249 aggregate_window_sec,
@@ -140,13 +253,28 @@ impl LoggingMeshClient {
140253 Ok ( ( ) )
141254 }
142255
143- // A sync flush mechanism for the client make sure all the stdout/stderr are streamed back and flushed.
256+ /// Force a sync flush of remote stdout/stderr back to the client,
257+ /// and wait for completion.
258+ ///
259+ /// If log forwarding was disabled at startup (so we never spawned
260+ /// any `LogForwardActor`s), this becomes a no-op success: there's
261+ /// nothing to flush from remote procs in that mode, and we don't
262+ /// try to manufacture it dynamically.
144263 fn flush ( & self , instance : & PyInstance ) -> PyResult < PyPythonTask > {
145- let forwarder_mesh = self . forwarder_mesh . deref ( ) . clone ( ) ;
264+ let forwarder_mesh_opt = self
265+ . forwarder_mesh
266+ . as_ref ( )
267+ . map ( |mesh| mesh. deref ( ) . clone ( ) ) ;
146268 let client_actor = self . client_actor . clone ( ) ;
147269 let instance = instance. clone ( ) ;
148270
149271 PyPythonTask :: new ( async move {
272+ // If there's no forwarer mesh (forwarding disabled by
273+ // config), we just succeed immediately.
274+ let Some ( forwarder_mesh) = forwarder_mesh_opt else {
275+ return Ok ( ( ) ) ;
276+ } ;
277+
150278 instance_dispatch ! ( instance, async move |cx_instance| {
151279 Self :: flush_internal( cx_instance, client_actor, forwarder_mesh) . await
152280 } )
@@ -171,3 +299,97 @@ pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
171299 module. add_class :: < LoggingMeshClient > ( ) ?;
172300 Ok ( ( ) )
173301}
302+
303+ #[ cfg( test) ]
304+ mod tests {
305+ use hyperactor:: Instance ;
306+ use hyperactor:: channel:: ChannelTransport ;
307+ use hyperactor:: proc:: Proc ;
308+ use hyperactor_mesh:: alloc:: AllocSpec ;
309+ use hyperactor_mesh:: alloc:: Allocator ;
310+ use hyperactor_mesh:: alloc:: ProcessAllocator ;
311+ use hyperactor_mesh:: v1:: ProcMesh ;
312+ use hyperactor_mesh:: v1:: host_mesh:: HostMesh ;
313+ use ndslice:: Extent ;
314+ use ndslice:: View ; // .region(), .num_ranks() etc.
315+ use ndslice:: extent;
316+ use tokio:: process:: Command ;
317+
318+ /// Bring up a minimal "world" suitable for integration-style
319+ /// tests.
320+ ///
321+ /// This does all the real work:
322+ /// - spawns a root `Proc` locally,
323+ /// - creates a client `Instance<()>` on that proc for
324+ /// sending/receiving messages,
325+ /// - allocates a 1-host `HostMesh` via the bootstrap binary,
326+ /// - spawns a 1-rank `ProcMesh` on that host.
327+ #[ cfg( fbcode_build) ] // Uses buck resources.
328+ pub async fn test_world ( ) -> anyhow:: Result < (
329+ Proc , // root proc
330+ Instance < ( ) > , // client instance handle for messaging
331+ HostMesh , // allocated host mesh
332+ ProcMesh , // spawned proc mesh
333+ ) > {
334+ let proc = Proc :: direct ( ChannelTransport :: Unix . any ( ) , "root" . to_string ( ) )
335+ . await
336+ . expect ( "failed to start root Proc" ) ;
337+
338+ let ( instance, _handle) = proc
339+ . instance ( "client" )
340+ . expect ( "failed to create Proc instance" ) ;
341+
342+ let mut allocator = ProcessAllocator :: new ( Command :: new (
343+ buck_resources:: get ( "monarch/monarch_hyperactor/bootstrap" )
344+ . expect ( "missing bootstrap exe" ) ,
345+ ) ) ;
346+
347+ let alloc = allocator
348+ . allocate ( AllocSpec {
349+ extent : extent ! ( replicas = 1 ) ,
350+ constraints : Default :: default ( ) ,
351+ proc_name : None ,
352+ transport : ChannelTransport :: Unix ,
353+ } )
354+ . await
355+ . expect ( "allocator.allocate failed" ) ;
356+
357+ let host_mesh = HostMesh :: allocate ( & instance, Box :: new ( alloc) , "test" , None )
358+ . await
359+ . expect ( "HostMesh::allocate failed" ) ;
360+
361+ let proc_mesh = host_mesh
362+ . spawn ( & instance, "p0" , Extent :: unity ( ) )
363+ . await
364+ . expect ( "host_mesh.spawn failed" ) ;
365+
366+ Ok ( ( proc, instance, host_mesh, proc_mesh) )
367+ }
368+
369+ #[ cfg( fbcode_build) ]
370+ #[ tokio:: test]
371+ async fn test_world_smoke ( ) {
372+ let ( proc, instance, host_mesh, proc_mesh) =
373+ test_world ( ) . await . expect ( "bootstrap_cannonical failed" ) ;
374+
375+ assert_eq ! (
376+ host_mesh. region( ) . num_ranks( ) ,
377+ 1 ,
378+ "bootstrap_cannonical() should allocate exactly one host"
379+ ) ;
380+
381+ assert_eq ! (
382+ proc_mesh. region( ) . num_ranks( ) ,
383+ 1 ,
384+ "bootstrap_cannonical() should spawn exactly one proc"
385+ ) ;
386+
387+ assert_eq ! (
388+ instance. self_id( ) . proc_id( ) ,
389+ proc. proc_id( ) ,
390+ "returned Instance<()> should be bound to the root Proc"
391+ ) ;
392+
393+ host_mesh. shutdown ( & instance) . await . expect ( "host shutdown" ) ;
394+ }
395+ }
0 commit comments