Skip to content

Commit ec0f974

Browse files
committed
Remove LiveStreamPublisherManager and make Autonomous greedy configureable
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
1 parent a0bb7d3 commit ec0f974

File tree

9 files changed

+182
-123
lines changed

9 files changed

+182
-123
lines changed

block-node/app/src/main/java/org/hiero/block/node/app/DefaultThreadPoolManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ public ExecutorService createSingleThreadExecutor(
102102
*/
103103
@NonNull
104104
@Override
105-
public ScheduledExecutorService createSingleThreadScheduledExecutor(
105+
public ScheduledExecutorService createVirtualThreadScheduledExecutor(
106+
int corePoolSize,
106107
@Nullable final String threadName,
107108
@Nullable final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
108109
Thread.Builder factoryBuilder = Thread.ofVirtual();
@@ -111,10 +112,14 @@ public ScheduledExecutorService createSingleThreadScheduledExecutor(
111112
}
112113
if (threadName != null) {
113114
factoryBuilder.name(threadName, 0);
114-
return Executors.newSingleThreadScheduledExecutor(factoryBuilder.factory());
115+
return corePoolSize <= 1
116+
? Executors.newSingleThreadScheduledExecutor(factoryBuilder.factory())
117+
: Executors.newScheduledThreadPool(corePoolSize, factoryBuilder.factory());
115118
} else {
116119
final ThreadFactory factory = factoryBuilder.factory();
117-
return Executors.newSingleThreadScheduledExecutor(factory);
120+
return corePoolSize <= 1
121+
? Executors.newSingleThreadScheduledExecutor(factory)
122+
: Executors.newScheduledThreadPool(corePoolSize, factory);
118123
}
119124
}
120125
}

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/async/ScheduledBlockingExecutor.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,20 @@
33

44
import edu.umd.cs.findbugs.annotations.NonNull;
55
import java.util.concurrent.BlockingQueue;
6-
import java.util.concurrent.Executors;
76
import java.util.concurrent.ScheduledThreadPoolExecutor;
87

98
public class ScheduledBlockingExecutor extends ScheduledThreadPoolExecutor {
109
/** The work queue that will be used to hold the tasks. */
1110
private final BlockingQueue<Runnable> workQueue;
1211

1312
public ScheduledBlockingExecutor(@NonNull final BlockingQueue<Runnable> workQueue) {
14-
super(1, Executors.defaultThreadFactory(), new AbortPolicy());
13+
super(1, Thread.ofVirtual().factory(), new AbortPolicy());
14+
15+
this.workQueue = workQueue; // actual work queue
16+
}
17+
18+
public ScheduledBlockingExecutor(int corePoolSize, @NonNull final BlockingQueue<Runnable> workQueue) {
19+
super(corePoolSize, Thread.ofVirtual().factory(), new AbortPolicy());
1520

1621
this.workQueue = workQueue; // actual work queue
1722
}

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/async/TestThreadPoolManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ public T createSingleThreadExecutor(
6060

6161
@NonNull
6262
@Override
63-
public ScheduledExecutorService createSingleThreadScheduledExecutor(
64-
@Nullable String threadName, @Nullable UncaughtExceptionHandler uncaughtExceptionHandler) {
63+
public ScheduledExecutorService createVirtualThreadScheduledExecutor(
64+
int corePoolSize,
65+
@Nullable String threadName,
66+
@Nullable UncaughtExceptionHandler uncaughtExceptionHandler) {
6567
return scheduledExecutor;
6668
}
6769

block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* process indefinitely in case something unexpected happens, this would allow for self-recovery
2424
* @param grpcOverallTimeout single timeout configuration for gRPC Client construction, connectTimeout, readTimeout and pollWaitTime
2525
* @param enableTLS if enabled will assume block-node client supports tls connection.
26+
* @param greedy if enabled will search and retrieve blocks beyond latestAcknowledged to ensure BN doesn't fall too far behind.
2627
*/
2728
@ConfigData("backfill")
2829
public record BackfillConfiguration(
@@ -37,4 +38,5 @@ public record BackfillConfiguration(
3738
@Loggable @ConfigProperty(defaultValue = "15000") @Min(5) int initialDelay,
3839
@Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout,
3940
@Loggable @ConfigProperty(defaultValue = "60000") @Min(10000) int grpcOverallTimeout,
40-
@Loggable @ConfigProperty(defaultValue = "false") boolean enableTLS) {}
41+
@Loggable @ConfigProperty(defaultValue = "false") boolean enableTLS,
42+
@Loggable @ConfigProperty(defaultValue = "true") boolean greedy) {}

block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java

Lines changed: 85 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class BackfillPlugin implements BlockNodePlugin, BlockNotificationHandler
6161
private volatile boolean onDemandError = false;
6262
private final AtomicLong autonomousBackfillEndBlock = new AtomicLong(-1);
6363
private final AtomicLong onDemandBackfillStartBlock = new AtomicLong(-1);
64+
private final AtomicLong lastAcknowledgedBlockObserved = new AtomicLong(-1);
6465

6566
// Metrics
6667
private Counter backfillGapsDetected;
@@ -221,7 +222,13 @@ public void start() {
221222
"Scheduling backfill process to start in {0} milliseconds",
222223
backfillConfiguration.initialDelay());
223224
scheduler = Executors.newScheduledThreadPool(
224-
2); // Two threads: one for autonomous backfill, one for on-demand backfill
225+
2,
226+
Thread.ofVirtual().factory()); // Two threads: one for autonomous backfill, one for on-demand backfill
227+
// scheduler = context.threadPoolManager().createVirtualThreadScheduledExecutor(
228+
// 2, // Two threads: one for autonomous backfill, one for on-demand backfill
229+
// "BackfillPluginRunner",
230+
// (t, e) -> LOGGER.log(ERROR, "Uncaught exception in thread: " + t.getName(), e));
231+
225232
scheduler.scheduleAtFixedRate(
226233
this::detectGaps,
227234
backfillConfiguration.initialDelay(),
@@ -257,45 +264,42 @@ private boolean isOnDemandBackfillRunning() {
257264
return onDemandBackfillStartBlock.get() != -1;
258265
}
259266

260-
private void detectGaps() {
261-
// Skip if already running
262-
if (isAutonomousBackfillRunning()) {
263-
LOGGER.log(TRACE, "Gap detection already in progress, skipping this execution");
267+
private void greedyBackfillRecentBlocks(long lastAcknowledgedBlockObserved, long lastAcknowledgedBlock) {
268+
if (!backfillConfiguration.greedy()) {
264269
return;
265270
}
266271

267-
// Skip if OnDemand backfill is running
268-
if (isOnDemandBackfillRunning()) {
269-
LOGGER.log(TRACE, "On-Demand backfill is running, skipping autonomous gap detection");
272+
// Skip if lack acknowledged block observed has increased
273+
if (lastAcknowledgedBlockObserved > 0 && lastAcknowledgedBlock > lastAcknowledgedBlockObserved) {
274+
LOGGER.log(
275+
TRACE,
276+
"Last acknowledged block observed has increased from {0} to {1}, skipping greedy backfill",
277+
lastAcknowledgedBlockObserved,
278+
lastAcknowledgedBlock);
270279
return;
271280
}
272281

273282
try {
274-
LOGGER.log(TRACE, "Detecting gaps in historical blocks");
275-
276-
// Calculate total missing blocks
277-
long pendingBlocks = 0;
278-
279-
// Check for gaps between ranges
280-
List<LongRange> blockRanges = context.historicalBlockProvider()
281-
.availableBlocks()
282-
.streamRanges()
283-
.toList();
283+
LOGGER.log(TRACE, "Greedy backfilling recent blocks to stay close to network");
284284

285285
// greedy backfill newer blocks available from peer BN sources
286286
detectedGaps = new ArrayList<>();
287-
LongRange detectedRecentGapRange = backfillGrpcClientAutonomous.getNewAvailableRange(
288-
blockRanges.getLast().end());
287+
LongRange detectedRecentGapRange = backfillGrpcClientAutonomous.getNewAvailableRange(lastAcknowledgedBlock);
288+
289+
// to-do: consider on-demand backfill range and remove from detectedRecentGapRange if overlapping
290+
if (isOnDemandBackfillRunning()) {
291+
// if on-demand backfill is running, we need to adjust the detected recent gap range
292+
}
293+
289294
if (detectedRecentGapRange.size() > 0 && detectedRecentGapRange.start() >= 0) {
290295
detectedGaps.add(detectedRecentGapRange);
291-
pendingBlocks += detectedRecentGapRange.size();
292296

293297
// backfill recent gaps first to prioritize staying close to the network
294298
LOGGER.log(
295299
TRACE,
296300
"Detected recent gaps, numGaps={0} totalMissingBlocks={1}",
297301
detectedGaps.size(),
298-
pendingBlocks);
302+
detectedRecentGapRange.size());
299303

300304
autonomousBackfillEndBlock.set(detectedRecentGapRange.end());
301305
processDetectedGaps();
@@ -304,6 +308,51 @@ private void detectGaps() {
304308
LOGGER.log(TRACE, "No recent gaps detected from other block node sources");
305309
}
306310

311+
autonomousError = false;
312+
} catch (Exception e) {
313+
LOGGER.log(TRACE, "Error during backfill autonomous process: {0}", e);
314+
autonomousError = true;
315+
autonomousBackfillEndBlock.set(-1);
316+
}
317+
}
318+
319+
private void detectGaps() {
320+
// Skip if already running
321+
if (isAutonomousBackfillRunning()) {
322+
LOGGER.log(
323+
TRACE,
324+
"Autonomous backfill is already running up to {0}, skipping autonomous gap detection",
325+
autonomousBackfillEndBlock.get());
326+
return;
327+
}
328+
329+
// Skip if OnDemand backfill is running
330+
if (isOnDemandBackfillRunning() && !backfillConfiguration.greedy()) {
331+
LOGGER.log(
332+
TRACE,
333+
"On-Demand backfill is running starting from block {0}, skipping autonomous gap detection",
334+
onDemandBackfillStartBlock.get());
335+
return;
336+
}
337+
338+
try {
339+
LOGGER.log(TRACE, "Detecting gaps in blocks");
340+
341+
// Calculate total missing blocks
342+
long pendingBlocks = 0;
343+
344+
// Check for gaps between ranges
345+
List<LongRange> blockRanges = context.historicalBlockProvider()
346+
.availableBlocks()
347+
.streamRanges()
348+
.toList();
349+
350+
// greedy backfill newer blocks available from peer BN sources
351+
greedyBackfillRecentBlocks(
352+
lastAcknowledgedBlockObserved.get(), blockRanges.getLast().end());
353+
lastAcknowledgedBlockObserved.set(
354+
blockRanges.getLast().end()); // update the last observed acknowledged block
355+
307356
// backfill missing historical blocks from peer BN sources
308357
detectedGaps = new ArrayList<>();
309358
long expectedFirstBlock = backfillConfiguration.startBlock();
@@ -512,24 +561,24 @@ public void handleNewestBlockKnownToNetwork(NewestBlockKnownToNetworkNotificatio
512561
return;
513562
}
514563

515-
// we should create new Gap and a new task to backfill it
564+
// we should create new Gap and a new task to backfill it
516565
long lastPersistedBlock =
517566
context.historicalBlockProvider().availableBlocks().max();
518567
long newestBlockKnown = notification.blockNumber();
519-
final LongRange gap;
520-
if (newestBlockKnown == UNKNOWN_BLOCK_NUMBER) {
521-
// proactively get the new available range from other BN sources
522-
gap = backfillGrpcClientOnDemand.getNewAvailableRange(lastPersistedBlock);
523-
} else {
524-
// create gap from last persisted + 1 to block number from notification
525-
gap = new LongRange(lastPersistedBlock + 1, newestBlockKnown);
526-
}
568+
LongRange gap = new LongRange(lastPersistedBlock + 1, newestBlockKnown);
569+
LOGGER.log(
570+
TRACE,
571+
"Detected new block known to network: {0,number,#}, starting backfill task for gap: {1}",
572+
newestBlockKnown,
573+
gap);
574+
575+
lastAcknowledgedBlockObserved.set(lastPersistedBlock); // update the last observed acknowledged block
527576

528577
// if the gap is not empty, we can backfill it
529578
if (gap.size() > 0) {
530579
try {
531-
// Skip if autonomous backfill is running
532-
if (isAutonomousBackfillRunning()) {
580+
// Skip if greedy autonomous backfill is greedy is more aggressive and will like cover more blocks
581+
if (isAutonomousBackfillRunning() && backfillConfiguration.greedy()) {
533582
LOGGER.log(TRACE, "Autonomous backfill is running, skipping on-demand gap detection");
534583
return;
535584
}
@@ -543,6 +592,9 @@ public void handleNewestBlockKnownToNetwork(NewestBlockKnownToNetworkNotificatio
543592
try {
544593
LOGGER.log(TRACE, "Starting on-demand backfill for gap: {0}", gap);
545594
backfillGap(gap, BackfillType.ON_DEMAND);
595+
lastAcknowledgedBlockObserved.set(context.historicalBlockProvider()
596+
.availableBlocks()
597+
.max()); // update the last observed acknowledged block
546598
} catch (ParseException | InterruptedException | RuntimeException e) {
547599
LOGGER.log(TRACE, "Error backfilling gap {0}: {1}", gap, e);
548600
backfillFetchErrors.add(1);

0 commit comments

Comments
 (0)