diff --git a/.github/workflows/support/citr/log4j2.xml b/.github/workflows/support/citr/log4j2.xml index 354babe3addf..3b59e72a0787 100644 --- a/.github/workflows/support/citr/log4j2.xml +++ b/.github/workflows/support/citr/log4j2.xml @@ -76,8 +76,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n diff --git a/hedera-node/configuration/compose/log4j2.xml b/hedera-node/configuration/compose/log4j2.xml index b94cce524e37..aeeffcd38017 100644 --- a/hedera-node/configuration/compose/log4j2.xml +++ b/hedera-node/configuration/compose/log4j2.xml @@ -74,8 +74,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n diff --git a/hedera-node/configuration/dev/log4j2.xml b/hedera-node/configuration/dev/log4j2.xml index a44548155af2..0768dba99542 100644 --- a/hedera-node/configuration/dev/log4j2.xml +++ b/hedera-node/configuration/dev/log4j2.xml @@ -82,8 +82,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n diff --git a/hedera-node/configuration/mainnet/log4j2.xml b/hedera-node/configuration/mainnet/log4j2.xml index 740df09ee361..394d6c54967d 100644 --- a/hedera-node/configuration/mainnet/log4j2.xml +++ b/hedera-node/configuration/mainnet/log4j2.xml @@ -82,8 +82,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n diff --git a/hedera-node/configuration/preprod/log4j2.xml b/hedera-node/configuration/preprod/log4j2.xml index 98f9dc23c3e0..9e196810c015 100644 --- a/hedera-node/configuration/preprod/log4j2.xml +++ b/hedera-node/configuration/preprod/log4j2.xml @@ -82,8 +82,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n diff --git a/hedera-node/configuration/previewnet/log4j2.xml b/hedera-node/configuration/previewnet/log4j2.xml index d01319b44124..2988300e3db8 100644 --- a/hedera-node/configuration/previewnet/log4j2.xml +++ b/hedera-node/configuration/previewnet/log4j2.xml @@ -82,8 +82,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n diff --git a/hedera-node/configuration/testnet/log4j2.xml b/hedera-node/configuration/testnet/log4j2.xml index 574856cea81c..6d0134a05693 100644 --- a/hedera-node/configuration/testnet/log4j2.xml +++ b/hedera-node/configuration/testnet/log4j2.xml @@ -82,8 +82,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md index 277f6a4a77d8..8b47306882ec 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md @@ -6,7 +6,7 @@ 2. [Definitions](#definitions) 3. [Component Responsibilities](#component-responsibilities) 4. [Component Interaction](#component-interaction) -5. [State Management](#state-management) +5. [Lifecycle](#lifecycle) 6. [State Machine Diagrams](#state-machine-diagrams) 7. [Error Handling](#error-handling) @@ -29,6 +29,7 @@ It manages connection state, handles communication, and reports errors to the `B - Establish and maintain the connection transport. - Handle incoming and outgoing message flow. +- Detect unresponsive block nodes via configurable timeouts on pipeline operations. - Report connection errors promptly. - Coordinate with `BlockNodeConnectionManager` on lifecycle events. - Notify the block buffer when a block has been acknowledged and therefore eligible to be pruned. @@ -129,6 +130,7 @@ stateDiagram-v2 ACTIVE --> CLOSING : ResendBlock unavailable ACTIVE --> CLOSING : gRPC onError ACTIVE --> CLOSING : Stream failure + ACTIVE --> CLOSING : Pipeline operation timeout ACTIVE --> CLOSING : Manual close ACTIVE --> ACTIVE : BlockAcknowledgement ACTIVE --> ACTIVE : SkipBlock @@ -227,4 +229,49 @@ The connection implements a configurable rate limiting mechanism for EndOfStream
blockNode.maxRequestDelay
The maximum amount of time between attempting to send block items to a block node, regardless of the number of items ready to send.
+ +
pipelineOperationTimeout
+
The maximum duration allowed for pipeline onNext() and onComplete() operations before considering the block node unresponsive. Default: 30 seconds.
+ +### Pipeline Operation Timeout + +To detect unresponsive block nodes during message transmission and connection establishment, the connection implements configurable timeouts for pipeline operations. + +#### Timeout Behavior + +Pipeline operations (`onNext()`, `onComplete()`, and pipeline creation) are potentially blocking I/O operations that are executed on a dedicated virtual thread executor with timeout enforcement using `Future.get(timeout)`. The executor is provided via dependency injection through the constructor, allowing for flexible configuration and easier testing. + +- **Pipeline creation timeout**: When establishing the gRPC connection via `createRequestPipeline()`, both the gRPC client creation and bidirectional stream setup are executed with timeout protection. If the operation does not complete within the configured timeout period: + - The Future is cancelled to interrupt the blocked operation + - The timeout metric is incremented + - A `RuntimeException` is thrown with the underlying `TimeoutException` + - The connection remains in UNINITIALIZED state + - The connection manager's error handling will schedule a retry with exponential backoff +- **onNext() timeout**: When sending block items via `sendRequest()`, the operation is submitted to the connection's dedicated executor and the calling thread blocks waiting for completion with a timeout. If the operation does not complete within the configured timeout period: + - The Future is cancelled to interrupt the blocked operation + - The timeout metric is incremented + - `handleStreamFailure()` is triggered (only if connection is still ACTIVE) + - The connection follows standard failure handling with exponential backoff retry + - The connection manager will select a different block node for the next attempt if one is available + - `TimeoutException` is caught and handled internally +- **onComplete() timeout**: When closing the stream via `closePipeline()`, the operation is submitted to the same dedicated executor with the same timeout mechanism. If the operation does not complete within the configured timeout period: + - The Future is cancelled to interrupt the blocked operation + - The timeout metric is incremented + - Since the connection is already in CLOSING state, only the timeout is logged + - The connection completes the close operation normally + +**Note**: The dedicated executor (typically a virtual thread executor in production) is provided during construction and properly shut down when the connection closes with a 5-second grace period for termination, ensuring no resource leaks. If tasks don't complete within the grace period, `shutdownNow()` is called to forcefully terminate them. + +#### Exception Handling + +The implementation handles multiple exception scenarios across all timeout-protected operations: +- **TimeoutException**: Pipeline operation exceeded the timeout - triggers failure handling for `onNext()` and pipeline creation, logged for `onComplete()` +- **InterruptedException**: Thread was interrupted while waiting - interrupt status is restored via `Thread.currentThread().interrupt()` before propagating the exception (for `onNext()` and pipeline creation) or logging it (for `onComplete()` and executor shutdown) +- **ExecutionException**: Error occurred during pipeline operation execution - the underlying cause is unwrapped and re-thrown (for `onNext()` and pipeline creation) or logged (for `onComplete()`) + +All exception scenarios include appropriate DEBUG-level logging with context information to aid in troubleshooting. + +#### Metrics + +A new metric `conn_pipelineOperationTimeout` tracks the total number of timeout events for pipeline creation, `onNext()`, and `onComplete()` operations, enabling operators to monitor block node responsiveness and connection establishment issues. diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index bd370fbb2b67..f1b2d6ef8388 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -29,10 +29,14 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Flow; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -106,6 +110,10 @@ private record Options(Optional authority, String contentType) implement * The reset period for the stream. This is used to periodically reset the stream to ensure increased stability and reliability. */ private final Duration streamResetPeriod; + /** + * Timeout for pipeline onNext() and onComplete() operations to detect unresponsive block nodes. + */ + private final Duration pipelineOperationTimeout; /** * Flag that indicates if this stream is currently shutting down, as initiated by this consensus node. */ @@ -124,6 +132,13 @@ private record Options(Optional authority, String contentType) implement * Scheduled executor service that is used to schedule periodic reset of the stream to help ensure stream health. */ private final ScheduledExecutorService executorService; + /** + * Dedicated executor service for pipeline operations using virtual threads. + * This isolates pipeline operations from the shared scheduled executor to prevent + * blocking when the connection manager is busy with other tasks. Virtual threads + * make this approach lightweight despite creating one executor per connection. + */ + private final ExecutorService pipelineExecutor; /** * This task runs every 24 hours (initial delay of 24 hours) when a connection is active. * The task helps maintain stream stability by forcing periodic reconnections. @@ -198,6 +213,9 @@ boolean isTerminal() { * @param blockBufferService the block stream state manager for block node connections * @param blockStreamMetrics the block stream metrics for block node connections * @param executorService the scheduled executor service used to perform async connection reconnects + * @param pipelineExecutor the executor service used for the block processing pipeline + * @param initialBlockToStream the initial block number to start streaming from, or null to use default + * @param clientFactory the factory for creating block stream clients */ public BlockNodeConnection( @NonNull final ConfigProvider configProvider, @@ -206,6 +224,7 @@ public BlockNodeConnection( @NonNull final BlockBufferService blockBufferService, @NonNull final BlockStreamMetrics blockStreamMetrics, @NonNull final ScheduledExecutorService executorService, + @NonNull final ExecutorService pipelineExecutor, @Nullable final Long initialBlockToStream, @NonNull final BlockNodeClientFactory clientFactory) { this.configProvider = requireNonNull(configProvider, "configProvider must not be null"); @@ -216,10 +235,12 @@ public BlockNodeConnection( this.blockStreamMetrics = requireNonNull(blockStreamMetrics, "blockStreamMetrics must not be null"); this.connectionState = new AtomicReference<>(ConnectionState.UNINITIALIZED); this.executorService = requireNonNull(executorService, "executorService must not be null"); + this.pipelineExecutor = requireNonNull(pipelineExecutor, "pipelineExecutor must not be null"); final var blockNodeConnectionConfig = configProvider.getConfiguration().getConfigData(BlockNodeConnectionConfig.class); this.streamResetPeriod = blockNodeConnectionConfig.streamResetPeriod(); this.clientFactory = requireNonNull(clientFactory, "clientFactory must not be null"); + this.pipelineOperationTimeout = blockNodeConnectionConfig.pipelineOperationTimeout(); connectionId = String.format("%04d", connectionIdCounter.incrementAndGet()); @@ -234,13 +255,33 @@ public BlockNodeConnection( */ public synchronized void createRequestPipeline() { if (requestPipelineRef.get() == null) { - blockStreamPublishServiceClient = createNewGrpcClient(); - final Pipeline pipeline = - blockStreamPublishServiceClient.publishBlockStream(this); - requestPipelineRef.set(pipeline); - logger.debug("{} Request pipeline initialized.", this); - updateConnectionState(ConnectionState.PENDING); - blockStreamMetrics.recordConnectionOpened(); + // Execute entire pipeline creation (including gRPC client creation) with timeout + // to prevent blocking on network operations + final Future future = pipelineExecutor.submit(() -> { + blockStreamPublishServiceClient = createNewGrpcClient(); + final Pipeline pipeline = + blockStreamPublishServiceClient.publishBlockStream(this); + requestPipelineRef.set(pipeline); + }); + + try { + future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS); + logger.debug("{} Request pipeline initialized.", this); + updateConnectionState(ConnectionState.PENDING); + blockStreamMetrics.recordConnectionOpened(); + } catch (final TimeoutException e) { + future.cancel(true); + logger.debug("{} Pipeline creation timed out after {}ms", this, pipelineOperationTimeout.toMillis()); + blockStreamMetrics.recordPipelineOperationTimeout(); + throw new RuntimeException("Pipeline creation timed out", e); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + logger.debug("{} Interrupted while creating pipeline", this, e); + throw new RuntimeException("Interrupted while creating pipeline", e); + } catch (final ExecutionException e) { + logger.debug("{} Error creating pipeline", this, e.getCause()); + throw new RuntimeException("Error creating pipeline", e.getCause()); + } } else { logger.debug("{} Request pipeline already available.", this); } @@ -648,7 +689,7 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) { highestAckedBlockNumber); try { sendRequest(endStream); - } catch (RuntimeException e) { + } catch (final RuntimeException e) { logger.warn("{} Error sending EndStream request", this, e); } close(true); @@ -687,7 +728,27 @@ private boolean sendRequest( } final long startMs = System.currentTimeMillis(); - pipeline.onNext(request); + + final Future future = pipelineExecutor.submit(() -> pipeline.onNext(request)); + try { + future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (final TimeoutException e) { + future.cancel(true); // Cancel the task if it times out + if (getConnectionState() == ConnectionState.ACTIVE) { + logger.debug( + "{} Pipeline onNext() timed out after {}ms", this, pipelineOperationTimeout.toMillis()); + blockStreamMetrics.recordPipelineOperationTimeout(); + handleStreamFailure(); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); // Restore interrupt status + logger.debug("{} Interrupted while waiting for pipeline.onNext()", this, e); + throw new RuntimeException("Interrupted while waiting for pipeline.onNext()", e); + } catch (final ExecutionException e) { + logger.debug("{} Error executing pipeline.onNext()", this, e.getCause()); + throw new RuntimeException("Error executing pipeline.onNext()", e.getCause()); + } + final long durationMs = System.currentTimeMillis() - startMs; blockStreamMetrics.recordRequestLatency(durationMs); @@ -774,6 +835,16 @@ public void close(final boolean callOnComplete) { } catch (final Exception e) { logger.error("{} Error occurred while closing gRPC client.", this, e); } + try { + pipelineExecutor.shutdown(); + if (!pipelineExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + pipelineExecutor.shutdownNow(); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + pipelineExecutor.shutdownNow(); + logger.error("{} Error occurred while shutting down pipeline executor.", this, e); + } blockStreamMetrics.recordConnectionClosed(); blockStreamMetrics.recordActiveConnectionIp(-1L); blockNodeConnectionManager.notifyConnectionClosed(this); @@ -792,8 +863,24 @@ private void closePipeline(final boolean callOnComplete) { try { final ConnectionState state = getConnectionState(); if (state == ConnectionState.CLOSING && callOnComplete) { - pipeline.onComplete(); - logger.debug("{} Request pipeline successfully closed.", this); + final Future future = pipelineExecutor.submit(pipeline::onComplete); + try { + future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS); + logger.debug("{} Request pipeline successfully closed.", this); + } catch (final TimeoutException e) { + future.cancel(true); // Cancel the task if it times out + logger.debug( + "{} Pipeline onComplete() timed out after {}ms", + this, + pipelineOperationTimeout.toMillis()); + blockStreamMetrics.recordPipelineOperationTimeout(); + // Connection is already closing, just log the timeout + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); // Restore interrupt status + logger.debug("{} Interrupted while waiting for pipeline.onComplete()", this); + } catch (final ExecutionException e) { + logger.debug("{} Error executing pipeline.onComplete()", this, e.getCause()); + } } } catch (final Exception e) { logger.warn("{} Error while completing request pipeline.", this, e); diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 3b6415b6d302..cb0fbf372b86 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -548,6 +548,7 @@ private BlockNodeConnection createConnection( blockBufferService, blockStreamMetrics, sharedExecutorService, + Executors.newVirtualThreadPerTaskExecutor(), initialBlockToStream, clientFactory); diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java index b68c07b011d5..ce824fe804d2 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java @@ -61,6 +61,7 @@ public class BlockStreamMetrics { private Counter conn_endOfStreamLimitCounter; private DoubleGauge conn_ackLatencyGauge; private Counter conn_highLatencyCounter; + private Counter conn_pipelineOperationTimeoutCounter; // buffer metrics private static final long BACK_PRESSURE_ACTIVE = 3; @@ -282,6 +283,11 @@ private void registerConnectivityMetrics() { final Counter.Config highLatencyCfg = newCounter(GROUP_CONN, "highLatencyEvents") .withDescription("Count of high latency events from the active block node connection"); conn_highLatencyCounter = metrics.getOrCreate(highLatencyCfg); + + final Counter.Config pipelineTimeoutCfg = newCounter(GROUP_CONN, "pipelineOperationTimeout") + .withDescription( + "Number of times a pipeline onNext() or onComplete() operation timed out on a block node connection"); + conn_pipelineOperationTimeoutCounter = metrics.getOrCreate(pipelineTimeoutCfg); } /** @@ -357,6 +363,13 @@ public void recordHighLatencyEvent() { conn_highLatencyCounter.increment(); } + /** + * Record that a pipeline onNext() or onComplete() operation timed out. + */ + public void recordPipelineOperationTimeout() { + conn_pipelineOperationTimeoutCounter.increment(); + } + // Connection RECV metrics ----------------------------------------------------------------------------------------- private void registerConnectionRecvMetrics() { diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 561b41f16d47..f37c7394cb41 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -42,10 +42,15 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Flow; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -98,12 +103,13 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase { private BlockStreamMetrics metrics; private Pipeline requestPipeline; private ScheduledExecutorService executorService; + private ExecutorService pipelineExecutor; private BlockNodeStats.HighLatencyResult latencyResult; private BlockNodeClientFactory clientFactory; @BeforeEach @SuppressWarnings("unchecked") - void beforeEach() { + void beforeEach() throws Exception { final ConfigProvider configProvider = createConfigProvider(createDefaultConfigProvider()); nodeConfig = newBlockNodeConfig(8080, 1); connectionManager = mock(BlockNodeConnectionManager.class); @@ -112,14 +118,44 @@ void beforeEach() { metrics = mock(BlockStreamMetrics.class); requestPipeline = mock(Pipeline.class); executorService = mock(ScheduledExecutorService.class); + pipelineExecutor = mock(ExecutorService.class); latencyResult = mock(BlockNodeStats.HighLatencyResult.class); + // Set up default behavior for pipelineExecutor using a real executor + // This ensures proper Future semantics while still being fast for tests + // Individual tests can override this with their own specific mocks for timeout scenarios + final ExecutorService realExecutor = Executors.newCachedThreadPool(); + lenient() + .doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + return realExecutor.submit(runnable); + }) + .when(pipelineExecutor) + .submit(any(Runnable.class)); + + // Also handle shutdown for cleanup + lenient() + .doAnswer(invocation -> { + realExecutor.shutdown(); + return null; + }) + .when(pipelineExecutor) + .shutdown(); + + lenient() + .doAnswer(invocation -> { + long timeout = invocation.getArgument(0); + TimeUnit unit = invocation.getArgument(1); + return realExecutor.awaitTermination(timeout, unit); + }) + .when(pipelineExecutor) + .awaitTermination(anyLong(), any(TimeUnit.class)); + clientFactory = mock(BlockNodeClientFactory.class); lenient() .doReturn(grpcServiceClient) .when(clientFactory) .createClient(any(WebClient.class), any(PbjGrpcClientConfig.class), any(RequestOptions.class)); - connection = new BlockNodeConnection( configProvider, nodeConfig, @@ -127,6 +163,7 @@ void beforeEach() { bufferService, metrics, executorService, + pipelineExecutor, null, clientFactory); @@ -192,6 +229,7 @@ void testConstructorWithInitialBlock() { bufferService, metrics, executorService, + pipelineExecutor, 100L, clientFactory); @@ -200,6 +238,96 @@ void testConstructorWithInitialBlock() { assertThat(streamingBlockNumber).hasValue(100L); } + /** + * Tests TimeoutException handling during pipeline creation. + * Uses mocks to simulate a timeout without actually waiting, making the test fast. + */ + @Test + void testCreateRequestPipeline_timeoutException() throws Exception { + // Create a mock Future that will throw TimeoutException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Simulated timeout")); + + // Set up the pipelineExecutor to return mock future + doReturn(mockFuture).when(pipelineExecutor).submit(any(Runnable.class)); + + // Attempt to create pipeline - should timeout and throw + final RuntimeException exception = catchRuntimeException(() -> connection.createRequestPipeline()); + + assertThat(exception).isNotNull(); + assertThat(exception.getMessage()).contains("Pipeline creation timed out"); + assertThat(exception.getCause()).isInstanceOf(TimeoutException.class); + + // Verify timeout was detected and recorded + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + verify(mockFuture).cancel(true); + verify(metrics).recordPipelineOperationTimeout(); + + // Connection should still be UNINITIALIZED since pipeline creation failed + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.UNINITIALIZED); + } + + /** + * Tests InterruptedException handling during pipeline creation. + * Uses mocks to simulate an interruption without actually waiting, making the test fast. + */ + @Test + void testCreateRequestPipeline_interruptedException() throws Exception { + // Create a mock Future that will throw InterruptedException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new InterruptedException("Simulated interruption")); + + // Set up the pipelineExecutor to return mock future + doReturn(mockFuture).when(pipelineExecutor).submit(any(Runnable.class)); + + // Attempt to create pipeline - should handle interruption and throw + final RuntimeException exception = catchRuntimeException(() -> connection.createRequestPipeline()); + + assertThat(exception).isNotNull(); + assertThat(exception.getMessage()).contains("Interrupted while creating pipeline"); + assertThat(exception.getCause()).isInstanceOf(InterruptedException.class); + + // Verify interruption was handled + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + + // Connection should still be UNINITIALIZED since pipeline creation failed + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.UNINITIALIZED); + } + + /** + * Tests ExecutionException handling during pipeline creation. + * Uses mocks to simulate an execution error without actually waiting, making the test fast. + */ + @Test + void testCreateRequestPipeline_executionException() throws Exception { + // Create a mock Future that will throw ExecutionException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new java.util.concurrent.ExecutionException( + "Simulated execution error", new RuntimeException("Underlying cause"))); + + // Set up the pipelineExecutor to return mock future + doReturn(mockFuture).when(pipelineExecutor).submit(any(Runnable.class)); + + // Attempt to create pipeline - should handle execution exception and throw + final RuntimeException exception = catchRuntimeException(() -> connection.createRequestPipeline()); + + assertThat(exception).isNotNull(); + assertThat(exception.getMessage()).contains("Error creating pipeline"); + assertThat(exception.getCause()).isInstanceOf(RuntimeException.class); + assertThat(exception.getCause().getMessage()).isEqualTo("Underlying cause"); + + // Verify execution exception was handled + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + + // Connection should still be UNINITIALIZED since pipeline creation failed + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.UNINITIALIZED); + } + @Test void testUpdatingConnectionState() { final ConnectionState preUpdateState = connection.getConnectionState(); @@ -722,7 +850,11 @@ void testSendRequest_errorWhileActive() { final PublishStreamRequest request = createRequest(newBlockHeaderItem()); final RuntimeException e = catchRuntimeException(() -> connection.sendRequest(request)); - assertThat(e).isInstanceOf(RuntimeException.class).hasMessage("kaboom!"); + assertThat(e).isInstanceOf(RuntimeException.class); + // Exception gets wrapped when executed in virtual thread executor + assertThat(e.getMessage()).contains("Error executing pipeline.onNext()"); + assertThat(e.getCause()).isInstanceOf(RuntimeException.class); + assertThat(e.getCause().getMessage()).isEqualTo("kaboom!"); verify(metrics).recordRequestSendFailure(); verifyNoMoreInteractions(metrics); @@ -1346,6 +1478,7 @@ void testConnectionWorker_blockNodeTooFarBehind() throws Exception { verify(metrics).recordActiveConnectionIp(-1L); verify(bufferService, times(2)).getEarliestAvailableBlockNumber(); verify(bufferService).getHighestAckedBlockNumber(); + verify(connectionManager).notifyConnectionClosed(connection); verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true); final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishStreamRequest.class); @@ -1874,6 +2007,405 @@ void testPeriodicStreamReset_connectionNotActive() { verifyNoInteractions(requestPipeline); } + // Pipeline operation timeout tests + + /** + * Tests onNext() normal (non-timeout) path. + */ + @Test + void testSendRequest_onNextCompletesSuccessfully() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + connection.sendRequest(request); + + // Verify the request was sent successfully + verify(requestPipeline).onNext(request); + verify(metrics).recordRequestSent(RequestOneOfType.BLOCK_ITEMS); + verify(metrics).recordBlockItemsSent(1); + verify(metrics).recordRequestLatency(anyLong()); + + // Verify no timeout was recorded + verify(metrics, times(0)).recordPipelineOperationTimeout(); + + // Connection should still be ACTIVE + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.ACTIVE); + } + + /** + * Tests that sendRequest does not execute if connection is no longer ACTIVE. + */ + @Test + void testSendRequest_connectionNotActive() { + openConnectionAndResetMocks(); + // Start in CLOSING state + connection.updateConnectionState(ConnectionState.CLOSING); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Since connection is not ACTIVE, sendRequest should not do anything + connection.sendRequest(request); + + // Verify no interactions since connection is not ACTIVE + verifyNoInteractions(requestPipeline); + verifyNoInteractions(metrics); + verifyNoInteractions(connectionManager); + } + + /** + * Tests that close operation completes successfully. + */ + @Test + void testClose_completesSuccessfully() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Close connection normally + connection.close(true); + + // Verify close completed successfully + verify(requestPipeline).onComplete(); + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + + // Connection should be CLOSED + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests that closing without calling onComplete does not call onComplete on pipeline. + * This covers the case where callOnComplete=false. + */ + @Test + void testClose_withoutOnCompleteDoesNotCallOnComplete() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Close without calling onComplete + connection.close(false); + + // Verify onComplete was not called on pipeline + verifyNoInteractions(requestPipeline); + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + + // Connection should be CLOSED + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests that error during pipeline operation is handled properly. + * This tests the exception handling in sendRequest when pipeline.onNext throws. + */ + @Test + void testSendRequest_pipelineThrowsException() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Mock requestPipeline.onNext() to throw an exception + doThrow(new RuntimeException("Pipeline error")).when(requestPipeline).onNext(any()); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Should throw RuntimeException wrapped by the executor + final RuntimeException exception = catchRuntimeException(() -> connection.sendRequest(request)); + + assertThat(exception).isNotNull(); + // Exception gets wrapped when executed in virtual thread executor + assertThat(exception.getMessage()).contains("Error executing pipeline.onNext()"); + assertThat(exception.getCause()).isInstanceOf(RuntimeException.class); + assertThat(exception.getCause().getMessage()).isEqualTo("Pipeline error"); + + // Verify error was recorded + verify(requestPipeline).onNext(request); + verify(metrics).recordRequestSendFailure(); + } + + /** + * Tests that the pipelineExecutor is properly shut down when the connection closes. + * This ensures no resource leaks and that the executor won't accept new tasks after close. + */ + @Test + void testClose_pipelineExecutorShutdown() throws InterruptedException { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Close the connection + connection.close(true); + + // Verify connection is closed + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + // Give the executor a moment to shutdown + Thread.sleep(100); + + // Try to send a request after close - should be ignored since connection is CLOSED + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + connection.sendRequest(request); + + // Verify that the pipeline was NOT called (executor should be shut down and connection is CLOSED) + verify(requestPipeline, times(1)).onComplete(); // Only from the close() call + verify(requestPipeline, times(0)).onNext(any()); // sendRequest should not execute + + // Verify no additional interactions beyond the close operation + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + verifyNoMoreInteractions(requestPipeline); + } + + /** + * Tests TimeoutException handling when pipeline.onNext() times out. + * Uses mocks to simulate a timeout without actually waiting, making the test fast. + */ + @Test + void testSendRequest_timeoutException() throws Exception { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Create a mock Future that will throw TimeoutException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Simulated timeout")); + + // Set up the pipelineExecutor to return mock future + doReturn(mockFuture).when(pipelineExecutor).submit(any(Runnable.class)); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Send request - should trigger timeout handling immediately + connection.sendRequest(request); + + // Verify timeout was detected and handled + // Note: future.get() is called twice - once for sendRequest (times out) + // and once for closePipeline/onComplete (also times out during cleanup) + verify(mockFuture, times(2)).get(anyLong(), any(TimeUnit.class)); + verify(mockFuture, times(2)).cancel(true); // Future should be cancelled both times + + // Timeout metric is recorded twice - once for sendRequest, once for onComplete during close + verify(metrics, times(2)).recordPipelineOperationTimeout(); + verify(metrics).recordConnectionClosed(); + verify(connectionManager).rescheduleConnection(eq(connection), eq(Duration.ofSeconds(30)), eq(null), eq(true)); + + // Connection should be CLOSED after timeout + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests TimeoutException handling when pipeline.onComplete() times out during close. + * Uses mocks to simulate a timeout without actually waiting, making the test fast. + */ + @Test + void testClose_onCompleteTimeoutException() throws Exception { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Create a mock Future that will throw TimeoutException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Simulated timeout")); + + // Set up the pipelineExecutor to return mock future + doReturn(mockFuture).when(pipelineExecutor).submit(any(Runnable.class)); + + // Close connection - should trigger timeout during onComplete + connection.close(true); + + // Verify timeout was detected during onComplete + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + verify(mockFuture).cancel(true); + verify(metrics).recordPipelineOperationTimeout(); + verify(metrics).recordConnectionClosed(); + + // Connection should still be CLOSED despite timeout + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests InterruptedException handling when pipeline.onComplete() is interrupted during close. + * Uses mocks to simulate an interruption without actually waiting, making the test fast. + */ + @Test + void testClose_onCompleteInterruptedException() throws Exception { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Create a mock Future that will throw InterruptedException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new InterruptedException("Simulated interruption")); + + // Set up the pipelineExecutor to return mock future + doReturn(mockFuture).when(pipelineExecutor).submit(any(Runnable.class)); + + // Close connection in a separate thread to verify interrupt status is restored + final AtomicBoolean isInterrupted = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + Thread.ofVirtual().start(() -> { + try { + connection.close(true); + } finally { + isInterrupted.set(Thread.currentThread().isInterrupted()); + latch.countDown(); + } + }); + + // Wait for the close operation to complete + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + + // Verify interruption was handled gracefully + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + verify(metrics).recordConnectionClosed(); + + // Connection should still be CLOSED despite interruption + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + assertThat(isInterrupted.get()).isTrue(); + } + + /** + * Tests InterruptedException handling during pipelineExecutor.awaitTermination() in close(). + * This covers the exception handling when shutting down the executor is interrupted. + */ + @Test + void testClose_executorShutdownInterruptedException() throws Exception { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Set up the pipelineExecutor to throw InterruptedException during awaitTermination + when(pipelineExecutor.awaitTermination(anyLong(), any(TimeUnit.class))) + .thenThrow(new InterruptedException("Simulated shutdown interruption")); + + // Close connection - should handle interruption during executor shutdown + connection.close(true); + + // Verify executor shutdown was attempted + verify(pipelineExecutor).shutdown(); + verify(pipelineExecutor).awaitTermination(5, TimeUnit.SECONDS); + + // Verify shutdownNow was called after interruption + verify(pipelineExecutor).shutdownNow(); + + verify(metrics).recordConnectionClosed(); + + // Connection should still be CLOSED despite interruption + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests InterruptedException handling during pipeline operation. + */ + @Test + void testSendRequest_interruptedException() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Create a latch to coordinate the test + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference exceptionRef = new AtomicReference<>(); + + // Make the pipeline block until interrupted + doAnswer(invocation -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted", e); + } + return null; + }) + .when(requestPipeline) + .onNext(any()); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Send request in a separate thread + final Thread testThread = Thread.ofVirtual().start(() -> { + try { + connection.sendRequest(request); + } catch (RuntimeException e) { + exceptionRef.set(e); + } + }); + + // Give the thread time to start and block + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Interrupt the thread + testThread.interrupt(); + + // Wait for thread to complete + try { + testThread.join(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Verify exception was thrown + assertThat(exceptionRef.get()).isNotNull(); + assertThat(exceptionRef.get().getMessage()).contains("Interrupted while waiting for pipeline.onNext()"); + assertThat(exceptionRef.get().getCause()).isInstanceOf(InterruptedException.class); + } + + /** + * Tests ExecutionException handling when pipeline.onNext() throws an exception. + * This is already covered by testSendRequest_pipelineThrowsException but included + * here for completeness of exception handling coverage. + */ + @Test + void testSendRequest_executionException() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Mock requestPipeline.onNext() to throw an exception + doThrow(new RuntimeException("Execution failed")).when(requestPipeline).onNext(any()); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Should throw RuntimeException wrapping ExecutionException + final RuntimeException exception = catchRuntimeException(() -> connection.sendRequest(request)); + + assertThat(exception).isNotNull(); + assertThat(exception.getMessage()).contains("Error executing pipeline.onNext()"); + assertThat(exception.getCause()).isInstanceOf(RuntimeException.class); + assertThat(exception.getCause().getMessage()).isEqualTo("Execution failed"); + + verify(metrics).recordRequestSendFailure(); + } + + /** + * Tests that closing the connection multiple times doesn't cause issues with executor shutdown. + * The executor should only be shut down once, and subsequent closes should be idempotent. + */ + @Test + void testClose_multipleCloseCallsHandleExecutorShutdownGracefully() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Close the connection first time + connection.close(true); + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + // Reset mocks to verify second close behavior + reset(requestPipeline, metrics, connectionManager); + + // Close again - should be idempotent (no-op since already closed) + connection.close(true); + + // Verify no additional operations were performed + verifyNoInteractions(requestPipeline); + verifyNoInteractions(metrics); + verifyNoInteractions(connectionManager); + + // Connection should still be CLOSED + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + // Utilities private void openConnectionAndResetMocks() { diff --git a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java index f694796abdc6..6203c43ec071 100644 --- a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java +++ b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java @@ -24,6 +24,7 @@ * @param connectionWorkerSleepDuration the amount of time a connection worker will sleep between handling block items (should be less than {@link #maxRequestDelay}) * @param maxRequestDelay the maximum amount of time between sending a request to a block node * @param forcedSwitchRescheduleDelay the delay to reschedule a closed active connection after a forced switch + * @param pipelineOperationTimeout timeout for pipeline onNext() and onComplete() operations to detect unresponsive block nodes */ @ConfigData("blockNode") public record BlockNodeConnectionConfig( @@ -41,4 +42,5 @@ public record BlockNodeConnectionConfig( @ConfigProperty(defaultValue = "30s") @NodeProperty Duration grpcOverallTimeout, @ConfigProperty(defaultValue = "25ms") @NetworkProperty Duration connectionWorkerSleepDuration, @ConfigProperty(defaultValue = "200ms") @NetworkProperty Duration maxRequestDelay, - @ConfigProperty(defaultValue = "180s") @NodeProperty Duration forcedSwitchRescheduleDelay) {} + @ConfigProperty(defaultValue = "180s") @NodeProperty Duration forcedSwitchRescheduleDelay, + @ConfigProperty(defaultValue = "3s") @NodeProperty Duration pipelineOperationTimeout) {} diff --git a/hedera-node/infrastructure/grafana/dashboards/development/hedera-node/block_streaming.json b/hedera-node/infrastructure/grafana/dashboards/development/hedera-node/block_streaming.json index a8fc66a6d702..d0b789f151e0 100644 --- a/hedera-node/infrastructure/grafana/dashboards/development/hedera-node/block_streaming.json +++ b/hedera-node/infrastructure/grafana/dashboards/development/hedera-node/block_streaming.json @@ -19,7 +19,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 1, - "id": 10, + "id": 2, "links": [], "panels": [ { @@ -116,7 +116,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -230,7 +230,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -304,7 +304,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -422,7 +422,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -540,7 +540,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -658,7 +658,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -804,7 +804,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -906,7 +906,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1008,7 +1008,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1110,7 +1110,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1213,7 +1213,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1325,7 +1325,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1429,7 +1429,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1531,7 +1531,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1633,7 +1633,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1735,7 +1735,7 @@ "sort": "desc" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1837,7 +1837,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1939,7 +1939,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2041,7 +2041,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2143,7 +2143,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2164,6 +2164,109 @@ "title": "High latency events for the current active connection (1m delta)", "type": "timeseries" }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Number of pipeline operation timeouts (onNext or onComplete)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 76 + }, + "id": 60, + "options": { + "legend": { + "calcs": [ + "last" + ], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.0.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "max by(node) (delta(blockStream_conn_pipelineOperationTimeout_total{node=~\"$NodeId\"}[1m]))", + "fullMetaSearch": false, + "includeNullMetadata": true, + "legendFormat": "node {{node}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Pipeline Operation Timeouts (1m delta)", + "type": "timeseries" + }, { "datasource": { "type": "prometheus", @@ -2200,8 +2303,8 @@ "gridPos": { "h": 4, "w": 8, - "x": 12, - "y": 76 + "x": 0, + "y": 84 }, "id": 57, "options": { @@ -2217,7 +2320,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2318,8 +2421,8 @@ "gridPos": { "h": 4, "w": 8, - "x": 12, - "y": 80 + "x": 8, + "y": 84 }, "id": 59, "options": { @@ -2335,7 +2438,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2436,7 +2539,7 @@ "gridPos": { "h": 4, "w": 8, - "x": 12, + "x": 16, "y": 84 }, "id": 58, @@ -2453,7 +2556,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2591,7 +2694,7 @@ "h": 8, "w": 12, "x": 0, - "y": 97 + "y": 89 }, "id": 26, "options": { @@ -2609,7 +2712,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2693,7 +2796,7 @@ "h": 8, "w": 12, "x": 12, - "y": 97 + "y": 89 }, "id": 27, "options": { @@ -2711,7 +2814,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2795,7 +2898,7 @@ "h": 8, "w": 12, "x": 0, - "y": 105 + "y": 97 }, "id": 29, "options": { @@ -2813,7 +2916,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2897,7 +3000,7 @@ "h": 8, "w": 12, "x": 12, - "y": 105 + "y": 97 }, "id": 28, "options": { @@ -2915,7 +3018,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2999,7 +3102,7 @@ "h": 8, "w": 12, "x": 0, - "y": 113 + "y": 105 }, "id": 31, "options": { @@ -3017,7 +3120,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3101,7 +3204,7 @@ "h": 8, "w": 12, "x": 12, - "y": 113 + "y": 105 }, "id": 30, "options": { @@ -3119,7 +3222,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3203,7 +3306,7 @@ "h": 8, "w": 12, "x": 0, - "y": 121 + "y": 113 }, "id": 32, "options": { @@ -3221,7 +3324,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3305,7 +3408,7 @@ "h": 8, "w": 12, "x": 12, - "y": 121 + "y": 113 }, "id": 33, "options": { @@ -3323,7 +3426,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3407,7 +3510,7 @@ "h": 8, "w": 12, "x": 0, - "y": 129 + "y": 121 }, "id": 34, "options": { @@ -3425,7 +3528,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3509,7 +3612,7 @@ "h": 8, "w": 12, "x": 12, - "y": 129 + "y": 121 }, "id": 35, "options": { @@ -3527,7 +3630,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3611,7 +3714,7 @@ "h": 8, "w": 12, "x": 0, - "y": 137 + "y": 129 }, "id": 36, "options": { @@ -3629,7 +3732,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3713,7 +3816,7 @@ "h": 8, "w": 12, "x": 12, - "y": 137 + "y": 129 }, "id": 37, "options": { @@ -3731,7 +3834,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3815,7 +3918,7 @@ "h": 8, "w": 12, "x": 0, - "y": 145 + "y": 137 }, "id": 38, "options": { @@ -3833,7 +3936,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3931,7 +4034,7 @@ "h": 8, "w": 12, "x": 0, - "y": 360 + "y": 146 }, "id": 19, "options": { @@ -3949,7 +4052,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4030,7 +4133,7 @@ "h": 8, "w": 12, "x": 12, - "y": 360 + "y": 146 }, "id": 56, "options": { @@ -4048,7 +4151,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4134,7 +4237,7 @@ "h": 8, "w": 12, "x": 0, - "y": 368 + "y": 154 }, "id": 18, "options": { @@ -4152,7 +4255,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4236,7 +4339,7 @@ "h": 8, "w": 12, "x": 12, - "y": 368 + "y": 154 }, "id": 21, "options": { @@ -4254,7 +4357,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4344,7 +4447,7 @@ "h": 8, "w": 12, "x": 0, - "y": 376 + "y": 162 }, "id": 20, "options": { @@ -4362,7 +4465,7 @@ "sort": "none" } }, - "pluginVersion": "11.4.0", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4446,7 +4549,7 @@ "h": 8, "w": 12, "x": 12, - "y": 376 + "y": 162 }, "id": 23, "options": { @@ -4464,7 +4567,7 @@ "sort": "none" } }, - "pluginVersion": "11.4.0", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4554,7 +4657,7 @@ "h": 8, "w": 12, "x": 0, - "y": 384 + "y": 170 }, "id": 22, "options": { @@ -4572,7 +4675,7 @@ "sort": "none" } }, - "pluginVersion": "11.4.0", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4661,8 +4764,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, - "y": 392 + "x": 12, + "y": 170 }, "id": 24, "options": { @@ -4680,7 +4783,7 @@ "sort": "none" } }, - "pluginVersion": "11.4.0", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { diff --git a/hedera-node/log4j2.xml b/hedera-node/log4j2.xml index 9db29458ff8e..4377c427624e 100644 --- a/hedera-node/log4j2.xml +++ b/hedera-node/log4j2.xml @@ -73,8 +73,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-4L %c{1} - [%t] %m{nolookups}%n diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/AbstractNode.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/AbstractNode.java index f68db0f49888..60d4d5efb713 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/AbstractNode.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/AbstractNode.java @@ -24,7 +24,7 @@ public abstract class AbstractNode implements HederaNode { private static final String HGCAA_LOG = "hgcaa.log"; private static final String SWIRLDS_LOG = "swirlds.log"; private static final String LOG4J2_XML = "log4j2.xml"; - private static final String BLOCK_NODE_COMMS_LOG = "blocknode-comms.log"; + private static final String BLOCK_NODE_COMMS_LOG = "block-node-comms.log"; protected NodeMetadata metadata;