From 72c13eb8a490021b8524ff3b0fb836ef8fb0b2ab Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Wed, 24 Dec 2025 16:40:22 -0500 Subject: [PATCH 1/2] HDDS-14238. Move RDBBatchOperation Byte comparison to native comparison for optimization Change-Id: Ia7655ff5148197be488a2c1151ec7fd1d6f9d452 --- .../hdds/utils/db/RDBBatchOperation.java | 71 ++++++++++++------- .../hadoop/hdds/utils/db/TestCodec.java | 18 ++--- pom.xml | 1 + 3 files changed, 56 insertions(+), 34 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 3681959b4b45..f786ebfa56d6 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -17,8 +17,6 @@ package org.apache.hadoop.hdds.utils.db; -import static org.apache.hadoop.hdds.StringUtils.bytes2String; - import com.google.common.base.Preconditions; import java.io.Closeable; import java.nio.ByteBuffer; @@ -30,10 +28,13 @@ import java.util.function.Supplier; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; +import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; import org.apache.ratis.util.TraditionalBinaryPrefix; import org.apache.ratis.util.UncheckedAutoCloseable; +import org.rocksdb.AbstractSlice; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,26 +81,33 @@ private static String countSize2String(int count, long size) { * To implement {@link #equals(Object)} and {@link #hashCode()} * based on the contents of the bytes. */ - static final class Bytes { - private final byte[] array; - private final CodecBuffer buffer; + static final class Bytes implements Closeable { + private AbstractSlice slice; /** Cache the hash value. */ - private final int hash; + private int hash; Bytes(CodecBuffer buffer) { - this.array = null; - this.buffer = Objects.requireNonNull(buffer, "buffer == null"); - this.hash = buffer.asReadOnlyByteBuffer().hashCode(); + Objects.requireNonNull(buffer, "buffer == null"); + if (buffer.isDirect()) { + initWithDirectByteBuffer(buffer.asReadOnlyByteBuffer()); + } else { + initWithByteArray(buffer.getArray()); + } } Bytes(byte[] array) { - this.array = array; - this.buffer = null; + Objects.requireNonNull(array, "array == null"); + initWithByteArray(array); + } + + private void initWithByteArray(byte[] array) { + this.slice = new ManagedSlice(array); this.hash = ByteBuffer.wrap(array).hashCode(); } - ByteBuffer asReadOnlyByteBuffer() { - return buffer.asReadOnlyByteBuffer(); + private void initWithDirectByteBuffer(ByteBuffer byteBuffer) { + this.slice = new ManagedDirectSlice(byteBuffer); + this.hash = byteBuffer.hashCode(); } @Override @@ -113,11 +121,7 @@ public boolean equals(Object obj) { if (this.hash != that.hash) { return false; } - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); - return thisBuf.equals(thatBuf); + return slice.equals(that.slice); } @Override @@ -127,12 +131,21 @@ public int hashCode() { @Override public String toString() { - return array != null ? bytes2String(array) - : bytes2String(asReadOnlyByteBuffer()); + return slice.toString(); + } + + @Override + public void close() { + slice.close(); } } private abstract static class Op implements Closeable { + private final Bytes keyBytes; + + private Op(Bytes keyBytes) { + this.keyBytes = keyBytes; + } abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; @@ -148,6 +161,9 @@ int totalLength() { @Override public void close() { + if (keyBytes != null) { + keyBytes.close(); + } } } @@ -157,7 +173,8 @@ public void close() { private static final class DeleteOp extends Op { private final byte[] key; - private DeleteOp(byte[] key) { + private DeleteOp(byte[] key, Bytes keyBytes) { + super(Objects.requireNonNull(keyBytes, "keyBytes == null")); this.key = Objects.requireNonNull(key, "key == null"); } @@ -180,7 +197,8 @@ private final class PutOp extends Op { private final CodecBuffer value; private final AtomicBoolean closed = new AtomicBoolean(false); - private PutOp(CodecBuffer key, CodecBuffer value) { + private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { + super(keyBytes); this.key = key; this.value = value; } @@ -217,7 +235,8 @@ private static final class ByteArrayPutOp extends Op { private final byte[] key; private final byte[] value; - private ByteArrayPutOp(byte[] key, byte[] value) { + private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) { + super(keyBytes); this.key = Objects.requireNonNull(key, "key == null"); this.value = Objects.requireNonNull(value, "value == null"); } @@ -324,19 +343,19 @@ void put(CodecBuffer key, CodecBuffer value) { putCount++; // always release the key with the value Bytes keyBytes = new Bytes(key); - overwriteIfExists(keyBytes, new PutOp(key, value)); + overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes)); } void put(byte[] key, byte[] value) { putCount++; Bytes keyBytes = new Bytes(key); - overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value)); + overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes)); } void delete(byte[] key) { delCount++; Bytes keyBytes = new Bytes(key); - overwriteIfExists(keyBytes, new DeleteOp(key)); + overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes)); } String putString(int keySize, int valueSize) { diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java index f695f2864055..88eca2b02f6b 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.google.common.primitives.Shorts; @@ -35,6 +36,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; @@ -49,6 +51,7 @@ public final class TestCodec { static { CodecBuffer.enableLeakDetection(); + ManagedRocksObjectUtils.loadRocksDBLibrary(); } @Test @@ -295,14 +298,13 @@ public static void runTest(Codec codec, T original, static void runTestBytes(T object, Codec codec) throws IOException { final byte[] array = codec.toPersistedFormat(object); final Bytes fromArray = new Bytes(array); - - try (CodecBuffer buffer = codec.toCodecBuffer(object, - CodecBuffer.Allocator.HEAP)) { - final Bytes fromBuffer = new Bytes(buffer); - - assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); - assertEquals(fromArray, fromBuffer); - assertEquals(fromBuffer, fromArray); + for (CodecBuffer.Allocator allocator : ImmutableList.of(CodecBuffer.Allocator.HEAP, CodecBuffer.Allocator.DIRECT)) { + try (CodecBuffer buffer = codec.toCodecBuffer(object, allocator)) { + final Bytes fromBuffer = new Bytes(buffer); + assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); + assertEquals(fromArray, fromBuffer); + assertEquals(fromBuffer, fromArray); + } } } } diff --git a/pom.xml b/pom.xml index 88c60ed8a316..813226c07b05 100644 --- a/pom.xml +++ b/pom.xml @@ -1959,6 +1959,7 @@ org.rocksdb.ColumnFamilyHandle org.rocksdb.Env org.rocksdb.Statistics + org.rocksdb.AbstractSlice org.rocksdb.RocksDB.* From 159d35b80dba824888cb50493b6c1af8422a9da2 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Sun, 28 Dec 2025 18:28:28 -0500 Subject: [PATCH 2/2] HDDS-14238. Address review comments Change-Id: I913be571b87e318abc798c7396ca072de23b01e8 --- .../hdds/utils/db/RDBBatchOperation.java | 31 ++++++++----------- .../hadoop/hdds/utils/db/TestCodec.java | 19 ++++++------ 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index f786ebfa56d6..de181ae0c8d9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -17,6 +17,8 @@ package org.apache.hadoop.hdds.utils.db; +import static org.apache.ratis.util.Preconditions.assertTrue; + import com.google.common.base.Preconditions; import java.io.Closeable; import java.nio.ByteBuffer; @@ -82,34 +84,27 @@ private static String countSize2String(int count, long size) { * based on the contents of the bytes. */ static final class Bytes implements Closeable { - private AbstractSlice slice; + private final AbstractSlice slice; /** Cache the hash value. */ - private int hash; + private final int hash; + + static Bytes newBytes(CodecBuffer buffer) { + return buffer.isDirect() ? new Bytes(buffer.asReadOnlyByteBuffer()) : new Bytes(buffer.getArray()); + } - Bytes(CodecBuffer buffer) { + Bytes(ByteBuffer buffer) { Objects.requireNonNull(buffer, "buffer == null"); - if (buffer.isDirect()) { - initWithDirectByteBuffer(buffer.asReadOnlyByteBuffer()); - } else { - initWithByteArray(buffer.getArray()); - } + assertTrue(buffer.isDirect(), "buffer must be direct"); + this.slice = new ManagedDirectSlice(buffer); + this.hash = buffer.hashCode(); } Bytes(byte[] array) { Objects.requireNonNull(array, "array == null"); - initWithByteArray(array); - } - - private void initWithByteArray(byte[] array) { this.slice = new ManagedSlice(array); this.hash = ByteBuffer.wrap(array).hashCode(); } - private void initWithDirectByteBuffer(ByteBuffer byteBuffer) { - this.slice = new ManagedDirectSlice(byteBuffer); - this.hash = byteBuffer.hashCode(); - } - @Override public boolean equals(Object obj) { if (this == obj) { @@ -342,7 +337,7 @@ void overwriteIfExists(Bytes key, Op op) { void put(CodecBuffer key, CodecBuffer value) { putCount++; // always release the key with the value - Bytes keyBytes = new Bytes(key); + Bytes keyBytes = Bytes.newBytes(key); overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes)); } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java index 88eca2b02f6b..4ce46b97cf8d 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java @@ -19,6 +19,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.utils.db.CodecTestUtil.gc; +import static org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes.newBytes; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -26,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.google.common.primitives.Shorts; @@ -292,19 +292,18 @@ public void testUuidCodec() throws Exception { public static void runTest(Codec codec, T original, Integer serializedSize) throws Exception { CodecTestUtil.runTest(codec, original, serializedSize, null); - runTestBytes(original, codec); + runTestBytes(original, codec, CodecBuffer.Allocator.HEAP); + runTestBytes(original, codec, CodecBuffer.Allocator.DIRECT); } - static void runTestBytes(T object, Codec codec) throws IOException { + static void runTestBytes(T object, Codec codec, CodecBuffer.Allocator allocator) throws IOException { final byte[] array = codec.toPersistedFormat(object); final Bytes fromArray = new Bytes(array); - for (CodecBuffer.Allocator allocator : ImmutableList.of(CodecBuffer.Allocator.HEAP, CodecBuffer.Allocator.DIRECT)) { - try (CodecBuffer buffer = codec.toCodecBuffer(object, allocator)) { - final Bytes fromBuffer = new Bytes(buffer); - assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); - assertEquals(fromArray, fromBuffer); - assertEquals(fromBuffer, fromArray); - } + try (CodecBuffer buffer = codec.toCodecBuffer(object, allocator)) { + final Bytes fromBuffer = newBytes(buffer); + assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); + assertEquals(fromArray, fromBuffer); + assertEquals(fromBuffer, fromArray); } } }