From 250a35c2d8255098ee46eba036a941476e1930bd Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Mon, 1 Sep 2025 15:11:42 +0300 Subject: [PATCH 1/7] Add node option to ignore checkpoints. Log peer id in p2p via a tracing span instead of doing it explicitly. Propagate current tracing span when calling subsystems, so that peer id is logged by chainstate functions as well. --- blockprod/src/lib.rs | 1 + chainstate/src/config.rs | 11 +++ chainstate/src/detail/ban_score.rs | 3 +- chainstate/src/detail/chainstateref/mod.rs | 59 ++++++++----- chainstate/src/detail/error.rs | 10 ++- chainstate/src/detail/error_classification.rs | 3 +- .../chainstate_interface_impl_delegation.rs | 1 + .../test-suite/src/tests/syncing_tests.rs | 25 +++--- node-lib/src/config_files/chainstate/mod.rs | 9 ++ node-lib/src/config_files/mod.rs | 4 + node-lib/src/options.rs | 4 + node-lib/tests/cli.rs | 53 ++++++----- p2p/src/peer_manager/mod.rs | 38 ++++---- p2p/src/sync/peer/block_manager.rs | 87 ++++++------------- p2p/src/sync/peer/transaction_manager.rs | 33 ++----- subsystem/src/calls/handle.rs | 19 +++- 16 files changed, 190 insertions(+), 170 deletions(-) diff --git a/blockprod/src/lib.rs b/blockprod/src/lib.rs index 522040194..958196ee9 100644 --- a/blockprod/src/lib.rs +++ b/blockprod/src/lib.rs @@ -274,6 +274,7 @@ mod tests { max_db_commit_attempts: Default::default(), max_orphan_blocks: Default::default(), min_max_bootstrap_import_buffer_sizes: Default::default(), + allow_checkpoints_mismatch: Default::default(), }; let mempool_config = MempoolConfig::new(); diff --git a/chainstate/src/config.rs b/chainstate/src/config.rs index 5412603af..a68e091a5 100644 --- a/chainstate/src/config.rs +++ b/chainstate/src/config.rs @@ -38,17 +38,24 @@ make_config_setting!(MaxTipAge, Duration, Duration::from_secs(60 * 60 * 24)); pub struct ChainstateConfig { /// The number of maximum attempts to process a block. pub max_db_commit_attempts: MaxDbCommitAttempts, + /// The maximum capacity of the orphan blocks pool. pub max_orphan_blocks: MaxOrphanBlocks, + /// When importing bootstrap file, this controls the buffer sizes (min, max) /// (see bootstrap import function for more information) pub min_max_bootstrap_import_buffer_sizes: MinMaxBootstrapImportBufferSizes, + /// The initial block download is finished if the difference between the current time and the /// tip time is less than this value. pub max_tip_age: MaxTipAge, + /// If true, additional computationally-expensive consistency checks will be performed by /// the chainstate. The default value depends on the chain type. pub enable_heavy_checks: Option, + + /// If true, blocks and block headers will not be rejected if checkpoints mismatch is detected. + pub allow_checkpoints_mismatch: Option, } impl ChainstateConfig { @@ -90,4 +97,8 @@ impl ChainstateConfig { ChainType::Regtest => true, } } + + pub fn checkpoints_mismatch_allowed(&self) -> bool { + self.allow_checkpoints_mismatch.unwrap_or(false) + } } diff --git a/chainstate/src/detail/ban_score.rs b/chainstate/src/detail/ban_score.rs index 4e70c75a9..4e5659eac 100644 --- a/chainstate/src/detail/ban_score.rs +++ b/chainstate/src/detail/ban_score.rs @@ -317,8 +317,7 @@ impl BanScore for CheckBlockError { CheckBlockError::MerkleRootCalculationFailed(_, _) => 100, CheckBlockError::BlockRewardMaturityError(err) => err.ban_score(), CheckBlockError::PropertyQueryError(_) => 100, - CheckBlockError::CheckpointMismatch(_, _) => 100, - CheckBlockError::ParentCheckpointMismatch(_, _, _) => 100, + CheckBlockError::CheckpointMismatch { .. } => 100, CheckBlockError::GetAncestorError(_) => 100, CheckBlockError::AttemptedToAddBlockBeforeReorgLimit(_, _, _) => 100, CheckBlockError::EpochSealError(err) => err.ban_score(), diff --git a/chainstate/src/detail/chainstateref/mod.rs b/chainstate/src/detail/chainstateref/mod.rs index 63aa5ba39..1890a0bf9 100644 --- a/chainstate/src/detail/chainstateref/mod.rs +++ b/chainstate/src/detail/chainstateref/mod.rs @@ -76,7 +76,7 @@ pub use in_memory_reorg::InMemoryReorgError; pub struct ChainstateRef<'a, S, V> { chain_config: &'a ChainConfig, - _chainstate_config: &'a ChainstateConfig, + chainstate_config: &'a ChainstateConfig, tx_verification_strategy: &'a V, db_tx: S, time_getter: &'a TimeGetter, @@ -141,7 +141,7 @@ impl<'a, S: BlockchainStorageRead, V: TransactionVerificationStrategy> Chainstat ) -> Self { ChainstateRef { chain_config, - _chainstate_config: chainstate_config, + chainstate_config, db_tx, tx_verification_strategy, time_getter, @@ -157,7 +157,7 @@ impl<'a, S: BlockchainStorageRead, V: TransactionVerificationStrategy> Chainstat ) -> Self { ChainstateRef { chain_config, - _chainstate_config: chainstate_config, + chainstate_config, db_tx, tx_verification_strategy, time_getter, @@ -457,6 +457,34 @@ impl<'a, S: BlockchainStorageRead, V: TransactionVerificationStrategy> Chainstat Ok(result) } + fn enforce_checkpoint_impl( + &self, + height: BlockHeight, + expected: &Id, + given: &Id, + ) -> Result<(), CheckBlockError> { + if given != expected { + // Note: we only log the mismatch if we're going to ignore it (because if it's + // not ignored, we'll log the error anyway). + if self.chainstate_config.checkpoints_mismatch_allowed() { + log::warn!( + "Checkpoint mismatch at height {}, expected: {:x}, actual: {:x}", + height, + expected, + given, + ); + } else { + return Err(CheckBlockError::CheckpointMismatch { + height, + expected: *expected, + given: *given, + }); + } + } + + Ok(()) + } + // If the header height is at an exact checkpoint height, check that the block id matches the checkpoint id. // Return true if the header height is at an exact checkpoint height. fn enforce_exact_checkpoint_assuming_height( @@ -464,15 +492,10 @@ impl<'a, S: BlockchainStorageRead, V: TransactionVerificationStrategy> Chainstat header: &SignedBlockHeader, header_height: BlockHeight, ) -> Result { - if let Some(e) = self.chain_config.height_checkpoints().checkpoint_at_height(&header_height) + if let Some(expected_id) = + self.chain_config.height_checkpoints().checkpoint_at_height(&header_height) { - let expected_id = Id::::new(e.to_hash()); - if expected_id != header.get_id() { - return Err(CheckBlockError::CheckpointMismatch( - expected_id, - header.get_id(), - )); - } + self.enforce_checkpoint_impl(header_height, expected_id, &header.get_id().into())?; Ok(true) } else { Ok(false) @@ -499,17 +522,13 @@ impl<'a, S: BlockchainStorageRead, V: TransactionVerificationStrategy> Chainstat let parent_checkpoint_block_index = self.get_ancestor(&prev_block_index, expected_checkpoint_height)?; - let parent_checkpoint_id = parent_checkpoint_block_index.block_id(); - if parent_checkpoint_id != expected_checkpoint_id { - return Err(CheckBlockError::ParentCheckpointMismatch( - expected_checkpoint_height, - expected_checkpoint_id, - parent_checkpoint_id, - )); - } - + self.enforce_checkpoint_impl( + expected_checkpoint_height, + &expected_checkpoint_id, + &parent_checkpoint_id, + )?; Ok(()) } diff --git a/chainstate/src/detail/error.rs b/chainstate/src/detail/error.rs index f9523b3fc..a409f8e97 100644 --- a/chainstate/src/detail/error.rs +++ b/chainstate/src/detail/error.rs @@ -159,10 +159,12 @@ pub enum CheckBlockError { InvalidBlockRewardOutputType(Id), #[error("Block reward maturity error: {0}")] BlockRewardMaturityError(#[from] tx_verifier::timelock_check::OutputMaturityError), - #[error("Checkpoint mismatch: expected {0} vs given {1}")] - CheckpointMismatch(Id, Id), - #[error("Parent checkpoint mismatch at height {0}: expected {1} vs given {2}")] - ParentCheckpointMismatch(BlockHeight, Id, Id), + #[error("Checkpoint mismatch at height {height}: expected {expected:x}, given {given:x}")] + CheckpointMismatch { + height: BlockHeight, + expected: Id, + given: Id, + }, #[error("CRITICAL: Failed to retrieve ancestor of submitted block: {0}")] GetAncestorError(#[from] GetAncestorError), #[error("Attempted to add a block before reorg limit (attempted at height: {0} while current height is: {1} and min allowed is: {2})")] diff --git a/chainstate/src/detail/error_classification.rs b/chainstate/src/detail/error_classification.rs index ad4326a84..0641f7387 100644 --- a/chainstate/src/detail/error_classification.rs +++ b/chainstate/src/detail/error_classification.rs @@ -169,8 +169,7 @@ impl BlockProcessingErrorClassification for CheckBlockError { | CheckBlockError::ParentBlockMissing { .. } | CheckBlockError::BlockTimeOrderInvalid(_, _) | CheckBlockError::InvalidBlockRewardOutputType(_) - | CheckBlockError::CheckpointMismatch(_, _) - | CheckBlockError::ParentCheckpointMismatch(_, _, _) + | CheckBlockError::CheckpointMismatch { .. } | CheckBlockError::AttemptedToAddBlockBeforeReorgLimit(_, _, _) | CheckBlockError::InvalidParent { .. } => BlockProcessingErrorClass::BadBlock, diff --git a/chainstate/src/interface/chainstate_interface_impl_delegation.rs b/chainstate/src/interface/chainstate_interface_impl_delegation.rs index 3ad9f8e14..b5742ec68 100644 --- a/chainstate/src/interface/chainstate_interface_impl_delegation.rs +++ b/chainstate/src/interface/chainstate_interface_impl_delegation.rs @@ -480,6 +480,7 @@ mod tests { min_max_bootstrap_import_buffer_sizes: Default::default(), max_tip_age: Default::default(), enable_heavy_checks: Some(true), + allow_checkpoints_mismatch: Default::default(), }; let chainstate_storage = Store::new_empty().unwrap(); diff --git a/chainstate/test-suite/src/tests/syncing_tests.rs b/chainstate/test-suite/src/tests/syncing_tests.rs index 0431e32c9..66f08fd22 100644 --- a/chainstate/test-suite/src/tests/syncing_tests.rs +++ b/chainstate/test-suite/src/tests/syncing_tests.rs @@ -640,6 +640,7 @@ fn initial_block_download(#[case] seed: Seed) { min_max_bootstrap_import_buffer_sizes: Default::default(), max_tip_age: Duration::from_secs(1).into(), enable_heavy_checks: Some(true), + allow_checkpoints_mismatch: Default::default(), }) .with_initial_time_since_genesis(2) .build(); @@ -798,8 +799,9 @@ fn headers_check_with_checkpoints(#[case] seed: Seed) { .enumerate() .map(|(idx, header)| (BlockHeight::new(idx as u64 + 2), header.block_id().into())) .collect::>(); + let bad_checkpoint_height = BlockHeight::new(5); let good_block_id = Id::new(Uint256::from_u64(12345).into()); - let bad_block_id = checkpoints.insert(BlockHeight::new(5), good_block_id).unwrap(); + let bad_block_id = checkpoints.insert(bad_checkpoint_height, good_block_id).unwrap(); let mut tf = TestFramework::builder(&mut rng) .with_chain_config( @@ -814,10 +816,11 @@ fn headers_check_with_checkpoints(#[case] seed: Seed) { assert_eq!( err, ChainstateError::ProcessBlockError(chainstate::BlockError::CheckBlockFailed( - CheckBlockError::CheckpointMismatch( - tf.to_chain_block_id(&good_block_id), - tf.to_chain_block_id(&bad_block_id) - ) + CheckBlockError::CheckpointMismatch { + height: bad_checkpoint_height, + expected: good_block_id, + given: bad_block_id + } )) ); } @@ -826,9 +829,10 @@ fn headers_check_with_checkpoints(#[case] seed: Seed) { { let good_block_id = Id::new(Uint256::from_u64(12345).into()); let bad_block_id = block_headers[5].block_id().into(); + let bad_checkpoint_height = BlockHeight::new(7); let checkpoints = [ (BlockHeight::new(3), block_headers[1].block_id().into()), - (BlockHeight::new(7), good_block_id), + (bad_checkpoint_height, good_block_id), ] .into_iter() .collect::>(); @@ -846,10 +850,11 @@ fn headers_check_with_checkpoints(#[case] seed: Seed) { assert_eq!( err, ChainstateError::ProcessBlockError(chainstate::BlockError::CheckBlockFailed( - CheckBlockError::CheckpointMismatch( - tf.to_chain_block_id(&good_block_id), - tf.to_chain_block_id(&bad_block_id) - ) + CheckBlockError::CheckpointMismatch { + height: bad_checkpoint_height, + expected: good_block_id, + given: bad_block_id + } )) ); } diff --git a/node-lib/src/config_files/chainstate/mod.rs b/node-lib/src/config_files/chainstate/mod.rs index d5809e2f4..f40b7552d 100644 --- a/node-lib/src/config_files/chainstate/mod.rs +++ b/node-lib/src/config_files/chainstate/mod.rs @@ -24,18 +24,25 @@ use chainstate::ChainstateConfig; pub struct ChainstateConfigFile { /// The number of maximum attempts to process a block. pub max_db_commit_attempts: Option, + /// The maximum capacity of the orphan blocks pool. pub max_orphan_blocks: Option, + /// When importing bootstrap file, this controls the buffer sizes (min, max) /// (see bootstrap import function for more information) pub min_max_bootstrap_import_buffer_sizes: Option<(usize, usize)>, + /// A maximum tip age in seconds. /// /// The initial block download is finished if the difference between the current time and the /// tip time is less than this value. pub max_tip_age: Option, + /// If true, additional computationally-expensive consistency checks will be performed by the chainstate. pub enable_heavy_checks: Option, + + /// If true, blocks and block headers will not be rejected if checkpoints mismatch is detected. + pub allow_checkpoints_mismatch: Option, } impl From for ChainstateConfig { @@ -46,6 +53,7 @@ impl From for ChainstateConfig { min_max_bootstrap_import_buffer_sizes, max_tip_age, enable_heavy_checks, + allow_checkpoints_mismatch, } = config_file; ChainstateConfig { @@ -54,6 +62,7 @@ impl From for ChainstateConfig { min_max_bootstrap_import_buffer_sizes: min_max_bootstrap_import_buffer_sizes.into(), max_tip_age: max_tip_age.map(Duration::from_secs).into(), enable_heavy_checks, + allow_checkpoints_mismatch, } } } diff --git a/node-lib/src/config_files/mod.rs b/node-lib/src/config_files/mod.rs index 70228abb8..3cff99202 100644 --- a/node-lib/src/config_files/mod.rs +++ b/node-lib/src/config_files/mod.rs @@ -152,6 +152,7 @@ fn chainstate_config( min_max_bootstrap_import_buffer_sizes, max_tip_age, enable_heavy_checks, + allow_checkpoints_mismatch, } = chainstate_config; let storage_backend = options.storage_backend.clone().unwrap_or(storage_backend); @@ -159,6 +160,8 @@ fn chainstate_config( let max_orphan_blocks = options.max_orphan_blocks.or(max_orphan_blocks); let max_tip_age = options.max_tip_age.or(max_tip_age); let enable_heavy_checks = options.enable_chainstate_heavy_checks.or(enable_heavy_checks); + let allow_checkpoints_mismatch = + options.allow_checkpoints_mismatch.or(allow_checkpoints_mismatch); let chainstate_config = ChainstateConfigFile { max_db_commit_attempts, @@ -166,6 +169,7 @@ fn chainstate_config( min_max_bootstrap_import_buffer_sizes, max_tip_age, enable_heavy_checks, + allow_checkpoints_mismatch, }; ChainstateLauncherConfigFile { storage_backend, diff --git a/node-lib/src/options.rs b/node-lib/src/options.rs index 2a71c8865..9668a6f85 100644 --- a/node-lib/src/options.rs +++ b/node-lib/src/options.rs @@ -343,6 +343,10 @@ pub struct RunOptions { /// Defaults to true for regtest and false in other cases. #[clap(long, value_name = "VAL")] pub enable_chainstate_heavy_checks: Option, + + /// If true, blocks and block headers will not be rejected if checkpoints mismatch is detected. + #[clap(long, action = clap::ArgAction::SetTrue, hide = true)] + pub allow_checkpoints_mismatch: Option, } pub fn default_data_dir(chain_type: ChainType) -> PathBuf { diff --git a/node-lib/tests/cli.rs b/node-lib/tests/cli.rs index 0ed1303aa..d2de5c64e 100644 --- a/node-lib/tests/cli.rs +++ b/node-lib/tests/cli.rs @@ -123,6 +123,7 @@ fn read_config_override_values() { let rpc_cookie_file = "cookie_file"; let min_tx_relay_fee_rate = 321; let enable_chainstate_heavy_checks = true; + let allow_checkpoints_mismatch = true; let options = RunOptions { blockprod_min_peers_to_produce_blocks: Some(blockprod_min_peers_to_produce_blocks), @@ -161,34 +162,35 @@ fn read_config_override_values() { min_tx_relay_fee_rate: Some(min_tx_relay_fee_rate), force_allow_run_as_root_outer: Default::default(), enable_chainstate_heavy_checks: Some(enable_chainstate_heavy_checks), + allow_checkpoints_mismatch: Some(allow_checkpoints_mismatch), }; let config = NodeConfigFile::read(&chain_config, &config_path, &options).unwrap(); assert_eq!( - config.blockprod.clone().unwrap().min_peers_to_produce_blocks, + config.blockprod.as_ref().unwrap().min_peers_to_produce_blocks, Some(blockprod_min_peers_to_produce_blocks), ); assert_eq!( - config.blockprod.clone().unwrap().skip_ibd_check, + config.blockprod.as_ref().unwrap().skip_ibd_check, Some(blockprod_skip_ibd_check) ); assert_eq!( - config.blockprod.clone().unwrap().use_current_time_if_non_pos, + config.blockprod.as_ref().unwrap().use_current_time_if_non_pos, Some(blockprod_use_current_time_if_non_pos) ); assert_eq!( - config.chainstate.clone().unwrap().chainstate_config.max_db_commit_attempts, + config.chainstate.as_ref().unwrap().chainstate_config.max_db_commit_attempts, Some(max_db_commit_attempts) ); assert_eq!( - config.chainstate.clone().unwrap().chainstate_config.max_orphan_blocks, + config.chainstate.as_ref().unwrap().chainstate_config.max_orphan_blocks, Some(max_orphan_blocks) ); assert_eq!( - config.chainstate.clone().unwrap().chainstate_config.max_tip_age, + config.chainstate.as_ref().unwrap().chainstate_config.max_tip_age, Some(max_tip_age) ); @@ -198,69 +200,74 @@ fn read_config_override_values() { ); assert_eq!( - config.chainstate.clone().unwrap().chainstate_config.enable_heavy_checks, + config.chainstate.as_ref().unwrap().chainstate_config.enable_heavy_checks, Some(enable_chainstate_heavy_checks) ); assert_eq!( - config.p2p.clone().unwrap().networking_enabled, + config.chainstate.as_ref().unwrap().chainstate_config.allow_checkpoints_mismatch, + Some(allow_checkpoints_mismatch) + ); + + assert_eq!( + config.p2p.as_ref().unwrap().networking_enabled, Some(p2p_networking_enabled) ); assert_eq!( - config.p2p.clone().unwrap().bind_addresses, + config.p2p.as_ref().unwrap().bind_addresses, Some(vec!(p2p_bind_addr)) ); assert_eq!( - config.p2p.clone().unwrap().socks5_proxy, + config.p2p.as_ref().unwrap().socks5_proxy, Some(p2p_socks5_proxy.to_owned()) ); assert_eq!( - config.p2p.clone().unwrap().disable_noise, + config.p2p.as_ref().unwrap().disable_noise, Some(p2p_disable_noise) ); assert_eq!( - config.p2p.clone().unwrap().boot_nodes, + config.p2p.as_ref().unwrap().boot_nodes, Some(vec!(p2p_boot_node)) ); assert_eq!( - config.p2p.clone().unwrap().reserved_nodes, + config.p2p.as_ref().unwrap().reserved_nodes, Some(vec!(p2p_reserved_node)) ); assert_eq!( - config.p2p.clone().unwrap().max_inbound_connections, + config.p2p.as_ref().unwrap().max_inbound_connections, Some(p2p_max_inbound_connections) ); assert_eq!( - config.p2p.clone().unwrap().discouragement_threshold, + config.p2p.as_ref().unwrap().discouragement_threshold, Some(p2p_discouragement_threshold) ); assert_eq!( - config.p2p.clone().unwrap().discouragement_duration, + config.p2p.as_ref().unwrap().discouragement_duration, Some(p2p_discouragement_duration) ); assert_eq!( - config.p2p.clone().unwrap().outbound_connection_timeout, + config.p2p.as_ref().unwrap().outbound_connection_timeout, Some(p2p_timeout) ); assert_eq!( - config.p2p.clone().unwrap().ping_check_period, + config.p2p.as_ref().unwrap().ping_check_period, Some(p2p_ping_check_period) ); assert_eq!( - config.p2p.clone().unwrap().ping_timeout, + config.p2p.as_ref().unwrap().ping_timeout, Some(p2p_ping_timeout) ); assert_eq!( - config.p2p.clone().unwrap().sync_stalling_timeout, + config.p2p.as_ref().unwrap().sync_stalling_timeout, Some(p2p_sync_stalling_timeout) ); assert_eq!( - config.p2p.clone().unwrap().max_clock_diff, + config.p2p.as_ref().unwrap().max_clock_diff, Some(p2p_max_clock_diff) ); - assert_eq!(config.p2p.clone().unwrap().node_type, Some(node_type)); + assert_eq!(config.p2p.as_ref().unwrap().node_type, Some(node_type)); assert_eq!( - config.p2p.clone().unwrap().force_dns_query_if_no_global_addresses_known, + config.p2p.as_ref().unwrap().force_dns_query_if_no_global_addresses_known, Some(p2p_force_dns_query_if_no_global_addresses_known) ); diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index 13409e2e4..61e49d313 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -678,7 +678,7 @@ where assert!(old_value.is_none()); } Err(e) => { - log::debug!("outbound connection to {address:?} failed: {e}"); + log::debug!("Outbound connection to {address:?} failed: {e}"); match outbound_connect_type { OutboundConnectType::Automatic { block_relay_only: _, @@ -728,7 +728,7 @@ where reason: Option, response_sender: Option>>, ) { - log::debug!("disconnect peer {peer_id}"); + log::debug!("Disconnect peer {peer_id}"); let res = self.try_disconnect(peer_id, reason); match res { @@ -743,7 +743,7 @@ where assert!(old_value.is_none()); } Err(e) => { - log::debug!("disconnecting new peer {peer_id} failed: {e}"); + log::warn!("Disconnecting peer {peer_id} failed: {e}"); if let Some(response_sender) = response_sender { response_sender.send(Err(e)); } @@ -883,7 +883,7 @@ where &self.p2p_config.peer_manager_config, &mut make_pseudo_rng(), ) { - log::info!("inbound peer {peer_id} is selected for eviction"); + log::info!("Inbound peer {peer_id} is selected for eviction"); self.disconnect( peer_id, PeerDisconnectionDbAction::Keep, @@ -904,7 +904,7 @@ where self.time_getter.get_time(), &mut make_pseudo_rng(), ) { - log::info!("block relay peer {peer_id} is selected for eviction"); + log::info!("Block relay peer {peer_id} is selected for eviction"); self.disconnect( peer_id, PeerDisconnectionDbAction::Keep, @@ -922,7 +922,7 @@ where self.time_getter.get_time(), &mut make_pseudo_rng(), ) { - log::info!("full relay peer {peer_id} is selected for eviction"); + log::info!("Full relay peer {peer_id} is selected for eviction"); self.disconnect( peer_id, PeerDisconnectionDbAction::Keep, @@ -976,7 +976,7 @@ where self.peer_connectivity_handle.accept(peer_id)?; log::info!( - "new peer accepted, peer_id: {peer_id}, address: {peer_address:?}, role: {peer_role:?}, protocol_version: {:?}", + "New peer accepted, peer_id: {peer_id}, address: {peer_address:?}, role: {peer_role:?}, protocol_version: {:?}", info.protocol_version ); @@ -1108,7 +1108,7 @@ where ); if let Err(accept_err) = &accept_res { - log::debug!("connection rejected for peer {peer_id}: {accept_err}"); + log::debug!("Connection rejected for peer {peer_id}: {accept_err}"); let disconnection_reason = DisconnectionReason::from_error(accept_err); @@ -1125,7 +1125,7 @@ where let disconnect_result = self.peer_connectivity_handle.disconnect(peer_id, disconnection_reason); if let Err(err) = disconnect_result { - log::error!("disconnect failed unexpectedly: {err:?}"); + log::error!("Disconnect failed unexpectedly: {err:?}"); } if peer_role.is_outbound() { @@ -1181,7 +1181,7 @@ where // The peer will not be in `peers` for rejected connections if let Some(peer) = self.peers.remove(&peer_id) { log::info!( - "peer disconnected, peer_id: {}, address: {:?}", + "Peer disconnected, peer_id: {}, address: {:?}", peer.info.peer_id, peer.peer_address ); @@ -1464,7 +1464,7 @@ where .get_mut(&peer_id) .expect("peer sending AnnounceAddrRequest must be known"); if !peer.address_rate_limiter.accept(self.time_getter.get_time()) { - log::debug!("address announcement is rate limited from peer {peer_id}"); + log::debug!("Address announcement is rate limited from peer {peer_id}"); return; } @@ -1602,14 +1602,14 @@ where peer.ping_min = Some(ping_time_min); } else { log::debug!( - "wrong nonce in ping response from peer {}, received: {}, expected: {}", + "Wrong nonce in ping response from peer {}, received: {}, expected: {}", peer_id, nonce, sent_ping.nonce, ); } } else { - log::debug!("unexpected ping response received from peer {}", peer_id); + log::debug!("Unexpected ping response received from peer {}", peer_id); } } } @@ -1648,17 +1648,17 @@ where } PeerManagerEvent::NewTipReceived { peer_id, block_id } => { if let Some(peer) = self.peers.get_mut(&peer_id) { - log::debug!("new tip {block_id} received from peer {peer_id}"); + log::debug!("New tip {block_id} received from peer {peer_id}"); peer.last_tip_block_time = Some(self.time_getter.get_time()); } } PeerManagerEvent::NewChainstateTip(block_id) => { - log::debug!("new tip {block_id} added to chainstate"); + log::debug!("New tip {block_id} added to chainstate"); self.last_chainstate_tip_block_time = Some(self.time_getter.get_time()); } PeerManagerEvent::NewValidTransactionReceived { peer_id, txid } => { if let Some(peer) = self.peers.get_mut(&peer_id) { - log::debug!("new transaction {txid} received from peer {peer_id}"); + log::debug!("New transaction {txid} received from peer {peer_id}"); peer.last_tx_time = Some(self.time_getter.get_time()); } } @@ -1963,10 +1963,10 @@ where let timeout_time = (sent_ping.timestamp + *self.p2p_config.ping_timeout) .expect("Both times are local, so this can't happen"); if now >= timeout_time { - log::info!("ping check: dead peer detected: {peer_id}"); + log::info!("Ping check: dead peer detected: {peer_id}"); dead_peers.push(*peer_id); } else { - log::debug!("ping check: slow peer detected: {peer_id}"); + log::debug!("Ping check: slow peer detected: {peer_id}"); } } None => { @@ -2153,7 +2153,7 @@ where // Skip heartbeat to give the stored anchor peers more time to connect to prevent churn! // The stored anchor peers should be the first connected block relay peers. for anchor_address in anchor_peers { - log::debug!("try to connect to anchor peer {anchor_address}"); + log::debug!("Try to connect to anchor peer {anchor_address}"); // The first peers should become anchor peers self.connect( anchor_address, diff --git a/p2p/src/sync/peer/block_manager.rs b/p2p/src/sync/peer/block_manager.rs index eb94b2d8c..6c2717a00 100644 --- a/p2p/src/sync/peer/block_manager.rs +++ b/p2p/src/sync/peer/block_manager.rs @@ -154,6 +154,7 @@ where *self.id } + #[tracing::instrument(skip_all, name = "", fields(peer_id = %self.id()))] pub async fn run(&mut self) { match self.main_loop().await { // The unexpected "channel closed" error will be handled by the sync manager. @@ -167,7 +168,7 @@ where let last_sync_status = self.get_sync_status(); if self.common_services.has_service(Service::Blocks) { - log::debug!("[peer id = {}] Asking for headers initially", self.id()); + log::debug!("Asking for headers initially"); self.request_headers().await?; } @@ -243,11 +244,10 @@ where log::debug!( concat!( - "[peer id = {}] In handle_new_tip: have_sent_all_headers = {}, ", + "In handle_new_tip: have_sent_all_headers = {}, ", "best_sent_block_header = {:?}, best_sent_block = {:?}, ", "peers_best_block_that_we_have = {:?}" ), - self.id(), self.have_sent_all_headers, self.outgoing.best_sent_block_header, best_sent_block_id, @@ -289,8 +289,7 @@ where if headers.is_empty() { log::debug!( - "[peer id = {}] Got new tip event with block id {}, but there is nothing to send", - self.id(), + "Got new tip event with block id {}, but there is nothing to send", new_tip_id, ); } else if best_block_id != new_tip_id { @@ -298,27 +297,19 @@ where // so we may ignore this one (and it makes sense to ignore it to avoid sending // the same header list multiple times). log::debug!( - "[peer id = {}] Got new tip event with block id {}, but the tip has changed since then to {}", - self.id(), + "Got new tip event with block id {}, but the tip has changed since then to {}", new_tip_id, best_block_id ); } else { - log::debug!( - "[peer id = {}] Sending header list of length {}", - self.id(), - headers.len() - ); + log::debug!("Sending header list of length {}", headers.len()); return self.send_headers(HeaderList::new(headers)); } } else { // Note: if we got here, then we haven't received a single header request or // response from the peer yet (otherwise peers_best_block_that_we_have would be // set at least to the genesis). There is no point in doing anything specific here. - log::warn!( - "[peer id = {}] Ignoring new tip event, because we don't know what to send", - self.id() - ); + log::warn!("Ignoring new tip event, because we don't know what to send"); } } @@ -326,10 +317,7 @@ where } async fn handle_local_event(&mut self, event: LocalEvent) -> Result<()> { - log::debug!( - "[peer id = {}] Handling local peer mgr event: {event:?}", - self.id() - ); + log::debug!("Handling local peer mgr event: {event:?}"); match event { LocalEvent::ChainstateNewTip(new_tip_id) => self.handle_new_tip(&new_tip_id).await, @@ -341,14 +329,13 @@ where let locator = self.chainstate_handle.call(|this| Ok(this.get_locator()?)).await?; if locator.len() > *self.p2p_config.protocol_config.msg_max_locator_count { log::warn!( - "[peer id = {}] Sending locator of the length {}, which exceeds the maximum length {:?}", - self.id(), + "Sending locator of the length {}, which exceeds the maximum length {:?}", locator.len(), self.p2p_config.protocol_config.msg_max_locator_count ); } - log::debug!("[peer id = {}] Sending header list request", self.id()); + log::debug!("Sending header list request"); self.send_message(BlockSyncMessage::HeaderListRequest(HeaderListRequest::new( locator, )))?; @@ -360,10 +347,7 @@ where } async fn handle_message(&mut self, message: BlockSyncMessage) -> Result<()> { - log::trace!( - "[peer id = {}] Handling block sync message from the peer: {message:?}", - self.id() - ); + log::trace!("Handling block sync message from the peer: {message:?}"); let res = match message { BlockSyncMessage::HeaderListRequest(r) => { @@ -385,7 +369,7 @@ where /// Processes a header request by sending requested data to the peer. async fn handle_header_request(&mut self, locator: Locator) -> Result<()> { - log::debug!("[peer id = {}] Handling header request", self.id()); + log::debug!("Handling header request"); if locator.len() > *self.p2p_config.protocol_config.msg_max_locator_count { return Err(P2pError::ProtocolError(ProtocolError::LocatorSizeExceeded( @@ -395,7 +379,9 @@ where } if self.chainstate_handle.is_initial_block_download().await? { - log::debug!("[peer id = {}] Responding with empty headers list because the node is in initial block download", self.id()); + log::debug!( + "Responding with empty headers list because the node is in initial block download" + ); // Respond with an empty list to avoid being marked as stalled. // Note: sending actual headers when in IBD is in general not a good idea, because // we may not be on the correct chain. E.g. the current best block might be below @@ -449,8 +435,7 @@ where ); log::debug!( - "[peer id = {}] Handling block request: {}-{} ({})", - self.id(), + "Handling block request: {}-{} ({})", block_ids.first().expect("block_ids is not empty"), block_ids.last().expect("block_ids is not empty"), block_ids.len(), @@ -462,8 +447,7 @@ where // be able to ask it for blocks. // TODO: return an error with a non-zero ban score instead? log::warn!( - "[peer id = {}] The node is in initial block download, but the peer is asking us for blocks", - self.id() + "The node is in initial block download, but the peer is asking us for blocks" ); return Ok(()); } @@ -574,8 +558,7 @@ where let sleep_time = std::cmp::min(clock_diff, self.p2p_config.effective_max_clock_diff()); log::debug!( - "[peer id = {}] Block timestamp from the future ({} seconds)", - self.id(), + "Block timestamp from the future ({} seconds)", sleep_time.as_secs() ); tokio::time::sleep(sleep_time).await; @@ -583,7 +566,7 @@ where } async fn handle_header_list(&mut self, headers: Vec) -> Result<()> { - log::debug!("[peer id = {}] Handling header list", self.id()); + log::debug!("Handling header list"); self.peer_activity.set_expecting_headers_since(None); @@ -701,11 +684,7 @@ where async fn handle_block_response(&mut self, block: Block) -> Result<()> { let block_id = block.get_id(); - log::debug!( - "[peer id = {}] Handling block response, block id = {}", - self.id(), - block_id - ); + log::debug!("Handling block response, block id = {block_id}"); if self.incoming.requested_blocks.front() != Some(&block.get_id()) { let idx = self.incoming.requested_blocks.iter().position(|id| id == &block.get_id()); @@ -740,7 +719,6 @@ where let block = self.chainstate_handle.call(|c| Ok(c.preliminary_block_check(block)?)).await?; // Process the block and also determine the new value for peers_best_block_that_we_have. - let peer_id = self.id(); let old_peers_best_block_that_we_have = self.incoming.peers_best_block_that_we_have; let (best_block, new_tip_received) = self .chainstate_handle @@ -748,11 +726,7 @@ where // If the block already exists in the block tree, skip it. let new_tip_received = if c.get_block_index_for_persisted_block(&block.get_id())?.is_some() { - log::debug!( - "[peer id = {}] The peer sent a block that already exists ({})", - peer_id, - block_id - ); + log::debug!("The peer sent a block that already exists ({block_id})"); false } else { let block_index = c.process_block(block, BlockSource::Peer)?; @@ -820,8 +794,7 @@ where let block_ids: Vec<_> = headers.into_iter().map(|h| h.get_id()).collect(); log::debug!( - "[peer id = {}] Requesting blocks from the peer: {}-{} ({})", - self.id(), + "Requesting blocks from the peer: {}-{} ({})", block_ids.first().expect("block_ids is not empty"), block_ids.last().expect("block_ids is not empty"), block_ids.len(), @@ -869,11 +842,7 @@ where self.outgoing.best_sent_block = Some(block_index); } - log::debug!( - "[peer id = {}] Sending block with id = {} to the peer", - self.id(), - block.get_id() - ); + log::debug!("Sending block with id = {} to the peer", block.get_id()); self.send_message(BlockSyncMessage::BlockResponse(BlockResponse::new(block))) } @@ -894,8 +863,8 @@ where // Nodes can disconnect each other if all of them are in the initial block download state, // but this should never occur in a normal network and can be worked around in the tests. let (sender, receiver) = oneshot_nofail::channel(); - log::warn!("[peer id = {}] Disconnecting the peer for ignoring requests, headers_req_stalling = {}, blocks_req_stalling = {}", - self.id(), headers_req_stalling, blocks_req_stalling); + log::warn!("Disconnecting the peer for ignoring requests, headers_req_stalling = {}, blocks_req_stalling = {}", + headers_req_stalling, blocks_req_stalling); self.peer_mgr_event_sender.send(PeerManagerEvent::Disconnect( self.id(), PeerDisconnectionDbAction::Keep, @@ -911,11 +880,7 @@ where async fn handle_stalling_interval(&mut self) { let result = self.disconnect_if_stalling().await; if let Err(err) = result { - log::warn!( - "[peer id = {}] Disconnecting peer failed: {}", - self.id(), - err - ); + log::warn!("Disconnecting peer failed: {err}"); } } } diff --git a/p2p/src/sync/peer/transaction_manager.rs b/p2p/src/sync/peer/transaction_manager.rs index 306668361..672b87ffb 100644 --- a/p2p/src/sync/peer/transaction_manager.rs +++ b/p2p/src/sync/peer/transaction_manager.rs @@ -124,6 +124,7 @@ where *self.id } + #[tracing::instrument(skip_all, name = "", fields(peer_id = %self.id()))] pub async fn run(&mut self) { match self.main_loop().await { // The unexpected "channel closed" error will be handled by the sync manager. @@ -175,10 +176,7 @@ where } fn handle_local_event(&mut self, event: LocalEvent) -> Result<()> { - log::debug!( - "[peer id = {}] Handling local peer mgr event: {event:?}", - self.id() - ); + log::debug!("Handling local peer mgr event: {event:?}"); match event { LocalEvent::ChainstateNewTip(_) => Ok(()), @@ -200,10 +198,7 @@ where } async fn handle_message(&mut self, message: TransactionSyncMessage) -> Result<()> { - log::trace!( - "[peer id = {}] Handling tx sync message from the peer: {message:?}", - self.id() - ); + log::trace!("Handling tx sync message from the peer: {message:?}"); let res = match message { TransactionSyncMessage::NewTransaction(id) => { @@ -268,11 +263,7 @@ where // because we purge "requested_transactions" from time to time. So it's technically // possible for such a response to be "solicited" but forgotten later. // So we just ignore the response. - log::warn!( - "[peer id = {}] Ignoring unsolicited TransactionResponse for tx {}", - self.id, - id - ); + log::warn!("Ignoring unsolicited TransactionResponse for tx {id}"); return Ok(()); } @@ -307,16 +298,13 @@ where } async fn handle_transaction_announcement(&mut self, tx: Id) -> Result<()> { - log::debug!( - "[peer id = {}] Handling transaction announcement: {tx}", - self.id() - ); + log::debug!("Handling transaction announcement: {tx}"); self.add_known_transaction(tx); if self.chainstate_handle.is_initial_block_download().await? { log::debug!( - "[peer id = {}] Ignoring transaction announcement because the node is in initial block download", self.id() + "Ignoring transaction announcement because the node is in initial block download" ); return Ok(()); } @@ -346,11 +334,7 @@ where // But still, it doesn't make sense to request an already requested tx again. // Also, we don't punish the peer, mainly for consistency with other places, where // we handle requested_transactions-related mischiefs leniently. - log::warn!( - "[peer id = {}] Ignoring duplicate announcement for tx {}", - self.id, - tx - ); + log::warn!("Ignoring duplicate announcement for tx {tx}"); return Ok(()); } @@ -366,8 +350,7 @@ where // in such a situation. Note that after certain time, older requests will be purged // from requested_transactions, after which we'll start to handle peer's tx // announcements again. - log::warn!("[peer id = {}] Ignoring announcement for tx {} because requested_transactions is over the limit", - self.id, tx); + log::warn!("Ignoring announcement for tx {tx} because requested_transactions is over the limit"); return Ok(()); } diff --git a/subsystem/src/calls/handle.rs b/subsystem/src/calls/handle.rs index c02368ced..705833a52 100644 --- a/subsystem/src/calls/handle.rs +++ b/subsystem/src/calls/handle.rs @@ -18,6 +18,7 @@ use futures::future::BoxFuture; use logging::log; +use tracing::Instrument as _; use utils::shallow_clone::ShallowClone; use crate::{ @@ -121,10 +122,14 @@ impl Handle { &self, func: impl FnOnce(&mut T) -> BoxFuture + Send + 'static, ) -> CallResult { + // Note: need to retrieve the tracing span in advance and then call `instrument` with it + // (as opposed to calling `in_current_span` on the future directly - this won't work because + // the current span will be different at the point of the call). + let current_tracing_span = tracing::Span::current(); let (rtx, rrx) = tokio::sync::oneshot::channel::(); let result = self.0.submit_async_mut(move |subsys| { Box::pin(async move { - if rtx.send(func(subsys).await).is_err() { + if rtx.send(func(subsys).instrument(current_tracing_span).await).is_err() { log::trace!("Subsystem call (mut) result ignored"); } }) @@ -137,10 +142,12 @@ impl Handle { &self, func: impl FnOnce(&T) -> BoxFuture + Send + 'static, ) -> CallResult { + // Same note about the tracing span as in `call_async_mut` above. + let current_tracing_span = tracing::Span::current(); let (rtx, rrx) = tokio::sync::oneshot::channel::(); let result = self.0.submit_async(move |subsys| { Box::pin(async move { - if rtx.send(func(subsys).await).is_err() { + if rtx.send(func(subsys).instrument(current_tracing_span).await).is_err() { log::trace!("Subsystem call result ignored"); } }) @@ -153,7 +160,10 @@ impl Handle { &self, func: impl FnOnce(&mut T) -> R + Send + 'static, ) -> CallResult { - self.call_async_mut(|this| Box::pin(core::future::ready(func(this)))) + // Note: originally we were creating the future via `core::future::ready` instead of using + // an `async` block. But `instrument`-ing the `Ready` future (which happens inside + // `call_async_mut`) doesn't work for some reason. + self.call_async_mut(|this| Box::pin(async { func(this) })) } /// Call a procedure to the subsystem (immutable). @@ -161,6 +171,7 @@ impl Handle { &self, func: impl FnOnce(&T) -> R + Send + 'static, ) -> CallResult { - self.call_async(|this| Box::pin(core::future::ready(func(this)))) + // Same note about the async block as in `call_mut` above. + self.call_async(|this| Box::pin(async { func(this) })) } } From d805c6f4f8d34e71f956c36d727f42c4c52a6feb Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Fri, 19 Sep 2025 18:40:10 +0300 Subject: [PATCH 2/7] Add last_tip_block_time to the peer info returned via node RPC. The "New peer accepted" log line now also includes the user agent and its version. --- chainstate/src/detail/mod.rs | 22 ++++++++++++---------- node-daemon/docs/RPC.md | 3 +++ p2p/src/disconnection_reason.rs | 3 ++- p2p/src/interface/types.rs | 6 +++++- p2p/src/peer_manager/mod.rs | 12 ++++++++++-- p2p/src/peer_manager/peer_context.rs | 1 + wallet/wallet-rpc-daemon/docs/RPC.md | 3 +++ 7 files changed, 36 insertions(+), 14 deletions(-) diff --git a/chainstate/src/detail/mod.rs b/chainstate/src/detail/mod.rs index a4702e004..1557c133d 100644 --- a/chainstate/src/detail/mod.rs +++ b/chainstate/src/detail/mod.rs @@ -261,15 +261,13 @@ impl Chainstate Ok(()) } - fn broadcast_new_tip_event(&mut self, new_block_index: &Option) { - if let Some(new_block_index) = new_block_index { - let new_height = new_block_index.block_height(); - let new_id = *new_block_index.block_id(); - let event = ChainstateEvent::NewTip(new_id, new_height); - - self.rpc_events.broadcast(&event); - self.subsystem_events.broadcast(event); - } + fn broadcast_new_tip_event(&mut self, new_block_index: &BlockIndex) { + let new_height = new_block_index.block_height(); + let new_id = *new_block_index.block_id(); + let event = ChainstateEvent::NewTip(new_id, new_height); + + self.rpc_events.broadcast(&event); + self.subsystem_events.broadcast(event); } /// Create a read-write transaction, call `main_action` on it and commit. @@ -609,7 +607,11 @@ impl Chainstate None => result, }; - self.broadcast_new_tip_event(&result); + if let Some(new_block_index) = &result { + self.broadcast_new_tip_event(new_block_index); + } else { + log::debug!("Stale block received: {block_id:x}"); + } if let Some(ref bi) = result { let compact_target = match bi.block_header().consensus_data() { diff --git a/node-daemon/docs/RPC.md b/node-daemon/docs/RPC.md index e5882a82a..f5a1f7755 100644 --- a/node-daemon/docs/RPC.md +++ b/node-daemon/docs/RPC.md @@ -1089,6 +1089,9 @@ Returns: "ping_min": EITHER OF 1) number 2) null, + "last_tip_block_time": EITHER OF + 1) number + 2) null, }, .. ] ``` diff --git a/p2p/src/disconnection_reason.rs b/p2p/src/disconnection_reason.rs index 8b3d67b0d..1a1baf311 100644 --- a/p2p/src/disconnection_reason.rs +++ b/p2p/src/disconnection_reason.rs @@ -28,6 +28,7 @@ use crate::{ /// /// Note: we derive `thiserror::Error` here just for the convenience of implementing `Display`. /// But conceptually this enum is not an error and it's not supposed to be used with `Result`. +// TODO: use `derive_more::Display` instead of `thiserror::Error`. #[derive(Error, Debug, Clone, PartialEq, Eq)] pub enum DisconnectionReason { #[error("Your address is banned")] @@ -57,7 +58,7 @@ pub enum DisconnectionReason { remote_time: Time, accepted_peer_time: std::ops::RangeInclusive