Skip to content

Commit eac9149

Browse files
committed
introduced executor service for pipeline operations using virtual threads
Signed-off-by: Alex Kehayov <aleks.kehayov@limechain.tech>
1 parent 582fc3b commit eac9149

File tree

3 files changed

+335
-167
lines changed

3 files changed

+335
-167
lines changed

hedera-node/docs/design/app/blocks/BlockNodeConnection.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,21 +173,23 @@ To detect unresponsive block nodes during message transmission, the connection i
173173

174174
#### Timeout Behavior
175175

176-
Pipeline operations (`onNext()` and `onComplete()`) are blocking I/O operations that are executed on a separate executor thread with timeout enforcement using `Future.get(timeout)`.
176+
Pipeline operations (`onNext()` and `onComplete()`) are blocking I/O operations that are executed on a dedicated virtual thread executor with timeout enforcement using `Future.get(timeout)`. Each connection instance creates its own virtual thread executor to isolate pipeline operations from other tasks and prevent blocking.
177177

178-
- **onNext() timeout**: When sending block items via `sendRequest()`, the operation is submitted to an executor and the calling thread blocks waiting for completion with a timeout. If the operation does not complete within the configured timeout period:
178+
- **onNext() timeout**: When sending block items via `sendRequest()`, the operation is submitted to the connection's dedicated executor and the calling thread blocks waiting for completion with a timeout. If the operation does not complete within the configured timeout period:
179179
- The Future is cancelled to interrupt the blocked operation
180180
- The timeout metric is incremented
181181
- `handleStreamFailure()` is triggered (only if connection is still ACTIVE)
182182
- The connection follows standard failure handling with exponential backoff retry
183183
- The connection manager will select a different block node for the next attempt if one is available
184184
- `TimeoutException` is caught and handled internally
185-
- **onComplete() timeout**: When closing the stream via `closePipeline()`, the operation is submitted to an executor with the same timeout mechanism. If the operation does not complete within the configured timeout period:
185+
- **onComplete() timeout**: When closing the stream via `closePipeline()`, the operation is submitted to the same dedicated executor with the same timeout mechanism. If the operation does not complete within the configured timeout period:
186186
- The Future is cancelled to interrupt the blocked operation
187187
- The timeout metric is incremented
188188
- Since the connection is already in CLOSING state, only the timeout is logged
189189
- The connection completes the close operation normally
190190

191+
**Note**: The dedicated virtual thread executor is shut down when the connection closes, ensuring no resource leaks.
192+
191193
#### Exception Handling
192194

193195
The implementation handles multiple exception scenarios:

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.util.Objects;
3232
import java.util.Optional;
3333
import java.util.concurrent.ExecutionException;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
3436
import java.util.concurrent.Flow;
3537
import java.util.concurrent.Future;
3638
import java.util.concurrent.ScheduledExecutorService;
@@ -124,6 +126,13 @@ private record Options(Optional<String> authority, String contentType) implement
124126
* Scheduled executor service that is used to schedule periodic reset of the stream to help ensure stream health.
125127
*/
126128
private final ScheduledExecutorService executorService;
129+
/**
130+
* Dedicated executor service for pipeline operations using virtual threads.
131+
* This isolates pipeline operations from the shared scheduled executor to prevent
132+
* blocking when the connection manager is busy with other tasks. Virtual threads
133+
* make this approach lightweight despite creating one executor per connection.
134+
*/
135+
private final ExecutorService pipelineExecutor;
127136
/**
128137
* This task runs every 24 hours (initial delay of 24 hours) when a connection is active.
129138
* The task helps maintain stream stability by forcing periodic reconnections.
@@ -204,6 +213,7 @@ public BlockNodeConnection(
204213
this.blockStreamMetrics = requireNonNull(blockStreamMetrics, "blockStreamMetrics must not be null");
205214
this.connectionState = new AtomicReference<>(ConnectionState.UNINITIALIZED);
206215
this.executorService = requireNonNull(executorService, "executorService must not be null");
216+
this.pipelineExecutor = Executors.newVirtualThreadPerTaskExecutor();
207217
final var blockNodeConnectionConfig =
208218
configProvider.getConfiguration().getConfigData(BlockNodeConnectionConfig.class);
209219
this.streamResetPeriod = blockNodeConnectionConfig.streamResetPeriod();
@@ -646,7 +656,7 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) {
646656
highestAckedBlockNumber);
647657
try {
648658
sendRequest(endStream);
649-
} catch (RuntimeException e) {
659+
} catch (final RuntimeException e) {
650660
logger.warn(formatLogMessage("Error sending EndStream request", this), e);
651661
}
652662
close(true);
@@ -686,10 +696,10 @@ public void sendRequest(@NonNull final PublishStreamRequest request) {
686696
}
687697
final long startMs = System.currentTimeMillis();
688698

689-
Future<?> future = executorService.submit(() -> pipeline.onNext(request));
699+
final Future<?> future = pipelineExecutor.submit(() -> pipeline.onNext(request));
690700
try {
691701
future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
692-
} catch (TimeoutException e) {
702+
} catch (final TimeoutException e) {
693703
future.cancel(true); // Cancel the task if it times out
694704
if (getConnectionState() == ConnectionState.ACTIVE) {
695705
logWithContext(
@@ -701,10 +711,10 @@ public void sendRequest(@NonNull final PublishStreamRequest request) {
701711
blockStreamMetrics.recordPipelineOperationTimeout();
702712
handleStreamFailure();
703713
}
704-
} catch (InterruptedException e) {
714+
} catch (final InterruptedException e) {
705715
Thread.currentThread().interrupt(); // Restore interrupt status
706716
throw new RuntimeException("Interrupted while waiting for pipeline.onNext()", e);
707-
} catch (ExecutionException e) {
717+
} catch (final ExecutionException e) {
708718
throw new RuntimeException("Error executing pipeline.onNext()", e.getCause());
709719
}
710720

@@ -783,6 +793,16 @@ public void close(final boolean callOnComplete) {
783793
} catch (final Exception e) {
784794
logger.error(formatLogMessage("Error occurred while closing gRPC client.", this), e);
785795
}
796+
try {
797+
pipelineExecutor.shutdown();
798+
if (!pipelineExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
799+
pipelineExecutor.shutdownNow();
800+
}
801+
} catch (final InterruptedException e) {
802+
Thread.currentThread().interrupt();
803+
pipelineExecutor.shutdownNow();
804+
logger.error(formatLogMessage("Error occurred while shutting down pipeline executor.", this), e);
805+
}
786806
blockStreamMetrics.recordConnectionClosed();
787807
blockStreamMetrics.recordActiveConnectionIp(-1L);
788808
// regardless of outcome, mark the connection as closed
@@ -800,11 +820,11 @@ private void closePipeline(final boolean callOnComplete) {
800820
try {
801821
final ConnectionState state = getConnectionState();
802822
if (state == ConnectionState.CLOSING && callOnComplete) {
803-
Future<?> future = executorService.submit(pipeline::onComplete);
823+
final Future<?> future = pipelineExecutor.submit(pipeline::onComplete);
804824
try {
805825
future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
806826
logWithContext(logger, DEBUG, this, "Request pipeline successfully closed.");
807-
} catch (TimeoutException e) {
827+
} catch (final TimeoutException e) {
808828
future.cancel(true); // Cancel the task if it times out
809829
logWithContext(
810830
logger,
@@ -814,10 +834,10 @@ private void closePipeline(final boolean callOnComplete) {
814834
pipelineOperationTimeout.toMillis());
815835
blockStreamMetrics.recordPipelineOperationTimeout();
816836
// Connection is already closing, just log the timeout
817-
} catch (InterruptedException e) {
837+
} catch (final InterruptedException e) {
818838
Thread.currentThread().interrupt(); // Restore interrupt status
819839
logWithContext(logger, DEBUG, this, "Interrupted while waiting for pipeline.onComplete()");
820-
} catch (ExecutionException e) {
840+
} catch (final ExecutionException e) {
821841
logWithContext(logger, DEBUG, this, "Error executing pipeline.onComplete()", e.getCause());
822842
}
823843
}

0 commit comments

Comments
 (0)