diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md
index 6645844372a3..d2583c00323f 100644
--- a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md
+++ b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md
@@ -29,6 +29,7 @@ It manages connection state, handles communication, and reports errors to the `B
- Establish and maintain the connection transport.
- Handle incoming and outgoing message flow.
+- Detect unresponsive block nodes via configurable timeouts on pipeline operations.
- Report connection errors promptly.
- Coordinate with `BlockNodeConnectionManager` on lifecycle events.
- Notify the block buffer (via connection manager) when a block has been acknowledged and therefore eligible to be
@@ -68,6 +69,7 @@ stateDiagram-v2
ACTIVE --> CLOSING : ResendBlock unavailable
ACTIVE --> CLOSING : gRPC onError
ACTIVE --> CLOSING : Stream failure
+ ACTIVE --> CLOSING : Pipeline operation timeout
ACTIVE --> CLOSING : Manual close
ACTIVE --> ACTIVE : BlockAcknowledgement
ACTIVE --> ACTIVE : SkipBlock
@@ -160,4 +162,49 @@ The connection implements a configurable rate limiting mechanism for EndOfStream
endOfStreamScheduleDelay
The delay duration before attempting reconnection when the rate limit is exceeded.
+
+
pipelineOperationTimeout
+
The maximum duration allowed for pipeline onNext() and onComplete() operations before considering the block node unresponsive. Default: 30 seconds.
+
+### Pipeline Operation Timeout
+
+To detect unresponsive block nodes during message transmission and connection establishment, the connection implements configurable timeouts for pipeline operations.
+
+#### Timeout Behavior
+
+Pipeline operations (`onNext()`, `onComplete()`, and pipeline creation) are potentially blocking I/O operations that are executed on a dedicated virtual thread executor with timeout enforcement using `Future.get(timeout)`. Each connection instance creates its own virtual thread executor to isolate pipeline operations from other tasks and prevent blocking.
+
+- **Pipeline creation timeout**: When establishing the gRPC connection via `createRequestPipeline()`, both the gRPC client creation and bidirectional stream setup are executed with timeout protection. If the operation does not complete within the configured timeout period:
+ - The Future is cancelled to interrupt the blocked operation
+ - The timeout metric is incremented
+ - A `RuntimeException` is thrown with the underlying `TimeoutException`
+ - The connection remains in UNINITIALIZED state
+ - The connection manager's error handling will schedule a retry with exponential backoff
+- **onNext() timeout**: When sending block items via `sendRequest()`, the operation is submitted to the connection's dedicated executor and the calling thread blocks waiting for completion with a timeout. If the operation does not complete within the configured timeout period:
+ - The Future is cancelled to interrupt the blocked operation
+ - The timeout metric is incremented
+ - `handleStreamFailure()` is triggered (only if connection is still ACTIVE)
+ - The connection follows standard failure handling with exponential backoff retry
+ - The connection manager will select a different block node for the next attempt if one is available
+ - `TimeoutException` is caught and handled internally
+- **onComplete() timeout**: When closing the stream via `closePipeline()`, the operation is submitted to the same dedicated executor with the same timeout mechanism. If the operation does not complete within the configured timeout period:
+ - The Future is cancelled to interrupt the blocked operation
+ - The timeout metric is incremented
+ - Since the connection is already in CLOSING state, only the timeout is logged
+ - The connection completes the close operation normally
+
+**Note**: The dedicated virtual thread executor is properly shut down when the connection closes with a 5-second grace period for termination, ensuring no resource leaks. If tasks don't complete within the grace period, `shutdownNow()` is called to forcefully terminate them.
+
+#### Exception Handling
+
+The implementation handles multiple exception scenarios across all timeout-protected operations:
+- **TimeoutException**: Pipeline operation exceeded the timeout - triggers failure handling for `onNext()` and pipeline creation, logged for `onComplete()`
+- **InterruptedException**: Thread was interrupted while waiting - interrupt status is restored via `Thread.currentThread().interrupt()` before propagating the exception (for `onNext()` and pipeline creation) or logging it (for `onComplete()` and executor shutdown)
+- **ExecutionException**: Error occurred during pipeline operation execution - the underlying cause is unwrapped and re-thrown (for `onNext()` and pipeline creation) or logged (for `onComplete()`)
+
+All exception scenarios include appropriate DEBUG-level logging with context information to aid in troubleshooting.
+
+#### Metrics
+
+A new metric `conn_pipelineOperationTimeout` tracks the total number of timeout events for pipeline creation, `onNext()`, and `onComplete()` operations, enabling operators to monitor block node responsiveness and connection establishment issues.
diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java
index 21649010a6cd..986cce7aae4b 100644
--- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java
+++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java
@@ -30,10 +30,15 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -99,6 +104,10 @@ private record Options(Optional authority, String contentType) implement
* The reset period for the stream. This is used to periodically reset the stream to ensure increased stability and reliability.
*/
private final Duration streamResetPeriod;
+ /**
+ * Timeout for pipeline onNext() and onComplete() operations to detect unresponsive block nodes.
+ */
+ private final Duration pipelineOperationTimeout;
/**
* Flag that indicates if this stream is currently shutting down, as initiated by this consensus node.
*/
@@ -117,6 +126,13 @@ private record Options(Optional authority, String contentType) implement
* Scheduled executor service that is used to schedule periodic reset of the stream to help ensure stream health.
*/
private final ScheduledExecutorService executorService;
+ /**
+ * Dedicated executor service for pipeline operations using virtual threads.
+ * This isolates pipeline operations from the shared scheduled executor to prevent
+ * blocking when the connection manager is busy with other tasks. Virtual threads
+ * make this approach lightweight despite creating one executor per connection.
+ */
+ private final ExecutorService pipelineExecutor;
/**
* This task runs every 24 hours (initial delay of 24 hours) when a connection is active.
* The task helps maintain stream stability by forcing periodic reconnections.
@@ -197,10 +213,12 @@ public BlockNodeConnection(
this.blockStreamMetrics = requireNonNull(blockStreamMetrics, "blockStreamMetrics must not be null");
this.connectionState = new AtomicReference<>(ConnectionState.UNINITIALIZED);
this.executorService = requireNonNull(executorService, "executorService must not be null");
+ this.pipelineExecutor = Executors.newVirtualThreadPerTaskExecutor();
final var blockNodeConnectionConfig =
configProvider.getConfiguration().getConfigData(BlockNodeConnectionConfig.class);
this.streamResetPeriod = blockNodeConnectionConfig.streamResetPeriod();
this.clientFactory = requireNonNull(clientFactory, "clientFactory must not be null");
+ this.pipelineOperationTimeout = blockNodeConnectionConfig.pipelineOperationTimeout();
connectionId = String.format("%04d", connectionIdCounter.incrementAndGet());
}
@@ -209,13 +227,38 @@ public BlockNodeConnection(
*/
public synchronized void createRequestPipeline() {
if (requestPipelineRef.get() == null) {
- blockStreamPublishServiceClient = createNewGrpcClient();
- final Pipeline super PublishStreamRequest> pipeline =
- blockStreamPublishServiceClient.publishBlockStream(this);
- requestPipelineRef.set(pipeline);
- logWithContext(logger, DEBUG, this, "Request pipeline initialized.");
- updateConnectionState(ConnectionState.PENDING);
- blockStreamMetrics.recordConnectionOpened();
+ // Execute entire pipeline creation (including gRPC client creation) with timeout
+ // to prevent blocking on network operations
+ final Future> future = pipelineExecutor.submit(() -> {
+ blockStreamPublishServiceClient = createNewGrpcClient();
+ final Pipeline super PublishStreamRequest> pipeline =
+ blockStreamPublishServiceClient.publishBlockStream(this);
+ requestPipelineRef.set(pipeline);
+ });
+
+ try {
+ future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ logWithContext(logger, DEBUG, this, "Request pipeline initialized.");
+ updateConnectionState(ConnectionState.PENDING);
+ blockStreamMetrics.recordConnectionOpened();
+ } catch (final TimeoutException e) {
+ future.cancel(true);
+ logWithContext(
+ logger,
+ DEBUG,
+ this,
+ "Pipeline creation timed out after {}ms",
+ pipelineOperationTimeout.toMillis());
+ blockStreamMetrics.recordPipelineOperationTimeout();
+ throw new RuntimeException("Pipeline creation timed out", e);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logWithContext(logger, DEBUG, this, "Interrupted while creating pipeline", e);
+ throw new RuntimeException("Interrupted while creating pipeline", e);
+ } catch (final ExecutionException e) {
+ logWithContext(logger, DEBUG, this, "Error creating pipeline", e.getCause());
+ throw new RuntimeException("Error creating pipeline", e.getCause());
+ }
} else {
logWithContext(logger, DEBUG, this, "Request pipeline already available.");
}
@@ -638,7 +681,7 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) {
highestAckedBlockNumber);
try {
sendRequest(endStream);
- } catch (RuntimeException e) {
+ } catch (final RuntimeException e) {
logger.warn(formatLogMessage("Error sending EndStream request", this), e);
}
close(true);
@@ -677,7 +720,31 @@ public void sendRequest(@NonNull final PublishStreamRequest request) {
request.protobufSize());
}
final long startMs = System.currentTimeMillis();
- pipeline.onNext(request);
+
+ final Future> future = pipelineExecutor.submit(() -> pipeline.onNext(request));
+ try {
+ future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (final TimeoutException e) {
+ future.cancel(true); // Cancel the task if it times out
+ if (getConnectionState() == ConnectionState.ACTIVE) {
+ logWithContext(
+ logger,
+ DEBUG,
+ this,
+ "Pipeline onNext() timed out after {}ms",
+ pipelineOperationTimeout.toMillis());
+ blockStreamMetrics.recordPipelineOperationTimeout();
+ handleStreamFailure();
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt(); // Restore interrupt status
+ logWithContext(logger, DEBUG, this, "Interrupted while waiting for pipeline.onNext()", e);
+ throw new RuntimeException("Interrupted while waiting for pipeline.onNext()", e);
+ } catch (final ExecutionException e) {
+ logWithContext(logger, DEBUG, this, "Error executing pipeline.onNext()", e.getCause());
+ throw new RuntimeException("Error executing pipeline.onNext()", e.getCause());
+ }
+
final long durationMs = System.currentTimeMillis() - startMs;
blockStreamMetrics.recordRequestLatency(durationMs);
logWithContext(logger, TRACE, this, "Request took {}ms to send", this, durationMs);
@@ -753,6 +820,16 @@ public void close(final boolean callOnComplete) {
} catch (final Exception e) {
logger.error(formatLogMessage("Error occurred while closing gRPC client.", this), e);
}
+ try {
+ pipelineExecutor.shutdown();
+ if (!pipelineExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ pipelineExecutor.shutdownNow();
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ pipelineExecutor.shutdownNow();
+ logger.error(formatLogMessage("Error occurred while shutting down pipeline executor.", this), e);
+ }
blockStreamMetrics.recordConnectionClosed();
blockStreamMetrics.recordActiveConnectionIp(-1L);
// regardless of outcome, mark the connection as closed
@@ -770,8 +847,26 @@ private void closePipeline(final boolean callOnComplete) {
try {
final ConnectionState state = getConnectionState();
if (state == ConnectionState.CLOSING && callOnComplete) {
- pipeline.onComplete();
- logWithContext(logger, DEBUG, this, "Request pipeline successfully closed.");
+ final Future> future = pipelineExecutor.submit(pipeline::onComplete);
+ try {
+ future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ logWithContext(logger, DEBUG, this, "Request pipeline successfully closed.");
+ } catch (final TimeoutException e) {
+ future.cancel(true); // Cancel the task if it times out
+ logWithContext(
+ logger,
+ DEBUG,
+ this,
+ "Pipeline onComplete() timed out after {}ms",
+ pipelineOperationTimeout.toMillis());
+ blockStreamMetrics.recordPipelineOperationTimeout();
+ // Connection is already closing, just log the timeout
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt(); // Restore interrupt status
+ logWithContext(logger, DEBUG, this, "Interrupted while waiting for pipeline.onComplete()");
+ } catch (final ExecutionException e) {
+ logWithContext(logger, DEBUG, this, "Error executing pipeline.onComplete()", e.getCause());
+ }
}
} catch (final Exception e) {
logger.warn(formatLogMessage("Error while completing request pipeline.", this), e);
diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java
index b68c07b011d5..ce824fe804d2 100644
--- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java
+++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java
@@ -61,6 +61,7 @@ public class BlockStreamMetrics {
private Counter conn_endOfStreamLimitCounter;
private DoubleGauge conn_ackLatencyGauge;
private Counter conn_highLatencyCounter;
+ private Counter conn_pipelineOperationTimeoutCounter;
// buffer metrics
private static final long BACK_PRESSURE_ACTIVE = 3;
@@ -282,6 +283,11 @@ private void registerConnectivityMetrics() {
final Counter.Config highLatencyCfg = newCounter(GROUP_CONN, "highLatencyEvents")
.withDescription("Count of high latency events from the active block node connection");
conn_highLatencyCounter = metrics.getOrCreate(highLatencyCfg);
+
+ final Counter.Config pipelineTimeoutCfg = newCounter(GROUP_CONN, "pipelineOperationTimeout")
+ .withDescription(
+ "Number of times a pipeline onNext() or onComplete() operation timed out on a block node connection");
+ conn_pipelineOperationTimeoutCounter = metrics.getOrCreate(pipelineTimeoutCfg);
}
/**
@@ -357,6 +363,13 @@ public void recordHighLatencyEvent() {
conn_highLatencyCounter.increment();
}
+ /**
+ * Record that a pipeline onNext() or onComplete() operation timed out.
+ */
+ public void recordPipelineOperationTimeout() {
+ conn_pipelineOperationTimeoutCounter.increment();
+ }
+
// Connection RECV metrics -----------------------------------------------------------------------------------------
private void registerConnectionRecvMetrics() {
diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java
index fcd044020f00..4bffc792ddf0 100644
--- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java
+++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java
@@ -6,6 +6,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
@@ -27,12 +28,18 @@
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions;
import io.helidon.webclient.api.WebClient;
+import java.lang.reflect.Field;
import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.hiero.block.api.BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient;
import org.hiero.block.api.PublishStreamRequest;
import org.hiero.block.api.PublishStreamRequest.EndStream;
@@ -62,6 +69,7 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase {
private Pipeline super PublishStreamRequest> requestPipeline;
private ScheduledExecutorService executorService;
private BlockNodeStats.HighLatencyResult latencyResult;
+ private BlockNodeClientFactory clientFactory;
@BeforeEach
@SuppressWarnings("unchecked")
@@ -76,12 +84,11 @@ void beforeEach() {
executorService = mock(ScheduledExecutorService.class);
latencyResult = mock(BlockNodeStats.HighLatencyResult.class);
- final BlockNodeClientFactory clientFactory = mock(BlockNodeClientFactory.class);
+ clientFactory = mock(BlockNodeClientFactory.class);
lenient()
.doReturn(grpcServiceClient)
.when(clientFactory)
.createClient(any(WebClient.class), any(PbjGrpcClientConfig.class), any(RequestOptions.class));
-
connection = new BlockNodeConnection(
configProvider, nodeConfig, connectionManager, bufferService, metrics, executorService, clientFactory);
@@ -107,6 +114,114 @@ void testCreateRequestPipeline_alreadyExists() {
verifyNoMoreInteractions(grpcServiceClient);
}
+ /**
+ * Tests TimeoutException handling during pipeline creation.
+ * Uses mocks to simulate a timeout without actually waiting, making the test fast.
+ */
+ @Test
+ void testCreateRequestPipeline_timeoutException() throws Exception {
+ // Create a mock Future that will throw TimeoutException when get() is called
+ @SuppressWarnings("unchecked")
+ final Future