From fb5c06a15ec1cd423b8e6ab316fe13ddfaa3b307 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 30 Oct 2025 20:42:43 +0100 Subject: [PATCH 1/4] fix: return SuccessfulPut before PUT broadcast completes --- crates/core/src/operations/put.rs | 42 +++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 5b9ffb9f9..9a90e6a50 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -477,6 +477,39 @@ impl Operation for PutOp { let sender = op_manager.ring.connection_manager.own_location(); let mut broadcasted_to = *broadcasted_to; + if upstream.peer == sender.peer { + // Originator reached the subscription tree. We can + // complete the local operation immediately. + tracing::trace!( + tx = %id, + %key, + "PUT originator reached subscription tree; completing locally" + ); + new_state = Some(PutState::Finished { key: *key }); + } else { + // Notify the upstream hop right away so the request + // path does not wait for the broadcast to finish. + let ack = PutMsg::SuccessfulPut { + id: *id, + target: upstream.clone(), + key: *key, + sender: sender.clone(), + }; + + tracing::trace!( + tx = %id, + %key, + upstream = %upstream.peer, + "Forwarding SuccessfulPut upstream before broadcast" + ); + + conn_manager + .send(&upstream.peer, NetMessage::from(ack)) + .await?; + + new_state = None; + } + // Broadcast to all peers in parallel let mut broadcasting = Vec::with_capacity(broadcast_to.len()); for peer in broadcast_to.iter() { @@ -526,14 +559,7 @@ impl Operation for PutOp { "Successfully broadcasted put into contract {key} to {broadcasted_to} peers" ); - // Subscriber nodes have been notified of the change, the operation is completed - return_msg = Some(PutMsg::SuccessfulPut { - id: *id, - target: upstream.clone(), - key: *key, - sender, - }); - new_state = None; + return_msg = None; } PutMsg::SuccessfulPut { id, .. } => { match self.state { From 04e96c5955d9d5d1363470d209aa09d963c44590 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 31 Oct 2025 02:02:52 +0100 Subject: [PATCH 2/4] refactor: remove SuccessfulUpdate and fix fire-and-forget updates --- .../src/node/network_bridge/p2p_protoc.rs | 1 - crates/core/src/operations/mod.rs | 6 +- crates/core/src/operations/update.rs | 594 +++++++++--------- crates/core/src/tracing/mod.rs | 13 - network-monitor/src/mock_data.ts | 10 +- network-monitor/src/type_definitions.ts | 1 - 6 files changed, 323 insertions(+), 302 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 617712817..4a39bfc43 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1662,7 +1662,6 @@ fn extract_sender_from_message(msg: &NetMessage) -> Option { // Update messages have sender in some variants NetMessageV1::Update(update_msg) => match update_msg { UpdateMsg::SeekNode { sender, .. } => Some(sender.clone()), - UpdateMsg::SuccessfulUpdate { sender, .. } => Some(sender.clone()), UpdateMsg::Broadcasting { sender, .. } => Some(sender.clone()), UpdateMsg::BroadcastTo { sender, .. } => Some(sender.clone()), _ => None, diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index c274f888d..c8e1a6aec 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -133,8 +133,12 @@ where if let Some(target) = msg.target() { tracing::debug!(%id, %target, "sending updated op state"); network_bridge.send(&target.peer, msg).await?; + op_manager.push(id, updated_state).await?; + } else { + tracing::debug!(%id, "queueing op state for local processing"); + op_manager.notify_op_change(msg, updated_state).await?; + return Err(OpError::StatePushed); } - op_manager.push(id, updated_state).await?; } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index dd870257c..14e455d12 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -1,11 +1,12 @@ // TODO: complete update logic in the network +use either::Either; use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use freenet_stdlib::prelude::*; pub(crate) use self::messages::UpdateMsg; use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::contract::{ContractHandlerEvent, StoreResponse}; -use crate::message::{InnerMessage, NetMessage, Transaction}; +use crate::message::{InnerMessage, NetMessage, NodeEvent, Transaction}; use crate::node::IsOperationCompleted; use crate::ring::{Location, PeerKeyLocation, RingError}; use crate::{ @@ -60,6 +61,12 @@ struct UpdateStats { target: Option, } +struct UpdateExecution { + value: WrappedState, + summary: StateSummary<'static>, + changed: bool, +} + pub(crate) struct UpdateResult {} impl TryFrom for UpdateResult { @@ -173,11 +180,7 @@ impl Operation for UpdateOp { // Target is us - process the request // Determine if this is a local request (from our own client) or remote request let is_local_request = - matches!(&self.state, Some(UpdateState::PrepareRequest { .. })) - || matches!( - &self.state, - Some(UpdateState::AwaitingResponse { upstream: None, .. }) - ); + matches!(&self.state, Some(UpdateState::PrepareRequest { .. })); let upstream = if is_local_request { None // No upstream - we are the initiator } else { @@ -214,7 +217,11 @@ impl Operation for UpdateOp { ); // Update contract locally - let updated_value = update_contract( + let UpdateExecution { + value: updated_value, + summary, + changed, + } = update_contract( op_manager, *key, value.clone(), @@ -222,43 +229,65 @@ impl Operation for UpdateOp { ) .await?; - // Get broadcast targets for propagating UPDATE to subscribers - let broadcast_to = - op_manager.get_broadcast_targets_update(key, &request_sender.peer); + if !changed { + tracing::debug!( + tx = %id, + %key, + "UPDATE yielded no state change, skipping broadcast" + ); - // Create success message to send back - let raw_state = State::from(updated_value); - let summary = StateSummary::from(raw_state.into_bytes()); + if upstream.is_none() { + new_state = Some(UpdateState::Finished { + key: *key, + summary: summary.clone(), + }); + } else { + new_state = None; + } - if broadcast_to.is_empty() { - // No peers to broadcast to - just send success response - return_msg = Some(UpdateMsg::SuccessfulUpdate { - id: *id, - target: request_sender.clone(), - summary: summary.clone(), - key: *key, - sender: self_location.clone(), - }); - new_state = Some(UpdateState::Finished { key: *key, summary }); + return_msg = None; } else { - // Broadcast to other peers - match try_to_broadcast( - *id, - true, // last_hop - we're handling locally - op_manager, - self.state, - (broadcast_to, request_sender.clone()), - *key, - value.clone(), - false, - ) - .await - { - Ok((state, msg)) => { - new_state = state; - return_msg = msg; + // Get broadcast targets for propagating UPDATE to subscribers + let broadcast_to = op_manager + .get_broadcast_targets_update(key, &request_sender.peer); + + if broadcast_to.is_empty() { + tracing::debug!( + tx = %id, + %key, + "No broadcast targets, completing UPDATE locally" + ); + + if upstream.is_none() { + new_state = Some(UpdateState::Finished { + key: *key, + summary: summary.clone(), + }); + } else { + new_state = None; + } + + return_msg = None; + } else { + // Broadcast to other peers + match try_to_broadcast( + *id, + true, // last_hop - we're handling locally + op_manager, + self.state, + (broadcast_to, request_sender.clone()), + *key, + updated_value.clone(), + false, + ) + .await + { + Ok((state, msg)) => { + new_state = state; + return_msg = msg; + } + Err(err) => return Err(err), } - Err(err) => return Err(err), } } } else { @@ -285,10 +314,9 @@ impl Operation for UpdateOp { key: *key, related_contracts: related_contracts.clone(), }); - // Transition to AwaitingResponse state to wait for SuccessfulUpdate new_state = Some(UpdateState::AwaitingResponse { - key: *key, - upstream, + _key: *key, + _upstream: upstream, }); } else { // No peers available and we don't have the contract - error @@ -333,7 +361,11 @@ impl Operation for UpdateOp { if has_contract { tracing::debug!("Contract found locally - handling UPDATE"); - let updated_value = update_contract( + let UpdateExecution { + value: updated_value, + summary: _summary, // summary not used here yet + changed, + } = update_contract( op_manager, *key, value.clone(), @@ -347,49 +379,48 @@ impl Operation for UpdateOp { target.location ); - // Get broadcast targets - let broadcast_to = - op_manager.get_broadcast_targets_update(key, &sender.peer); - - // If no peers to broadcast to, send success response directly - if broadcast_to.is_empty() { + if !changed { tracing::debug!( tx = %id, %key, - "No broadcast targets for SeekNode - completing with SuccessfulUpdate" + "SeekNode update produced no change, stopping propagation" ); - - // Create success message to send back to sender - let raw_state = State::from(updated_value); - let summary = StateSummary::from(raw_state.into_bytes()); - - return_msg = Some(UpdateMsg::SuccessfulUpdate { - id: *id, - target: sender.clone(), - summary, - key: *key, - sender: op_manager.ring.connection_manager.own_location(), - }); new_state = None; + return_msg = None; } else { - // Have peers to broadcast to - use try_to_broadcast - match try_to_broadcast( - *id, - true, - op_manager, - self.state, - (broadcast_to, sender.clone()), - *key, - value.clone(), - false, - ) - .await - { - Ok((state, msg)) => { - new_state = state; - return_msg = msg; + // Get broadcast targets + let broadcast_to = + op_manager.get_broadcast_targets_update(key, &sender.peer); + + // If no peers to broadcast to, nothing else to do + if broadcast_to.is_empty() { + tracing::debug!( + tx = %id, + %key, + "No broadcast targets for SeekNode - completing locally" + ); + new_state = None; + return_msg = None; + } else { + // Have peers to broadcast to - use try_to_broadcast + match try_to_broadcast( + *id, + true, + op_manager, + self.state, + (broadcast_to, sender.clone()), + *key, + updated_value.clone(), + false, + ) + .await + { + Ok((state, msg)) => { + new_state = state; + return_msg = msg; + } + Err(err) => return Err(err), } - Err(err) => return Err(err), } } } else { @@ -418,8 +449,8 @@ impl Operation for UpdateOp { related_contracts: related_contracts.clone(), }); new_state = Some(UpdateState::AwaitingResponse { - key: *key, - upstream: Some(sender.clone()), + _key: *key, + _upstream: Some(sender.clone()), }); } else { // No more peers to try - error @@ -445,7 +476,11 @@ impl Operation for UpdateOp { } tracing::debug!("Attempting contract value update - BroadcastTo - update"); - let new_value = update_contract( + let UpdateExecution { + value: updated_value, + summary: _summary, + changed, + } = update_contract( op_manager, *key, new_value.clone(), @@ -454,31 +489,42 @@ impl Operation for UpdateOp { .await?; tracing::debug!("Contract successfully updated - BroadcastTo - update"); - let broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); + if !changed { + tracing::debug!( + tx = %id, + %key, + "BroadcastTo update produced no change, ending propagation" + ); + new_state = None; + return_msg = None; + } else { + let broadcast_to = + op_manager.get_broadcast_targets_update(key, &sender.peer); - tracing::debug!( - "Successfully updated a value for contract {} @ {:?} - BroadcastTo - update", - key, - target.location - ); + tracing::debug!( + "Successfully updated a value for contract {} @ {:?} - BroadcastTo - update", + key, + target.location + ); - match try_to_broadcast( - *id, - false, - op_manager, - self.state, - (broadcast_to, sender.clone()), - *key, - new_value, - true, - ) - .await - { - Ok((state, msg)) => { - new_state = state; - return_msg = msg; + match try_to_broadcast( + *id, + false, + op_manager, + self.state, + (broadcast_to, sender.clone()), + *key, + updated_value.clone(), + true, + ) + .await + { + Ok((state, msg)) => { + new_state = state; + return_msg = msg; + } + Err(err) => return Err(err), } - Err(err) => return Err(err), } } UpdateMsg::Broadcasting { @@ -487,7 +533,7 @@ impl Operation for UpdateOp { broadcasted_to, key, new_value, - upstream, + upstream: _upstream, .. } => { let sender = op_manager.ring.connection_manager.own_location(); @@ -537,87 +583,9 @@ impl Operation for UpdateOp { "Successfully broadcasted update contract {key} to {broadcasted_to} peers - Broadcasting" ); - let raw_state = State::from(new_value.clone()); - - let summary = StateSummary::from(raw_state.into_bytes()); - - // Subscriber nodes have been notified of the change, the operation is complete - return_msg = Some(UpdateMsg::SuccessfulUpdate { - id: *id, - target: upstream.clone(), - summary, - sender: sender.clone(), - key: *key, - }); - + // Subscriber nodes have been notified of the change new_state = None; - } - UpdateMsg::SuccessfulUpdate { id, summary, .. } => { - match self.state { - Some(UpdateState::AwaitingResponse { key, upstream }) => { - tracing::debug!( - tx = %id, - %key, - this_peer = ?op_manager.ring.connection_manager.get_peer_key(), - ?upstream, - "Peer completed contract value update - SuccessfulUpdate", - ); - - tracing::debug!( - "UPDATE operation {} completed successfully for key {} with summary length {}", - id, - key, - summary.size() - ); - - if let Some(upstream) = upstream { - // Intermediate node: Forward success to upstream and complete operation - tracing::debug!( - "UPDATE: Forwarding SuccessfulUpdate to upstream peer {:?} and completing operation", - upstream - ); - return_msg = Some(UpdateMsg::SuccessfulUpdate { - id: *id, - target: upstream, - summary: summary.clone(), - key, - sender: op_manager.ring.connection_manager.own_location(), - }); - // Set state to None so OpManager marks this operation as completed - // This prevents duplicate SuccessfulUpdate messages from being processed - new_state = None; - } else { - // Operation originated locally (no upstream peer) - // Preserve Finished state so client can receive the result - tracing::debug!( - "UPDATE: Operation {} completed successfully (originated locally)", - id - ); - new_state = Some(UpdateState::Finished { - key, - summary: summary.clone(), - }); - return_msg = None; - } - } - Some(UpdateState::ReceivedRequest) => { - // This is the target node that processed the update - // It should have already sent the response, this shouldn't happen - tracing::error!( - tx = %id, - "UPDATE: Unexpected SuccessfulUpdate in ReceivedRequest state", - ); - return Err(OpError::invalid_transition(self.id)); - } - _ => { - tracing::error!( - state = ?self.state, - "invalid transition in UpdateMsg::SuccessfulUpdate -> match self.state" - ); - - return Err(OpError::invalid_transition(self.id)); - } - }; + return_msg = None; } _ => return Err(OpError::UnexpectedOpState), } @@ -644,24 +612,17 @@ async fn try_to_broadcast( match state { Some(UpdateState::ReceivedRequest | UpdateState::BroadcastOngoing) => { if broadcast_to.is_empty() && !last_hop { - // broadcast complete tracing::debug!( "Empty broadcast list while updating value for contract {} - try_to_broadcast", key ); return_msg = None; + new_state = None; if is_from_a_broadcasted_to_peer { - new_state = None; return Ok((new_state, return_msg)); } - - // means the whole tx finished so can return early - new_state = Some(UpdateState::AwaitingResponse { - key, - upstream: Some(upstream), - }); } else if !broadcast_to.is_empty() { tracing::debug!( "Callback to start broadcasting to other nodes. List size {}", @@ -678,29 +639,9 @@ async fn try_to_broadcast( upstream, sender: op_manager.ring.connection_manager.own_location(), }); - - let op = UpdateOp { - id, - state: new_state, - stats: None, - }; - op_manager - .notify_op_change(NetMessage::from(return_msg.unwrap()), OpEnum::Update(op)) - .await?; - return Err(OpError::StatePushed); } else { - let raw_state = State::from(new_value); - - let summary = StateSummary::from(raw_state.into_bytes()); - new_state = None; - return_msg = Some(UpdateMsg::SuccessfulUpdate { - id, - target: upstream, - summary, - key, - sender: op_manager.ring.connection_manager.own_location(), - }); + return_msg = None; } } _ => return Err(OpError::invalid_transition(id)), @@ -775,7 +716,28 @@ async fn update_contract( key: ContractKey, state: WrappedState, related_contracts: RelatedContracts<'static>, -) -> Result { +) -> Result { + let previous_state = match op_manager + .notify_contract_handler(ContractHandlerEvent::GetQuery { + key, + return_contract_code: false, + }) + .await + { + Ok(ContractHandlerEvent::GetResponse { + response: Ok(StoreResponse { state, .. }), + .. + }) => state, + Ok(other) => { + tracing::trace!(?other, %key, "Unexpected get response while preparing update summary"); + None + } + Err(err) => { + tracing::debug!(%key, %err, "Failed to fetch existing contract state before update"); + None + } + }; + let update_data = UpdateData::State(State::from(state)); match op_manager .notify_contract_handler(ContractHandlerEvent::UpdateQuery { @@ -787,15 +749,51 @@ async fn update_contract( { Ok(ContractHandlerEvent::UpdateResponse { new_value: Ok(new_val), - }) => Ok(new_val), + }) => { + let new_bytes = State::from(new_val.clone()).into_bytes(); + let summary = StateSummary::from(new_bytes.clone()); + let changed = match previous_state.as_ref() { + Some(prev_state) => { + let prev_bytes = State::from(prev_state.clone()).into_bytes(); + prev_bytes != new_bytes + } + None => true, + }; + + Ok(UpdateExecution { + value: new_val, + summary, + changed, + }) + } Ok(ContractHandlerEvent::UpdateResponse { - new_value: Err(_rr), + new_value: Err(err), }) => { - tracing::error!("Failed to update contract value"); + tracing::error!(%key, %err, "Failed to update contract value"); Err(OpError::UnexpectedOpState) } + Ok(ContractHandlerEvent::UpdateNoChange { .. }) => { + if let Some(prev_state) = previous_state { + let prev_bytes = State::from(prev_state.clone()).into_bytes(); + let summary = StateSummary::from(prev_bytes.clone()); + Ok(UpdateExecution { + value: prev_state, + summary, + changed: false, + }) + } else { + tracing::warn!( + %key, + "Contract reported UpdateNoChange but no previous state was available" + ); + Err(OpError::UnexpectedOpState) + } + } Err(err) => Err(err.into()), - Ok(_) => Err(OpError::UnexpectedOpState), + Ok(other) => { + tracing::error!(?other, %key, "Unexpected event from contract handler during update"); + Err(OpError::UnexpectedOpState) + } } } @@ -925,7 +923,11 @@ pub(crate) async fn request_update( // 2. Either seeding the contract OR has subscribers (verified above) // Note: This handles both truly isolated nodes and nodes where subscribers exist // but no suitable remote caching peer was found. - let updated_value = update_contract(op_manager, key, value, related_contracts).await?; + let UpdateExecution { + value: updated_value, + summary, + changed, + } = update_contract(op_manager, key, value, related_contracts).await?; tracing::debug!( tx = %id, @@ -933,9 +935,21 @@ pub(crate) async fn request_update( "Successfully updated contract locally on isolated node" ); + if !changed { + tracing::debug!( + tx = %id, + %key, + "Local update resulted in no change; finishing without broadcast" + ); + deliver_update_result(op_manager, id, key, summary.clone()).await?; + return Ok(()); + } + // Check if there are any subscribers to broadcast to let broadcast_to = op_manager.get_broadcast_targets_update(&key, &sender.peer); + deliver_update_result(op_manager, id, key, summary.clone()).await?; + if broadcast_to.is_empty() { // No subscribers - operation complete tracing::debug!( @@ -943,31 +957,6 @@ pub(crate) async fn request_update( %key, "No broadcast targets, completing UPDATE operation locally" ); - - // Set up state for SuccessfulUpdate message handling - update_op.state = Some(UpdateState::AwaitingResponse { - key, - upstream: None, - }); - - // Create a StateSummary from the updated value - let raw_state = State::from(updated_value); - let summary = StateSummary::from(raw_state.into_bytes()); - - // Create a SuccessfulUpdate message to trigger the completion handling - let success_msg = UpdateMsg::SuccessfulUpdate { - id, - target: sender.clone(), - summary, - sender: sender.clone(), - key, - }; - - // Use notify_op_change to trigger the completion handling - op_manager - .notify_op_change(NetMessage::from(success_msg), OpEnum::Update(update_op)) - .await?; - return Ok(()); } else { // There are subscribers - broadcast the update @@ -980,7 +969,7 @@ pub(crate) async fn request_update( let broadcast_state = Some(UpdateState::ReceivedRequest); - let (new_state, return_msg) = try_to_broadcast( + let (_new_state, return_msg) = try_to_broadcast( id, false, op_manager, @@ -992,17 +981,20 @@ pub(crate) async fn request_update( ) .await?; - update_op.state = new_state; - if let Some(msg) = return_msg { op_manager - .notify_op_change(NetMessage::from(msg), OpEnum::Update(update_op)) - .await?; - } else { - // Complete the operation locally if no further messages needed - let raw_state = State::from(WrappedState::new(vec![])); - let summary = StateSummary::from(raw_state.into_bytes()); - update_op.state = Some(UpdateState::Finished { key, summary }); + .to_event_listener + .notifications_sender() + .send(Either::Left(NetMessage::from(msg))) + .await + .map_err(|error| { + tracing::error!( + tx = %id, + %error, + "Failed to enqueue UPDATE broadcast message" + ); + OpError::NotificationError + })?; } return Ok(()); @@ -1023,7 +1015,11 @@ pub(crate) async fn request_update( // Apply update locally - this ensures the initiating peer serves the updated state // even if the remote UPDATE times out or fails - let updated_value = update_contract(op_manager, key, value.clone(), related_contracts.clone()) + let UpdateExecution { + value: updated_value, + summary, + changed: _changed, + } = update_contract(op_manager, key, value.clone(), related_contracts.clone()) .await .map_err(|e| { tracing::error!( @@ -1045,10 +1041,7 @@ pub(crate) async fn request_update( stats.target = Some(target.clone()); } - let new_state = Some(UpdateState::AwaitingResponse { - key, - upstream: None, - }); + deliver_update_result(op_manager, id, key, summary.clone()).await?; let msg = UpdateMsg::RequestUpdate { id, key, @@ -1058,15 +1051,67 @@ pub(crate) async fn request_update( value: updated_value, // Send the updated value, not the original }; + op_manager + .to_event_listener + .notifications_sender() + .send(Either::Left(NetMessage::from(msg))) + .await + .map_err(|error| { + tracing::error!( + tx = %id, + %error, + "Failed to enqueue UPDATE RequestUpdate message" + ); + OpError::NotificationError + })?; + + Ok(()) +} + +async fn deliver_update_result( + op_manager: &OpManager, + id: Transaction, + key: ContractKey, + summary: StateSummary<'static>, +) -> Result<(), OpError> { let op = UpdateOp { - state: new_state, id, - stats: update_op.stats, + state: Some(UpdateState::Finished { + key, + summary: summary.clone(), + }), + stats: None, }; + let host_result = op.to_host_result(); + op_manager - .notify_op_change(NetMessage::from(msg), OpEnum::Update(op)) - .await?; + .result_router_tx + .send((id, host_result)) + .await + .map_err(|error| { + tracing::error!( + tx = %id, + %error, + "Failed to send UPDATE result to result router" + ); + OpError::NotificationError + })?; + + if let Err(error) = op_manager + .to_event_listener + .notifications_sender() + .send(Either::Right(NodeEvent::TransactionCompleted(id))) + .await + { + tracing::warn!( + tx = %id, + %error, + "Failed to notify transaction completion for UPDATE" + ); + } + + op_manager.completed(id); Ok(()) } @@ -1080,7 +1125,7 @@ impl IsOperationCompleted for UpdateOp { mod messages { use std::{borrow::Borrow, fmt::Display}; - use freenet_stdlib::prelude::{ContractKey, RelatedContracts, StateSummary, WrappedState}; + use freenet_stdlib::prelude::{ContractKey, RelatedContracts, WrappedState}; use serde::{Deserialize, Serialize}; use crate::{ @@ -1099,15 +1144,6 @@ mod messages { related_contracts: RelatedContracts<'static>, value: WrappedState, }, - /// Value successfully inserted/updated. - SuccessfulUpdate { - id: Transaction, - target: PeerKeyLocation, - #[serde(deserialize_with = "StateSummary::deser_state_summary")] - summary: StateSummary<'static>, - sender: PeerKeyLocation, - key: ContractKey, - }, AwaitUpdate { id: Transaction, }, @@ -1145,7 +1181,6 @@ mod messages { fn id(&self) -> &Transaction { match self { UpdateMsg::RequestUpdate { id, .. } => id, - UpdateMsg::SuccessfulUpdate { id, .. } => id, UpdateMsg::AwaitUpdate { id, .. } => id, UpdateMsg::SeekNode { id, .. } => id, UpdateMsg::Broadcasting { id, .. } => id, @@ -1156,7 +1191,6 @@ mod messages { fn target(&self) -> Option> { match self { UpdateMsg::RequestUpdate { target, .. } => Some(target), - UpdateMsg::SuccessfulUpdate { target, .. } => Some(target), UpdateMsg::SeekNode { target, .. } => Some(target), UpdateMsg::BroadcastTo { target, .. } => Some(target), _ => None, @@ -1179,7 +1213,6 @@ mod messages { match self { Self::RequestUpdate { sender, .. } => Some(sender), Self::SeekNode { sender, .. } => Some(sender), - Self::SuccessfulUpdate { sender, .. } => Some(sender), Self::BroadcastTo { sender, .. } => Some(sender), _ => None, } @@ -1190,7 +1223,6 @@ mod messages { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { UpdateMsg::RequestUpdate { id, .. } => write!(f, "RequestUpdate(id: {id})"), - UpdateMsg::SuccessfulUpdate { id, .. } => write!(f, "SuccessfulUpdate(id: {id})"), UpdateMsg::AwaitUpdate { id } => write!(f, "AwaitUpdate(id: {id})"), UpdateMsg::SeekNode { id, .. } => write!(f, "SeekNode(id: {id})"), UpdateMsg::Broadcasting { id, .. } => write!(f, "Broadcasting(id: {id})"), @@ -1204,8 +1236,8 @@ mod messages { pub enum UpdateState { ReceivedRequest, AwaitingResponse { - key: ContractKey, - upstream: Option, + _key: ContractKey, + _upstream: Option, }, Finished { key: ContractKey, diff --git a/crates/core/src/tracing/mod.rs b/crates/core/src/tracing/mod.rs index 25e202b6e..335a6e308 100644 --- a/crates/core/src/tracing/mod.rs +++ b/crates/core/src/tracing/mod.rs @@ -312,19 +312,6 @@ impl<'a> NetEventLog<'a> { timestamp: chrono::Utc::now().timestamp() as u64, }) } - NetMessageV1::Update(UpdateMsg::SuccessfulUpdate { - id, - target, - sender, - key, - .. - }) => EventKind::Update(UpdateEvent::UpdateSuccess { - id: *id, - requester: sender.clone(), - target: target.clone(), - key: *key, - timestamp: chrono::Utc::now().timestamp() as u64, - }), NetMessageV1::Update(UpdateMsg::Broadcasting { new_value, broadcast_to, diff --git a/network-monitor/src/mock_data.ts b/network-monitor/src/mock_data.ts index 6319ff84f..f94a20fd5 100644 --- a/network-monitor/src/mock_data.ts +++ b/network-monitor/src/mock_data.ts @@ -84,7 +84,7 @@ const mock_peers_in_tx: TxPeersTableInterface = { peer_id: "0xdef", location: "0.16234", last_state: OpState.Finished, - last_message: MessageType.SuccessfulUpdate, + last_message: MessageType.Broadcasting, started: "18-01-2024", finalized: "19-01-2024", }, @@ -116,7 +116,7 @@ const mock_peers_in_tx: TxPeersTableInterface = { peer_id: "0xabc", location: "0.16234", last_state: OpState.Finished, - last_message: MessageType.SuccessfulUpdate, + last_message: MessageType.Broadcasting, started: "18-01-2024", finalized: "19-01-2024", }, @@ -150,7 +150,7 @@ const mock_peers_in_tx: TxPeersTableInterface = { peer_id: "0xabc", location: "0.16234", last_state: OpState.Finished, - last_message: MessageType.SuccessfulUpdate, + last_message: MessageType.Broadcasting, started: "18-01-2024", finalized: "19-01-2024", }, @@ -174,7 +174,7 @@ const mock_peers_in_tx: TxPeersTableInterface = { peer_id: "0xdef", location: "0.1789234", last_state: OpState.Finished, - last_message: MessageType.SuccessfulUpdate, + last_message: MessageType.Broadcasting, started: "12-01-2024", finalized: "13-01-2024", }, @@ -182,7 +182,7 @@ const mock_peers_in_tx: TxPeersTableInterface = { peer_id: "0xabc", location: "0.16234", last_state: OpState.Finished, - last_message: MessageType.SuccessfulUpdate, + last_message: MessageType.Broadcasting, started: "18-01-2024", finalized: "19-01-2024", }, diff --git a/network-monitor/src/type_definitions.ts b/network-monitor/src/type_definitions.ts index 69a4cc789..96081c013 100644 --- a/network-monitor/src/type_definitions.ts +++ b/network-monitor/src/type_definitions.ts @@ -84,7 +84,6 @@ export enum MessageType { SeekNode = "SeekNode", BroadcastTo = "BroadcastTo", Broadcasting = "Broadcasting", - SuccessfulUpdate = "SuccessfulUpdate", UpdateForward = "UpdateForward", } From 994b918ae0e80380791eb0bbcbdbaddd54bc51ee Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 31 Oct 2025 03:08:54 +0100 Subject: [PATCH 3/4] refactor: tighten update state handling post-review --- crates/core/src/operations/mod.rs | 9 +++ crates/core/src/operations/update.rs | 84 +++++++++++++++++----------- 2 files changed, 60 insertions(+), 33 deletions(-) diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index c8e1a6aec..f85033e5d 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -136,6 +136,15 @@ where op_manager.push(id, updated_state).await?; } else { tracing::debug!(%id, "queueing op state for local processing"); + debug_assert!( + matches!( + msg, + NetMessage::V1(NetMessageV1::Update( + crate::operations::update::UpdateMsg::Broadcasting { .. } + )) + ), + "Only Update::Broadcasting messages should be re-queued locally" + ); op_manager.notify_op_change(msg, updated_state).await?; return Err(OpError::StatePushed); } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 14e455d12..0e4ceb65a 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -314,10 +314,7 @@ impl Operation for UpdateOp { key: *key, related_contracts: related_contracts.clone(), }); - new_state = Some(UpdateState::AwaitingResponse { - _key: *key, - _upstream: upstream, - }); + new_state = None; } else { // No peers available and we don't have the contract - error tracing::error!( @@ -448,10 +445,7 @@ impl Operation for UpdateOp { key: *key, related_contracts: related_contracts.clone(), }); - new_state = Some(UpdateState::AwaitingResponse { - _key: *key, - _upstream: Some(sender.clone()), - }); + new_state = None; } else { // No more peers to try - error tracing::error!( @@ -470,11 +464,6 @@ impl Operation for UpdateOp { sender, target, } => { - if let Some(UpdateState::AwaitingResponse { .. }) = self.state { - tracing::debug!("Trying to broadcast to a peer that was the initiator of the op because it received the client request, or is in the middle of a seek node process"); - return Err(OpError::StatePushed); - } - tracing::debug!("Attempting contract value update - BroadcastTo - update"); let UpdateExecution { value: updated_value, @@ -738,7 +727,7 @@ async fn update_contract( } }; - let update_data = UpdateData::State(State::from(state)); + let update_data = UpdateData::State(State::from(state.clone())); match op_manager .notify_contract_handler(ContractHandlerEvent::UpdateQuery { key, @@ -773,21 +762,51 @@ async fn update_contract( Err(OpError::UnexpectedOpState) } Ok(ContractHandlerEvent::UpdateNoChange { .. }) => { - if let Some(prev_state) = previous_state { - let prev_bytes = State::from(prev_state.clone()).into_bytes(); - let summary = StateSummary::from(prev_bytes.clone()); - Ok(UpdateExecution { - value: prev_state, - summary, - changed: false, - }) - } else { - tracing::warn!( - %key, - "Contract reported UpdateNoChange but no previous state was available" - ); - Err(OpError::UnexpectedOpState) - } + let resolved_state = match previous_state { + Some(prev_state) => prev_state, + None => { + match op_manager + .notify_contract_handler(ContractHandlerEvent::GetQuery { + key, + return_contract_code: false, + }) + .await + { + Ok(ContractHandlerEvent::GetResponse { + response: + Ok(StoreResponse { + state: Some(current), + .. + }), + .. + }) => current, + Ok(other) => { + tracing::debug!( + ?other, + %key, + "Fallback fetch for UpdateNoChange returned no state; using requested state" + ); + state.clone() + } + Err(err) => { + tracing::debug!( + %key, + %err, + "Fallback fetch for UpdateNoChange failed; using requested state" + ); + state.clone() + } + } + } + }; + + let bytes = State::from(resolved_state.clone()).into_bytes(); + let summary = StateSummary::from(bytes); + Ok(UpdateExecution { + value: resolved_state, + summary, + changed: false, + }) } Err(err) => Err(err.into()), Ok(other) => { @@ -1074,6 +1093,9 @@ async fn deliver_update_result( key: ContractKey, summary: StateSummary<'static>, ) -> Result<(), OpError> { + // NOTE: UPDATE is modeled as fire-and-forget: once the merge succeeds on the + // seed/subscriber peer we surface success to the host immediately and allow + // the broadcast fan-out to proceed asynchronously. let op = UpdateOp { id, state: Some(UpdateState::Finished { @@ -1235,10 +1257,6 @@ mod messages { #[derive(Debug)] pub enum UpdateState { ReceivedRequest, - AwaitingResponse { - _key: ContractKey, - _upstream: Option, - }, Finished { key: ContractKey, summary: StateSummary<'static>, From 5d54fc12648101496be5304e4ff465d99a2a5eab Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 31 Oct 2025 14:53:01 +0100 Subject: [PATCH 4/4] refactor: clarify update logging and byte-equality note --- crates/core/src/operations/put.rs | 9 +++++---- crates/core/src/operations/update.rs | 4 ++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 6bbdb7567..09c36c629 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -478,12 +478,13 @@ impl Operation for PutOp { let mut broadcasted_to = *broadcasted_to; if upstream.peer == sender.peer { - // Originator reached the subscription tree. We can - // complete the local operation immediately. - tracing::trace!( + // Originator reached the subscription tree. This path should be filtered + // out by the deduplication layer, so treat it as a warning if it happens + // to help surface potential bugs. + tracing::warn!( tx = %id, %key, - "PUT originator reached subscription tree; completing locally" + "PUT originator re-entered broadcast loop; dedup should have completed" ); new_state = Some(PutState::Finished { key: *key }); } else { diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 0e4ceb65a..4b21ccc72 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -749,6 +749,10 @@ async fn update_contract( None => true, }; + // NOTE: change detection currently relies on byte-level equality. Contracts that + // produce semantically identical states with different encodings must normalize their + // serialization (e.g., sort map keys) to avoid redundant broadcasts. + Ok(UpdateExecution { value: new_val, summary,