Skip to content

Commit 437df75

Browse files
actor_mesh: test: test_pingpong_full_mesh (#274)
Summary: Pull Request resolved: #274 add test to verify moore-neighbor ping-pong messaging across full 3×3×3 actor mesh. ghstack-source-id: 290740898 Reviewed By: mariusae Differential Revision: D76680489 fbshipit-source-id: bda70bd550e0ae16a3f54d8aaef3f24fed22e7f9
1 parent f8dd06f commit 437df75

File tree

1 file changed

+52
-0
lines changed

1 file changed

+52
-0
lines changed

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,58 @@ mod tests {
622622
assert!(done_rx.recv().await.unwrap());
623623
}
624624

625+
#[tokio::test]
626+
async fn test_pingpong_full_mesh() {
627+
use hyperactor::test_utils::pingpong::PingPongActor;
628+
use hyperactor::test_utils::pingpong::PingPongActorParams;
629+
use hyperactor::test_utils::pingpong::PingPongMessage;
630+
631+
use futures::future::join_all;
632+
633+
const X: usize = 3;
634+
const Y: usize = 3;
635+
const Z: usize = 3;
636+
let alloc = $allocator
637+
.allocate(AllocSpec {
638+
shape: shape! { x = X, y = Y, z = Z },
639+
constraints: Default::default(),
640+
})
641+
.await
642+
.unwrap();
643+
644+
let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
645+
let (undeliverable_tx, _undeliverable_rx) = proc_mesh.client().open_port();
646+
let params = PingPongActorParams::new(undeliverable_tx.bind(), None);
647+
let actor_mesh = proc_mesh.spawn::<PingPongActor>("pingpong", &params).await.unwrap();
648+
let slice = actor_mesh.shape().slice();
649+
650+
let mut futures = Vec::new();
651+
for rank in slice.iter() {
652+
let actor = actor_mesh.get(rank).unwrap();
653+
let coords = (&slice.coordinates(rank).unwrap()[..]).try_into().unwrap();
654+
let sizes = (&slice.sizes())[..].try_into().unwrap();
655+
let neighbors = ndslice::utils::stencil::moore_neighbors::<3>();
656+
for neighbor_coords in ndslice::utils::apply_stencil(&coords, sizes, &neighbors) {
657+
if let Ok(neighbor_rank) = slice.location(&neighbor_coords) {
658+
let neighbor = actor_mesh.get(neighbor_rank).unwrap();
659+
let (done_tx, done_rx) = proc_mesh.client().open_once_port();
660+
actor
661+
.send(
662+
proc_mesh.client(),
663+
PingPongMessage(4, neighbor.clone(), done_tx.bind()),
664+
)
665+
.unwrap();
666+
futures.push(done_rx.recv());
667+
}
668+
}
669+
}
670+
let results = join_all(futures).await;
671+
assert_eq!(results.len(), 316); // 5180 messages
672+
for result in results {
673+
assert_eq!(result.unwrap(), true);
674+
}
675+
}
676+
625677
#[tokio::test]
626678
async fn test_cast() {
627679
let alloc = $allocator

0 commit comments

Comments
 (0)