diff --git a/.github/workflows/support/citr/log4j2.xml b/.github/workflows/support/citr/log4j2.xml
index 354babe3addf..3b59e72a0787 100644
--- a/.github/workflows/support/citr/log4j2.xml
+++ b/.github/workflows/support/citr/log4j2.xml
@@ -76,8 +76,8 @@
-
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n
diff --git a/hedera-node/configuration/compose/log4j2.xml b/hedera-node/configuration/compose/log4j2.xml
index b94cce524e37..aeeffcd38017 100644
--- a/hedera-node/configuration/compose/log4j2.xml
+++ b/hedera-node/configuration/compose/log4j2.xml
@@ -74,8 +74,8 @@
-
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n
diff --git a/hedera-node/configuration/dev/log4j2.xml b/hedera-node/configuration/dev/log4j2.xml
index a44548155af2..0768dba99542 100644
--- a/hedera-node/configuration/dev/log4j2.xml
+++ b/hedera-node/configuration/dev/log4j2.xml
@@ -82,8 +82,8 @@
-
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n
diff --git a/hedera-node/configuration/mainnet/log4j2.xml b/hedera-node/configuration/mainnet/log4j2.xml
index 740df09ee361..394d6c54967d 100644
--- a/hedera-node/configuration/mainnet/log4j2.xml
+++ b/hedera-node/configuration/mainnet/log4j2.xml
@@ -82,8 +82,8 @@
-
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n
diff --git a/hedera-node/configuration/preprod/log4j2.xml b/hedera-node/configuration/preprod/log4j2.xml
index 98f9dc23c3e0..9e196810c015 100644
--- a/hedera-node/configuration/preprod/log4j2.xml
+++ b/hedera-node/configuration/preprod/log4j2.xml
@@ -82,8 +82,8 @@
-
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n
diff --git a/hedera-node/configuration/previewnet/log4j2.xml b/hedera-node/configuration/previewnet/log4j2.xml
index d01319b44124..2988300e3db8 100644
--- a/hedera-node/configuration/previewnet/log4j2.xml
+++ b/hedera-node/configuration/previewnet/log4j2.xml
@@ -82,8 +82,8 @@
-
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n
diff --git a/hedera-node/configuration/testnet/log4j2.xml b/hedera-node/configuration/testnet/log4j2.xml
index 574856cea81c..6d0134a05693 100644
--- a/hedera-node/configuration/testnet/log4j2.xml
+++ b/hedera-node/configuration/testnet/log4j2.xml
@@ -82,8 +82,8 @@
-
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n
diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md
index 277f6a4a77d8..8b47306882ec 100644
--- a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md
+++ b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md
@@ -6,7 +6,7 @@
2. [Definitions](#definitions)
3. [Component Responsibilities](#component-responsibilities)
4. [Component Interaction](#component-interaction)
-5. [State Management](#state-management)
+5. [Lifecycle](#lifecycle)
6. [State Machine Diagrams](#state-machine-diagrams)
7. [Error Handling](#error-handling)
@@ -29,6 +29,7 @@ It manages connection state, handles communication, and reports errors to the `B
- Establish and maintain the connection transport.
- Handle incoming and outgoing message flow.
+- Detect unresponsive block nodes via configurable timeouts on pipeline operations.
- Report connection errors promptly.
- Coordinate with `BlockNodeConnectionManager` on lifecycle events.
- Notify the block buffer when a block has been acknowledged and therefore eligible to be pruned.
@@ -129,6 +130,7 @@ stateDiagram-v2
ACTIVE --> CLOSING : ResendBlock unavailable
ACTIVE --> CLOSING : gRPC onError
ACTIVE --> CLOSING : Stream failure
+ ACTIVE --> CLOSING : Pipeline operation timeout
ACTIVE --> CLOSING : Manual close
ACTIVE --> ACTIVE : BlockAcknowledgement
ACTIVE --> ACTIVE : SkipBlock
@@ -227,4 +229,49 @@ The connection implements a configurable rate limiting mechanism for EndOfStream
blockNode.maxRequestDelay
The maximum amount of time between attempting to send block items to a block node, regardless of the number of items ready to send.
+
+
pipelineOperationTimeout
+
The maximum duration allowed for pipeline onNext() and onComplete() operations before considering the block node unresponsive. Default: 30 seconds.
+
+### Pipeline Operation Timeout
+
+To detect unresponsive block nodes during message transmission and connection establishment, the connection implements configurable timeouts for pipeline operations.
+
+#### Timeout Behavior
+
+Pipeline operations (`onNext()`, `onComplete()`, and pipeline creation) are potentially blocking I/O operations that are executed on a dedicated virtual thread executor with timeout enforcement using `Future.get(timeout)`. The executor is provided via dependency injection through the constructor, allowing for flexible configuration and easier testing.
+
+- **Pipeline creation timeout**: When establishing the gRPC connection via `createRequestPipeline()`, both the gRPC client creation and bidirectional stream setup are executed with timeout protection. If the operation does not complete within the configured timeout period:
+ - The Future is cancelled to interrupt the blocked operation
+ - The timeout metric is incremented
+ - A `RuntimeException` is thrown with the underlying `TimeoutException`
+ - The connection remains in UNINITIALIZED state
+ - The connection manager's error handling will schedule a retry with exponential backoff
+- **onNext() timeout**: When sending block items via `sendRequest()`, the operation is submitted to the connection's dedicated executor and the calling thread blocks waiting for completion with a timeout. If the operation does not complete within the configured timeout period:
+ - The Future is cancelled to interrupt the blocked operation
+ - The timeout metric is incremented
+ - `handleStreamFailure()` is triggered (only if connection is still ACTIVE)
+ - The connection follows standard failure handling with exponential backoff retry
+ - The connection manager will select a different block node for the next attempt if one is available
+ - `TimeoutException` is caught and handled internally
+- **onComplete() timeout**: When closing the stream via `closePipeline()`, the operation is submitted to the same dedicated executor with the same timeout mechanism. If the operation does not complete within the configured timeout period:
+ - The Future is cancelled to interrupt the blocked operation
+ - The timeout metric is incremented
+ - Since the connection is already in CLOSING state, only the timeout is logged
+ - The connection completes the close operation normally
+
+**Note**: The dedicated executor (typically a virtual thread executor in production) is provided during construction and properly shut down when the connection closes with a 5-second grace period for termination, ensuring no resource leaks. If tasks don't complete within the grace period, `shutdownNow()` is called to forcefully terminate them.
+
+#### Exception Handling
+
+The implementation handles multiple exception scenarios across all timeout-protected operations:
+- **TimeoutException**: Pipeline operation exceeded the timeout - triggers failure handling for `onNext()` and pipeline creation, logged for `onComplete()`
+- **InterruptedException**: Thread was interrupted while waiting - interrupt status is restored via `Thread.currentThread().interrupt()` before propagating the exception (for `onNext()` and pipeline creation) or logging it (for `onComplete()` and executor shutdown)
+- **ExecutionException**: Error occurred during pipeline operation execution - the underlying cause is unwrapped and re-thrown (for `onNext()` and pipeline creation) or logged (for `onComplete()`)
+
+All exception scenarios include appropriate DEBUG-level logging with context information to aid in troubleshooting.
+
+#### Metrics
+
+A new metric `conn_pipelineOperationTimeout` tracks the total number of timeout events for pipeline creation, `onNext()`, and `onComplete()` operations, enabling operators to monitor block node responsiveness and connection establishment issues.
diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java
index bd370fbb2b67..f1b2d6ef8388 100644
--- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java
+++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java
@@ -29,10 +29,14 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -106,6 +110,10 @@ private record Options(Optional authority, String contentType) implement
* The reset period for the stream. This is used to periodically reset the stream to ensure increased stability and reliability.
*/
private final Duration streamResetPeriod;
+ /**
+ * Timeout for pipeline onNext() and onComplete() operations to detect unresponsive block nodes.
+ */
+ private final Duration pipelineOperationTimeout;
/**
* Flag that indicates if this stream is currently shutting down, as initiated by this consensus node.
*/
@@ -124,6 +132,13 @@ private record Options(Optional authority, String contentType) implement
* Scheduled executor service that is used to schedule periodic reset of the stream to help ensure stream health.
*/
private final ScheduledExecutorService executorService;
+ /**
+ * Dedicated executor service for pipeline operations using virtual threads.
+ * This isolates pipeline operations from the shared scheduled executor to prevent
+ * blocking when the connection manager is busy with other tasks. Virtual threads
+ * make this approach lightweight despite creating one executor per connection.
+ */
+ private final ExecutorService pipelineExecutor;
/**
* This task runs every 24 hours (initial delay of 24 hours) when a connection is active.
* The task helps maintain stream stability by forcing periodic reconnections.
@@ -198,6 +213,9 @@ boolean isTerminal() {
* @param blockBufferService the block stream state manager for block node connections
* @param blockStreamMetrics the block stream metrics for block node connections
* @param executorService the scheduled executor service used to perform async connection reconnects
+ * @param pipelineExecutor the executor service used for the block processing pipeline
+ * @param initialBlockToStream the initial block number to start streaming from, or null to use default
+ * @param clientFactory the factory for creating block stream clients
*/
public BlockNodeConnection(
@NonNull final ConfigProvider configProvider,
@@ -206,6 +224,7 @@ public BlockNodeConnection(
@NonNull final BlockBufferService blockBufferService,
@NonNull final BlockStreamMetrics blockStreamMetrics,
@NonNull final ScheduledExecutorService executorService,
+ @NonNull final ExecutorService pipelineExecutor,
@Nullable final Long initialBlockToStream,
@NonNull final BlockNodeClientFactory clientFactory) {
this.configProvider = requireNonNull(configProvider, "configProvider must not be null");
@@ -216,10 +235,12 @@ public BlockNodeConnection(
this.blockStreamMetrics = requireNonNull(blockStreamMetrics, "blockStreamMetrics must not be null");
this.connectionState = new AtomicReference<>(ConnectionState.UNINITIALIZED);
this.executorService = requireNonNull(executorService, "executorService must not be null");
+ this.pipelineExecutor = requireNonNull(pipelineExecutor, "pipelineExecutor must not be null");
final var blockNodeConnectionConfig =
configProvider.getConfiguration().getConfigData(BlockNodeConnectionConfig.class);
this.streamResetPeriod = blockNodeConnectionConfig.streamResetPeriod();
this.clientFactory = requireNonNull(clientFactory, "clientFactory must not be null");
+ this.pipelineOperationTimeout = blockNodeConnectionConfig.pipelineOperationTimeout();
connectionId = String.format("%04d", connectionIdCounter.incrementAndGet());
@@ -234,13 +255,33 @@ public BlockNodeConnection(
*/
public synchronized void createRequestPipeline() {
if (requestPipelineRef.get() == null) {
- blockStreamPublishServiceClient = createNewGrpcClient();
- final Pipeline super PublishStreamRequest> pipeline =
- blockStreamPublishServiceClient.publishBlockStream(this);
- requestPipelineRef.set(pipeline);
- logger.debug("{} Request pipeline initialized.", this);
- updateConnectionState(ConnectionState.PENDING);
- blockStreamMetrics.recordConnectionOpened();
+ // Execute entire pipeline creation (including gRPC client creation) with timeout
+ // to prevent blocking on network operations
+ final Future> future = pipelineExecutor.submit(() -> {
+ blockStreamPublishServiceClient = createNewGrpcClient();
+ final Pipeline super PublishStreamRequest> pipeline =
+ blockStreamPublishServiceClient.publishBlockStream(this);
+ requestPipelineRef.set(pipeline);
+ });
+
+ try {
+ future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ logger.debug("{} Request pipeline initialized.", this);
+ updateConnectionState(ConnectionState.PENDING);
+ blockStreamMetrics.recordConnectionOpened();
+ } catch (final TimeoutException e) {
+ future.cancel(true);
+ logger.debug("{} Pipeline creation timed out after {}ms", this, pipelineOperationTimeout.toMillis());
+ blockStreamMetrics.recordPipelineOperationTimeout();
+ throw new RuntimeException("Pipeline creation timed out", e);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.debug("{} Interrupted while creating pipeline", this, e);
+ throw new RuntimeException("Interrupted while creating pipeline", e);
+ } catch (final ExecutionException e) {
+ logger.debug("{} Error creating pipeline", this, e.getCause());
+ throw new RuntimeException("Error creating pipeline", e.getCause());
+ }
} else {
logger.debug("{} Request pipeline already available.", this);
}
@@ -648,7 +689,7 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) {
highestAckedBlockNumber);
try {
sendRequest(endStream);
- } catch (RuntimeException e) {
+ } catch (final RuntimeException e) {
logger.warn("{} Error sending EndStream request", this, e);
}
close(true);
@@ -687,7 +728,27 @@ private boolean sendRequest(
}
final long startMs = System.currentTimeMillis();
- pipeline.onNext(request);
+
+ final Future> future = pipelineExecutor.submit(() -> pipeline.onNext(request));
+ try {
+ future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (final TimeoutException e) {
+ future.cancel(true); // Cancel the task if it times out
+ if (getConnectionState() == ConnectionState.ACTIVE) {
+ logger.debug(
+ "{} Pipeline onNext() timed out after {}ms", this, pipelineOperationTimeout.toMillis());
+ blockStreamMetrics.recordPipelineOperationTimeout();
+ handleStreamFailure();
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt(); // Restore interrupt status
+ logger.debug("{} Interrupted while waiting for pipeline.onNext()", this, e);
+ throw new RuntimeException("Interrupted while waiting for pipeline.onNext()", e);
+ } catch (final ExecutionException e) {
+ logger.debug("{} Error executing pipeline.onNext()", this, e.getCause());
+ throw new RuntimeException("Error executing pipeline.onNext()", e.getCause());
+ }
+
final long durationMs = System.currentTimeMillis() - startMs;
blockStreamMetrics.recordRequestLatency(durationMs);
@@ -774,6 +835,16 @@ public void close(final boolean callOnComplete) {
} catch (final Exception e) {
logger.error("{} Error occurred while closing gRPC client.", this, e);
}
+ try {
+ pipelineExecutor.shutdown();
+ if (!pipelineExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ pipelineExecutor.shutdownNow();
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ pipelineExecutor.shutdownNow();
+ logger.error("{} Error occurred while shutting down pipeline executor.", this, e);
+ }
blockStreamMetrics.recordConnectionClosed();
blockStreamMetrics.recordActiveConnectionIp(-1L);
blockNodeConnectionManager.notifyConnectionClosed(this);
@@ -792,8 +863,24 @@ private void closePipeline(final boolean callOnComplete) {
try {
final ConnectionState state = getConnectionState();
if (state == ConnectionState.CLOSING && callOnComplete) {
- pipeline.onComplete();
- logger.debug("{} Request pipeline successfully closed.", this);
+ final Future> future = pipelineExecutor.submit(pipeline::onComplete);
+ try {
+ future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ logger.debug("{} Request pipeline successfully closed.", this);
+ } catch (final TimeoutException e) {
+ future.cancel(true); // Cancel the task if it times out
+ logger.debug(
+ "{} Pipeline onComplete() timed out after {}ms",
+ this,
+ pipelineOperationTimeout.toMillis());
+ blockStreamMetrics.recordPipelineOperationTimeout();
+ // Connection is already closing, just log the timeout
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt(); // Restore interrupt status
+ logger.debug("{} Interrupted while waiting for pipeline.onComplete()", this);
+ } catch (final ExecutionException e) {
+ logger.debug("{} Error executing pipeline.onComplete()", this, e.getCause());
+ }
}
} catch (final Exception e) {
logger.warn("{} Error while completing request pipeline.", this, e);
diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java
index 3b6415b6d302..cb0fbf372b86 100644
--- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java
+++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java
@@ -548,6 +548,7 @@ private BlockNodeConnection createConnection(
blockBufferService,
blockStreamMetrics,
sharedExecutorService,
+ Executors.newVirtualThreadPerTaskExecutor(),
initialBlockToStream,
clientFactory);
diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java
index b68c07b011d5..ce824fe804d2 100644
--- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java
+++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java
@@ -61,6 +61,7 @@ public class BlockStreamMetrics {
private Counter conn_endOfStreamLimitCounter;
private DoubleGauge conn_ackLatencyGauge;
private Counter conn_highLatencyCounter;
+ private Counter conn_pipelineOperationTimeoutCounter;
// buffer metrics
private static final long BACK_PRESSURE_ACTIVE = 3;
@@ -282,6 +283,11 @@ private void registerConnectivityMetrics() {
final Counter.Config highLatencyCfg = newCounter(GROUP_CONN, "highLatencyEvents")
.withDescription("Count of high latency events from the active block node connection");
conn_highLatencyCounter = metrics.getOrCreate(highLatencyCfg);
+
+ final Counter.Config pipelineTimeoutCfg = newCounter(GROUP_CONN, "pipelineOperationTimeout")
+ .withDescription(
+ "Number of times a pipeline onNext() or onComplete() operation timed out on a block node connection");
+ conn_pipelineOperationTimeoutCounter = metrics.getOrCreate(pipelineTimeoutCfg);
}
/**
@@ -357,6 +363,13 @@ public void recordHighLatencyEvent() {
conn_highLatencyCounter.increment();
}
+ /**
+ * Record that a pipeline onNext() or onComplete() operation timed out.
+ */
+ public void recordPipelineOperationTimeout() {
+ conn_pipelineOperationTimeoutCounter.increment();
+ }
+
// Connection RECV metrics -----------------------------------------------------------------------------------------
private void registerConnectionRecvMetrics() {
diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java
index 561b41f16d47..f37c7394cb41 100644
--- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java
+++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java
@@ -42,10 +42,15 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -98,12 +103,13 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase {
private BlockStreamMetrics metrics;
private Pipeline super PublishStreamRequest> requestPipeline;
private ScheduledExecutorService executorService;
+ private ExecutorService pipelineExecutor;
private BlockNodeStats.HighLatencyResult latencyResult;
private BlockNodeClientFactory clientFactory;
@BeforeEach
@SuppressWarnings("unchecked")
- void beforeEach() {
+ void beforeEach() throws Exception {
final ConfigProvider configProvider = createConfigProvider(createDefaultConfigProvider());
nodeConfig = newBlockNodeConfig(8080, 1);
connectionManager = mock(BlockNodeConnectionManager.class);
@@ -112,14 +118,44 @@ void beforeEach() {
metrics = mock(BlockStreamMetrics.class);
requestPipeline = mock(Pipeline.class);
executorService = mock(ScheduledExecutorService.class);
+ pipelineExecutor = mock(ExecutorService.class);
latencyResult = mock(BlockNodeStats.HighLatencyResult.class);
+ // Set up default behavior for pipelineExecutor using a real executor
+ // This ensures proper Future semantics while still being fast for tests
+ // Individual tests can override this with their own specific mocks for timeout scenarios
+ final ExecutorService realExecutor = Executors.newCachedThreadPool();
+ lenient()
+ .doAnswer(invocation -> {
+ Runnable runnable = invocation.getArgument(0);
+ return realExecutor.submit(runnable);
+ })
+ .when(pipelineExecutor)
+ .submit(any(Runnable.class));
+
+ // Also handle shutdown for cleanup
+ lenient()
+ .doAnswer(invocation -> {
+ realExecutor.shutdown();
+ return null;
+ })
+ .when(pipelineExecutor)
+ .shutdown();
+
+ lenient()
+ .doAnswer(invocation -> {
+ long timeout = invocation.getArgument(0);
+ TimeUnit unit = invocation.getArgument(1);
+ return realExecutor.awaitTermination(timeout, unit);
+ })
+ .when(pipelineExecutor)
+ .awaitTermination(anyLong(), any(TimeUnit.class));
+
clientFactory = mock(BlockNodeClientFactory.class);
lenient()
.doReturn(grpcServiceClient)
.when(clientFactory)
.createClient(any(WebClient.class), any(PbjGrpcClientConfig.class), any(RequestOptions.class));
-
connection = new BlockNodeConnection(
configProvider,
nodeConfig,
@@ -127,6 +163,7 @@ void beforeEach() {
bufferService,
metrics,
executorService,
+ pipelineExecutor,
null,
clientFactory);
@@ -192,6 +229,7 @@ void testConstructorWithInitialBlock() {
bufferService,
metrics,
executorService,
+ pipelineExecutor,
100L,
clientFactory);
@@ -200,6 +238,96 @@ void testConstructorWithInitialBlock() {
assertThat(streamingBlockNumber).hasValue(100L);
}
+ /**
+ * Tests TimeoutException handling during pipeline creation.
+ * Uses mocks to simulate a timeout without actually waiting, making the test fast.
+ */
+ @Test
+ void testCreateRequestPipeline_timeoutException() throws Exception {
+ // Create a mock Future that will throw TimeoutException when get() is called
+ @SuppressWarnings("unchecked")
+ final Future
-
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n
diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/AbstractNode.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/AbstractNode.java
index f68db0f49888..60d4d5efb713 100644
--- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/AbstractNode.java
+++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/AbstractNode.java
@@ -24,7 +24,7 @@ public abstract class AbstractNode implements HederaNode {
private static final String HGCAA_LOG = "hgcaa.log";
private static final String SWIRLDS_LOG = "swirlds.log";
private static final String LOG4J2_XML = "log4j2.xml";
- private static final String BLOCK_NODE_COMMS_LOG = "blocknode-comms.log";
+ private static final String BLOCK_NODE_COMMS_LOG = "block-node-comms.log";
protected NodeMetadata metadata;