Skip to content

Commit e75166d

Browse files
committed
chore: BlockAccessors Hardening & Improved Error Handling
* BlockAccessors are now Autocloseable * BlockAccessors now create hard links for the duration of their lifespan * This allows us to ensure data is accessible for the duration of the accessor's life * Tests are updated and migrated * Deleted redundant configuration tests * Added tests: * Tests for subsequent reads to ensure that closing an accessor does not affect the data or the ability for a new accessor to access it Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
1 parent b7b995a commit e75166d

File tree

24 files changed

+675
-576
lines changed

24 files changed

+675
-576
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# SPDX-License-Identifier: Apache-2.0
2-
32
# Test Properties File
43
prometheus.endpointEnabled=false
54
mediator.ringBufferSize=1024
65
notifier.ringBufferSize=1024
76
files.recent.liveRootPath=build/tmp/data/liveRootPath
7+
files.recent.linksRootPath=build/tmp/data/liveLinks
88
files.historic.rootPath=build/tmp/data/historic
9+
files.historic.linksRootPath=build/tmp/data/historicLinks

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/blocks/InMemoryBlockAccessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,7 @@ public Bytes blockBytes(final Format format) {
8585
private Bytes zstdCompressBytes(final Bytes bytes) {
8686
return Bytes.wrap(Zstd.compress(bytes.toByteArray()));
8787
}
88+
89+
@Override
90+
public void close() {}
8891
}

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/blocks/MinimalBlockAccessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ public Bytes blockBytes(final Format format) {
3232
private Bytes zstdCompressBytes(final Bytes bytes) {
3333
return Bytes.wrap(Zstd.compress(bytes.toByteArray()));
3434
}
35+
36+
@Override
37+
public void close() {}
3538
}

block-node/base/src/main/java/org/hiero/block/node/base/tar/TaredBlockIterator.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.block.node.base.tar;
33

4+
import edu.umd.cs.findbugs.annotations.NonNull;
45
import java.nio.charset.StandardCharsets;
56
import java.text.DecimalFormat;
67
import java.text.NumberFormat;
78
import java.util.Arrays;
89
import java.util.Iterator;
910
import java.util.NoSuchElementException;
11+
import java.util.Objects;
1012
import org.hiero.block.node.spi.historicalblocks.BlockAccessor;
1113
import org.hiero.block.node.spi.historicalblocks.BlockAccessor.Format;
1214

@@ -59,13 +61,14 @@ enum State {
5961
* @param format the format of the block data files written to the tar file
6062
* @param blockIterator the iterator over the blocks to be written into the tar file
6163
*/
62-
public TaredBlockIterator(Format format, Iterator<BlockAccessor> blockIterator) {
63-
this.format = format;
64+
public TaredBlockIterator(@NonNull final Format format, @NonNull final Iterator<BlockAccessor> blockIterator) {
65+
this.format = Objects.requireNonNull(format);
6466
this.extension = switch (format) {
6567
case Format.JSON -> ".json";
6668
case Format.PROTOBUF -> ".blk";
67-
case Format.ZSTD_PROTOBUF -> ".blk.zstd";};
68-
this.blockIterator = blockIterator;
69+
case Format.ZSTD_PROTOBUF -> ".blk.zstd";
70+
};
71+
this.blockIterator = Objects.requireNonNull(blockIterator);
6972
this.currentBuffer = new byte[CHUNK_SIZE];
7073
this.bufferPosition = 0;
7174
}
@@ -98,9 +101,13 @@ public byte[] next() {
98101
case NEXT_FILE:
99102
if (blockIterator.hasNext()) {
100103
// get the next block from the iterator
101-
final BlockAccessor currentBlock = blockIterator.next();
102-
currentBlockBytes = currentBlock.blockBytes(format).toByteArray();
103-
currentBlockName = BLOCK_NUMBER_FORMAT.format(currentBlock.blockNumber()) + extension;
104+
final BlockAccessor currentBlockAccessor = blockIterator.next();
105+
currentBlockName = BLOCK_NUMBER_FORMAT.format(currentBlockAccessor.blockNumber()) + extension;
106+
currentBlockBytes =
107+
currentBlockAccessor.blockBytes(format).toByteArray();
108+
// close the accessor as we are done with it and we need
109+
// to free resources
110+
currentBlockAccessor.close();
104111
filePosition = 0;
105112
state = State.WRITE_HEADER;
106113
} else {

block-node/block-access/src/main/java/org/hiero/block/node/access/service/BlockAccessServicePlugin.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import static java.lang.System.Logger.Level.INFO;
66
import static java.lang.System.Logger.Level.TRACE;
77

8-
import com.hedera.hapi.block.stream.Block;
98
import com.swirlds.metrics.api.Counter;
109
import org.hiero.block.api.BlockAccessServiceInterface;
1110
import org.hiero.block.api.BlockRequest;
@@ -14,6 +13,7 @@
1413
import org.hiero.block.node.spi.BlockNodeContext;
1514
import org.hiero.block.node.spi.BlockNodePlugin;
1615
import org.hiero.block.node.spi.ServiceBuilder;
16+
import org.hiero.block.node.spi.historicalblocks.BlockAccessor;
1717
import org.hiero.block.node.spi.historicalblocks.HistoricalBlockFacility;
1818

1919
/**
@@ -44,12 +44,10 @@ public class BlockAccessServicePlugin implements BlockNodePlugin, BlockAccessSer
4444
*/
4545
public BlockResponse getBlock(BlockRequest request) {
4646
requestCounter.increment();
47-
4847
try {
4948
long blockNumberToRetrieve;
5049
// The block number and retrieve latest are mutually exclusive in
5150
// the proto definition, so no need to check for that here.
52-
5351
// if retrieveLatest is set, or the request is for the largest possible block number, get the latest block.
5452
if (request.hasBlockNumber() && request.blockNumber() >= 0) {
5553
blockNumberToRetrieve = request.blockNumber();
@@ -63,7 +61,6 @@ public BlockResponse getBlock(BlockRequest request) {
6361
LOGGER.log(INFO, "Invalid request, 'retrieve_latest' or a valid 'block number' is required.");
6462
return new BlockResponse(Code.INVALID_REQUEST, null);
6563
}
66-
6764
// Check if block is within the available range
6865
if (!blockProvider.availableBlocks().contains(blockNumberToRetrieve)) {
6966
long lowestBlockNumber = blockProvider.availableBlocks().min();
@@ -77,14 +74,21 @@ public BlockResponse getBlock(BlockRequest request) {
7774
responseCounterNotAvailable.increment();
7875
return new BlockResponse(Code.NOT_AVAILABLE, null);
7976
}
80-
8177
// Retrieve the block
82-
Block block = blockProvider.block(blockNumberToRetrieve).block();
83-
responseCounterSuccess.increment();
84-
return new BlockResponse(Code.SUCCESS, block);
85-
86-
} catch (RuntimeException e) {
87-
String message = "Failed to retrieve block number %d.".formatted(request.blockNumber());
78+
try (final BlockAccessor accessor = blockProvider.block(blockNumberToRetrieve)) {
79+
if (accessor != null) {
80+
// even though we have checked for the existence of the block
81+
// it may not be available anymore, so we have to ensure the accessor
82+
// still exists
83+
final BlockResponse response = new BlockResponse(Code.SUCCESS, accessor.block());
84+
responseCounterSuccess.increment();
85+
return response;
86+
} else {
87+
return new BlockResponse(Code.NOT_FOUND, null);
88+
}
89+
}
90+
} catch (final RuntimeException e) {
91+
final String message = "Failed to retrieve block number %d.".formatted(request.blockNumber());
8892
LOGGER.log(ERROR, message, e);
8993
responseCounterNotFound.increment(); // Should this be a failure counter?
9094
return new BlockResponse(Code.NOT_FOUND, null);

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/FilesHistoricConfig.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66
import com.swirlds.config.api.validation.annotation.Max;
77
import com.swirlds.config.api.validation.annotation.Min;
88
import java.nio.file.Path;
9-
import java.util.Objects;
10-
import org.hiero.block.common.utils.Preconditions;
119
import org.hiero.block.node.base.CompressionType;
1210
import org.hiero.block.node.base.Loggable;
1311

1412
/**
1513
* Use this configuration across the files recent plugin.
1614
*
1715
* @param rootPath provides the root path for saving historic blocks
16+
* @param linksRootPath provides the root path for saving hard links when issuing
17+
* accessors
1818
* @param compression compression type to use for the storage. It is assumed this never changes while a node is running
1919
* and has existing files.
2020
* @param powersOfTenPerZipFileContents the number files in a zip file specified in powers of ten. Can can be one of
@@ -30,16 +30,7 @@
3030
@ConfigData("files.historic")
3131
public record FilesHistoricConfig(
3232
@Loggable @ConfigProperty(defaultValue = "/opt/hiero/block-node/data/historic") Path rootPath,
33+
@Loggable @ConfigProperty(defaultValue = "/opt/hiero/block-node/data/links") Path linksRootPath,
3334
@Loggable @ConfigProperty(defaultValue = "ZSTD") CompressionType compression,
3435
@Loggable @ConfigProperty(defaultValue = "4") @Min(1) @Max(6) int powersOfTenPerZipFileContents,
35-
@Loggable @ConfigProperty(defaultValue = "0") long blockRetentionThreshold) {
36-
/**
37-
* Constructor.
38-
*/
39-
public FilesHistoricConfig {
40-
Objects.requireNonNull(rootPath);
41-
Objects.requireNonNull(compression);
42-
Preconditions.requireInRange(powersOfTenPerZipFileContents, 1, 6);
43-
Preconditions.requireGreaterOrEqual(blockRetentionThreshold, 0L);
44-
}
45-
}
36+
@Loggable @ConfigProperty(defaultValue = "0") long blockRetentionThreshold) {}

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/ZipBlockAccessor.java

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.block.node.blocks.files.historic;
33

4+
import static java.lang.System.Logger.Level.INFO;
45
import static java.lang.System.Logger.Level.WARNING;
56

67
import com.hedera.hapi.block.stream.Block;
@@ -15,6 +16,7 @@
1516
import java.nio.file.Files;
1617
import java.nio.file.Path;
1718
import java.util.Objects;
19+
import java.util.UUID;
1820
import org.hiero.block.node.base.CompressionType;
1921
import org.hiero.block.node.spi.historicalblocks.BlockAccessor;
2022

@@ -28,25 +30,44 @@ final class ZipBlockAccessor implements BlockAccessor {
2830
private static final String FAILED_TO_PARSE_MESSAGE = "Failed to parse block from file %s.";
2931
/** Message logged when data cannot be read from a block file */
3032
private static final String FAILED_TO_READ_MESSAGE = "Failed to read block from file %s.";
31-
/** The block path. */
32-
private final BlockPath blockPath;
33+
/** Message logged when the provided path to a zip file is not a regular file or does not exist. */
34+
private static final String INVALID_ZIP_FILE_PATH_MESSAGE =
35+
"Provided path to zip file is not a regular file or does not exist: %s";
36+
/** The block number. */
37+
private final long blockNumber;
38+
/** The compression type. */
39+
private final CompressionType compressionType;
40+
/** The zip entry name. */
41+
private final String zipEntryName;
42+
/** The zip path. */
43+
private final Path zipFileLink;
3344

3445
/**
3546
* Constructs a ZipBlockAccessor with the specified block path.
3647
*
3748
* @param blockPath the block path
49+
* @param config the configuration for the block provider
3850
*/
39-
ZipBlockAccessor(@NonNull final BlockPath blockPath) {
40-
this.blockPath = Objects.requireNonNull(blockPath);
51+
ZipBlockAccessor(@NonNull final BlockPath blockPath, @NonNull final FilesHistoricConfig config) throws IOException {
52+
this.blockNumber = Long.parseLong(blockPath.blockNumStr());
53+
this.compressionType = blockPath.compressionType();
54+
this.zipEntryName = blockPath.blockFileName();
55+
final Path zipFilePath = blockPath.zipFilePath();
56+
if (!Files.isRegularFile(zipFilePath)) {
57+
final String msg = INVALID_ZIP_FILE_PATH_MESSAGE.formatted(zipFilePath);
58+
throw new IOException(msg);
59+
}
60+
// create a hard link to the block file for the duration of the accessor's life
61+
final Path link = config.linksRootPath().resolve(UUID.randomUUID().toString());
62+
this.zipFileLink = Files.createLink(link, zipFilePath);
4163
}
4264

4365
/**
4466
* {@inheritDoc}
4567
*/
4668
@Override
4769
public long blockNumber() {
48-
// TODO maybe there is nice option here than having to parse the string
49-
return Long.parseLong(blockPath.blockNumStr());
70+
return blockNumber;
5071
}
5172

5273
/**
@@ -55,12 +76,11 @@ public long blockNumber() {
5576
@Override
5677
public Bytes blockBytes(@NonNull final Format format) {
5778
Objects.requireNonNull(format);
58-
CompressionType compressionType = blockPath.compressionType();
59-
try (final FileSystem zipFs = FileSystems.newFileSystem(blockPath.zipFilePath())) {
60-
final Path entry = zipFs.getPath(blockPath.blockFileName());
79+
try (final FileSystem zipFs = FileSystems.newFileSystem(zipFileLink)) {
80+
final Path entry = zipFs.getPath(zipEntryName);
6181
return getBytesFromPath(format, entry, compressionType);
6282
} catch (final UncheckedIOException | IOException e) {
63-
LOGGER.log(WARNING, FAILED_TO_READ_MESSAGE.formatted(blockPath), e);
83+
LOGGER.log(WARNING, FAILED_TO_READ_MESSAGE.formatted(zipFileLink), e);
6484
return null;
6585
}
6686
}
@@ -112,12 +132,24 @@ private Bytes getJsonBytesFromProtobufBytes(final Bytes sourceData) {
112132
try {
113133
return Block.JSON.toBytes(Block.PROTOBUF.parse(sourceData));
114134
} catch (final UncheckedIOException | ParseException e) {
115-
final String message = FAILED_TO_PARSE_MESSAGE.formatted(blockPath);
135+
final String message = FAILED_TO_PARSE_MESSAGE.formatted(zipFileLink);
116136
LOGGER.log(WARNING, message, e);
117137
return null;
118138
}
119139
} else {
120140
return null;
121141
}
122142
}
143+
144+
/**
145+
* This method deletes the link to the zip file.
146+
*/
147+
@Override
148+
public void close() {
149+
try {
150+
Files.delete(zipFileLink);
151+
} catch (final IOException e) {
152+
LOGGER.log(INFO, "Failed to delete accessor link for block: {0}, path: {0}", blockNumber, zipFileLink);
153+
}
154+
}
123155
}

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/ZipBlockArchive.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package org.hiero.block.node.blocks.files.historic;
33

44
import static java.lang.System.Logger.Level.ERROR;
5+
import static java.lang.System.Logger.Level.INFO;
56
import static java.lang.System.Logger.Level.WARNING;
67
import static org.hiero.block.node.base.BlockFile.blockNumberFromFile;
78
import static org.hiero.block.node.blocks.files.historic.BlockPath.computeBlockPath;
@@ -86,6 +87,9 @@ long writeNewZipFile(List<BlockAccessor> batch) throws IOException {
8687
// it is here possible that the accessor will no longer be able to access the block
8788
// because it is possible that the block has been deleted or has been moved
8889
final Bytes bytes = blockAccessor.blockBytes(format);
90+
// close the accessor as we are done with it and we need to free
91+
// resources
92+
blockAccessor.close();
8993
// calculate CRC-32 checksum
9094
final CRC32 crc = new CRC32();
9195
crc.update(bytes.toByteArray());
@@ -121,11 +125,12 @@ long writeNewZipFile(List<BlockAccessor> batch) throws IOException {
121125
*/
122126
BlockAccessor blockAccessor(long blockNumber) {
123127
try {
124-
// get existing block path or null if we cannot find it
128+
// get existing block path or null if we cannot find it or create accessor for
125129
final BlockPath blockPath = computeExistingBlockPath(config, blockNumber);
126-
return blockPath == null ? null : new ZipBlockAccessor(blockPath);
130+
return blockPath == null ? null : new ZipBlockAccessor(blockPath, config);
127131
} catch (final IOException e) {
128-
throw new UncheckedIOException(e);
132+
LOGGER.log(INFO, "Could not create zip block accessor", e);
133+
return null;
129134
}
130135
}
131136

block-node/block-providers/files.historic/src/test/java/org/hiero/block/node/blocks/files/historic/BlockFileHistoricPluginTest.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@
5353
*/
5454
@DisplayName("BlockFileHistoricPlugin Tests")
5555
class BlockFileHistoricPluginTest {
56-
/** TempDir for the current test */
57-
private final Path testTempDir;
56+
/** Data TempDir for the current test */
57+
private final Path dataRoot;
58+
/** Links TempDir for the current test */
59+
private final Path linksRoot;
5860
/** The test block messaging facility to use for testing. */
5961
private final SimpleInMemoryHistoricalBlockFacility testHistoricalBlockFacility;
6062
/** The test config to use for the plugin, overridable. */
@@ -65,13 +67,14 @@ class BlockFileHistoricPluginTest {
6567
/**
6668
* Construct test environment.
6769
*/
68-
BlockFileHistoricPluginTest(@TempDir final Path tempDir) {
69-
this.testTempDir = Objects.requireNonNull(tempDir);
70+
BlockFileHistoricPluginTest(@TempDir final Path dataRoot, @TempDir final Path linksRoot) {
71+
this.dataRoot = Objects.requireNonNull(dataRoot);
72+
this.linksRoot = Objects.requireNonNull(linksRoot);
7073
// generate test config, for the purposes of this test, we will always
7174
// use 10 blocks per zip, assuming that the first zip file will contain
7275
// for example blocks 0-9, the second zip file will contain blocks 10-19
7376
// also we will not use compression, and we will use the jUnit temp dir
74-
testConfig = new FilesHistoricConfig(this.testTempDir, CompressionType.NONE, 1, 10L);
77+
testConfig = new FilesHistoricConfig(this.dataRoot, this.linksRoot, CompressionType.NONE, 1, 10L);
7578
// build the plugin using the test environment
7679
toTest = new BlockFileHistoricPlugin();
7780
// initialize an in memory historical block facility to use for testing
@@ -159,14 +162,17 @@ final class PluginTests extends PluginTestBase<BlockFileHistoricPlugin, Blocking
159162
private Map<String, String> getConfigOverrides() {
160163
final Entry<String, String> rootPath =
161164
Map.entry("files.historic.rootPath", testConfig.rootPath().toString());
165+
final Entry<String, String> linksRootPath = Map.entry(
166+
"files.historic.linksRootPath", testConfig.linksRootPath().toString());
162167
final Entry<String, String> compression = Map.entry(
163168
"files.historic.compression", testConfig.compression().name());
164169
final Entry<String, String> powersOfTenPerZipFileContents = Map.entry(
165170
"files.historic.powersOfTenPerZipFileContents",
166171
String.valueOf(testConfig.powersOfTenPerZipFileContents()));
167172
final Entry<String, String> blockRetentionThreshold = Map.entry(
168173
"files.historic.blockRetentionThreshold", String.valueOf(testConfig.blockRetentionThreshold()));
169-
return Map.ofEntries(rootPath, compression, powersOfTenPerZipFileContents, blockRetentionThreshold);
174+
return Map.ofEntries(
175+
rootPath, linksRootPath, compression, powersOfTenPerZipFileContents, blockRetentionThreshold);
170176
}
171177

172178
/**
@@ -942,7 +948,7 @@ void testRetentionPolicyThresholdHappyPath() throws IOException {
942948
@DisplayName("Test retention policy threshold disabled")
943949
void testRetentionPolicyThresholdDisabled() throws IOException {
944950
// change the retention policy to be disabled
945-
testConfig = new FilesHistoricConfig(testTempDir, CompressionType.NONE, 1, 0L);
951+
testConfig = new FilesHistoricConfig(dataRoot, linksRoot, CompressionType.NONE, 1, 0L);
946952
// override the config in the plugin
947953
start(toTest, testHistoricalBlockFacility, getConfigOverrides());
948954
// generate first 150 blocks from numbers 0-149 and add them to the

0 commit comments

Comments
 (0)