diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml
index ea7f08edd82..27654930c41 100644
--- a/hadoop-hdds/framework/pom.xml
+++ b/hadoop-hdds/framework/pom.xml
@@ -307,6 +307,12 @@
test-jar
test
+
+ org.apache.ozone
+ hdds-managed-rocksdb
+ test-jar
+ test
+
org.apache.ozone
hdds-test-utils
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
index 9d2944fab66..416cc8bb9c1 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
@@ -39,12 +39,12 @@
*/
public final class CodecBufferCodec implements Codec {
- private static final Codec DIRECT_INSTANCE = new CodecBufferCodec(true);
- private static final Codec NON_DIRECT_INSTANCE = new CodecBufferCodec(false);
+ private static final CodecBufferCodec DIRECT_INSTANCE = new CodecBufferCodec(true);
+ private static final CodecBufferCodec NON_DIRECT_INSTANCE = new CodecBufferCodec(false);
private final CodecBuffer.Allocator allocator;
- public static Codec get(boolean direct) {
+ public static CodecBufferCodec get(boolean direct) {
return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE;
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
index de181ae0c8d..314920c5445 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
@@ -49,6 +49,7 @@ public final class RDBBatchOperation implements BatchOperation {
static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class);
private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
+ private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true);
private final String name = "Batch-" + BATCH_COUNT.getAndIncrement();
@@ -136,15 +137,29 @@ public void close() {
}
private abstract static class Op implements Closeable {
+ private final CodecBuffer keyBuffer;
private final Bytes keyBytes;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private Op(CodecBuffer keyBuffer) {
+ this.keyBuffer = keyBuffer;
+ this.keyBytes = keyBuffer == null ? null : Bytes.newBytes(keyBuffer);
+ }
- private Op(Bytes keyBytes) {
- this.keyBytes = keyBytes;
+ CodecBuffer getKeyBuffer() {
+ return keyBuffer;
+ }
+
+ Bytes getKeyBytes() {
+ return keyBytes;
}
abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException;
- abstract int keyLen();
+ int keyLen() {
+ CodecBuffer key = getKeyBuffer();
+ return key == null ? 0 : key.readableBytes();
+ }
int valLen() {
return 0;
@@ -154,11 +169,17 @@ int totalLength() {
return keyLen() + valLen();
}
- @Override
- public void close() {
- if (keyBytes != null) {
- keyBytes.close();
+ boolean closeImpl() {
+ if (closed.compareAndSet(false, true)) {
+ IOUtils.close(LOG, keyBuffer, keyBytes);
+ return true;
}
+ return false;
+ }
+
+ @Override
+ public final void close() {
+ closeImpl();
}
}
@@ -166,21 +187,14 @@ public void close() {
* Delete operation to be applied to a {@link ColumnFamily} batch.
*/
private static final class DeleteOp extends Op {
- private final byte[] key;
- private DeleteOp(byte[] key, Bytes keyBytes) {
- super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
- this.key = Objects.requireNonNull(key, "key == null");
+ private DeleteOp(CodecBuffer key) {
+ super(Objects.requireNonNull(key, "keyBytes == null"));
}
@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
- family.batchDelete(batch, this.key);
- }
-
- @Override
- public int keyLen() {
- return key.length;
+ family.batchDelete(batch, this.getKeyBuffer().asReadOnlyByteBuffer());
}
}
@@ -188,24 +202,16 @@ public int keyLen() {
* Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api.
*/
private final class PutOp extends Op {
- private final CodecBuffer key;
private final CodecBuffer value;
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) {
- super(keyBytes);
- this.key = key;
- this.value = value;
+ private PutOp(CodecBuffer key, CodecBuffer value) {
+ super(Objects.requireNonNull(key, "key == null"));
+ this.value = Objects.requireNonNull(value, "value == null");
}
@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
- family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer());
- }
-
- @Override
- public int keyLen() {
- return key.readableBytes();
+ family.batchPut(batch, getKeyBuffer().asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer());
}
@Override
@@ -214,41 +220,12 @@ public int valLen() {
}
@Override
- public void close() {
- if (closed.compareAndSet(false, true)) {
- key.release();
- value.release();
+ boolean closeImpl() {
+ if (super.closeImpl()) {
+ IOUtils.close(LOG, value);
+ return true;
}
- super.close();
- }
- }
-
- /**
- * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api.
- */
- private static final class ByteArrayPutOp extends Op {
- private final byte[] key;
- private final byte[] value;
-
- private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) {
- super(keyBytes);
- this.key = Objects.requireNonNull(key, "key == null");
- this.value = Objects.requireNonNull(value, "value == null");
- }
-
- @Override
- public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
- family.batchPut(batch, key, value);
- }
-
- @Override
- public int keyLen() {
- return key.length;
- }
-
- @Override
- public int valLen() {
- return value.length;
+ return false;
}
}
@@ -322,8 +299,9 @@ private void deleteIfExist(Bytes key) {
}
}
- void overwriteIfExists(Bytes key, Op op) {
+ void overwriteIfExists(Op op) {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
+ Bytes key = op.getKeyBytes();
deleteIfExist(key);
batchSize += op.totalLength();
Op overwritten = ops.put(key, op);
@@ -336,21 +314,25 @@ void overwriteIfExists(Bytes key, Op op) {
void put(CodecBuffer key, CodecBuffer value) {
putCount++;
- // always release the key with the value
- Bytes keyBytes = Bytes.newBytes(key);
- overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes));
+ overwriteIfExists(new PutOp(key, value));
}
void put(byte[] key, byte[] value) {
putCount++;
- Bytes keyBytes = new Bytes(key);
- overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes));
+ CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key);
+ CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value);
+ overwriteIfExists(new PutOp(keyBuffer, valueBuffer));
}
void delete(byte[] key) {
delCount++;
- Bytes keyBytes = new Bytes(key);
- overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes));
+ CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key);
+ overwriteIfExists(new DeleteOp(keyBuffer));
+ }
+
+ void delete(CodecBuffer key) {
+ delCount++;
+ overwriteIfExists(new DeleteOp(key));
}
String putString(int keySize, int valueSize) {
@@ -388,6 +370,10 @@ void delete(ColumnFamily family, byte[] key) {
.delete(key);
}
+ void delete(ColumnFamily family, CodecBuffer key) {
+ name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key);
+ }
+
/** Prepare batch write for the entire cache. */
UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
for (Map.Entry e : name2cache.entrySet()) {
@@ -464,6 +450,10 @@ public void delete(ColumnFamily family, byte[] key) {
opCache.delete(family, key);
}
+ public void delete(ColumnFamily family, CodecBuffer key) {
+ opCache.delete(family, key);
+ }
+
public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) {
opCache.put(family, key, value);
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index f732735cbe3..2aef5daa3c9 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce
db.deleteRange(family, beginKey, endKey);
}
+ void deleteWithBatch(BatchOperation batch, CodecBuffer key) {
+ if (batch instanceof RDBBatchOperation) {
+ ((RDBBatchOperation) batch).delete(family, key);
+ } else {
+ throw new IllegalArgumentException("Unexpected batch class: " + batch.getClass().getSimpleName());
+ }
+ }
+
@Override
public void deleteWithBatch(BatchOperation batch, byte[] key) {
if (batch instanceof RDBBatchOperation) {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 659954a861b..5aff9351804 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -299,7 +299,7 @@ public ColumnFamilyHandle getHandle() {
return handle;
}
- public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
+ public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key)
throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire()) {
writeBatch.delete(getHandle(), key);
@@ -308,20 +308,6 @@ public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
}
}
- public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[] value)
- throws RocksDatabaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("batchPut array key {}", bytes2String(key));
- LOG.debug("batchPut array value {}", bytes2String(value));
- }
-
- try (UncheckedAutoCloseable ignored = acquire()) {
- writeBatch.put(getHandle(), key, value);
- } catch (RocksDBException e) {
- throw toRocksDatabaseException(this, "batchPut key " + bytes2String(key), e);
- }
- }
-
public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key,
ByteBuffer value) throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 8000d48c618..59e924529ce 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -377,7 +377,19 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException {
@Override
public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException {
- rawTable.deleteWithBatch(batch, encodeKey(key));
+ if (supportCodecBuffer) {
+ CodecBuffer keyBuffer = null;
+ try {
+ keyBuffer = keyCodec.toDirectCodecBuffer(key);
+ // The buffers will be released after commit.
+ rawTable.deleteWithBatch(batch, keyBuffer);
+ } catch (Exception e) {
+ IOUtils.closeQuietly(keyBuffer);
+ throw e;
+ }
+ } else {
+ rawTable.deleteWithBatch(batch, encodeKey(key));
+ }
}
@Override
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
new file mode 100644
index 00000000000..09ce31c72fa
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableSet;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
+import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch;
+import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.OpType;
+import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.Operation;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+/**
+ * Test class for verifying batch operations with delete ranges using the
+ * RDBBatchOperation and MockedConstruction of ManagedWriteBatch.
+ *
+ * This test class includes:
+ * - Mocking and tracking of operations including put, delete, and delete range
+ * within a batch operation.
+ * - Validation of committed operations using assertions on collected data.
+ * - Ensures that the batch operation interacts correctly with the
+ * RocksDatabase and ColumnFamilyHandle components.
+ *
+ * The test method includes:
+ * 1. Setup of mocked ColumnFamilyHandle and RocksDatabase.ColumnFamily.
+ * 2. Mocking of methods to track operations performed on*/
+public class TestRDBBatchOperation {
+
+ static {
+ ManagedRocksObjectUtils.loadRocksDBLibrary();
+ }
+
+ private static Operation getOperation(String key, String value, OpType opType) {
+ return new Operation(string2Bytes(key), value == null ? null : string2Bytes(value), opType);
+ }
+
+ @Test
+ public void testBatchOperation() throws RocksDatabaseException, CodecException, RocksDBException {
+ try (TrackingUtilManagedWriteBatch writeBatch = new TrackingUtilManagedWriteBatch();
+ RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation(writeBatch)) {
+ ColumnFamilyHandle columnFamilyHandle = Mockito.mock(ColumnFamilyHandle.class);
+ RocksDatabase.ColumnFamily columnFamily = Mockito.mock(RocksDatabase.ColumnFamily.class);
+ doAnswer((i) -> {
+ ((ManagedWriteBatch)i.getArgument(0))
+ .put(columnFamilyHandle, (ByteBuffer) i.getArgument(1), (ByteBuffer) i.getArgument(2));
+ return null;
+ }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), any(ByteBuffer.class), any(ByteBuffer.class));
+
+ doAnswer((i) -> {
+ ((ManagedWriteBatch)i.getArgument(0))
+ .delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1));
+ return null;
+ }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), any(ByteBuffer.class));
+
+ when(columnFamily.getHandle()).thenReturn(columnFamilyHandle);
+ when(columnFamilyHandle.getName()).thenReturn(string2Bytes("test"));
+ when(columnFamily.getName()).thenReturn("test");
+ Codec codec = StringCodec.get();
+ // OP1: This should be skipped in favor of OP9.
+ batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value01"));
+ // OP2
+ batchOperation.put(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("value02"));
+ // OP3: This should be skipped in favor of OP4.
+ batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"), codec.toDirectCodecBuffer("value03"));
+ // OP4
+ batchOperation.put(columnFamily, codec.toPersistedFormat("key03"), codec.toPersistedFormat("value04"));
+ // OP5
+ batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05"));
+ // OP6
+ batchOperation.delete(columnFamily, codec.toPersistedFormat("key10"));
+ // OP7
+ batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), codec.toDirectCodecBuffer("value04"));
+ // OP8
+ batchOperation.put(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("value05"));
+ //OP9
+ batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value011"));
+
+
+ RocksDatabase db = Mockito.mock(RocksDatabase.class);
+ doNothing().when(db).batchWrite(any());
+ batchOperation.commit(db);
+ Set expectedOps = ImmutableSet.of(
+ getOperation("key01", "value011", OpType.PUT_DIRECT),
+ getOperation("key02", "value02", OpType.PUT_DIRECT),
+ getOperation("key03", "value04", OpType.PUT_DIRECT),
+ getOperation("key05", null, OpType.DELETE_DIRECT),
+ getOperation("key10", null, OpType.DELETE_DIRECT),
+ getOperation("key04", "value04", OpType.PUT_DIRECT),
+ getOperation("key06", "value05", OpType.PUT_DIRECT));
+ assertEquals(Collections.singleton("test"), writeBatch.getOperations().keySet());
+ assertEquals(expectedOps, new HashSet<>(writeBatch.getOperations().get("test")));
+ }
+ }
+}
diff --git a/hadoop-hdds/managed-rocksdb/pom.xml b/hadoop-hdds/managed-rocksdb/pom.xml
index 5e6976500f9..1a1fb3a82be 100644
--- a/hadoop-hdds/managed-rocksdb/pom.xml
+++ b/hadoop-hdds/managed-rocksdb/pom.xml
@@ -25,11 +25,6 @@
Apache Ozone HDDS Managed RocksDB
Apache Ozone Managed RocksDB library
-
-
- true
-
-
com.google.guava
@@ -63,6 +58,11 @@
org.slf4j
slf4j-api
+
+ org.apache.commons
+ commons-lang3
+ test
+
@@ -74,6 +74,18 @@
none
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ test-jar
+
+ test-jar
+
+
+
+
diff --git a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java
new file mode 100644
index 00000000000..eca3977d1ef
--- /dev/null
+++ b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db.managed;
+
+import static org.apache.hadoop.hdds.StringUtils.bytes2String;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+/**
+ * The TrackingUtilManagedWriteBatch class extends ManagedWriteBatch to provide functionality
+ * for tracking operations in a managed write batch context. Operations such as put, delete,
+ * merge, and delete range are managed and tracked, along with their corresponding operation types.
+ *
+ * This class supports direct and indirect operation types, delineated in the OpType enumeration.
+ * Direct operations are created using ByteBuffers while indirect operations are created using
+ * byte arrays.
+ */
+public class TrackingUtilManagedWriteBatch extends ManagedWriteBatch {
+
+ private final Map> operations = new HashMap<>();
+
+ /**
+ * The OpType enumeration defines the different types of operations performed in a batch.
+ */
+ public enum OpType {
+ PUT_DIRECT,
+ DELETE_DIRECT,
+ MERGE_DIRECT,
+ DELETE_RANGE_INDIRECT,
+ PUT_INDIRECT,
+ DELETE_INDIRECT,
+ MERGE_INDIRECT,
+ }
+
+ /**
+ * The Operation class represents an individual operation to be performed in the context of
+ * a batch operation, such as a database write, delete, or merge. Each operation is characterized
+ * by a key, value, and an operation type (OpType).
+ *
+ * Operations can be of different types, as defined in the OpType enumeration, which include
+ * actions such as put, delete, merge, and delete range, either direct or indirect.
+ */
+ public static class Operation {
+ private final byte[] key;
+ private final byte[] value;
+ private final OpType opType;
+
+ public Operation(byte[] key, byte[] value, OpType opType) {
+ this.key = Arrays.copyOf(key, key.length);
+ this.value = value == null ? null : Arrays.copyOf(value, value.length);
+ this.opType = opType;
+ }
+
+ public Operation(byte[] key, OpType opType) {
+ this(key, null, opType);
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (!(o instanceof Operation)) {
+ return false;
+ }
+
+ Operation operation = (Operation) o;
+ return Arrays.equals(key, operation.key) && Arrays.equals(value, operation.value) &&
+ opType == operation.opType;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Arrays.hashCode(key) + Arrays.hashCode(value) + opType.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "Operation{" +
+ "key=" + bytes2String(key) +
+ ", value=" + (value == null ? null : bytes2String(value)) +
+ ", opType=" + opType +
+ '}';
+ }
+ }
+
+ public Map> getOperations() {
+ return operations;
+ }
+
+ public TrackingUtilManagedWriteBatch() {
+ super();
+ }
+
+ private byte[] convert(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return bytes;
+ }
+
+ @Override
+ public void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>())
+ .add(new Operation(key, OpType.DELETE_INDIRECT));
+ }
+
+ @Override
+ public void delete(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>())
+ .add(new Operation(convert(key), OpType.DELETE_DIRECT));
+ }
+
+ @Override
+ public void delete(byte[] key) throws RocksDBException {
+ operations.computeIfAbsent("", k -> new ArrayList<>()).add(new Operation(key, OpType.DELETE_INDIRECT));
+ }
+
+ @Override
+ public void delete(ByteBuffer key) throws RocksDBException {
+ operations.computeIfAbsent("", k -> new ArrayList<>())
+ .add(new Operation(convert(key), OpType.DELETE_DIRECT));
+ }
+
+ @Override
+ public void deleteRange(byte[] beginKey, byte[] endKey) {
+ operations.computeIfAbsent("", k -> new ArrayList<>())
+ .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT));
+ }
+
+ @Override
+ public void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey)
+ throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>())
+ .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT));
+ }
+
+ @Override
+ public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>())
+ .add(new Operation(key, value, OpType.MERGE_INDIRECT));
+ }
+
+ @Override
+ public void merge(byte[] key, byte[] value) {
+ operations.computeIfAbsent("", k -> new ArrayList<>())
+ .add(new Operation(key, value, OpType.MERGE_INDIRECT));
+ }
+
+ @Override
+ public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>())
+ .add(new Operation(key, value, OpType.PUT_INDIRECT));
+ }
+
+ @Override
+ public void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>())
+ .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT));
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) throws RocksDBException {
+ operations.computeIfAbsent("", k -> new ArrayList<>()).add(new Operation(key, value, OpType.PUT_INDIRECT));
+ }
+
+ @Override
+ public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException {
+ operations.computeIfAbsent("", k -> new ArrayList<>())
+ .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT));
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 813226c07b0..32bea5743c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1069,6 +1069,12 @@
hdds-managed-rocksdb
${hdds.version}
+
+ org.apache.ozone
+ hdds-managed-rocksdb
+ ${hdds.version}
+ test-jar
+
org.apache.ozone
hdds-rocks-native