diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md index 6645844372a3..d2583c00323f 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md @@ -29,6 +29,7 @@ It manages connection state, handles communication, and reports errors to the `B - Establish and maintain the connection transport. - Handle incoming and outgoing message flow. +- Detect unresponsive block nodes via configurable timeouts on pipeline operations. - Report connection errors promptly. - Coordinate with `BlockNodeConnectionManager` on lifecycle events. - Notify the block buffer (via connection manager) when a block has been acknowledged and therefore eligible to be @@ -68,6 +69,7 @@ stateDiagram-v2 ACTIVE --> CLOSING : ResendBlock unavailable ACTIVE --> CLOSING : gRPC onError ACTIVE --> CLOSING : Stream failure + ACTIVE --> CLOSING : Pipeline operation timeout ACTIVE --> CLOSING : Manual close ACTIVE --> ACTIVE : BlockAcknowledgement ACTIVE --> ACTIVE : SkipBlock @@ -160,4 +162,49 @@ The connection implements a configurable rate limiting mechanism for EndOfStream
endOfStreamScheduleDelay
The delay duration before attempting reconnection when the rate limit is exceeded.
+ +
pipelineOperationTimeout
+
The maximum duration allowed for pipeline onNext() and onComplete() operations before considering the block node unresponsive. Default: 30 seconds.
+ +### Pipeline Operation Timeout + +To detect unresponsive block nodes during message transmission and connection establishment, the connection implements configurable timeouts for pipeline operations. + +#### Timeout Behavior + +Pipeline operations (`onNext()`, `onComplete()`, and pipeline creation) are potentially blocking I/O operations that are executed on a dedicated virtual thread executor with timeout enforcement using `Future.get(timeout)`. Each connection instance creates its own virtual thread executor to isolate pipeline operations from other tasks and prevent blocking. + +- **Pipeline creation timeout**: When establishing the gRPC connection via `createRequestPipeline()`, both the gRPC client creation and bidirectional stream setup are executed with timeout protection. If the operation does not complete within the configured timeout period: + - The Future is cancelled to interrupt the blocked operation + - The timeout metric is incremented + - A `RuntimeException` is thrown with the underlying `TimeoutException` + - The connection remains in UNINITIALIZED state + - The connection manager's error handling will schedule a retry with exponential backoff +- **onNext() timeout**: When sending block items via `sendRequest()`, the operation is submitted to the connection's dedicated executor and the calling thread blocks waiting for completion with a timeout. If the operation does not complete within the configured timeout period: + - The Future is cancelled to interrupt the blocked operation + - The timeout metric is incremented + - `handleStreamFailure()` is triggered (only if connection is still ACTIVE) + - The connection follows standard failure handling with exponential backoff retry + - The connection manager will select a different block node for the next attempt if one is available + - `TimeoutException` is caught and handled internally +- **onComplete() timeout**: When closing the stream via `closePipeline()`, the operation is submitted to the same dedicated executor with the same timeout mechanism. If the operation does not complete within the configured timeout period: + - The Future is cancelled to interrupt the blocked operation + - The timeout metric is incremented + - Since the connection is already in CLOSING state, only the timeout is logged + - The connection completes the close operation normally + +**Note**: The dedicated virtual thread executor is properly shut down when the connection closes with a 5-second grace period for termination, ensuring no resource leaks. If tasks don't complete within the grace period, `shutdownNow()` is called to forcefully terminate them. + +#### Exception Handling + +The implementation handles multiple exception scenarios across all timeout-protected operations: +- **TimeoutException**: Pipeline operation exceeded the timeout - triggers failure handling for `onNext()` and pipeline creation, logged for `onComplete()` +- **InterruptedException**: Thread was interrupted while waiting - interrupt status is restored via `Thread.currentThread().interrupt()` before propagating the exception (for `onNext()` and pipeline creation) or logging it (for `onComplete()` and executor shutdown) +- **ExecutionException**: Error occurred during pipeline operation execution - the underlying cause is unwrapped and re-thrown (for `onNext()` and pipeline creation) or logged (for `onComplete()`) + +All exception scenarios include appropriate DEBUG-level logging with context information to aid in troubleshooting. + +#### Metrics + +A new metric `conn_pipelineOperationTimeout` tracks the total number of timeout events for pipeline creation, `onNext()`, and `onComplete()` operations, enabling operators to monitor block node responsiveness and connection establishment issues. diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 21649010a6cd..986cce7aae4b 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -30,10 +30,15 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Flow; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -99,6 +104,10 @@ private record Options(Optional authority, String contentType) implement * The reset period for the stream. This is used to periodically reset the stream to ensure increased stability and reliability. */ private final Duration streamResetPeriod; + /** + * Timeout for pipeline onNext() and onComplete() operations to detect unresponsive block nodes. + */ + private final Duration pipelineOperationTimeout; /** * Flag that indicates if this stream is currently shutting down, as initiated by this consensus node. */ @@ -117,6 +126,13 @@ private record Options(Optional authority, String contentType) implement * Scheduled executor service that is used to schedule periodic reset of the stream to help ensure stream health. */ private final ScheduledExecutorService executorService; + /** + * Dedicated executor service for pipeline operations using virtual threads. + * This isolates pipeline operations from the shared scheduled executor to prevent + * blocking when the connection manager is busy with other tasks. Virtual threads + * make this approach lightweight despite creating one executor per connection. + */ + private final ExecutorService pipelineExecutor; /** * This task runs every 24 hours (initial delay of 24 hours) when a connection is active. * The task helps maintain stream stability by forcing periodic reconnections. @@ -197,10 +213,12 @@ public BlockNodeConnection( this.blockStreamMetrics = requireNonNull(blockStreamMetrics, "blockStreamMetrics must not be null"); this.connectionState = new AtomicReference<>(ConnectionState.UNINITIALIZED); this.executorService = requireNonNull(executorService, "executorService must not be null"); + this.pipelineExecutor = Executors.newVirtualThreadPerTaskExecutor(); final var blockNodeConnectionConfig = configProvider.getConfiguration().getConfigData(BlockNodeConnectionConfig.class); this.streamResetPeriod = blockNodeConnectionConfig.streamResetPeriod(); this.clientFactory = requireNonNull(clientFactory, "clientFactory must not be null"); + this.pipelineOperationTimeout = blockNodeConnectionConfig.pipelineOperationTimeout(); connectionId = String.format("%04d", connectionIdCounter.incrementAndGet()); } @@ -209,13 +227,38 @@ public BlockNodeConnection( */ public synchronized void createRequestPipeline() { if (requestPipelineRef.get() == null) { - blockStreamPublishServiceClient = createNewGrpcClient(); - final Pipeline pipeline = - blockStreamPublishServiceClient.publishBlockStream(this); - requestPipelineRef.set(pipeline); - logWithContext(logger, DEBUG, this, "Request pipeline initialized."); - updateConnectionState(ConnectionState.PENDING); - blockStreamMetrics.recordConnectionOpened(); + // Execute entire pipeline creation (including gRPC client creation) with timeout + // to prevent blocking on network operations + final Future future = pipelineExecutor.submit(() -> { + blockStreamPublishServiceClient = createNewGrpcClient(); + final Pipeline pipeline = + blockStreamPublishServiceClient.publishBlockStream(this); + requestPipelineRef.set(pipeline); + }); + + try { + future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS); + logWithContext(logger, DEBUG, this, "Request pipeline initialized."); + updateConnectionState(ConnectionState.PENDING); + blockStreamMetrics.recordConnectionOpened(); + } catch (final TimeoutException e) { + future.cancel(true); + logWithContext( + logger, + DEBUG, + this, + "Pipeline creation timed out after {}ms", + pipelineOperationTimeout.toMillis()); + blockStreamMetrics.recordPipelineOperationTimeout(); + throw new RuntimeException("Pipeline creation timed out", e); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + logWithContext(logger, DEBUG, this, "Interrupted while creating pipeline", e); + throw new RuntimeException("Interrupted while creating pipeline", e); + } catch (final ExecutionException e) { + logWithContext(logger, DEBUG, this, "Error creating pipeline", e.getCause()); + throw new RuntimeException("Error creating pipeline", e.getCause()); + } } else { logWithContext(logger, DEBUG, this, "Request pipeline already available."); } @@ -638,7 +681,7 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) { highestAckedBlockNumber); try { sendRequest(endStream); - } catch (RuntimeException e) { + } catch (final RuntimeException e) { logger.warn(formatLogMessage("Error sending EndStream request", this), e); } close(true); @@ -677,7 +720,31 @@ public void sendRequest(@NonNull final PublishStreamRequest request) { request.protobufSize()); } final long startMs = System.currentTimeMillis(); - pipeline.onNext(request); + + final Future future = pipelineExecutor.submit(() -> pipeline.onNext(request)); + try { + future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (final TimeoutException e) { + future.cancel(true); // Cancel the task if it times out + if (getConnectionState() == ConnectionState.ACTIVE) { + logWithContext( + logger, + DEBUG, + this, + "Pipeline onNext() timed out after {}ms", + pipelineOperationTimeout.toMillis()); + blockStreamMetrics.recordPipelineOperationTimeout(); + handleStreamFailure(); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); // Restore interrupt status + logWithContext(logger, DEBUG, this, "Interrupted while waiting for pipeline.onNext()", e); + throw new RuntimeException("Interrupted while waiting for pipeline.onNext()", e); + } catch (final ExecutionException e) { + logWithContext(logger, DEBUG, this, "Error executing pipeline.onNext()", e.getCause()); + throw new RuntimeException("Error executing pipeline.onNext()", e.getCause()); + } + final long durationMs = System.currentTimeMillis() - startMs; blockStreamMetrics.recordRequestLatency(durationMs); logWithContext(logger, TRACE, this, "Request took {}ms to send", this, durationMs); @@ -753,6 +820,16 @@ public void close(final boolean callOnComplete) { } catch (final Exception e) { logger.error(formatLogMessage("Error occurred while closing gRPC client.", this), e); } + try { + pipelineExecutor.shutdown(); + if (!pipelineExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + pipelineExecutor.shutdownNow(); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + pipelineExecutor.shutdownNow(); + logger.error(formatLogMessage("Error occurred while shutting down pipeline executor.", this), e); + } blockStreamMetrics.recordConnectionClosed(); blockStreamMetrics.recordActiveConnectionIp(-1L); // regardless of outcome, mark the connection as closed @@ -770,8 +847,26 @@ private void closePipeline(final boolean callOnComplete) { try { final ConnectionState state = getConnectionState(); if (state == ConnectionState.CLOSING && callOnComplete) { - pipeline.onComplete(); - logWithContext(logger, DEBUG, this, "Request pipeline successfully closed."); + final Future future = pipelineExecutor.submit(pipeline::onComplete); + try { + future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS); + logWithContext(logger, DEBUG, this, "Request pipeline successfully closed."); + } catch (final TimeoutException e) { + future.cancel(true); // Cancel the task if it times out + logWithContext( + logger, + DEBUG, + this, + "Pipeline onComplete() timed out after {}ms", + pipelineOperationTimeout.toMillis()); + blockStreamMetrics.recordPipelineOperationTimeout(); + // Connection is already closing, just log the timeout + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); // Restore interrupt status + logWithContext(logger, DEBUG, this, "Interrupted while waiting for pipeline.onComplete()"); + } catch (final ExecutionException e) { + logWithContext(logger, DEBUG, this, "Error executing pipeline.onComplete()", e.getCause()); + } } } catch (final Exception e) { logger.warn(formatLogMessage("Error while completing request pipeline.", this), e); diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java index b68c07b011d5..ce824fe804d2 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java @@ -61,6 +61,7 @@ public class BlockStreamMetrics { private Counter conn_endOfStreamLimitCounter; private DoubleGauge conn_ackLatencyGauge; private Counter conn_highLatencyCounter; + private Counter conn_pipelineOperationTimeoutCounter; // buffer metrics private static final long BACK_PRESSURE_ACTIVE = 3; @@ -282,6 +283,11 @@ private void registerConnectivityMetrics() { final Counter.Config highLatencyCfg = newCounter(GROUP_CONN, "highLatencyEvents") .withDescription("Count of high latency events from the active block node connection"); conn_highLatencyCounter = metrics.getOrCreate(highLatencyCfg); + + final Counter.Config pipelineTimeoutCfg = newCounter(GROUP_CONN, "pipelineOperationTimeout") + .withDescription( + "Number of times a pipeline onNext() or onComplete() operation timed out on a block node connection"); + conn_pipelineOperationTimeoutCounter = metrics.getOrCreate(pipelineTimeoutCfg); } /** @@ -357,6 +363,13 @@ public void recordHighLatencyEvent() { conn_highLatencyCounter.increment(); } + /** + * Record that a pipeline onNext() or onComplete() operation timed out. + */ + public void recordPipelineOperationTimeout() { + conn_pipelineOperationTimeoutCounter.increment(); + } + // Connection RECV metrics ----------------------------------------------------------------------------------------- private void registerConnectionRecvMetrics() { diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index fcd044020f00..4bffc792ddf0 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -6,6 +6,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; @@ -27,12 +28,18 @@ import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions; import io.helidon.webclient.api.WebClient; +import java.lang.reflect.Field; import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Flow; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.hiero.block.api.BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient; import org.hiero.block.api.PublishStreamRequest; import org.hiero.block.api.PublishStreamRequest.EndStream; @@ -62,6 +69,7 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase { private Pipeline requestPipeline; private ScheduledExecutorService executorService; private BlockNodeStats.HighLatencyResult latencyResult; + private BlockNodeClientFactory clientFactory; @BeforeEach @SuppressWarnings("unchecked") @@ -76,12 +84,11 @@ void beforeEach() { executorService = mock(ScheduledExecutorService.class); latencyResult = mock(BlockNodeStats.HighLatencyResult.class); - final BlockNodeClientFactory clientFactory = mock(BlockNodeClientFactory.class); + clientFactory = mock(BlockNodeClientFactory.class); lenient() .doReturn(grpcServiceClient) .when(clientFactory) .createClient(any(WebClient.class), any(PbjGrpcClientConfig.class), any(RequestOptions.class)); - connection = new BlockNodeConnection( configProvider, nodeConfig, connectionManager, bufferService, metrics, executorService, clientFactory); @@ -107,6 +114,114 @@ void testCreateRequestPipeline_alreadyExists() { verifyNoMoreInteractions(grpcServiceClient); } + /** + * Tests TimeoutException handling during pipeline creation. + * Uses mocks to simulate a timeout without actually waiting, making the test fast. + */ + @Test + void testCreateRequestPipeline_timeoutException() throws Exception { + // Create a mock Future that will throw TimeoutException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Simulated timeout")); + + // Mock the executor to return our mock future + final ExecutorService mockPipelineExecutor = mock(ExecutorService.class); + doReturn(mockFuture).when(mockPipelineExecutor).submit(any(Runnable.class)); + + // Use reflection to replace the pipelineExecutor with the mock + final Field executorField = BlockNodeConnection.class.getDeclaredField("pipelineExecutor"); + executorField.setAccessible(true); + executorField.set(connection, mockPipelineExecutor); + + // Attempt to create pipeline - should timeout and throw + final RuntimeException exception = catchRuntimeException(() -> connection.createRequestPipeline()); + + assertThat(exception).isNotNull(); + assertThat(exception.getMessage()).contains("Pipeline creation timed out"); + assertThat(exception.getCause()).isInstanceOf(TimeoutException.class); + + // Verify timeout was detected and recorded + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + verify(mockFuture).cancel(true); + verify(metrics).recordPipelineOperationTimeout(); + + // Connection should still be UNINITIALIZED since pipeline creation failed + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.UNINITIALIZED); + } + + /** + * Tests InterruptedException handling during pipeline creation. + * Uses mocks to simulate an interruption without actually waiting, making the test fast. + */ + @Test + void testCreateRequestPipeline_interruptedException() throws Exception { + // Create a mock Future that will throw InterruptedException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new InterruptedException("Simulated interruption")); + + // Mock the executor to return our mock future + final ExecutorService mockPipelineExecutor = mock(ExecutorService.class); + doReturn(mockFuture).when(mockPipelineExecutor).submit(any(Runnable.class)); + + // Use reflection to replace the pipelineExecutor with the mock + final Field executorField = BlockNodeConnection.class.getDeclaredField("pipelineExecutor"); + executorField.setAccessible(true); + executorField.set(connection, mockPipelineExecutor); + + // Attempt to create pipeline - should handle interruption and throw + final RuntimeException exception = catchRuntimeException(() -> connection.createRequestPipeline()); + + assertThat(exception).isNotNull(); + assertThat(exception.getMessage()).contains("Interrupted while creating pipeline"); + assertThat(exception.getCause()).isInstanceOf(InterruptedException.class); + + // Verify interruption was handled + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + + // Connection should still be UNINITIALIZED since pipeline creation failed + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.UNINITIALIZED); + } + + /** + * Tests ExecutionException handling during pipeline creation. + * Uses mocks to simulate an execution error without actually waiting, making the test fast. + */ + @Test + void testCreateRequestPipeline_executionException() throws Exception { + // Create a mock Future that will throw ExecutionException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new java.util.concurrent.ExecutionException( + "Simulated execution error", new RuntimeException("Underlying cause"))); + + // Mock the executor to return our mock future + final ExecutorService mockPipelineExecutor = mock(ExecutorService.class); + doReturn(mockFuture).when(mockPipelineExecutor).submit(any(Runnable.class)); + + // Use reflection to replace the pipelineExecutor with the mock + final Field executorField = BlockNodeConnection.class.getDeclaredField("pipelineExecutor"); + executorField.setAccessible(true); + executorField.set(connection, mockPipelineExecutor); + + // Attempt to create pipeline - should handle execution exception and throw + final RuntimeException exception = catchRuntimeException(() -> connection.createRequestPipeline()); + + assertThat(exception).isNotNull(); + assertThat(exception.getMessage()).contains("Error creating pipeline"); + assertThat(exception.getCause()).isInstanceOf(RuntimeException.class); + assertThat(exception.getCause().getMessage()).isEqualTo("Underlying cause"); + + // Verify execution exception was handled + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + + // Connection should still be UNINITIALIZED since pipeline creation failed + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.UNINITIALIZED); + } + @Test void testUpdatingConnectionState() { final ConnectionState preUpdateState = connection.getConnectionState(); @@ -566,7 +681,11 @@ void testSendRequest_errorWhileActive() { final PublishStreamRequest request = createRequest(newBlockHeaderItem()); final RuntimeException e = catchRuntimeException(() -> connection.sendRequest(request)); - assertThat(e).isInstanceOf(RuntimeException.class).hasMessage("kaboom!"); + assertThat(e).isInstanceOf(RuntimeException.class); + // Exception gets wrapped when executed in virtual thread executor + assertThat(e.getMessage()).contains("Error executing pipeline.onNext()"); + assertThat(e.getCause()).isInstanceOf(RuntimeException.class); + assertThat(e.getCause().getMessage()).isEqualTo("kaboom!"); verify(metrics).recordRequestSendFailure(); verifyNoMoreInteractions(metrics); @@ -1326,6 +1445,423 @@ void testPeriodicStreamReset_connectionNotActive() { verifyNoInteractions(requestPipeline); } + // Pipeline operation timeout tests + + /** + * Tests onNext() normal (non-timeout) path. + */ + @Test + void testSendRequest_onNextCompletesSuccessfully() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + connection.sendRequest(request); + + // Verify the request was sent successfully + verify(requestPipeline).onNext(request); + verify(metrics).recordRequestSent(RequestOneOfType.BLOCK_ITEMS); + verify(metrics).recordBlockItemsSent(1); + verify(metrics).recordRequestLatency(anyLong()); + + // Verify no timeout was recorded + verify(metrics, times(0)).recordPipelineOperationTimeout(); + + // Connection should still be ACTIVE + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.ACTIVE); + } + + /** + * Tests that sendRequest does not execute if connection is no longer ACTIVE. + */ + @Test + void testSendRequest_connectionNotActive() { + openConnectionAndResetMocks(); + // Start in CLOSING state + connection.updateConnectionState(ConnectionState.CLOSING); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Since connection is not ACTIVE, sendRequest should not do anything + connection.sendRequest(request); + + // Verify no interactions since connection is not ACTIVE + verifyNoInteractions(requestPipeline); + verifyNoInteractions(metrics); + verifyNoInteractions(connectionManager); + } + + /** + * Tests that close operation completes successfully. + */ + @Test + void testClose_completesSuccessfully() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Close connection normally + connection.close(true); + + // Verify close completed successfully + verify(requestPipeline).onComplete(); + verify(connectionManager).jumpToBlock(-1L); + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + + // Connection should be CLOSED + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests that closing without calling onComplete does not call onComplete on pipeline. + * This covers the case where callOnComplete=false. + */ + @Test + void testClose_withoutOnCompleteDoesNotCallOnComplete() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Close without calling onComplete + connection.close(false); + + // Verify onComplete was not called on pipeline + verifyNoInteractions(requestPipeline); + verify(connectionManager).jumpToBlock(-1L); + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + + // Connection should be CLOSED + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests that error during pipeline operation is handled properly. + * This tests the exception handling in sendRequest when pipeline.onNext throws. + */ + @Test + void testSendRequest_pipelineThrowsException() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Mock requestPipeline.onNext() to throw an exception + doThrow(new RuntimeException("Pipeline error")).when(requestPipeline).onNext(any()); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Should throw RuntimeException wrapped by the executor + final RuntimeException exception = catchRuntimeException(() -> connection.sendRequest(request)); + + assertThat(exception).isNotNull(); + // Exception gets wrapped when executed in virtual thread executor + assertThat(exception.getMessage()).contains("Error executing pipeline.onNext()"); + assertThat(exception.getCause()).isInstanceOf(RuntimeException.class); + assertThat(exception.getCause().getMessage()).isEqualTo("Pipeline error"); + + // Verify error was recorded + verify(requestPipeline).onNext(request); + verify(metrics).recordRequestSendFailure(); + } + + /** + * Tests that the pipelineExecutor is properly shut down when the connection closes. + * This ensures no resource leaks and that the executor won't accept new tasks after close. + */ + @Test + void testClose_pipelineExecutorShutdown() throws InterruptedException { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Close the connection + connection.close(true); + + // Verify connection is closed + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + // Give the executor a moment to shutdown + Thread.sleep(100); + + // Try to send a request after close - should be ignored since connection is CLOSED + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + connection.sendRequest(request); + + // Verify that the pipeline was NOT called (executor should be shut down and connection is CLOSED) + verify(requestPipeline, times(1)).onComplete(); // Only from the close() call + verify(requestPipeline, times(0)).onNext(any()); // sendRequest should not execute + + // Verify no additional interactions beyond the close operation + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + verifyNoMoreInteractions(requestPipeline); + } + + /** + * Tests TimeoutException handling when pipeline.onNext() times out. + * Uses mocks to simulate a timeout without actually waiting, making the test fast. + */ + @Test + void testSendRequest_timeoutException() throws Exception { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Create a mock Future that will throw TimeoutException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Simulated timeout")); + + // Mock the executor to return our mock future + final ExecutorService mockPipelineExecutor = mock(ExecutorService.class); + doReturn(mockFuture).when(mockPipelineExecutor).submit(any(Runnable.class)); + + // Use reflection to replace the pipelineExecutor with the mock + final Field executorField = BlockNodeConnection.class.getDeclaredField("pipelineExecutor"); + executorField.setAccessible(true); + executorField.set(connection, mockPipelineExecutor); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Send request - should trigger timeout handling immediately + connection.sendRequest(request); + + // Verify timeout was detected and handled + // Note: future.get() is called twice - once for sendRequest (times out) + // and once for closePipeline/onComplete (also times out during cleanup) + verify(mockFuture, times(2)).get(anyLong(), any(TimeUnit.class)); + verify(mockFuture, times(2)).cancel(true); // Future should be cancelled both times + + // Timeout metric is recorded twice - once for sendRequest, once for onComplete during close + verify(metrics, times(2)).recordPipelineOperationTimeout(); + verify(metrics).recordConnectionClosed(); + verify(connectionManager).rescheduleConnection(eq(connection), eq(Duration.ofSeconds(30)), eq(null), eq(true)); + + // Connection should be CLOSED after timeout + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests TimeoutException handling when pipeline.onComplete() times out during close. + * Uses mocks to simulate a timeout without actually waiting, making the test fast. + */ + @Test + void testClose_onCompleteTimeoutException() throws Exception { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Create a mock Future that will throw TimeoutException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException("Simulated timeout")); + + // Mock the executor to return our mock future + final ExecutorService mockPipelineExecutor = mock(ExecutorService.class); + doReturn(mockFuture).when(mockPipelineExecutor).submit(any(Runnable.class)); + + // Use reflection to replace the pipelineExecutor with our mock + final Field executorField = BlockNodeConnection.class.getDeclaredField("pipelineExecutor"); + executorField.setAccessible(true); + executorField.set(connection, mockPipelineExecutor); + + // Close connection - should trigger timeout during onComplete + connection.close(true); + + // Verify timeout was detected during onComplete + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + verify(mockFuture).cancel(true); + verify(metrics).recordPipelineOperationTimeout(); + verify(metrics).recordConnectionClosed(); + verify(connectionManager).jumpToBlock(-1L); + + // Connection should still be CLOSED despite timeout + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests InterruptedException handling when pipeline.onComplete() is interrupted during close. + * Uses mocks to simulate an interruption without actually waiting, making the test fast. + */ + @Test + void testClose_onCompleteInterruptedException() throws Exception { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Create a mock Future that will throw InterruptedException when get() is called + @SuppressWarnings("unchecked") + final Future mockFuture = mock(Future.class); + when(mockFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new InterruptedException("Simulated interruption")); + + // Mock the executor to return our mock future + final ExecutorService mockPipelineExecutor = mock(ExecutorService.class); + doReturn(mockFuture).when(mockPipelineExecutor).submit(any(Runnable.class)); + + // Use reflection to replace the pipelineExecutor with our mock + final Field executorField = BlockNodeConnection.class.getDeclaredField("pipelineExecutor"); + executorField.setAccessible(true); + executorField.set(connection, mockPipelineExecutor); + + // Close connection - should handle interruption during onComplete + connection.close(true); + + // Verify interruption was handled gracefully + verify(mockFuture).get(anyLong(), any(TimeUnit.class)); + verify(metrics).recordConnectionClosed(); + verify(connectionManager).jumpToBlock(-1L); + + // Connection should still be CLOSED despite interruption + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + // Verify thread interrupt status was restored + // (Note: Can't directly test this in unit test, but the code does Thread.currentThread().interrupt()) + } + + /** + * Tests InterruptedException handling during pipelineExecutor.awaitTermination() in close(). + * This covers the exception handling when shutting down the executor is interrupted. + */ + @Test + void testClose_executorShutdownInterruptedException() throws Exception { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Mock the executor to throw InterruptedException during awaitTermination + final ExecutorService mockPipelineExecutor = mock(ExecutorService.class); + when(mockPipelineExecutor.awaitTermination(anyLong(), any(TimeUnit.class))) + .thenThrow(new InterruptedException("Simulated shutdown interruption")); + + // Use reflection to replace the pipelineExecutor with our mock + final Field executorField = BlockNodeConnection.class.getDeclaredField("pipelineExecutor"); + executorField.setAccessible(true); + executorField.set(connection, mockPipelineExecutor); + + // Close connection - should handle interruption during executor shutdown + connection.close(true); + + // Verify executor shutdown was attempted + verify(mockPipelineExecutor).shutdown(); + verify(mockPipelineExecutor).awaitTermination(5, TimeUnit.SECONDS); + + // Verify shutdownNow was called after interruption + verify(mockPipelineExecutor).shutdownNow(); + + verify(metrics).recordConnectionClosed(); + verify(connectionManager).jumpToBlock(-1L); + + // Connection should still be CLOSED despite interruption + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + + /** + * Tests InterruptedException handling during pipeline operation. + */ + @Test + void testSendRequest_interruptedException() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Create a latch to coordinate the test + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference exceptionRef = new AtomicReference<>(); + + // Make the pipeline block until interrupted + doAnswer(invocation -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted", e); + } + return null; + }) + .when(requestPipeline) + .onNext(any()); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Send request in a separate thread + final Thread testThread = Thread.ofVirtual().start(() -> { + try { + connection.sendRequest(request); + } catch (RuntimeException e) { + exceptionRef.set(e); + } + }); + + // Give the thread time to start and block + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Interrupt the thread + testThread.interrupt(); + + // Wait for thread to complete + try { + testThread.join(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Verify exception was thrown + assertThat(exceptionRef.get()).isNotNull(); + assertThat(exceptionRef.get().getMessage()).contains("Interrupted while waiting for pipeline.onNext()"); + assertThat(exceptionRef.get().getCause()).isInstanceOf(InterruptedException.class); + } + + /** + * Tests ExecutionException handling when pipeline.onNext() throws an exception. + * This is already covered by testSendRequest_pipelineThrowsException but included + * here for completeness of exception handling coverage. + */ + @Test + void testSendRequest_executionException() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Mock requestPipeline.onNext() to throw an exception + doThrow(new RuntimeException("Execution failed")).when(requestPipeline).onNext(any()); + + final PublishStreamRequest request = createRequest(newBlockHeaderItem()); + + // Should throw RuntimeException wrapping ExecutionException + final RuntimeException exception = catchRuntimeException(() -> connection.sendRequest(request)); + + assertThat(exception).isNotNull(); + assertThat(exception.getMessage()).contains("Error executing pipeline.onNext()"); + assertThat(exception.getCause()).isInstanceOf(RuntimeException.class); + assertThat(exception.getCause().getMessage()).isEqualTo("Execution failed"); + + verify(metrics).recordRequestSendFailure(); + } + + /** + * Tests that closing the connection multiple times doesn't cause issues with executor shutdown. + * The executor should only be shut down once, and subsequent closes should be idempotent. + */ + @Test + void testClose_multipleCloseCallsHandleExecutorShutdownGracefully() { + openConnectionAndResetMocks(); + connection.updateConnectionState(ConnectionState.ACTIVE); + + // Close the connection first time + connection.close(true); + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + // Reset mocks to verify second close behavior + reset(requestPipeline, metrics, connectionManager); + + // Close again - should be idempotent (no-op since already closed) + connection.close(true); + + // Verify no additional operations were performed + verifyNoInteractions(requestPipeline); + verifyNoInteractions(metrics); + verifyNoInteractions(connectionManager); + + // Connection should still be CLOSED + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + } + // Utilities private void openConnectionAndResetMocks() { diff --git a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java index c3597e7672bd..a83fe192ec45 100644 --- a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java +++ b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java @@ -20,6 +20,7 @@ * @param highLatencyEventsBeforeSwitching number of consecutive high-latency events before considering switching nodes * @param maxBackoffDelay the maximum backoff delay for exponential backoff * @param grpcOverallTimeout single timeout configuration for gRPC Client construction, connectTimeout, readTimeout and pollWaitTime + * @param pipelineOperationTimeout timeout for pipeline onNext() and onComplete() operations to detect unresponsive block nodes */ @ConfigData("blockNode") public record BlockNodeConnectionConfig( @@ -34,4 +35,5 @@ public record BlockNodeConnectionConfig( @ConfigProperty(defaultValue = "30s") @NodeProperty Duration highLatencyThreshold, @ConfigProperty(defaultValue = "5") @NodeProperty int highLatencyEventsBeforeSwitching, @ConfigProperty(defaultValue = "10s") @NodeProperty Duration maxBackoffDelay, - @ConfigProperty(defaultValue = "30s") @NodeProperty Duration grpcOverallTimeout) {} + @ConfigProperty(defaultValue = "30s") @NodeProperty Duration grpcOverallTimeout, + @ConfigProperty(defaultValue = "30s") @NodeProperty Duration pipelineOperationTimeout) {} diff --git a/hedera-node/infrastructure/grafana/dashboards/development/hedera-node/block_streaming.json b/hedera-node/infrastructure/grafana/dashboards/development/hedera-node/block_streaming.json index a8fc66a6d702..d0b789f151e0 100644 --- a/hedera-node/infrastructure/grafana/dashboards/development/hedera-node/block_streaming.json +++ b/hedera-node/infrastructure/grafana/dashboards/development/hedera-node/block_streaming.json @@ -19,7 +19,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 1, - "id": 10, + "id": 2, "links": [], "panels": [ { @@ -116,7 +116,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -230,7 +230,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -304,7 +304,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -422,7 +422,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -540,7 +540,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -658,7 +658,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -804,7 +804,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -906,7 +906,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1008,7 +1008,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1110,7 +1110,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1213,7 +1213,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1325,7 +1325,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1429,7 +1429,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1531,7 +1531,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1633,7 +1633,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1735,7 +1735,7 @@ "sort": "desc" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1837,7 +1837,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -1939,7 +1939,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2041,7 +2041,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2143,7 +2143,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2164,6 +2164,109 @@ "title": "High latency events for the current active connection (1m delta)", "type": "timeseries" }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Number of pipeline operation timeouts (onNext or onComplete)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 76 + }, + "id": 60, + "options": { + "legend": { + "calcs": [ + "last" + ], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.0.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "max by(node) (delta(blockStream_conn_pipelineOperationTimeout_total{node=~\"$NodeId\"}[1m]))", + "fullMetaSearch": false, + "includeNullMetadata": true, + "legendFormat": "node {{node}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Pipeline Operation Timeouts (1m delta)", + "type": "timeseries" + }, { "datasource": { "type": "prometheus", @@ -2200,8 +2303,8 @@ "gridPos": { "h": 4, "w": 8, - "x": 12, - "y": 76 + "x": 0, + "y": 84 }, "id": 57, "options": { @@ -2217,7 +2320,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2318,8 +2421,8 @@ "gridPos": { "h": 4, "w": 8, - "x": 12, - "y": 80 + "x": 8, + "y": 84 }, "id": 59, "options": { @@ -2335,7 +2438,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2436,7 +2539,7 @@ "gridPos": { "h": 4, "w": 8, - "x": 12, + "x": 16, "y": 84 }, "id": 58, @@ -2453,7 +2556,7 @@ "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2591,7 +2694,7 @@ "h": 8, "w": 12, "x": 0, - "y": 97 + "y": 89 }, "id": 26, "options": { @@ -2609,7 +2712,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2693,7 +2796,7 @@ "h": 8, "w": 12, "x": 12, - "y": 97 + "y": 89 }, "id": 27, "options": { @@ -2711,7 +2814,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2795,7 +2898,7 @@ "h": 8, "w": 12, "x": 0, - "y": 105 + "y": 97 }, "id": 29, "options": { @@ -2813,7 +2916,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2897,7 +3000,7 @@ "h": 8, "w": 12, "x": 12, - "y": 105 + "y": 97 }, "id": 28, "options": { @@ -2915,7 +3018,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -2999,7 +3102,7 @@ "h": 8, "w": 12, "x": 0, - "y": 113 + "y": 105 }, "id": 31, "options": { @@ -3017,7 +3120,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3101,7 +3204,7 @@ "h": 8, "w": 12, "x": 12, - "y": 113 + "y": 105 }, "id": 30, "options": { @@ -3119,7 +3222,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3203,7 +3306,7 @@ "h": 8, "w": 12, "x": 0, - "y": 121 + "y": 113 }, "id": 32, "options": { @@ -3221,7 +3324,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3305,7 +3408,7 @@ "h": 8, "w": 12, "x": 12, - "y": 121 + "y": 113 }, "id": 33, "options": { @@ -3323,7 +3426,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3407,7 +3510,7 @@ "h": 8, "w": 12, "x": 0, - "y": 129 + "y": 121 }, "id": 34, "options": { @@ -3425,7 +3528,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3509,7 +3612,7 @@ "h": 8, "w": 12, "x": 12, - "y": 129 + "y": 121 }, "id": 35, "options": { @@ -3527,7 +3630,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3611,7 +3714,7 @@ "h": 8, "w": 12, "x": 0, - "y": 137 + "y": 129 }, "id": 36, "options": { @@ -3629,7 +3732,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3713,7 +3816,7 @@ "h": 8, "w": 12, "x": 12, - "y": 137 + "y": 129 }, "id": 37, "options": { @@ -3731,7 +3834,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3815,7 +3918,7 @@ "h": 8, "w": 12, "x": 0, - "y": 145 + "y": 137 }, "id": 38, "options": { @@ -3833,7 +3936,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -3931,7 +4034,7 @@ "h": 8, "w": 12, "x": 0, - "y": 360 + "y": 146 }, "id": 19, "options": { @@ -3949,7 +4052,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4030,7 +4133,7 @@ "h": 8, "w": 12, "x": 12, - "y": 360 + "y": 146 }, "id": 56, "options": { @@ -4048,7 +4151,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4134,7 +4237,7 @@ "h": 8, "w": 12, "x": 0, - "y": 368 + "y": 154 }, "id": 18, "options": { @@ -4152,7 +4255,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4236,7 +4339,7 @@ "h": 8, "w": 12, "x": 12, - "y": 368 + "y": 154 }, "id": 21, "options": { @@ -4254,7 +4357,7 @@ "sort": "none" } }, - "pluginVersion": "12.0.1", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4344,7 +4447,7 @@ "h": 8, "w": 12, "x": 0, - "y": 376 + "y": 162 }, "id": 20, "options": { @@ -4362,7 +4465,7 @@ "sort": "none" } }, - "pluginVersion": "11.4.0", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4446,7 +4549,7 @@ "h": 8, "w": 12, "x": 12, - "y": 376 + "y": 162 }, "id": 23, "options": { @@ -4464,7 +4567,7 @@ "sort": "none" } }, - "pluginVersion": "11.4.0", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4554,7 +4657,7 @@ "h": 8, "w": 12, "x": 0, - "y": 384 + "y": 170 }, "id": 22, "options": { @@ -4572,7 +4675,7 @@ "sort": "none" } }, - "pluginVersion": "11.4.0", + "pluginVersion": "12.0.2", "targets": [ { "datasource": { @@ -4661,8 +4764,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, - "y": 392 + "x": 12, + "y": 170 }, "id": 24, "options": { @@ -4680,7 +4783,7 @@ "sort": "none" } }, - "pluginVersion": "11.4.0", + "pluginVersion": "12.0.2", "targets": [ { "datasource": {