diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 3681959b4b4..de181ae0c8d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -17,7 +17,7 @@ package org.apache.hadoop.hdds.utils.db; -import static org.apache.hadoop.hdds.StringUtils.bytes2String; +import static org.apache.ratis.util.Preconditions.assertTrue; import com.google.common.base.Preconditions; import java.io.Closeable; @@ -30,10 +30,13 @@ import java.util.function.Supplier; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; +import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; import org.apache.ratis.util.TraditionalBinaryPrefix; import org.apache.ratis.util.UncheckedAutoCloseable; +import org.rocksdb.AbstractSlice; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,26 +83,26 @@ private static String countSize2String(int count, long size) { * To implement {@link #equals(Object)} and {@link #hashCode()} * based on the contents of the bytes. */ - static final class Bytes { - private final byte[] array; - private final CodecBuffer buffer; + static final class Bytes implements Closeable { + private final AbstractSlice slice; /** Cache the hash value. */ private final int hash; - Bytes(CodecBuffer buffer) { - this.array = null; - this.buffer = Objects.requireNonNull(buffer, "buffer == null"); - this.hash = buffer.asReadOnlyByteBuffer().hashCode(); + static Bytes newBytes(CodecBuffer buffer) { + return buffer.isDirect() ? new Bytes(buffer.asReadOnlyByteBuffer()) : new Bytes(buffer.getArray()); } - Bytes(byte[] array) { - this.array = array; - this.buffer = null; - this.hash = ByteBuffer.wrap(array).hashCode(); + Bytes(ByteBuffer buffer) { + Objects.requireNonNull(buffer, "buffer == null"); + assertTrue(buffer.isDirect(), "buffer must be direct"); + this.slice = new ManagedDirectSlice(buffer); + this.hash = buffer.hashCode(); } - ByteBuffer asReadOnlyByteBuffer() { - return buffer.asReadOnlyByteBuffer(); + Bytes(byte[] array) { + Objects.requireNonNull(array, "array == null"); + this.slice = new ManagedSlice(array); + this.hash = ByteBuffer.wrap(array).hashCode(); } @Override @@ -113,11 +116,7 @@ public boolean equals(Object obj) { if (this.hash != that.hash) { return false; } - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); - return thisBuf.equals(thatBuf); + return slice.equals(that.slice); } @Override @@ -127,12 +126,21 @@ public int hashCode() { @Override public String toString() { - return array != null ? bytes2String(array) - : bytes2String(asReadOnlyByteBuffer()); + return slice.toString(); + } + + @Override + public void close() { + slice.close(); } } private abstract static class Op implements Closeable { + private final Bytes keyBytes; + + private Op(Bytes keyBytes) { + this.keyBytes = keyBytes; + } abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; @@ -148,6 +156,9 @@ int totalLength() { @Override public void close() { + if (keyBytes != null) { + keyBytes.close(); + } } } @@ -157,7 +168,8 @@ public void close() { private static final class DeleteOp extends Op { private final byte[] key; - private DeleteOp(byte[] key) { + private DeleteOp(byte[] key, Bytes keyBytes) { + super(Objects.requireNonNull(keyBytes, "keyBytes == null")); this.key = Objects.requireNonNull(key, "key == null"); } @@ -180,7 +192,8 @@ private final class PutOp extends Op { private final CodecBuffer value; private final AtomicBoolean closed = new AtomicBoolean(false); - private PutOp(CodecBuffer key, CodecBuffer value) { + private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { + super(keyBytes); this.key = key; this.value = value; } @@ -217,7 +230,8 @@ private static final class ByteArrayPutOp extends Op { private final byte[] key; private final byte[] value; - private ByteArrayPutOp(byte[] key, byte[] value) { + private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) { + super(keyBytes); this.key = Objects.requireNonNull(key, "key == null"); this.value = Objects.requireNonNull(value, "value == null"); } @@ -323,20 +337,20 @@ void overwriteIfExists(Bytes key, Op op) { void put(CodecBuffer key, CodecBuffer value) { putCount++; // always release the key with the value - Bytes keyBytes = new Bytes(key); - overwriteIfExists(keyBytes, new PutOp(key, value)); + Bytes keyBytes = Bytes.newBytes(key); + overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes)); } void put(byte[] key, byte[] value) { putCount++; Bytes keyBytes = new Bytes(key); - overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value)); + overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes)); } void delete(byte[] key) { delCount++; Bytes keyBytes = new Bytes(key); - overwriteIfExists(keyBytes, new DeleteOp(key)); + overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes)); } String putString(int keySize, int valueSize) { diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java index f695f286405..4ce46b97cf8 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java @@ -19,6 +19,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.utils.db.CodecTestUtil.gc; +import static org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes.newBytes; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -35,6 +36,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; @@ -49,6 +51,7 @@ public final class TestCodec { static { CodecBuffer.enableLeakDetection(); + ManagedRocksObjectUtils.loadRocksDBLibrary(); } @Test @@ -289,17 +292,15 @@ public void testUuidCodec() throws Exception { public static void runTest(Codec codec, T original, Integer serializedSize) throws Exception { CodecTestUtil.runTest(codec, original, serializedSize, null); - runTestBytes(original, codec); + runTestBytes(original, codec, CodecBuffer.Allocator.HEAP); + runTestBytes(original, codec, CodecBuffer.Allocator.DIRECT); } - static void runTestBytes(T object, Codec codec) throws IOException { + static void runTestBytes(T object, Codec codec, CodecBuffer.Allocator allocator) throws IOException { final byte[] array = codec.toPersistedFormat(object); final Bytes fromArray = new Bytes(array); - - try (CodecBuffer buffer = codec.toCodecBuffer(object, - CodecBuffer.Allocator.HEAP)) { - final Bytes fromBuffer = new Bytes(buffer); - + try (CodecBuffer buffer = codec.toCodecBuffer(object, allocator)) { + final Bytes fromBuffer = newBytes(buffer); assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); assertEquals(fromArray, fromBuffer); assertEquals(fromBuffer, fromArray); diff --git a/pom.xml b/pom.xml index 88c60ed8a31..813226c07b0 100644 --- a/pom.xml +++ b/pom.xml @@ -1959,6 +1959,7 @@ org.rocksdb.ColumnFamilyHandle org.rocksdb.Env org.rocksdb.Statistics + org.rocksdb.AbstractSlice org.rocksdb.RocksDB.*