Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,6 @@ fn extract_sender_from_message(msg: &NetMessage) -> Option<PeerKeyLocation> {
// Update messages have sender in some variants
NetMessageV1::Update(update_msg) => match update_msg {
UpdateMsg::SeekNode { sender, .. } => Some(sender.clone()),
UpdateMsg::SuccessfulUpdate { sender, .. } => Some(sender.clone()),
UpdateMsg::Broadcasting { sender, .. } => Some(sender.clone()),
UpdateMsg::BroadcastTo { sender, .. } => Some(sender.clone()),
_ => None,
Expand Down
15 changes: 14 additions & 1 deletion crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,21 @@ where
if let Some(target) = msg.target() {
tracing::debug!(%id, %target, "sending updated op state");
network_bridge.send(&target.peer, msg).await?;
op_manager.push(id, updated_state).await?;
} else {
tracing::debug!(%id, "queueing op state for local processing");
debug_assert!(
matches!(
msg,
NetMessage::V1(NetMessageV1::Update(
crate::operations::update::UpdateMsg::Broadcasting { .. }
))
),
"Only Update::Broadcasting messages should be re-queued locally"
);
op_manager.notify_op_change(msg, updated_state).await?;
return Err(OpError::StatePushed);
}
op_manager.push(id, updated_state).await?;
}
}

Expand Down
43 changes: 35 additions & 8 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,40 @@ impl Operation for PutOp {
let sender = op_manager.ring.connection_manager.own_location();
let mut broadcasted_to = *broadcasted_to;

if upstream.peer == sender.peer {
// Originator reached the subscription tree. This path should be filtered
// out by the deduplication layer, so treat it as a warning if it happens
// to help surface potential bugs.
tracing::warn!(
tx = %id,
%key,
"PUT originator re-entered broadcast loop; dedup should have completed"
);
new_state = Some(PutState::Finished { key: *key });
} else {
// Notify the upstream hop right away so the request
// path does not wait for the broadcast to finish.
let ack = PutMsg::SuccessfulPut {
id: *id,
target: upstream.clone(),
key: *key,
sender: sender.clone(),
};

tracing::trace!(
tx = %id,
%key,
upstream = %upstream.peer,
"Forwarding SuccessfulPut upstream before broadcast"
);

conn_manager
.send(&upstream.peer, NetMessage::from(ack))
.await?;

new_state = None;
}

// Broadcast to all peers in parallel
let mut broadcasting = Vec::with_capacity(broadcast_to.len());
for peer in broadcast_to.iter() {
Expand Down Expand Up @@ -526,14 +560,7 @@ impl Operation for PutOp {
"Successfully broadcasted put into contract {key} to {broadcasted_to} peers"
);

// Subscriber nodes have been notified of the change, the operation is completed
return_msg = Some(PutMsg::SuccessfulPut {
id: *id,
target: upstream.clone(),
key: *key,
sender,
});
new_state = None;
return_msg = None;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sanity didn't we say we were gonna return succesful puts? did you change your mind?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we do this ensure in tests we are asserting the contracts being put are cached in the expected peers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still send the SuccessfulPut upstream, but we now emit it immediately when the target node starts broadcasting (conn_manager.send(&upstream.peer, …) a few lines below) instead of bubbling it back through return_msg. That keeps the request path latency the same while the fan-out continues in the background.

[AI-assisted]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep — test_put_contract already exercises this by fetching the contract from both nodes immediately after the PUT. The helper routes through verify_contract_exists, so we assert the bundle is cached on each peer before the test completes.

[AI-assisted]

}
PutMsg::SuccessfulPut { id, .. } => {
match self.state {
Expand Down
Loading
Loading