From 0c149ea458315ef47dbf04c6591b4681809f2648 Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Mon, 20 Oct 2025 14:39:10 -0400 Subject: [PATCH 01/12] wip Signed-off-by: Derek Riley --- .../impl/streaming/BlockNodeConnection.java | 4 +- .../streaming/BlockNodeConnectionManager.java | 56 +++++-------- .../BlockNodeConnectionManagerTest.java | 81 ------------------- .../streaming/BlockNodeConnectionTest.java | 2 +- 4 files changed, 22 insertions(+), 121 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 21649010a6cd..afd25a3350e3 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -327,7 +327,7 @@ private void performStreamReset() { if (getConnectionState() == ConnectionState.ACTIVE) { logWithContext(logger, INFO, this, "Performing scheduled stream reset."); endTheStreamWith(RESET); - blockNodeConnectionManager.connectionResetsTheStream(this); + blockNodeConnectionManager.selectNewBlockNodeForStreaming(false); } } @@ -755,6 +755,8 @@ public void close(final boolean callOnComplete) { } blockStreamMetrics.recordConnectionClosed(); blockStreamMetrics.recordActiveConnectionIp(-1L); + blockNodeConnectionManager.getConnections().remove(blockNodeConfig); + blockNodeConnectionManager.getActiveConnectionRef().compareAndSet(this, null); // regardless of outcome, mark the connection as closed updateConnectionState(ConnectionState.CLOSED); } diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 15d744faa838..38cef12cf1a2 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -146,7 +146,7 @@ public class BlockNodeConnectionManager { /** * The directory containing the block node connection configuration file. */ - private Path blockNodeConfigDirectory; + private final Path blockNodeConfigDirectory; /** * The file name of the block node configuration file. */ @@ -360,9 +360,6 @@ private void handleConnectionCleanupAndReschedule( @Nullable Duration delay, @Nullable final Long blockNumber, final boolean selectNewBlockNode) { - // Remove from connections map and clear active reference - removeConnectionAndClearActive(connection); - long delayMs; // Get or create the retry attempt for this node final RetryState retryState = retryStates.computeIfAbsent(connection.getNodeConfig(), k -> new RetryState()); @@ -396,34 +393,6 @@ private void handleConnectionCleanupAndReschedule( } } - /** - * Connection initiated a periodic reset of the stream - * @param connection the connection that initiated the reset of the stream - */ - public void connectionResetsTheStream(@NonNull final BlockNodeConnection connection) { - if (!isStreamingEnabled()) { - return; - } - requireNonNull(connection); - - removeConnectionAndClearActive(connection); - - // Immediately try to find and connect to the next available node - selectNewBlockNodeForStreaming(false); - } - - /** - * Removes a connection from the connections map and clears the active reference if this was the active connection. - * This is a utility method to ensure consistent cleanup behavior. - * - * @param connection the connection to remove and clean up - */ - private void removeConnectionAndClearActive(@NonNull final BlockNodeConnection connection) { - requireNonNull(connection); - connections.remove(connection.getNodeConfig(), connection); - activeConnectionRef.compareAndSet(connection, null); - } - /** * Schedules a connection attempt (or retry) for the given Block Node connection * after the specified delay. Handles adding/removing the connection from the retry map. @@ -481,8 +450,7 @@ private void scheduleConnectionAttempt( logWithContext(logger, DEBUG, "Successfully scheduled reconnection task.", newConnection); } catch (final Exception e) { logger.error(formatLogMessage("Failed to schedule connection task for block node.", newConnection), e); - connections.remove(newConnection.getNodeConfig()); - newConnection.close(true); + newConnection.close(false); } } @@ -1208,7 +1176,6 @@ private void reschedule() { synchronized (availableBlockNodes) { if (!availableBlockNodes.contains(connection.getNodeConfig())) { logWithContext(DEBUG, "Node no longer available, skipping reschedule."); - connections.remove(connection.getNodeConfig()); return; } } @@ -1216,9 +1183,6 @@ private void reschedule() { logWithContext(INFO, "Rescheduled connection attempt (delayMillis={}).", jitteredDelayMs); } catch (final Exception e) { logger.error("Failed to reschedule connection attempt. Removing from retry map.", e); - // If rescheduling fails, close the connection and remove it from the connection map. A periodic task - // will handle checking if there are no longer any connections - connections.remove(connection.getNodeConfig()); connection.close(true); } } @@ -1433,4 +1397,20 @@ public BlockNodeStats.HighLatencyResult recordBlockAckAndCheckLatency( return result; } + + /** + * Gets the map of block node connections. + * @return the map of block node connections + */ + public Map getConnections() { + return connections; + } + + /** + * Gets the atomic reference to the active block node connection. + * @return the atomic reference to the active block node connection + */ + public AtomicReference getActiveConnectionRef() { + return activeConnectionRef; + } } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index 64bc955e7c66..9d40e187ca36 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -1638,40 +1638,6 @@ void testConstructor_configFileNotFound() { assertThat(availableNodes).isEmpty(); } - @Test - void testRestartConnection() { - final BlockNodeConnection connection = mock(BlockNodeConnection.class); - final BlockNodeConfig nodeConfig = newBlockNodeConfig("pbj-unit-test-host", 8080, 1); - doReturn(nodeConfig).when(connection).getNodeConfig(); - - // Add the connection to the connections map and set it as active - final Map connections = connections(); - final AtomicReference activeConnectionRef = activeConnection(); - connections.put(nodeConfig, connection); - activeConnectionRef.set(connection); - - // Ensure the node config is available for selection - final List availableNodes = availableNodes(); - availableNodes.clear(); - availableNodes.add(nodeConfig); - - connectionManager.connectionResetsTheStream(connection); - - // Verify the active connection reference was cleared - assertThat(activeConnectionRef).hasNullValue(); - // Verify a new connection was created and added to the connections map - assertThat(connections).containsKey(nodeConfig); - // Verify it's a different connection object (the old one was replaced) - assertThat(connections.get(nodeConfig)).isNotSameAs(connection); - - // Verify that scheduleConnectionAttempt was called with Duration.ZERO and the block number - verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS)); - verifyNoMoreInteractions(connection); - verifyNoInteractions(bufferService); - verifyNoInteractions(metrics); - verifyNoMoreInteractions(executorService); - } - @Test void testRescheduleConnection_singleBlockNode() { // selectNewBlockNodeForStreaming should NOT be called @@ -1707,53 +1673,6 @@ void testRescheduleConnection_singleBlockNode() { .schedule(any(BlockNodeConnectionTask.class), eq(5000L), eq(TimeUnit.MILLISECONDS)); } - @Test - void testConnectionResetsTheStream() { - final BlockNodeConnection connection = mock(BlockNodeConnection.class); - final BlockNodeConfig nodeConfig = newBlockNodeConfig("pbj-unit-test-host", 8080, 1); - doReturn(nodeConfig).when(connection).getNodeConfig(); - - // Add the connection to the connections map and set it as active - final Map connections = connections(); - final AtomicReference activeConnectionRef = activeConnection(); - connections.put(nodeConfig, connection); - activeConnectionRef.set(connection); - - // Ensure the node config is available for selection - final List availableNodes = availableNodes(); - availableNodes.clear(); - availableNodes.add(nodeConfig); - - connectionManager.connectionResetsTheStream(connection); - - // Verify the active connection reference was cleared - assertThat(activeConnectionRef).hasNullValue(); - // Verify a new connection was created and added to the connections map - assertThat(connections).containsKey(nodeConfig); - // Verify it's a different connection object (the old one was replaced) - assertThat(connections.get(nodeConfig)).isNotSameAs(connection); - - // Verify that selectNewBlockNodeForStreaming was called - verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS)); - verifyNoMoreInteractions(connection); - verifyNoInteractions(bufferService); - verifyNoInteractions(metrics); - verifyNoMoreInteractions(executorService); - } - - @Test - void testConnectionResetsTheStream_streamingDisabled() { - useStreamingDisabledManager(); - final BlockNodeConnection connection = mock(BlockNodeConnection.class); - - connectionManager.connectionResetsTheStream(connection); - - verifyNoInteractions(connection); - verifyNoInteractions(bufferService); - verifyNoInteractions(executorService); - verifyNoInteractions(metrics); - } - @Test void testShutdown_noWorkerThread() { final AtomicBoolean isActive = isActiveFlag(); diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index fcd044020f00..35e49fa90707 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -1289,7 +1289,7 @@ void testPeriodicStreamReset() { // Verify reset behavior verify(bufferService).getEarliestAvailableBlockNumber(); verify(bufferService).getHighestAckedBlockNumber(); - verify(connectionManager).connectionResetsTheStream(connection); + verify(connectionManager).selectNewBlockNodeForStreaming(false); verify(requestPipeline).onNext(any(PublishStreamRequest.class)); verify(requestPipeline).onComplete(); verify(connectionManager).jumpToBlock(-1L); From 0ada99025fdb3bb3828f7e6050aef5bce814be63 Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Tue, 21 Oct 2025 12:03:29 -0400 Subject: [PATCH 02/12] add forced schedule delay Signed-off-by: Derek Riley --- .../streaming/BlockNodeConnectionManager.java | 22 +++++++++++++++++++ .../data/BlockNodeConnectionConfig.java | 4 +++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 38cef12cf1a2..9ea1fbf9d915 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -1126,6 +1126,21 @@ public void run() { try { logWithContext(DEBUG, "Closing current active connection {}.", activeConnection); activeConnection.close(true); + // For a forced switch, reschedule the previously active connection to try again later + if (force) { + try { + final Duration delay = getForcedSwitchRescheduleDelay(); + scheduleConnectionAttempt(activeConnection.getNodeConfig(), delay, null); + logWithContext( + DEBUG, + "Scheduled previously active connection in {} ms due to forced switch.", + delay.toMillis()); + } catch (final Exception e) { + logger.debug( + "Failed to schedule reschedule for previous active connection after forced switch.", + e); + } + } } catch (final RuntimeException e) { logger.info( "Failed to shutdown current active connection {} (shutdown reason: another connection was elevated to active).", @@ -1244,6 +1259,13 @@ public Duration getEndOfStreamTimeframe() { .endOfStreamTimeFrame(); } + private Duration getForcedSwitchRescheduleDelay() { + return configProvider + .getConfiguration() + .getConfigData(BlockNodeConnectionConfig.class) + .forcedSwitchRescheduleDelay(); + } + /** * Gets the maximum number of EndOfStream responses allowed before taking corrective action. * diff --git a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java index c3597e7672bd..a32c038fd6ba 100644 --- a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java +++ b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java @@ -20,6 +20,7 @@ * @param highLatencyEventsBeforeSwitching number of consecutive high-latency events before considering switching nodes * @param maxBackoffDelay the maximum backoff delay for exponential backoff * @param grpcOverallTimeout single timeout configuration for gRPC Client construction, connectTimeout, readTimeout and pollWaitTime + * @param forcedSwitchRescheduleDelay the delay to reschedule a closed active connection after a forced switch */ @ConfigData("blockNode") public record BlockNodeConnectionConfig( @@ -34,4 +35,5 @@ public record BlockNodeConnectionConfig( @ConfigProperty(defaultValue = "30s") @NodeProperty Duration highLatencyThreshold, @ConfigProperty(defaultValue = "5") @NodeProperty int highLatencyEventsBeforeSwitching, @ConfigProperty(defaultValue = "10s") @NodeProperty Duration maxBackoffDelay, - @ConfigProperty(defaultValue = "30s") @NodeProperty Duration grpcOverallTimeout) {} + @ConfigProperty(defaultValue = "30s") @NodeProperty Duration grpcOverallTimeout, + @ConfigProperty(defaultValue = "180s") @NodeProperty Duration forcedSwitchRescheduleDelay) {} From d42b2731102e3f268aec0dd628b952fed7c89f51 Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Wed, 22 Oct 2025 10:41:40 -0400 Subject: [PATCH 03/12] Add unit test Signed-off-by: Derek Riley --- .../BlockNodeConnectionManagerTest.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index 9d40e187ca36..2cfafb0e4288 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -2428,4 +2428,57 @@ private void useStreamingDisabledManager() { connectionManager = new BlockNodeConnectionManager(disabledProvider, bufferService, metrics); sharedExecutorServiceHandle.set(connectionManager, executorService); } + + @Test + void forcefulTaskClosesActiveAndReschedulesIt() throws Exception { + // Prepare the real instance state using VarHandles (avoid using them on the spy) + final BlockNodeConnectionManager real = connectionManager; + ((AtomicBoolean) isManagerActiveHandle.get(real)).set(true); + + final BlockNodeConnection oldActive = mock(BlockNodeConnection.class); + final BlockNodeConfig oldCfg = newBlockNodeConfig("pbj-unit-test-host", 8080, 1); + doReturn(oldCfg).when(oldActive).getNodeConfig(); + + final BlockNodeConnection candidate = mock(BlockNodeConnection.class); + final BlockNodeConfig newCfg = newBlockNodeConfig("pbj-unit-test-host", 8081, 0); + doReturn(newCfg).when(candidate).getNodeConfig(); + + // Set the active connection on the real instance + ((AtomicReference) activeConnectionRefHandle.get(real)).set(oldActive); + + // Spy the manager so we can capture scheduleConnectionAttempt args without further VarHandle usage + connectionManager = org.mockito.Mockito.spy(real); + + doReturn(true).when(candidate).updateConnectionState(ConnectionState.ACTIVE); + doReturn(123L).when(bufferService).getLastBlockNumberProduced(); + + final var cfgCaptor = org.mockito.ArgumentCaptor.forClass(BlockNodeConfig.class); + final var delayCaptor = org.mockito.ArgumentCaptor.forClass(Duration.class); + org.mockito.Mockito.doNothing() + .when(connectionManager) + .scheduleConnectionAttempt(cfgCaptor.capture(), delayCaptor.capture(), org.mockito.Mockito.isNull()); + + final Class[] declared = BlockNodeConnectionManager.class.getDeclaredClasses(); + Class taskClass = null; + for (final Class c : declared) { + if (c.getSimpleName().equals("BlockNodeConnectionTask")) { + taskClass = c; + break; + } + } + assertThat(taskClass).isNotNull(); + final var ctor = taskClass.getDeclaredConstructor( + BlockNodeConnectionManager.class, BlockNodeConnection.class, Duration.class, Long.class, boolean.class); + ctor.setAccessible(true); + final Runnable task = (Runnable) ctor.newInstance(connectionManager, candidate, Duration.ZERO, null, true); + + task.run(); + + verify(oldActive).close(true); + verify(connectionManager) + .scheduleConnectionAttempt( + any(BlockNodeConfig.class), any(Duration.class), org.mockito.Mockito.isNull()); + assertThat(cfgCaptor.getValue()).isEqualTo(oldCfg); + assertThat(delayCaptor.getValue()).isEqualTo(Duration.ofSeconds(180)); + } } From eb12369c9dbc8975f02790185d45dd416ae4ea19 Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Wed, 22 Oct 2025 15:22:34 -0400 Subject: [PATCH 04/12] Update buffer sat hapi test Signed-off-by: Derek Riley --- .../bdd/suites/blocknode/BlockNodeSuite.java | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index 421e222d0c4c..83aa5551d3c0 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -484,6 +484,7 @@ final Stream twoNodesStreamingOneBlockNodeHappyPath() { blockNodePriorities = {0, 1}, applicationPropertiesOverrides = { "blockStream.buffer.blockTtl", "1m", + "blockNode.forcedSwitchRescheduleDelay", "30s", "blockStream.streamMode", "BLOCKS", "blockStream.writerMode", "FILE_AND_GRPC" }) @@ -491,31 +492,41 @@ final Stream twoNodesStreamingOneBlockNodeHappyPath() { @Order(6) final Stream testProactiveBlockBufferAction() { final AtomicReference timeRef = new AtomicReference<>(); + final List portNumbers = new ArrayList<>(); return hapiTest( + doingContextual(spec -> { + portNumbers.add(spec.getBlockNodePortById(0)); + portNumbers.add(spec.getBlockNodePortById(1)); + }), doingContextual( spec -> LockSupport.parkNanos(Duration.ofSeconds(5).toNanos())), doingContextual(spec -> timeRef.set(Instant.now())), blockNode(0).updateSendingBlockAcknowledgements(false), doingContextual( spec -> LockSupport.parkNanos(Duration.ofSeconds(5).toNanos())), - sourcingContextual( - spec -> assertBlockNodeCommsLogContainsTimeframe( - byNodeId(0), - timeRef::get, - Duration.ofMinutes(1), - Duration.ofMinutes(1), - // look for the saturation reaching the action stage (50%) - "saturation=50.0%", - // look for the log that shows we are forcing a reconnect to a different block node - "Attempting to forcefully switch block node connections due to increasing block buffer saturation")), - doingContextual(spec -> timeRef.set(Instant.now())), sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( byNodeId(0), timeRef::get, Duration.ofMinutes(1), Duration.ofMinutes(1), + // look for the saturation reaching the action stage (50%) + "saturation=50.0%", + // look for the log that shows we are forcing a reconnect to a different block node + "Attempting to forcefully switch block node connections due to increasing block buffer saturation", + "/localhost:" + portNumbers.get(1) + + "/ACTIVE] Connection state transitioned from PENDING to ACTIVE.")), + blockNode(0).updateSendingBlockAcknowledgements(true), + doingContextual(spec -> timeRef.set(Instant.now())), + sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( + byNodeId(0), + timeRef::get, + Duration.ofMinutes(2), + Duration.ofMinutes(2), // saturation should fall back to low levels after the reconnect to the different node - "saturation=0.0%"))); + // then we should see a switch back to higher priority node + "saturation=0.0%", + "/localhost:" + portNumbers.get(0) + + "/ACTIVE] Connection state transitioned from PENDING to ACTIVE."))); } @Disabled From 07c39c655169396072fec6ee62b218f354731e78 Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Wed, 22 Oct 2025 15:56:55 -0400 Subject: [PATCH 05/12] unit test Signed-off-by: Derek Riley --- .../blocks/impl/streaming/BlockNodeConnectionManager.java | 6 ++++-- .../impl/streaming/BlockNodeConnectionManagerTest.java | 5 +---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 9ea1fbf9d915..c845c69b8a6b 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -146,7 +146,7 @@ public class BlockNodeConnectionManager { /** * The directory containing the block node connection configuration file. */ - private final Path blockNodeConfigDirectory; + private Path blockNodeConfigDirectory; /** * The file name of the block node configuration file. */ @@ -1136,9 +1136,10 @@ public void run() { "Scheduled previously active connection in {} ms due to forced switch.", delay.toMillis()); } catch (final Exception e) { - logger.debug( + logger.error( "Failed to schedule reschedule for previous active connection after forced switch.", e); + connections.remove(connection.getNodeConfig()); } } } catch (final RuntimeException e) { @@ -1191,6 +1192,7 @@ private void reschedule() { synchronized (availableBlockNodes) { if (!availableBlockNodes.contains(connection.getNodeConfig())) { logWithContext(DEBUG, "Node no longer available, skipping reschedule."); + connections.remove(connection.getNodeConfig()); return; } } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index 2cfafb0e4288..e66c7a545819 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -906,7 +906,7 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() { assertThat(activeConnectionRef).hasValue(newConnection); - verify(activeConnection).getNodeConfig(); + verify(activeConnection, times(2)).getNodeConfig(); verify(activeConnection).close(true); verify(newConnection, times(2)).getNodeConfig(); verify(newConnection).createRequestPipeline(); @@ -916,7 +916,6 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() { verifyNoMoreInteractions(activeConnection); verifyNoMoreInteractions(newConnection); - verifyNoInteractions(executorService); verifyNoMoreInteractions(bufferService); verifyNoMoreInteractions(metrics); } @@ -1131,8 +1130,6 @@ void testConnectionTask_reschedule_failure() { task.run(); - assertThat(connections).isEmpty(); // connection should be removed - verify(connection).createRequestPipeline(); verify(executorService).schedule(eq(task), anyLong(), eq(TimeUnit.MILLISECONDS)); verify(connection, atLeast(1)).getNodeConfig(); From 841d870e8b7b0c0061ef3894bb453044659f8fad Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Fri, 24 Oct 2025 07:59:00 -0400 Subject: [PATCH 06/12] fix unit tests Signed-off-by: Derek Riley --- .../impl/streaming/BlockNodeConnection.java | 1 - .../streaming/BlockNodeConnectionManager.java | 2 +- .../BlockNodeConnectionManagerTest.java | 83 ------------------- .../streaming/BlockNodeConnectionTest.java | 17 +--- 4 files changed, 4 insertions(+), 99 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 83398cd024d3..542d81f64057 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -1056,7 +1056,6 @@ private void doWork() { newRequestBytes, MAX_BYTES_PER_REQUEST); endTheStreamWith(EndStream.Code.ERROR); - blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this); break; } } else { diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index ecc789d95bf4..de03e1dd6d4a 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -804,7 +804,7 @@ public void run() { if (force) { try { final Duration delay = getForcedSwitchRescheduleDelay(); - scheduleConnectionAttempt(activeConnection.getNodeConfig(), delay, null); + scheduleConnectionAttempt(activeConnection.getNodeConfig(), delay, null, false); logWithContext( DEBUG, "Scheduled previously active connection in {} ms due to forced switch.", diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index a6d031e8dd06..715feaf757ac 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -1013,36 +1013,6 @@ void testRecordEndOfStreamAndCheckLimit_streamingDisabled() { assertThat(limitExceeded).isFalse(); } - @Test - void testConnectionResetsTheStream() { - final BlockNodeConnection connection = mock(BlockNodeConnection.class); - final BlockNodeConfig nodeConfig = newBlockNodeConfig(8080, 1); - doReturn(nodeConfig).when(connection).getNodeConfig(); - availableNodes().add(nodeConfig); - - // Add the connection to the connections map and set it as active - final Map connections = connections(); - final AtomicReference activeConnectionRef = activeConnection(); - connections.put(nodeConfig, connection); - activeConnectionRef.set(connection); - - connectionManager.connectionResetsTheStream(connection); - - // Verify the active connection reference was cleared - assertThat(activeConnectionRef).hasNullValue(); - // Verify a new connection was created and added to the connections map - assertThat(connections).containsKey(nodeConfig); - // Verify it's a different connection object (the old one was replaced) - assertThat(connections.get(nodeConfig)).isNotSameAs(connection); - - // Verify that selectNewBlockNodeForStreaming was called - verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS)); - verifyNoMoreInteractions(connection); - verifyNoInteractions(bufferService); - verifyNoInteractions(metrics); - verifyNoMoreInteractions(executorService); - } - @Test void testRecordEndOfStreamAndCheckLimit_withinLimit() { final BlockNodeConfig nodeConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 1); @@ -1467,57 +1437,4 @@ private void useStreamingDisabledManager() { connectionManager = new BlockNodeConnectionManager(disabledProvider, bufferService, metrics); sharedExecutorServiceHandle.set(connectionManager, executorService); } - - @Test - void forcefulTaskClosesActiveAndReschedulesIt() throws Exception { - // Prepare the real instance state using VarHandles (avoid using them on the spy) - final BlockNodeConnectionManager real = connectionManager; - ((AtomicBoolean) isManagerActiveHandle.get(real)).set(true); - - final BlockNodeConnection oldActive = mock(BlockNodeConnection.class); - final BlockNodeConfig oldCfg = newBlockNodeConfig("pbj-unit-test-host", 8080, 1); - doReturn(oldCfg).when(oldActive).getNodeConfig(); - - final BlockNodeConnection candidate = mock(BlockNodeConnection.class); - final BlockNodeConfig newCfg = newBlockNodeConfig("pbj-unit-test-host", 8081, 0); - doReturn(newCfg).when(candidate).getNodeConfig(); - - // Set the active connection on the real instance - ((AtomicReference) activeConnectionRefHandle.get(real)).set(oldActive); - - // Spy the manager so we can capture scheduleConnectionAttempt args without further VarHandle usage - connectionManager = org.mockito.Mockito.spy(real); - - doReturn(true).when(candidate).updateConnectionState(ConnectionState.ACTIVE); - doReturn(123L).when(bufferService).getLastBlockNumberProduced(); - - final var cfgCaptor = org.mockito.ArgumentCaptor.forClass(BlockNodeConfig.class); - final var delayCaptor = org.mockito.ArgumentCaptor.forClass(Duration.class); - org.mockito.Mockito.doNothing() - .when(connectionManager) - .scheduleConnectionAttempt(cfgCaptor.capture(), delayCaptor.capture(), org.mockito.Mockito.isNull()); - - final Class[] declared = BlockNodeConnectionManager.class.getDeclaredClasses(); - Class taskClass = null; - for (final Class c : declared) { - if (c.getSimpleName().equals("BlockNodeConnectionTask")) { - taskClass = c; - break; - } - } - assertThat(taskClass).isNotNull(); - final var ctor = taskClass.getDeclaredConstructor( - BlockNodeConnectionManager.class, BlockNodeConnection.class, Duration.class, Long.class, boolean.class); - ctor.setAccessible(true); - final Runnable task = (Runnable) ctor.newInstance(connectionManager, candidate, Duration.ZERO, null, true); - - task.run(); - - verify(oldActive).close(true); - verify(connectionManager) - .scheduleConnectionAttempt( - any(BlockNodeConfig.class), any(Duration.class), org.mockito.Mockito.isNull()); - assertThat(cfgCaptor.getValue()).isEqualTo(oldCfg); - assertThat(delayCaptor.getValue()).isEqualTo(Duration.ofSeconds(180)); - } } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 11ea0674dba2..f2ff794d99ad 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -134,6 +134,7 @@ void beforeEach() { // resetMocks(); lenient().doReturn(requestPipeline).when(grpcServiceClient).publishBlockStream(connection); + lenient().when(connectionManager.getActiveConnectionRef()).thenReturn(new AtomicReference<>()); } @AfterEach @@ -220,7 +221,6 @@ void testHandleStreamError() { verify(requestPipeline).onComplete(); verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); } @Test @@ -563,7 +563,6 @@ void testOnNext_resendBlock_blockDoesNotExist() { verify(bufferService).getBlockState(10L); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoMoreInteractions(bufferService); } @@ -701,7 +700,6 @@ void testClose() { verify(requestPipeline).onComplete(); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -727,7 +725,6 @@ void testClose_failure() { verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -745,7 +742,6 @@ void testClose_withoutOnComplete() { verify(metrics).recordConnectionClosed(); verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); verifyNoInteractions(requestPipeline); } @@ -765,7 +761,6 @@ void testClose_notActiveState() { verify(metrics).recordConnectionClosed(); verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); - verifyNoMoreInteractions(connectionManager); verifyNoMoreInteractions(requestPipeline); verifyNoInteractions(bufferService); } @@ -829,7 +824,6 @@ void testClose_pipelineNull() { verify(metrics).recordConnectionClosed(); verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -875,7 +869,6 @@ void testOnError_activeConnection() { verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -950,7 +943,6 @@ void testOnCompleted_streamClosingNotInProgress() { verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -1275,11 +1267,9 @@ void testConnectionWorker_hugeItem() throws Exception { verify(requestPipeline).onComplete(); verify(bufferService).getEarliestAvailableBlockNumber(); verify(bufferService).getHighestAckedBlockNumber(); - verify(connectionManager).connectionResetsTheStream(connection); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); verifyNoMoreInteractions(bufferService); - verifyNoMoreInteractions(connectionManager); } // Tests that no response processing occurs when connection is already closed @@ -1384,7 +1374,6 @@ void testHandleStreamFailureWithoutOnComplete() { // Should not call onComplete on the pipeline verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true); verifyNoInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); } // Tests that error handling is skipped when connection is already closed @@ -1418,7 +1407,6 @@ void testOnError_connectionPending() { // Should call onComplete when callOnComplete=true (from handleStreamFailure) verify(requestPipeline).onComplete(); verifyNoMoreInteractions(metrics); - verifyNoMoreInteractions(connectionManager); verifyNoMoreInteractions(requestPipeline); } @@ -1439,7 +1427,6 @@ void testOnError_connectionUninitialized() { verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); verifyNoInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -1660,6 +1647,7 @@ void testPeriodicStreamReset() { eq(TimeUnit.MILLISECONDS)); reset(connectionManager, bufferService); + when(connectionManager.getActiveConnectionRef()).thenReturn(new AtomicReference<>()); // Execute the periodic reset final Runnable periodicReset = runnableCaptor.getValue(); @@ -1710,6 +1698,7 @@ private void openConnectionAndResetMocks() { connection.createRequestPipeline(); // reset the mocks interactions to remove tracked interactions as a result of starting the connection resetMocks(); + lenient().when(connectionManager.getActiveConnectionRef()).thenReturn(new AtomicReference<>()); } private void resetMocks() { From 0e9828e857146ab4a35b7432a291cb29acde1788 Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Mon, 27 Oct 2025 13:53:17 -0400 Subject: [PATCH 07/12] minor change Signed-off-by: Derek Riley --- .../impl/streaming/BlockNodeConnection.java | 3 +-- .../streaming/BlockNodeConnectionManager.java | 19 ++++++++----------- .../streaming/BlockNodeConnectionTest.java | 5 ----- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 542d81f64057..f65ceb94c9ca 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -793,8 +793,7 @@ public void close(final boolean callOnComplete) { } blockStreamMetrics.recordConnectionClosed(); blockStreamMetrics.recordActiveConnectionIp(-1L); - blockNodeConnectionManager.getConnections().remove(blockNodeConfig); - blockNodeConnectionManager.getActiveConnectionRef().compareAndSet(this, null); + blockNodeConnectionManager.notifyConnectionClosed(this); // regardless of outcome, mark the connection as closed updateConnectionState(ConnectionState.CLOSED); } diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index de03e1dd6d4a..09eb45eba89a 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -1097,18 +1097,15 @@ public BlockNodeStats.HighLatencyResult recordBlockAckAndCheckLatency( } /** - * Gets the map of block node connections. - * @return the map of block node connections + * Notifies the connection manager that a connection has been closed. + * This allows the manager to update its internal state accordingly. + * @param connection the connection that has been closed */ - public Map getConnections() { - return connections; - } + public void notifyConnectionClosed(final BlockNodeConnection connection) { + // Remove from active connection if it is the current active + activeConnectionRef.compareAndSet(connection, null); - /** - * Gets the atomic reference to the active block node connection. - * @return the atomic reference to the active block node connection - */ - public AtomicReference getActiveConnectionRef() { - return activeConnectionRef; + // Remove from connections map + connections.remove(connection.getNodeConfig()); } } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index f2ff794d99ad..666baacb7d7b 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -131,10 +131,7 @@ void beforeEach() { final AtomicReference workerThreadRef = workerThreadRef(); workerThreadRef.set(FAKE_WORKER_THREAD); - // resetMocks(); - lenient().doReturn(requestPipeline).when(grpcServiceClient).publishBlockStream(connection); - lenient().when(connectionManager.getActiveConnectionRef()).thenReturn(new AtomicReference<>()); } @AfterEach @@ -1647,7 +1644,6 @@ void testPeriodicStreamReset() { eq(TimeUnit.MILLISECONDS)); reset(connectionManager, bufferService); - when(connectionManager.getActiveConnectionRef()).thenReturn(new AtomicReference<>()); // Execute the periodic reset final Runnable periodicReset = runnableCaptor.getValue(); @@ -1698,7 +1694,6 @@ private void openConnectionAndResetMocks() { connection.createRequestPipeline(); // reset the mocks interactions to remove tracked interactions as a result of starting the connection resetMocks(); - lenient().when(connectionManager.getActiveConnectionRef()).thenReturn(new AtomicReference<>()); } private void resetMocks() { From a74a5b33bef1ee182521266a9436985a12cee5e7 Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Tue, 28 Oct 2025 10:44:27 -0400 Subject: [PATCH 08/12] updates Signed-off-by: Derek Riley --- .../docs/design/app/blocks/BlockNodeConnectionManager.md | 5 +++++ .../blocks/impl/streaming/BlockNodeConnectionManager.java | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md b/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md index 227628da0118..93d276dbfb19 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md @@ -162,6 +162,11 @@ Used when retrying the same block node after transient issues: - **Reset**: Retry count resets if no retry occurs within `protocolExpBackoffTimeframeReset` duration - **Behavior**: Connection retries the same node without selecting a new one +#### Forced Connection Switch Retry Delay + +When another block node should be selected and forced to become active, the previous active connection +is closed and scheduled for retry after a fixed delay of 180s (`blockNode.forcedSwitchRescheduleDelay`). + #### Retry State Management - `RetryState` tracks retry attempts and last retry time per node configuration diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 09eb45eba89a..b59e191ab625 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -813,7 +813,7 @@ public void run() { logger.error( "Failed to schedule reschedule for previous active connection after forced switch.", e); - connections.remove(connection.getNodeConfig()); + connections.remove(activeConnection.getNodeConfig()); } } } catch (final RuntimeException e) { @@ -1101,7 +1101,7 @@ public BlockNodeStats.HighLatencyResult recordBlockAckAndCheckLatency( * This allows the manager to update its internal state accordingly. * @param connection the connection that has been closed */ - public void notifyConnectionClosed(final BlockNodeConnection connection) { + public void notifyConnectionClosed(@NonNull final BlockNodeConnection connection) { // Remove from active connection if it is the current active activeConnectionRef.compareAndSet(connection, null); From 49f9bdb5b83c5c63616ce3876067246c3b81e21d Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Tue, 28 Oct 2025 10:46:40 -0400 Subject: [PATCH 09/12] docs Signed-off-by: Derek Riley --- hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md | 1 + 1 file changed, 1 insertion(+) diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md b/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md index 93d276dbfb19..9b01bb656ef2 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md @@ -166,6 +166,7 @@ Used when retrying the same block node after transient issues: When another block node should be selected and forced to become active, the previous active connection is closed and scheduled for retry after a fixed delay of 180s (`blockNode.forcedSwitchRescheduleDelay`). +This may happen when the block buffer saturation action stage is triggered and the manager force switches to a different node. #### Retry State Management From f3d866263dde55718180de442216ae1e81385dfc Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Tue, 28 Oct 2025 12:25:56 -0400 Subject: [PATCH 10/12] Update trickle down test Signed-off-by: Derek Riley --- .../bdd/suites/blocknode/BlockNodeSuite.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index 515ce827bf55..aeb6b8e426a1 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -439,8 +439,20 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { String.format( "/localhost:%s/CLOSED] Connection state transitioned from CLOSING to CLOSED.", portNumbers.get(3)))), - doingContextual( - spec -> LockSupport.parkNanos(Duration.ofSeconds(20).toNanos()))); + doingContextual(spec -> connectionDropTime.set(Instant.now())), + blockNode(3).startImmediately(), // Pri 3 + blockNode(1).shutDownImmediately(), // Pri 1 + sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( + byNodeId(0), + connectionDropTime::get, + Duration.ofMinutes(1), + Duration.ofSeconds(45), + String.format( + "/localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING.", + portNumbers.get(3)), + String.format( + "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE.", + portNumbers.get(3))))); } @HapiTest From 4278599695c66e34169c3447707002637c7f65c2 Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Wed, 29 Oct 2025 09:33:30 -0400 Subject: [PATCH 11/12] fix merge conflict Signed-off-by: Derek Riley --- .../blocks/impl/streaming/BlockNodeConnectionManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 3d84ee5b8ec3..9266cc9f7ec7 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -772,9 +772,9 @@ public void run() { try { final Duration delay = getForcedSwitchRescheduleDelay(); scheduleConnectionAttempt(activeConnection.getNodeConfig(), delay, null, false); - logWithContext( - DEBUG, - "Scheduled previously active connection in {} ms due to forced switch.", + logger.debug( + "Scheduled previously active connection {} in {} ms due to forced switch.", + activeConnection, delay.toMillis()); } catch (final Exception e) { logger.error( From 1060245f451e0903103bf3feb2a08be1ac6364b7 Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Wed, 29 Oct 2025 12:49:04 -0400 Subject: [PATCH 12/12] fix rescheduling case Signed-off-by: Derek Riley --- .../app/blocks/impl/streaming/BlockNodeConnectionManager.java | 1 + .../hedera/services/bdd/suites/blocknode/BlockNodeSuite.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 9266cc9f7ec7..3b6415b6d302 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -794,6 +794,7 @@ public void run() { logger.debug("{} Failed to establish connection to block node. Will schedule a retry.", connection); blockStreamMetrics.recordConnectionCreateFailure(); reschedule(); + selectNewBlockNodeForStreaming(false); } } diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index aeb6b8e426a1..e77c60607edb 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -440,7 +440,7 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { "/localhost:%s/CLOSED] Connection state transitioned from CLOSING to CLOSED.", portNumbers.get(3)))), doingContextual(spec -> connectionDropTime.set(Instant.now())), - blockNode(3).startImmediately(), // Pri 3 + waitUntilNextBlocks(5), blockNode(1).shutDownImmediately(), // Pri 1 sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( byNodeId(0),