From 18bd2dc90d11febb2a61e31b418d880c8bba21d7 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 23 Dec 2025 15:31:02 -0800 Subject: [PATCH] HDDS-14235. Cleanup RDBBatchOperation. --- .../hdds/utils/db/RDBBatchOperation.java | 301 ++++++++---------- .../hadoop/hdds/utils/db/TestCodec.java | 11 +- 2 files changed, 145 insertions(+), 167 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 49693bd29674..ab8132042093 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.utils.db; import static org.apache.hadoop.hdds.StringUtils.bytes2String; +import static org.apache.ratis.util.Preconditions.assertInstanceOf; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedBytes; @@ -34,7 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; @@ -60,8 +60,6 @@ public final class RDBBatchOperation implements BatchOperation { private final OpCache opCache = new OpCache(); - private enum Op { DELETE, PUT, DELETE_RANGE } - public static RDBBatchOperation newAtomicOperation() { return newAtomicOperation(new ManagedWriteBatch()); } @@ -84,32 +82,60 @@ private static String countSize2String(int count, long size) { return count + " (" + byteSize2String(size) + ")"; } + static final class ByteArray extends Bytes { + ByteArray(byte[] array) { + super(array); + } + + @Override + int length() { + return getBytes().length; + } + + @Override + ByteBuffer asReadOnlyByteBuffer() { + return ByteBuffer.wrap(getBytes()); + } + } + + static final class CodecBufferBytes extends Bytes { + CodecBufferBytes(CodecBuffer buffer) { + super(buffer); + } + + @Override + int length() { + return getBytes().readableBytes(); + } + + @Override + ByteBuffer asReadOnlyByteBuffer() { + return getBytes().asReadOnlyByteBuffer(); + } + } + /** * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#opsKeys}. * To implement {@link #equals(Object)} and {@link #hashCode()} * based on the contents of the bytes. */ - static final class Bytes implements Comparable { - private final byte[] array; - private final CodecBuffer buffer; + abstract static class Bytes { + private final T byteObject; /** Cache the hash value. */ private final int hash; - Bytes(CodecBuffer buffer) { - this.array = null; - this.buffer = Objects.requireNonNull(buffer, "buffer == null"); - this.hash = buffer.asReadOnlyByteBuffer().hashCode(); + Bytes(T byteObject) { + this.byteObject = Objects.requireNonNull(byteObject, "byteObject == null"); + this.hash = asReadOnlyByteBuffer().hashCode(); } - Bytes(byte[] array) { - this.array = array; - this.buffer = null; - this.hash = ByteBuffer.wrap(array).hashCode(); + T getBytes() { + return byteObject; } - ByteBuffer asReadOnlyByteBuffer() { - return buffer.asReadOnlyByteBuffer(); - } + abstract int length(); + + abstract ByteBuffer asReadOnlyByteBuffer(); @Override public boolean equals(Object obj) { @@ -118,15 +144,11 @@ public boolean equals(Object obj) { } else if (!(obj instanceof Bytes)) { return false; } - final Bytes that = (Bytes) obj; + final Bytes that = (Bytes) obj; if (this.hash != that.hash) { return false; } - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); - return thisBuf.equals(thatBuf); + return this.asReadOnlyByteBuffer().equals(that.asReadOnlyByteBuffer()); } @Override @@ -136,17 +158,13 @@ public int hashCode() { @Override public String toString() { - return array != null ? bytes2String(array) - : bytes2String(asReadOnlyByteBuffer()); + return bytes2String(asReadOnlyByteBuffer()); } + // TODO: use ByteWiseComparator directly and do not reimplement it. // This method mimics the ByteWiseComparator in RocksDB. - @Override - public int compareTo(RDBBatchOperation.Bytes that) { - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); + int compareTo(ByteBuffer thatBuf) { + final ByteBuffer thisBuf = asReadOnlyByteBuffer(); for (int i = 0; i < Math.min(thisBuf.remaining(), thatBuf.remaining()); i++) { int cmp = UnsignedBytes.compare(thisBuf.get(i), thatBuf.get(i)); @@ -158,20 +176,24 @@ public int compareTo(RDBBatchOperation.Bytes that) { } } - private abstract class Operation implements Closeable { - private Bytes keyBytes; + abstract static class Op> implements Closeable { + private final T keyBytes; - private Operation(Bytes keyBytes) { - this.keyBytes = keyBytes; + private Op(T keyBytes) { + this.keyBytes = Objects.requireNonNull(keyBytes, "keyBytes == null"); } abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; - abstract int keyLen(); + int keyLen() { + return getKey().length(); + } - abstract int valLen(); + int valLen() { + return 0; + } - Bytes getKey() { + T getKey() { return keyBytes; } @@ -179,8 +201,6 @@ int totalLength() { return keyLen() + valLen(); } - abstract Op getOpType(); - @Override public void close() { } @@ -189,144 +209,116 @@ public void close() { /** * Delete operation to be applied to a {@link ColumnFamily} batch. */ - private final class DeleteOperation extends Operation { - private final byte[] key; - - private DeleteOperation(byte[] key, Bytes keyBytes) { - super(Objects.requireNonNull(keyBytes, "keyBytes == null")); - this.key = Objects.requireNonNull(key, "key == null"); + static final class DeleteOp extends Op { + private DeleteOp(ByteArray key) { + super(key); } @Override public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchDelete(batch, this.key); - } - - @Override - public int keyLen() { - return key.length; - } - - @Override - public int valLen() { - return 0; - } - - @Override - public Op getOpType() { - return Op.DELETE; + family.batchDelete(batch, getKey().getBytes()); } } /** * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. */ - private final class CodecBufferPutOperation extends Operation { - private final CodecBuffer key; - private final CodecBuffer value; + static final class CodecBufferPutOp extends Op { + private final CodecBufferBytes value; private final AtomicBoolean closed = new AtomicBoolean(false); - private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { - super(keyBytes); - this.key = key; + private CodecBufferPutOp(CodecBufferBytes key, CodecBufferBytes value) { + super(key); this.value = value; } @Override public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); - } - - @Override - public int keyLen() { - return key.readableBytes(); + family.batchPut(batch, getKey().asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); } @Override public int valLen() { - return value.readableBytes(); - } - - @Override - public Op getOpType() { - return Op.PUT; + return value.length(); } @Override public void close() { if (closed.compareAndSet(false, true)) { - key.release(); - value.release(); + getKey().getBytes().release(); + value.getBytes().release(); } - super.close(); } } /** * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. */ - private final class ByteArrayPutOperation extends Operation { - private final byte[] key; + static final class ByteArrayPutOp extends Op { private final byte[] value; - private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) { - super(Objects.requireNonNull(keyBytes)); - this.key = Objects.requireNonNull(key, "key == null"); + private ByteArrayPutOp(ByteArray key, byte[] value) { + super(key); this.value = Objects.requireNonNull(value, "value == null"); } @Override public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchPut(batch, key, value); - } - - @Override - public int keyLen() { - return key.length; + family.batchPut(batch, getKey().getBytes(), value); } @Override public int valLen() { return value.length; } + } - @Override - public Op getOpType() { - return Op.PUT; + static DeleteRangeOp findContainingRange(Bytes key, Iterable deleteRanges) { + for (DeleteRangeOp op: deleteRanges) { + if (op.containKey(key)) { + return op; + } } + return null; } /** * Delete range operation to be applied to a {@link ColumnFamily} batch. */ - private final class DeleteRangeOperation extends Operation { - private final byte[] startKey; - private final byte[] endKey; + static final class DeleteRangeOp extends Op { + private final ByteArray endKey; - private DeleteRangeOperation(byte[] startKey, byte[] endKey) { - super(null); - this.startKey = Objects.requireNonNull(startKey, "startKey == null"); + private DeleteRangeOp(ByteArray startKey, ByteArray endKey) { + super(startKey); this.endKey = Objects.requireNonNull(endKey, "endKey == null"); } - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchDeleteRange(batch, startKey, endKey); + boolean containKey(Bytes key) { + final ByteBuffer keyBuf = ByteBuffer.wrap(assertInstanceOf(key.getBytes(), byte[].class)); + return getKey().compareTo(keyBuf) <= 0 && endKey.compareTo(keyBuf) > 0; + } + + ByteBuffer getStartKey() { + return getKey().asReadOnlyByteBuffer(); + } + + ByteBuffer getEndKey() { + return endKey.asReadOnlyByteBuffer(); } @Override - public int keyLen() { - return startKey.length + endKey.length; + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDeleteRange(batch, getKey().getBytes(), endKey.getBytes()); } @Override - public int valLen() { - return 0; + public int keyLen() { + return getKey().length() + endKey.length(); } @Override - public Op getOpType() { - return Op.DELETE_RANGE; + public String toString() { + return String.format("deleteRange[%s, %s)", bytes2String(getStartKey()), bytes2String(getEndKey())); } } @@ -353,7 +345,7 @@ private class FamilyCache { * This field plays a critical role in managing the logical consistency and proper execution * order of operations stored in the batch when interacting with a RocksDB-backed system. */ - private final Map opsKeys = new HashMap<>(); + private final Map, Integer> opsKeys = new HashMap<>(); /** * Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances. * @@ -372,7 +364,7 @@ private class FamilyCache { * - Operations stored in this map are expected to define specific actions (e.g., put, delete, * delete range) and their associated data (e.g., keys, values). */ - private final Map batchOps = new HashMap<>(); + private final Map> batchOps = new HashMap<>(); private boolean isCommit; private long batchSize; @@ -417,13 +409,15 @@ void prepareBatchWrite() throws RocksDatabaseException { Preconditions.checkState(!isCommit, "%s is already committed.", this); isCommit = true; // Sort Entries based on opIndex and flush the operation to the batch in the same order. - List ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) + // TODO: use sorted map + final List> ops = batchOps.entrySet().stream() + .sorted(Comparator.comparingInt(Map.Entry::getKey)) .map(Map.Entry::getValue).collect(Collectors.toList()); List> deleteRangeIndices = new ArrayList<>(); int index = 0; int prevIndex = -2; - for (Operation op : ops) { - if (Op.DELETE_RANGE == op.getOpType()) { + for (Op op : ops) { + if (op instanceof DeleteRangeOp) { if (index - prevIndex > 1) { deleteRangeIndices.add(new ArrayList<>()); } @@ -437,42 +431,26 @@ void prepareBatchWrite() throws RocksDatabaseException { deleteRangeIndices.add(Collections.emptyList()); int startIndex = 0; for (List continuousDeleteRangeIndices : deleteRangeIndices) { - List deleteRangeOps = continuousDeleteRangeIndices.stream() - .map(i -> (DeleteRangeOperation)ops.get(i)) - .collect(Collectors.toList()); - List> deleteRangeOpsRanges = continuousDeleteRangeIndices.stream() - .map(i -> (DeleteRangeOperation)ops.get(i)) - .map(i -> Pair.of(new Bytes(i.startKey), new Bytes(i.endKey))) + final List deleteRangeOps = continuousDeleteRangeIndices.stream() + .map(i -> (DeleteRangeOp)ops.get(i)) .collect(Collectors.toList()); int firstOpIndex = continuousDeleteRangeIndices.isEmpty() ? ops.size() : continuousDeleteRangeIndices.get(0); for (int i = startIndex; i < firstOpIndex; i++) { - Operation op = ops.get(i); - Bytes key = op.getKey(); - // Compare the key with the startKey and endKey of the delete range operation. Add to Batch if key - // doesn't fall [startKey, endKey) range. - boolean keyInRange = false; - Pair deleteRange = null; - for (Pair deleteRangeOp : deleteRangeOpsRanges) { - if (key.compareTo(deleteRangeOp.getLeft()) >= 0 && key.compareTo(deleteRangeOp.getRight()) < 0) { - keyInRange = true; - deleteRange = deleteRangeOp; - break; - } - } - if (!keyInRange) { + final Op op = ops.get(i); + final Bytes key = op.getKey(); + // Add to Batch if the key is not contained in a deleteRange. + final DeleteRangeOp containingRange = findContainingRange(key, deleteRangeOps); + if (containingRange == null) { op.apply(family, writeBatch); } else { - Pair finalDeleteRange = deleteRange; - debug(() -> String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", - bytes2String(key.asReadOnlyByteBuffer()), - bytes2String(finalDeleteRange.getKey().asReadOnlyByteBuffer()), - bytes2String(finalDeleteRange.getRight().asReadOnlyByteBuffer()))); + debug(() -> String.format("Discarding Operation with Key: %s as it falls within %s", + bytes2String(key.asReadOnlyByteBuffer()), containingRange)); discardedCount++; discardedSize += op.totalLength(); } } - for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) { + for (DeleteRangeOp deleteRangeOp : deleteRangeOps) { // Apply the delete range operation to the batch. deleteRangeOp.apply(family, writeBatch); } @@ -499,30 +477,29 @@ void clear() { } } - private void deleteIfExist(Bytes key, boolean removeFromIndexMap) { + private void deleteIfExist(Bytes key) { // remove previous first in order to call release() - if (opsKeys.containsKey(key)) { - int previousIndex = removeFromIndexMap ? opsKeys.remove(key) : opsKeys.get(key); - final Operation previous = batchOps.remove(previousIndex); + final Integer previousIndex = opsKeys.remove(key); + if (previousIndex != null) { + final Op previous = batchOps.remove(previousIndex); previous.close(); discardedSize += previous.totalLength(); discardedCount++; - debug(() -> String.format("%s overwriting a previous %s[valLen => %s]", this, previous.getOpType(), - previous.valLen())); + debug(() -> String.format("%s overwriting a previous %s[valLen=%s]", + this, previous.getClass().getSimpleName(), previous.valLen())); } } - void overWriteOpIfExist(Bytes key, Operation operation) { + void overwriteIfExist(Bytes key, Op op) { Preconditions.checkState(!isCommit, "%s is already committed.", this); - deleteIfExist(key, true); - batchSize += operation.totalLength(); + deleteIfExist(key); + batchSize += op.totalLength(); int newIndex = opIndex.getAndIncrement(); final Integer overwritten = opsKeys.put(key, newIndex); - batchOps.put(newIndex, operation); + batchOps.put(newIndex, op); Preconditions.checkState(overwritten == null || !batchOps.containsKey(overwritten)); debug(() -> String.format("%s %s, %s; key=%s", this, - Op.DELETE == operation.getOpType() ? delString(operation.totalLength()) : putString(operation.keyLen(), - operation.valLen()), + op instanceof DeleteOp ? delString(op.totalLength()) : putString(op.keyLen(), op.valLen()), batchSizeDiscardedString(), key)); } @@ -530,25 +507,25 @@ void put(CodecBuffer key, CodecBuffer value) { putCount++; // always release the key with the value - Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value, keyBytes)); + final CodecBufferBytes keyBytes = new CodecBufferBytes(key); + overwriteIfExist(keyBytes, new CodecBufferPutOp(keyBytes, new CodecBufferBytes(value))); } void put(byte[] key, byte[] value) { putCount++; - Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value, keyBytes)); + final ByteArray keyBytes = new ByteArray(key); + overwriteIfExist(keyBytes, new ByteArrayPutOp(keyBytes, value)); } void delete(byte[] key) { delCount++; - Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); + final ByteArray keyBytes = new ByteArray(key); + overwriteIfExist(keyBytes, new DeleteOp(keyBytes)); } void deleteRange(byte[] startKey, byte[] endKey) { delRangeCount++; - batchOps.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); + batchOps.put(opIndex.getAndIncrement(), new DeleteRangeOp(new ByteArray(startKey), new ByteArray(endKey))); } String putString(int keySize, int valueSize) { diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java index f695f2864055..18297a01f220 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java @@ -34,7 +34,8 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; -import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.ByteArray; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.CodecBufferBytes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; @@ -294,15 +295,15 @@ public static void runTest(Codec codec, T original, static void runTestBytes(T object, Codec codec) throws IOException { final byte[] array = codec.toPersistedFormat(object); - final Bytes fromArray = new Bytes(array); + final ByteArray fromArray = new ByteArray(array); try (CodecBuffer buffer = codec.toCodecBuffer(object, CodecBuffer.Allocator.HEAP)) { - final Bytes fromBuffer = new Bytes(buffer); + final CodecBufferBytes fromBuffer = new CodecBufferBytes(buffer); assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); - assertEquals(fromArray, fromBuffer); - assertEquals(fromBuffer, fromArray); + assertEquals(fromArray.asReadOnlyByteBuffer(), fromBuffer.asReadOnlyByteBuffer()); + assertEquals(fromBuffer.asReadOnlyByteBuffer(), fromArray.asReadOnlyByteBuffer()); } } }