Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions hedera-node/docs/design/app/blocks/BlockNodeConnection.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ It manages connection state, handles communication, and reports errors to the `B

- Establish and maintain the connection transport.
- Handle incoming and outgoing message flow.
- Detect unresponsive block nodes via configurable timeouts on pipeline operations.
- Report connection errors promptly.
- Coordinate with `BlockNodeConnectionManager` on lifecycle events.
- Notify the block buffer (via connection manager) when a block has been acknowledged and therefore eligible to be
Expand Down Expand Up @@ -68,6 +69,7 @@ stateDiagram-v2
ACTIVE --> CLOSING : ResendBlock unavailable
ACTIVE --> CLOSING : gRPC onError
ACTIVE --> CLOSING : Stream failure
ACTIVE --> CLOSING : Pipeline operation timeout
ACTIVE --> CLOSING : Manual close
ACTIVE --> ACTIVE : BlockAcknowledgement
ACTIVE --> ACTIVE : SkipBlock
Expand Down Expand Up @@ -160,4 +162,49 @@ The connection implements a configurable rate limiting mechanism for EndOfStream

<dt>endOfStreamScheduleDelay</dt>
<dd>The delay duration before attempting reconnection when the rate limit is exceeded.</dd>

<dt>pipelineOperationTimeout</dt>
<dd>The maximum duration allowed for pipeline onNext() and onComplete() operations before considering the block node unresponsive. Default: 30 seconds.</dd>
</dl>

### Pipeline Operation Timeout

To detect unresponsive block nodes during message transmission and connection establishment, the connection implements configurable timeouts for pipeline operations.

#### Timeout Behavior

Pipeline operations (`onNext()`, `onComplete()`, and pipeline creation) are potentially blocking I/O operations that are executed on a dedicated virtual thread executor with timeout enforcement using `Future.get(timeout)`. Each connection instance creates its own virtual thread executor to isolate pipeline operations from other tasks and prevent blocking.

- **Pipeline creation timeout**: When establishing the gRPC connection via `createRequestPipeline()`, both the gRPC client creation and bidirectional stream setup are executed with timeout protection. If the operation does not complete within the configured timeout period:
- The Future is cancelled to interrupt the blocked operation
- The timeout metric is incremented
- A `RuntimeException` is thrown with the underlying `TimeoutException`
- The connection remains in UNINITIALIZED state
- The connection manager's error handling will schedule a retry with exponential backoff
- **onNext() timeout**: When sending block items via `sendRequest()`, the operation is submitted to the connection's dedicated executor and the calling thread blocks waiting for completion with a timeout. If the operation does not complete within the configured timeout period:
- The Future is cancelled to interrupt the blocked operation
- The timeout metric is incremented
- `handleStreamFailure()` is triggered (only if connection is still ACTIVE)
- The connection follows standard failure handling with exponential backoff retry
- The connection manager will select a different block node for the next attempt if one is available
- `TimeoutException` is caught and handled internally
- **onComplete() timeout**: When closing the stream via `closePipeline()`, the operation is submitted to the same dedicated executor with the same timeout mechanism. If the operation does not complete within the configured timeout period:
- The Future is cancelled to interrupt the blocked operation
- The timeout metric is incremented
- Since the connection is already in CLOSING state, only the timeout is logged
- The connection completes the close operation normally

**Note**: The dedicated virtual thread executor is properly shut down when the connection closes with a 5-second grace period for termination, ensuring no resource leaks. If tasks don't complete within the grace period, `shutdownNow()` is called to forcefully terminate them.

#### Exception Handling

The implementation handles multiple exception scenarios across all timeout-protected operations:
- **TimeoutException**: Pipeline operation exceeded the timeout - triggers failure handling for `onNext()` and pipeline creation, logged for `onComplete()`
- **InterruptedException**: Thread was interrupted while waiting - interrupt status is restored via `Thread.currentThread().interrupt()` before propagating the exception (for `onNext()` and pipeline creation) or logging it (for `onComplete()` and executor shutdown)
- **ExecutionException**: Error occurred during pipeline operation execution - the underlying cause is unwrapped and re-thrown (for `onNext()` and pipeline creation) or logged (for `onComplete()`)

All exception scenarios include appropriate DEBUG-level logging with context information to aid in troubleshooting.

#### Metrics

A new metric `conn_pipelineOperationTimeout` tracks the total number of timeout events for pipeline creation, `onNext()`, and `onComplete()` operations, enabling operators to monitor block node responsiveness and connection establishment issues.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -99,6 +104,10 @@
* The reset period for the stream. This is used to periodically reset the stream to ensure increased stability and reliability.
*/
private final Duration streamResetPeriod;
/**
* Timeout for pipeline onNext() and onComplete() operations to detect unresponsive block nodes.
*/
private final Duration pipelineOperationTimeout;
/**
* Flag that indicates if this stream is currently shutting down, as initiated by this consensus node.
*/
Expand All @@ -117,6 +126,13 @@
* Scheduled executor service that is used to schedule periodic reset of the stream to help ensure stream health.
*/
private final ScheduledExecutorService executorService;
/**
* Dedicated executor service for pipeline operations using virtual threads.
* This isolates pipeline operations from the shared scheduled executor to prevent
* blocking when the connection manager is busy with other tasks. Virtual threads
* make this approach lightweight despite creating one executor per connection.
*/
private final ExecutorService pipelineExecutor;
/**
* This task runs every 24 hours (initial delay of 24 hours) when a connection is active.
* The task helps maintain stream stability by forcing periodic reconnections.
Expand Down Expand Up @@ -197,10 +213,12 @@
this.blockStreamMetrics = requireNonNull(blockStreamMetrics, "blockStreamMetrics must not be null");
this.connectionState = new AtomicReference<>(ConnectionState.UNINITIALIZED);
this.executorService = requireNonNull(executorService, "executorService must not be null");
this.pipelineExecutor = Executors.newVirtualThreadPerTaskExecutor();
final var blockNodeConnectionConfig =
configProvider.getConfiguration().getConfigData(BlockNodeConnectionConfig.class);
this.streamResetPeriod = blockNodeConnectionConfig.streamResetPeriod();
this.clientFactory = requireNonNull(clientFactory, "clientFactory must not be null");
this.pipelineOperationTimeout = blockNodeConnectionConfig.pipelineOperationTimeout();
connectionId = String.format("%04d", connectionIdCounter.incrementAndGet());
}

Expand All @@ -209,13 +227,38 @@
*/
public synchronized void createRequestPipeline() {
if (requestPipelineRef.get() == null) {
blockStreamPublishServiceClient = createNewGrpcClient();
final Pipeline<? super PublishStreamRequest> pipeline =
blockStreamPublishServiceClient.publishBlockStream(this);
requestPipelineRef.set(pipeline);
logWithContext(logger, DEBUG, this, "Request pipeline initialized.");
updateConnectionState(ConnectionState.PENDING);
blockStreamMetrics.recordConnectionOpened();
// Execute entire pipeline creation (including gRPC client creation) with timeout
// to prevent blocking on network operations
final Future<?> future = pipelineExecutor.submit(() -> {
blockStreamPublishServiceClient = createNewGrpcClient();
final Pipeline<? super PublishStreamRequest> pipeline =
blockStreamPublishServiceClient.publishBlockStream(this);
requestPipelineRef.set(pipeline);
});

try {
future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
logWithContext(logger, DEBUG, this, "Request pipeline initialized.");
updateConnectionState(ConnectionState.PENDING);
blockStreamMetrics.recordConnectionOpened();
} catch (final TimeoutException e) {
future.cancel(true);
logWithContext(
logger,
DEBUG,
this,
"Pipeline creation timed out after {}ms",
pipelineOperationTimeout.toMillis());
blockStreamMetrics.recordPipelineOperationTimeout();
throw new RuntimeException("Pipeline creation timed out", e);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
logWithContext(logger, DEBUG, this, "Interrupted while creating pipeline", e);
throw new RuntimeException("Interrupted while creating pipeline", e);
} catch (final ExecutionException e) {
logWithContext(logger, DEBUG, this, "Error creating pipeline", e.getCause());
throw new RuntimeException("Error creating pipeline", e.getCause());
}
} else {
logWithContext(logger, DEBUG, this, "Request pipeline already available.");
}
Expand Down Expand Up @@ -638,7 +681,7 @@
highestAckedBlockNumber);
try {
sendRequest(endStream);
} catch (RuntimeException e) {
} catch (final RuntimeException e) {

Check warning on line 684 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java#L684

Added line #L684 was not covered by tests
logger.warn(formatLogMessage("Error sending EndStream request", this), e);
}
close(true);
Expand Down Expand Up @@ -677,7 +720,31 @@
request.protobufSize());
}
final long startMs = System.currentTimeMillis();
pipeline.onNext(request);

final Future<?> future = pipelineExecutor.submit(() -> pipeline.onNext(request));
try {
future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (final TimeoutException e) {
future.cancel(true); // Cancel the task if it times out
if (getConnectionState() == ConnectionState.ACTIVE) {
logWithContext(
logger,
DEBUG,
this,
"Pipeline onNext() timed out after {}ms",
pipelineOperationTimeout.toMillis());
blockStreamMetrics.recordPipelineOperationTimeout();
handleStreamFailure();
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt status
logWithContext(logger, DEBUG, this, "Interrupted while waiting for pipeline.onNext()", e);
throw new RuntimeException("Interrupted while waiting for pipeline.onNext()", e);
} catch (final ExecutionException e) {
logWithContext(logger, DEBUG, this, "Error executing pipeline.onNext()", e.getCause());
throw new RuntimeException("Error executing pipeline.onNext()", e.getCause());
}

final long durationMs = System.currentTimeMillis() - startMs;
blockStreamMetrics.recordRequestLatency(durationMs);
logWithContext(logger, TRACE, this, "Request took {}ms to send", this, durationMs);
Expand Down Expand Up @@ -753,6 +820,16 @@
} catch (final Exception e) {
logger.error(formatLogMessage("Error occurred while closing gRPC client.", this), e);
}
try {
pipelineExecutor.shutdown();
if (!pipelineExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
pipelineExecutor.shutdownNow();
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
pipelineExecutor.shutdownNow();
logger.error(formatLogMessage("Error occurred while shutting down pipeline executor.", this), e);
}
blockStreamMetrics.recordConnectionClosed();
blockStreamMetrics.recordActiveConnectionIp(-1L);
// regardless of outcome, mark the connection as closed
Expand All @@ -770,8 +847,26 @@
try {
final ConnectionState state = getConnectionState();
if (state == ConnectionState.CLOSING && callOnComplete) {
pipeline.onComplete();
logWithContext(logger, DEBUG, this, "Request pipeline successfully closed.");
final Future<?> future = pipelineExecutor.submit(pipeline::onComplete);
try {
future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS);
logWithContext(logger, DEBUG, this, "Request pipeline successfully closed.");
} catch (final TimeoutException e) {
future.cancel(true); // Cancel the task if it times out
logWithContext(
logger,
DEBUG,
this,
"Pipeline onComplete() timed out after {}ms",
pipelineOperationTimeout.toMillis());
blockStreamMetrics.recordPipelineOperationTimeout();
// Connection is already closing, just log the timeout
} catch (final InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt status
logWithContext(logger, DEBUG, this, "Interrupted while waiting for pipeline.onComplete()");
} catch (final ExecutionException e) {
logWithContext(logger, DEBUG, this, "Error executing pipeline.onComplete()", e.getCause());
}
}
} catch (final Exception e) {
logger.warn(formatLogMessage("Error while completing request pipeline.", this), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
private Counter conn_endOfStreamLimitCounter;
private DoubleGauge conn_ackLatencyGauge;
private Counter conn_highLatencyCounter;
private Counter conn_pipelineOperationTimeoutCounter;

// buffer metrics
private static final long BACK_PRESSURE_ACTIVE = 3;
Expand Down Expand Up @@ -282,6 +283,11 @@
final Counter.Config highLatencyCfg = newCounter(GROUP_CONN, "highLatencyEvents")
.withDescription("Count of high latency events from the active block node connection");
conn_highLatencyCounter = metrics.getOrCreate(highLatencyCfg);

final Counter.Config pipelineTimeoutCfg = newCounter(GROUP_CONN, "pipelineOperationTimeout")
.withDescription(
"Number of times a pipeline onNext() or onComplete() operation timed out on a block node connection");
conn_pipelineOperationTimeoutCounter = metrics.getOrCreate(pipelineTimeoutCfg);
}

/**
Expand Down Expand Up @@ -357,6 +363,13 @@
conn_highLatencyCounter.increment();
}

/**
* Record that a pipeline onNext() or onComplete() operation timed out.
*/
public void recordPipelineOperationTimeout() {
conn_pipelineOperationTimeoutCounter.increment();
}

Check warning on line 371 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java#L370-L371

Added lines #L370 - L371 were not covered by tests

// Connection RECV metrics -----------------------------------------------------------------------------------------

private void registerConnectionRecvMetrics() {
Expand Down
Loading
Loading