Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.hadoop.hdds.utils.db;

import static org.apache.hadoop.hdds.StringUtils.bytes2String;
import static org.apache.ratis.util.Preconditions.assertTrue;

import com.google.common.base.Preconditions;
import java.io.Closeable;
Expand All @@ -30,10 +30,13 @@
import java.util.function.Supplier;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
import org.apache.ratis.util.TraditionalBinaryPrefix;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.rocksdb.AbstractSlice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,26 +83,26 @@ private static String countSize2String(int count, long size) {
* To implement {@link #equals(Object)} and {@link #hashCode()}
* based on the contents of the bytes.
*/
static final class Bytes {
private final byte[] array;
private final CodecBuffer buffer;
static final class Bytes implements Closeable {
private final AbstractSlice<?> slice;
/** Cache the hash value. */
private final int hash;

Bytes(CodecBuffer buffer) {
this.array = null;
this.buffer = Objects.requireNonNull(buffer, "buffer == null");
this.hash = buffer.asReadOnlyByteBuffer().hashCode();
static Bytes newBytes(CodecBuffer buffer) {
return buffer.isDirect() ? new Bytes(buffer.asReadOnlyByteBuffer()) : new Bytes(buffer.getArray());
}

Bytes(byte[] array) {
this.array = array;
this.buffer = null;
this.hash = ByteBuffer.wrap(array).hashCode();
Bytes(ByteBuffer buffer) {
Objects.requireNonNull(buffer, "buffer == null");
assertTrue(buffer.isDirect(), "buffer must be direct");
this.slice = new ManagedDirectSlice(buffer);
this.hash = buffer.hashCode();
}

ByteBuffer asReadOnlyByteBuffer() {
return buffer.asReadOnlyByteBuffer();
Bytes(byte[] array) {
Objects.requireNonNull(array, "array == null");
this.slice = new ManagedSlice(array);
this.hash = ByteBuffer.wrap(array).hashCode();
}

@Override
Expand All @@ -113,11 +116,7 @@ public boolean equals(Object obj) {
if (this.hash != that.hash) {
return false;
}
final ByteBuffer thisBuf = this.array != null ?
ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer();
final ByteBuffer thatBuf = that.array != null ?
ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer();
return thisBuf.equals(thatBuf);
return slice.equals(that.slice);
}

@Override
Expand All @@ -127,12 +126,21 @@ public int hashCode() {

@Override
public String toString() {
return array != null ? bytes2String(array)
: bytes2String(asReadOnlyByteBuffer());
return slice.toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you checked what will it return? Could you show an example output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static void main(String[] args) throws CodecException {
    String value = "test";
    Bytes bytes = new Bytes(value.getBytes(UTF_8));
    System.out.println("To String Heap Value: " + bytes);
    Bytes dbytes = newBytes(CodecBufferCodec.get(true).fromPersistedFormat(value.getBytes(UTF_8)));
    System.out.println("To String Direct Value: " + dbytes);
  }
2025-12-28 18:25:34,786 [main] INFO  db.CodecBuffer (CodecBuffer.java:set(85)) - Successfully set constructor to LeakDetector::newCodecBuffer: org.apache.hadoop.hdds.utils.db.CodecBuffer$$Lambda$4/1694556038@7085bdee
To String Heap Value: test
To String Direct Value: test

Copy link
Contributor

@szetszwo szetszwo Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To String Heap Value: test
To String Direct Value: test

Isn't the data supposed to be binary? This seems not working.

Copy link
Contributor Author

@swamirishi swamirishi Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say it is not working. Maybe you are not loading the rocksdb libs before this. I forgot there was static codeblock loading the rocksdb lib.
This should work:

public static void main(String[] args) throws CodecException {
    ManagedRocksObjectUtils.loadRocksDBLibrary();
    String value = "test";
    Bytes bytes = new Bytes(value.getBytes(UTF_8));
    System.out.println("To String Heap Value: " + bytes);
    Bytes dbytes = newBytes(CodecBufferCodec.get(true).fromPersistedFormat(value.getBytes(UTF_8)));
    System.out.println("To String Direct Value: " + dbytes);
  }

Copy link
Contributor Author

@swamirishi swamirishi Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output before the change and after will be the same since StringUtils.byte2String() also decodes the byte value to String value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want the binary value then we have to print the hex value.
slice.toString(true) would do that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the above to keep the behaviour same as before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right -- it is the same as before.

}

@Override
public void close() {
slice.close();
}
}

private abstract static class Op implements Closeable {
private final Bytes keyBytes;

private Op(Bytes keyBytes) {
this.keyBytes = keyBytes;
}

abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException;

Expand All @@ -148,6 +156,9 @@ int totalLength() {

@Override
public void close() {
if (keyBytes != null) {
keyBytes.close();
}
}
}

Expand All @@ -157,7 +168,8 @@ public void close() {
private static final class DeleteOp extends Op {
private final byte[] key;

private DeleteOp(byte[] key) {
private DeleteOp(byte[] key, Bytes keyBytes) {
super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
this.key = Objects.requireNonNull(key, "key == null");
}

Expand All @@ -180,7 +192,8 @@ private final class PutOp extends Op {
private final CodecBuffer value;
private final AtomicBoolean closed = new AtomicBoolean(false);

private PutOp(CodecBuffer key, CodecBuffer value) {
private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) {
super(keyBytes);
this.key = key;
this.value = value;
}
Expand Down Expand Up @@ -217,7 +230,8 @@ private static final class ByteArrayPutOp extends Op {
private final byte[] key;
private final byte[] value;

private ByteArrayPutOp(byte[] key, byte[] value) {
private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) {
super(keyBytes);
this.key = Objects.requireNonNull(key, "key == null");
this.value = Objects.requireNonNull(value, "value == null");
}
Expand Down Expand Up @@ -323,20 +337,20 @@ void overwriteIfExists(Bytes key, Op op) {
void put(CodecBuffer key, CodecBuffer value) {
putCount++;
// always release the key with the value
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new PutOp(key, value));
Bytes keyBytes = Bytes.newBytes(key);
overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes));
}

void put(byte[] key, byte[] value) {
putCount++;
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value));
overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes));
}

void delete(byte[] key) {
delCount++;
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new DeleteOp(key));
overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes));
}

String putString(int keySize, int valueSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.utils.db.CodecTestUtil.gc;
import static org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes.newBytes;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.slf4j.Logger;
Expand All @@ -49,6 +51,7 @@ public final class TestCodec {

static {
CodecBuffer.enableLeakDetection();
ManagedRocksObjectUtils.loadRocksDBLibrary();
}

@Test
Expand Down Expand Up @@ -289,17 +292,15 @@ public void testUuidCodec() throws Exception {
public static <T> void runTest(Codec<T> codec, T original,
Integer serializedSize) throws Exception {
CodecTestUtil.runTest(codec, original, serializedSize, null);
runTestBytes(original, codec);
runTestBytes(original, codec, CodecBuffer.Allocator.HEAP);
runTestBytes(original, codec, CodecBuffer.Allocator.DIRECT);
}

static <T> void runTestBytes(T object, Codec<T> codec) throws IOException {
static <T> void runTestBytes(T object, Codec<T> codec, CodecBuffer.Allocator allocator) throws IOException {
final byte[] array = codec.toPersistedFormat(object);
final Bytes fromArray = new Bytes(array);

try (CodecBuffer buffer = codec.toCodecBuffer(object,
CodecBuffer.Allocator.HEAP)) {
final Bytes fromBuffer = new Bytes(buffer);

try (CodecBuffer buffer = codec.toCodecBuffer(object, allocator)) {
final Bytes fromBuffer = newBytes(buffer);
assertEquals(fromArray.hashCode(), fromBuffer.hashCode());
assertEquals(fromArray, fromBuffer);
assertEquals(fromBuffer, fromArray);
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1959,6 +1959,7 @@
<allowedImport>org.rocksdb.ColumnFamilyHandle</allowedImport>
<allowedImport>org.rocksdb.Env</allowedImport>
<allowedImport>org.rocksdb.Statistics</allowedImport>
<allowedImport>org.rocksdb.AbstractSlice</allowedImport>
<!-- Allow RocksDB constants and static methods to be used. -->
<allowedImport>org.rocksdb.RocksDB.*</allowedImport>
</allowedImports>
Expand Down