Skip to content

Commit 15aa286

Browse files
committed
Streamline logic and Fix tests
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
1 parent 7523ecb commit 15aa286

File tree

20 files changed

+191
-63
lines changed

20 files changed

+191
-63
lines changed

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/TestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public static ConfigurationBuilder createTestConfiguration() {
3737
.withConfigDataType(com.swirlds.common.metrics.config.MetricsConfig.class)
3838
.withConfigDataType(com.swirlds.common.metrics.platform.prometheus.PrometheusConfig.class)
3939
.withConfigDataType(org.hiero.block.node.app.config.ServerConfig.class)
40-
.withConfigDataType(org.hiero.block.node.app.config.node.NodeConfig.class);
40+
.withConfigDataType(org.hiero.block.node.app.config.node.NodeConfig.class)
41+
.withValue("prometheus.endpointEnabled", "false");
4142
return configurationBuilder;
4243
}
4344
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package org.hiero.block.node.app.fixtures.async;
3+
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import java.util.concurrent.BlockingQueue;
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.ScheduledThreadPoolExecutor;
8+
9+
public class ScheduledBlockingExecutor extends ScheduledThreadPoolExecutor {
10+
/** The work queue that will be used to hold the tasks. */
11+
private final BlockingQueue<Runnable> workQueue;
12+
13+
public ScheduledBlockingExecutor(@NonNull final BlockingQueue<Runnable> workQueue) {
14+
super(1, Executors.defaultThreadFactory(), new AbortPolicy());
15+
16+
this.workQueue = workQueue; // actual work queue
17+
}
18+
19+
@Override
20+
@SuppressWarnings("all")
21+
public void execute(@NonNull final Runnable command) {
22+
try {
23+
workQueue.put(command);
24+
} catch (final InterruptedException e) {
25+
Thread.currentThread().interrupt();
26+
throw new RuntimeException("Thread was interrupted while trying to put a task into the work queue", e);
27+
}
28+
}
29+
}

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/async/TestThreadPoolManager.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
*
1818
* @param <T> the type of executor service
1919
*/
20-
public class TestThreadPoolManager<T extends ExecutorService> implements ThreadPoolManager {
20+
public class TestThreadPoolManager<T extends ExecutorService, S extends ScheduledExecutorService>
21+
implements ThreadPoolManager {
2122
private final T executor;
23+
private final S scheduledExecutor;
2224

23-
public TestThreadPoolManager(@NonNull T executor) {
25+
public TestThreadPoolManager(@NonNull T executor, S scheduledExecutorService) {
2426
this.executor = Objects.requireNonNull(executor);
27+
this.scheduledExecutor = scheduledExecutorService;
2528
}
2629

2730
/**
@@ -59,14 +62,19 @@ public T createSingleThreadExecutor(
5962
@Override
6063
public ScheduledExecutorService createSingleThreadScheduledExecutor(
6164
@Nullable String threadName, @Nullable UncaughtExceptionHandler uncaughtExceptionHandler) {
62-
return (ScheduledExecutorService) executor;
65+
return scheduledExecutor;
6366
}
6467

6568
@NonNull
6669
public final T executor() {
6770
return executor;
6871
}
6972

73+
@NonNull
74+
public final S scheduleExecutor() {
75+
return scheduledExecutor;
76+
}
77+
7078
public void shutdownNow() {
7179
executor.shutdownNow();
7280
}

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/GrpcPluginTestBase.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Optional;
1919
import java.util.concurrent.ExecutorService;
2020
import java.util.concurrent.Flow.Subscription;
21+
import java.util.concurrent.ScheduledExecutorService;
2122
import org.hiero.block.node.spi.BlockNodePlugin;
2223
import org.hiero.block.node.spi.ServiceBuilder;
2324
import org.hiero.block.node.spi.historicalblocks.HistoricalBlockFacility;
@@ -33,8 +34,9 @@
3334
* Implementations of this class should call one of the start() methods. This will start the plugin and initialize the
3435
* test fixture.
3536
*/
36-
public abstract class GrpcPluginTestBase<P extends BlockNodePlugin, E extends ExecutorService>
37-
extends PluginTestBase<P, E> implements ServiceBuilder {
37+
public abstract class GrpcPluginTestBase<
38+
P extends BlockNodePlugin, E extends ExecutorService, S extends ScheduledExecutorService>
39+
extends PluginTestBase<P, E, S> implements ServiceBuilder {
3840
private record ReqOptions(Optional<String> authority, boolean isProtobuf, boolean isJson, String contentType)
3941
implements ServiceInterface.RequestOptions {}
4042
/** The GRPC bytes received from the plugin. */
@@ -46,8 +48,8 @@ private record ReqOptions(Optional<String> authority, boolean isProtobuf, boolea
4648
/** The GRPC service interface for the plugin. */
4749
protected ServiceInterface serviceInterface;
4850

49-
protected GrpcPluginTestBase(@NonNull final E executorService) {
50-
super(executorService);
51+
protected GrpcPluginTestBase(@NonNull final E executorService, @NonNull final S scheduledExecutorService) {
52+
super(executorService, scheduledExecutorService);
5153
}
5254

5355
/**

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/PluginTestBase.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Map;
1212
import java.util.Map.Entry;
1313
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.ScheduledExecutorService;
1415
import org.hiero.block.node.app.fixtures.async.TestThreadPoolManager;
1516
import org.hiero.block.node.spi.BlockNodeContext;
1617
import org.hiero.block.node.spi.BlockNodePlugin;
@@ -39,11 +40,12 @@
3940
*
4041
* @param <P> the type of plugin being tested
4142
*/
42-
public abstract class PluginTestBase<P extends BlockNodePlugin, E extends ExecutorService> {
43+
public abstract class PluginTestBase<
44+
P extends BlockNodePlugin, E extends ExecutorService, S extends ScheduledExecutorService> {
4345
/** The logger for this class. */
4446
protected final System.Logger LOGGER = System.getLogger(getClass().getName());
4547
/** The test thread pool manager */
46-
protected final TestThreadPoolManager<E> testThreadPoolManager;
48+
protected final TestThreadPoolManager<E, S> testThreadPoolManager;
4749
/** The metrics provider for the test. */
4850
private DefaultMetricsProvider metricsProvider;
4951
/** The block node context, for access to core facilities. */
@@ -53,8 +55,8 @@ public abstract class PluginTestBase<P extends BlockNodePlugin, E extends Execut
5355
/** The plugin to be tested */
5456
protected P plugin;
5557

56-
protected PluginTestBase(@NonNull final E executorService) {
57-
testThreadPoolManager = new TestThreadPoolManager<>(executorService);
58+
protected PluginTestBase(@NonNull final E executorService, @NonNull final S scheduledExecutorService) {
59+
testThreadPoolManager = new TestThreadPoolManager<>(executorService, scheduledExecutorService);
5860
}
5961

6062
/**

block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,39 @@ private LongRange getAvailableRangeInNode(BlockNodeClient node, LongRange blockR
126126
* @return a LongRange representing the new available block range
127127
*/
128128
public LongRange getNewAvailableRange(long latestStoredBlockNumber) {
129-
long end = Long.MIN_VALUE;
129+
long earliestPeerBlock = Long.MAX_VALUE;
130+
long latestPeerBlock = Long.MIN_VALUE;
130131

131132
for (BackfillSourceConfig node : blockNodeSource.nodes()) {
132133
BlockNodeClient currentNodeClient = getNodeClient(node);
133134

134135
final ServerStatusResponse nodeStatus =
135136
currentNodeClient.getBlockNodeServiceClient().serverStatus(new ServerStatusRequest());
137+
long firstAvailableBlock = nodeStatus.firstAvailableBlock();
136138
long lastAvailableBlock = nodeStatus.lastAvailableBlock();
137139

138-
// update the end to the max lastAvailableBlock
139-
end = Math.max(end, lastAvailableBlock);
140+
// update the earliestPeerBlock to the max lastAvailableBlock
141+
latestPeerBlock = Math.max(latestPeerBlock, lastAvailableBlock);
142+
earliestPeerBlock = Math.min(earliestPeerBlock, firstAvailableBlock);
140143
}
141144

142-
return new LongRange(latestStoredBlockNumber + 1, end);
145+
LOGGER.log(
146+
INFO,
147+
"Determined block range from peer blocks nodes earliestPeerBlock={0,number,#} to latestStoredBlockNumber={1,number,#}",
148+
earliestPeerBlock,
149+
latestPeerBlock);
150+
151+
// confirm next block is available if not we still can't backfill
152+
if (latestStoredBlockNumber + 1 < earliestPeerBlock || latestStoredBlockNumber > latestPeerBlock) {
153+
return new LongRange(-1, -1);
154+
}
155+
156+
LOGGER.log(
157+
INFO,
158+
"Determined available range from peer blocks nodes start={0,number,#} to end={1,number,#}",
159+
latestStoredBlockNumber + 1,
160+
latestPeerBlock);
161+
return new LongRange(latestStoredBlockNumber + 1, latestPeerBlock);
143162
}
144163

145164
/**

block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -271,21 +271,39 @@ private void detectGaps() {
271271

272272
try {
273273
LOGGER.log(TRACE, "Detecting gaps in historical blocks");
274-
detectedGaps = new ArrayList<>();
275274

276-
// Get the configured first block available
277-
long expectedFirstBlock = backfillConfiguration.startBlock();
278-
long previousRangeEnd = expectedFirstBlock - 1;
275+
// Calculate total missing blocks
276+
long pendingBlocks = 0;
279277

280278
// Check for gaps between ranges
281279
List<LongRange> blockRanges = context.historicalBlockProvider()
282280
.availableBlocks()
283281
.streamRanges()
284282
.toList();
285283

286-
// Calculate total missing blocks
287-
long pendingBlocks = 0;
284+
// add block range available from other BN sources
285+
LongRange detectedRecentGapRange = backfillGrpcClientAutonomous.getNewAvailableRange(
286+
blockRanges.getLast().end());
287+
if (detectedRecentGapRange.size() > 0 && detectedRecentGapRange.start() >= 0) {
288+
detectedGaps = new ArrayList<>();
289+
detectedGaps.add(detectedRecentGapRange);
290+
291+
// backfile recent gaps first to prioritize staying close to the network
292+
LOGGER.log(
293+
TRACE,
294+
"Detected recent gaps, numGaps={0} totalMissingBlocks={1}",
295+
detectedGaps.size(),
296+
pendingBlocks);
297+
processDetectedGaps();
298+
} else {
299+
LOGGER.log(TRACE, "No recent gaps detected from other block node sources");
300+
}
301+
302+
// Get the configured first block available
303+
long expectedFirstBlock = backfillConfiguration.startBlock();
304+
long previousRangeEnd = expectedFirstBlock - 1;
288305

306+
detectedGaps = new ArrayList<>();
289307
for (LongRange range : blockRanges) {
290308
if (range.start() > previousRangeEnd + 1) {
291309
LongRange gap = new LongRange(previousRangeEnd + 1, range.start() - 1);
@@ -300,21 +318,19 @@ private void detectGaps() {
300318
previousRangeEnd = range.end();
301319
}
302320

303-
// add block range available from other BN sources
304-
detectedGaps.add(backfillGrpcClientAutonomous.getNewAvailableRange(
305-
blockRanges.getLast().end()));
306-
307321
// increase only if detectedGaps is not empty
308-
if (!detectedGaps.isEmpty()) backfillGapsDetected.add(detectedGaps.size());
309-
310-
LOGGER.log(TRACE, "Detected gaps, numGaps={0} totalMissingBlocks={1}", detectedGaps.size(), pendingBlocks);
311-
312-
// Process detected gaps
313322
if (!detectedGaps.isEmpty()) {
323+
backfillGapsDetected.add(detectedGaps.size());
324+
LOGGER.log(
325+
TRACE,
326+
"Detected historical gaps, numGaps={0} totalMissingBlocks={1}",
327+
detectedGaps.size(),
328+
pendingBlocks);
314329
processDetectedGaps();
315330
} else {
316331
LOGGER.log(TRACE, "No gaps detected in historical blocks");
317332
}
333+
318334
autonomousError = false;
319335
} catch (Exception e) {
320336
LOGGER.log(TRACE, "Error during backfill autonomous process: {0}", e);

block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.concurrent.TimeUnit;
1515
import org.hiero.block.internal.BlockItemUnparsed;
1616
import org.hiero.block.node.app.fixtures.async.BlockingExecutor;
17+
import org.hiero.block.node.app.fixtures.async.ScheduledBlockingExecutor;
1718
import org.hiero.block.node.app.fixtures.blocks.SimpleTestBlockItemBuilder;
1819
import org.hiero.block.node.app.fixtures.plugintest.PluginTestBase;
1920
import org.hiero.block.node.app.fixtures.plugintest.SimpleBlockRangeSet;
@@ -33,13 +34,15 @@
3334
import org.junit.jupiter.api.Test;
3435
import org.junit.jupiter.api.io.TempDir;
3536

36-
class BackfillPluginTest extends PluginTestBase<BackfillPlugin, BlockingExecutor> {
37+
class BackfillPluginTest extends PluginTestBase<BackfillPlugin, BlockingExecutor, ScheduledBlockingExecutor> {
3738

3839
/** TempDir for the current test */
3940
private final Path testTempDir;
4041

4142
public BackfillPluginTest(@TempDir final Path tempDir) {
42-
super(new BlockingExecutor(new LinkedBlockingQueue<>()));
43+
super(
44+
new BlockingExecutor(new LinkedBlockingQueue<>()),
45+
new ScheduledBlockingExecutor(new LinkedBlockingQueue<>()));
4346
this.testTempDir = Objects.requireNonNull(tempDir);
4447
}
4548

block-node/block-access/src/test/java/org/hiero/block/node/access/service/BlockAccessServicePluginTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,27 @@
1414
import com.hedera.pbj.runtime.grpc.ServiceInterface;
1515
import java.util.List;
1616
import java.util.concurrent.LinkedBlockingQueue;
17+
import java.util.concurrent.ScheduledExecutorService;
1718
import org.hiero.block.api.BlockRequest;
1819
import org.hiero.block.api.BlockResponse;
1920
import org.hiero.block.api.BlockResponse.Code;
2021
import org.hiero.block.node.app.fixtures.async.BlockingExecutor;
22+
import org.hiero.block.node.app.fixtures.async.ScheduledBlockingExecutor;
2123
import org.hiero.block.node.app.fixtures.plugintest.GrpcPluginTestBase;
2224
import org.hiero.block.node.app.fixtures.plugintest.SimpleInMemoryHistoricalBlockFacility;
2325
import org.hiero.block.node.spi.blockmessaging.BlockItems;
2426
import org.junit.jupiter.api.BeforeEach;
2527
import org.junit.jupiter.api.DisplayName;
2628
import org.junit.jupiter.api.Test;
2729

28-
public class BlockAccessServicePluginTest extends GrpcPluginTestBase<BlockAccessServicePlugin, BlockingExecutor> {
30+
public class BlockAccessServicePluginTest
31+
extends GrpcPluginTestBase<BlockAccessServicePlugin, BlockingExecutor, ScheduledExecutorService> {
2932
private final BlockAccessServicePlugin plugin = new BlockAccessServicePlugin();
3033

3134
public BlockAccessServicePluginTest() {
32-
super(new BlockingExecutor(new LinkedBlockingQueue<>()));
35+
super(
36+
new BlockingExecutor(new LinkedBlockingQueue<>()),
37+
new ScheduledBlockingExecutor(new LinkedBlockingQueue<>()));
3338
start(plugin, plugin.methods().getFirst(), new SimpleInMemoryHistoricalBlockFacility());
3439
}
3540

block-node/block-providers/files.historic/src/test/java/org/hiero/block/node/blocks/files/historic/BlockFileHistoricPluginTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import java.util.Map.Entry;
2424
import java.util.Objects;
2525
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.ScheduledExecutorService;
2627
import org.hiero.block.internal.BlockItemUnparsed;
2728
import org.hiero.block.internal.BlockUnparsed;
2829
import org.hiero.block.node.app.fixtures.async.BlockingExecutor;
30+
import org.hiero.block.node.app.fixtures.async.ScheduledBlockingExecutor;
2931
import org.hiero.block.node.app.fixtures.async.TestThreadPoolManager;
3032
import org.hiero.block.node.app.fixtures.blocks.SimpleTestBlockItemBuilder;
3133
import org.hiero.block.node.app.fixtures.plugintest.NoOpServiceBuilder;
@@ -127,7 +129,9 @@ void testInitNullServiceBuilder(@TempDir final Path tempDir) {
127129
new TestBlockMessagingFacility(),
128130
historicalBlockProvider,
129131
null,
130-
new TestThreadPoolManager<>(new BlockingExecutor(new LinkedBlockingQueue<>())));
132+
new TestThreadPoolManager<>(
133+
new BlockingExecutor(new LinkedBlockingQueue<>()),
134+
new ScheduledBlockingExecutor(new LinkedBlockingQueue<>())));
131135
// call
132136
final BlockFileHistoricPlugin toTest = new BlockFileHistoricPlugin();
133137
assertThatNoException().isThrownBy(() -> toTest.init(testContext, null));
@@ -139,15 +143,18 @@ void testInitNullServiceBuilder(@TempDir final Path tempDir) {
139143
*/
140144
@Nested
141145
@DisplayName("Plugin Tests")
142-
final class PluginTests extends PluginTestBase<BlockFileHistoricPlugin, BlockingExecutor> {
146+
final class PluginTests
147+
extends PluginTestBase<BlockFileHistoricPlugin, BlockingExecutor, ScheduledBlockingExecutor> {
143148
/** The test block serial executor service to use for the plugin. */
144149
private final BlockingExecutor pluginExecutor;
145150

146151
/**
147152
* Construct plugin base.
148153
*/
149154
PluginTests() {
150-
super(new BlockingExecutor(new LinkedBlockingQueue<>()));
155+
super(
156+
new BlockingExecutor(new LinkedBlockingQueue<>()),
157+
new ScheduledBlockingExecutor(new LinkedBlockingQueue<>()));
151158
// match overrides to the test config
152159
final Map<String, String> configOverrides = getConfigOverrides();
153160
pluginExecutor = testThreadPoolManager.executor();
@@ -917,13 +924,16 @@ void testRetentionPolicyThresholdDisabled() throws IOException {
917924
*/
918925
@Nested
919926
@DisplayName("Regression Tests")
920-
final class RegressionTests extends PluginTestBase<BlockFileHistoricPlugin, BlockingExecutor> {
927+
final class RegressionTests
928+
extends PluginTestBase<BlockFileHistoricPlugin, BlockingExecutor, ScheduledExecutorService> {
921929
/** The test historical block facility scoped to this regression scenario. */
922930
private final SimpleInMemoryHistoricalBlockFacility regressionHistoricalBlockFacility =
923931
new SimpleInMemoryHistoricalBlockFacility();
924932

925933
RegressionTests() {
926-
super(new BlockingExecutor(new LinkedBlockingQueue<>()));
934+
super(
935+
new BlockingExecutor(new LinkedBlockingQueue<>()),
936+
new ScheduledBlockingExecutor(new LinkedBlockingQueue<>()));
927937
}
928938

929939
private Map<String, String> buildConfigOverrides(final FilesHistoricConfig config) {

0 commit comments

Comments
 (0)