Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ Used when retrying the same block node after transient issues:
- **Reset**: Retry count resets if no retry occurs within `protocolExpBackoffTimeframeReset` duration
- **Behavior**: Connection retries the same node without selecting a new one

#### Forced Connection Switch Retry Delay

When another block node should be selected and forced to become active, the previous active connection
is closed and scheduled for retry after a fixed delay of 180s (`blockNode.forcedSwitchRescheduleDelay`).
This may happen when the block buffer saturation action stage is triggered and the manager force switches to a different node.

#### Retry State Management

- `RetryState` tracks retry attempts and last retry time per node configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private void performStreamReset() {
if (getConnectionState() == ConnectionState.ACTIVE) {
logger.info("{} Performing scheduled stream reset.", this);
endTheStreamWith(RESET);
blockNodeConnectionManager.connectionResetsTheStream(this);
blockNodeConnectionManager.selectNewBlockNodeForStreaming(false);
}
}

Expand Down Expand Up @@ -753,6 +753,7 @@ public void close(final boolean callOnComplete) {
}
blockStreamMetrics.recordConnectionClosed();
blockStreamMetrics.recordActiveConnectionIp(-1L);
blockNodeConnectionManager.notifyConnectionClosed(this);
// regardless of outcome, mark the connection as closed
updateConnectionState(ConnectionState.CLOSED);
}
Expand Down Expand Up @@ -1012,7 +1013,6 @@ private void doWork() {
newRequestBytes,
MAX_BYTES_PER_REQUEST);
endTheStreamWith(EndStream.Code.ERROR);
blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this);
break;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,7 @@
@Nullable final Duration delay,
@Nullable final Long blockNumber,
final boolean selectNewBlockNode) {
// Remove from connections map and clear active reference
removeConnectionAndClearActive(connection);

final long delayMs;
long delayMs;
// Get or create the retry attempt for this node
final RetryState retryState = retryStates.computeIfAbsent(connection.getNodeConfig(), k -> new RetryState());
final int retryAttempt;
Expand Down Expand Up @@ -335,34 +332,6 @@
}
}

/**
* Connection initiated a reset of the stream
* @param connection the connection that initiated the reset of the stream
*/
public void connectionResetsTheStream(@NonNull final BlockNodeConnection connection) {
if (!isStreamingEnabled()) {
return;
}
requireNonNull(connection);

removeConnectionAndClearActive(connection);

// Immediately try to find and connect to the next available node
selectNewBlockNodeForStreaming(false);
}

/**
* Removes a connection from the connections map and clears the active reference if this was the active connection.
* This is a utility method to ensure consistent cleanup behavior.
*
* @param connection the connection to remove and clean up
*/
private void removeConnectionAndClearActive(@NonNull final BlockNodeConnection connection) {
requireNonNull(connection);
connections.remove(connection.getNodeConfig(), connection);
activeConnectionRef.compareAndSet(connection, null);
}

private void scheduleConnectionAttempt(
@NonNull final BlockNodeConfig blockNodeConfig,
@NonNull final Duration initialDelay,
Expand All @@ -388,7 +357,6 @@
logger.debug("{} Successfully scheduled reconnection task.", newConnection);
} catch (final Exception e) {
logger.error("{} Failed to schedule connection task for block node.", newConnection, e);
connections.remove(newConnection.getNodeConfig());
newConnection.close(true);
}
}
Expand Down Expand Up @@ -799,6 +767,22 @@
try {
logger.debug("{} Closing current active connection {}.", connection, activeConnection);
activeConnection.close(true);
// For a forced switch, reschedule the previously active connection to try again later
if (force) {
try {
final Duration delay = getForcedSwitchRescheduleDelay();
scheduleConnectionAttempt(activeConnection.getNodeConfig(), delay, null, false);
logger.debug(
"Scheduled previously active connection {} in {} ms due to forced switch.",
activeConnection,
delay.toMillis());
} catch (final Exception e) {
logger.error(

Check warning on line 780 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java#L779-L780

Added lines #L779 - L780 were not covered by tests
"Failed to schedule reschedule for previous active connection after forced switch.",
e);
connections.remove(activeConnection.getNodeConfig());

Check warning on line 783 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java#L783

Added line #L783 was not covered by tests
}
}
} catch (final RuntimeException e) {
logger.info(
"Failed to shutdown current active connection {} (shutdown reason: another connection was elevated to active).",
Expand All @@ -810,6 +794,7 @@
logger.debug("{} Failed to establish connection to block node. Will schedule a retry.", connection);
blockStreamMetrics.recordConnectionCreateFailure();
reschedule();
selectNewBlockNodeForStreaming(false);
}
}

Expand Down Expand Up @@ -857,9 +842,6 @@
logger.info("{} Rescheduled connection attempt (delayMillis={}).", connection, jitteredDelayMs);
} catch (final Exception e) {
logger.error("{} Failed to reschedule connection attempt. Removing from retry map.", connection, e);
// If rescheduling fails, close the connection and remove it from the connection map. A periodic task
// will handle checking if there are no longer any connections
connections.remove(connection.getNodeConfig());
connection.close(true);
}
}
Expand Down Expand Up @@ -921,6 +903,13 @@
.endOfStreamTimeFrame();
}

private Duration getForcedSwitchRescheduleDelay() {
return configProvider
.getConfiguration()
.getConfigData(BlockNodeConnectionConfig.class)
.forcedSwitchRescheduleDelay();
}

/**
* Gets the maximum number of EndOfStream responses allowed before taking corrective action.
*
Expand Down Expand Up @@ -1074,4 +1063,17 @@

return result;
}

/**
* Notifies the connection manager that a connection has been closed.
* This allows the manager to update its internal state accordingly.
* @param connection the connection that has been closed
*/
public void notifyConnectionClosed(@NonNull final BlockNodeConnection connection) {
// Remove from active connection if it is the current active
activeConnectionRef.compareAndSet(connection, null);

// Remove from connections map
connections.remove(connection.getNodeConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() {

assertThat(activeConnectionRef).hasValue(newConnection);

verify(activeConnection).getNodeConfig();
verify(activeConnection, times(2)).getNodeConfig();
verify(activeConnection).close(true);
verify(newConnection, times(2)).getNodeConfig();
verify(newConnection).createRequestPipeline();
Expand All @@ -625,7 +625,6 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() {

verifyNoMoreInteractions(activeConnection);
verifyNoMoreInteractions(newConnection);
verifyNoInteractions(executorService);
verifyNoMoreInteractions(bufferService);
verifyNoMoreInteractions(metrics);
}
Expand Down Expand Up @@ -832,8 +831,6 @@ void testConnectionTask_reschedule_failure() {

task.run();

assertThat(connections).isEmpty(); // connection should be removed

verify(connection).createRequestPipeline();
verify(executorService).schedule(eq(task), anyLong(), eq(TimeUnit.MILLISECONDS));
verify(connection, atLeast(1)).getNodeConfig();
Expand Down Expand Up @@ -920,40 +917,6 @@ void testConstructor_configFileNotFound() {
assertThat(availableNodes).isEmpty();
}

@Test
void testRestartConnection() {
final BlockNodeConnection connection = mock(BlockNodeConnection.class);
final BlockNodeConfig nodeConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 1);
doReturn(nodeConfig).when(connection).getNodeConfig();

// Add the connection to the connections map and set it as active
final Map<BlockNodeConfig, BlockNodeConnection> connections = connections();
final AtomicReference<BlockNodeConnection> activeConnectionRef = activeConnection();
connections.put(nodeConfig, connection);
activeConnectionRef.set(connection);

// Ensure the node config is available for selection
final List<BlockNodeConfig> availableNodes = availableNodes();
availableNodes.clear();
availableNodes.add(nodeConfig);

connectionManager.connectionResetsTheStream(connection);

// Verify the active connection reference was cleared
assertThat(activeConnectionRef).hasNullValue();
// Verify a new connection was created and added to the connections map
assertThat(connections).containsKey(nodeConfig);
// Verify it's a different connection object (the old one was replaced)
assertThat(connections.get(nodeConfig)).isNotSameAs(connection);

// Verify that scheduleConnectionAttempt was called with Duration.ZERO and the block number
verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS));
verifyNoMoreInteractions(connection);
verifyNoInteractions(bufferService);
verifyNoInteractions(metrics);
verifyNoMoreInteractions(executorService);
}

@Test
void testRescheduleConnection_singleBlockNode() {
// selectNewBlockNodeForStreaming should NOT be called
Expand Down Expand Up @@ -987,19 +950,6 @@ void testRescheduleConnection_singleBlockNode() {
.schedule(any(BlockNodeConnectionTask.class), eq(5000L), eq(TimeUnit.MILLISECONDS));
}

@Test
void testConnectionResetsTheStream_streamingDisabled() {
useStreamingDisabledManager();
final BlockNodeConnection connection = mock(BlockNodeConnection.class);

connectionManager.connectionResetsTheStream(connection);

verifyNoInteractions(connection);
verifyNoInteractions(bufferService);
verifyNoInteractions(executorService);
verifyNoInteractions(metrics);
}

@Test
void testStart_streamingDisabled() {
useStreamingDisabledManager();
Expand Down Expand Up @@ -1096,36 +1046,6 @@ void testRecordEndOfStreamAndCheckLimit_streamingDisabled() {
assertThat(limitExceeded).isFalse();
}

@Test
void testConnectionResetsTheStream() {
final BlockNodeConnection connection = mock(BlockNodeConnection.class);
final BlockNodeConfig nodeConfig = newBlockNodeConfig(8080, 1);
doReturn(nodeConfig).when(connection).getNodeConfig();
availableNodes().add(nodeConfig);

// Add the connection to the connections map and set it as active
final Map<BlockNodeConfig, BlockNodeConnection> connections = connections();
final AtomicReference<BlockNodeConnection> activeConnectionRef = activeConnection();
connections.put(nodeConfig, connection);
activeConnectionRef.set(connection);

connectionManager.connectionResetsTheStream(connection);

// Verify the active connection reference was cleared
assertThat(activeConnectionRef).hasNullValue();
// Verify a new connection was created and added to the connections map
assertThat(connections).containsKey(nodeConfig);
// Verify it's a different connection object (the old one was replaced)
assertThat(connections.get(nodeConfig)).isNotSameAs(connection);

// Verify that selectNewBlockNodeForStreaming was called
verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS));
verifyNoMoreInteractions(connection);
verifyNoInteractions(bufferService);
verifyNoInteractions(metrics);
verifyNoMoreInteractions(executorService);
}

@Test
void testRecordEndOfStreamAndCheckLimit_withinLimit() {
final BlockNodeConfig nodeConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 1);
Expand Down
Loading
Loading