diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 87d9ffc625c..3681959b4b4 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -20,12 +20,15 @@ import static org.apache.hadoop.hdds.StringUtils.bytes2String; import com.google.common.base.Preconditions; +import java.io.Closeable; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; @@ -50,8 +53,6 @@ public final class RDBBatchOperation implements BatchOperation { private final OpCache opCache = new OpCache(); - private enum Op { DELETE } - public static RDBBatchOperation newAtomicOperation() { return newAtomicOperation(new ManagedWriteBatch()); } @@ -97,10 +98,6 @@ static final class Bytes { this.hash = ByteBuffer.wrap(array).hashCode(); } - byte[] array() { - return array; - } - ByteBuffer asReadOnlyByteBuffer() { return buffer.asReadOnlyByteBuffer(); } @@ -135,6 +132,112 @@ public String toString() { } } + private abstract static class Op implements Closeable { + + abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; + + abstract int keyLen(); + + int valLen() { + return 0; + } + + int totalLength() { + return keyLen() + valLen(); + } + + @Override + public void close() { + } + } + + /** + * Delete operation to be applied to a {@link ColumnFamily} batch. + */ + private static final class DeleteOp extends Op { + private final byte[] key; + + private DeleteOp(byte[] key) { + this.key = Objects.requireNonNull(key, "key == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDelete(batch, this.key); + } + + @Override + public int keyLen() { + return key.length; + } + } + + /** + * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. + */ + private final class PutOp extends Op { + private final CodecBuffer key; + private final CodecBuffer value; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private PutOp(CodecBuffer key, CodecBuffer value) { + this.key = key; + this.value = value; + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); + } + + @Override + public int keyLen() { + return key.readableBytes(); + } + + @Override + public int valLen() { + return value.readableBytes(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + key.release(); + value.release(); + } + super.close(); + } + } + + /** + * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. + */ + private static final class ByteArrayPutOp extends Op { + private final byte[] key; + private final byte[] value; + + private ByteArrayPutOp(byte[] key, byte[] value) { + this.key = Objects.requireNonNull(key, "key == null"); + this.value = Objects.requireNonNull(value, "value == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchPut(batch, key, value); + } + + @Override + public int keyLen() { + return key.length; + } + + @Override + public int valLen() { + return value.length; + } + } + /** Cache and deduplicate db ops (put/delete). */ private class OpCache { /** A (family name -> {@link FamilyCache}) map. */ @@ -143,13 +246,18 @@ private class OpCache { /** A cache for a {@link ColumnFamily}. */ private class FamilyCache { private final ColumnFamily family; + /** - * A (dbKey -> dbValue) map, where the dbKey type is {@link Bytes} - * and the dbValue type is {@link Object}. - * When dbValue is a byte[]/{@link ByteBuffer}, it represents a put-op. - * Otherwise, it represents a delete-op (dbValue is {@link Op#DELETE}). + * A mapping of keys to operations for batch processing in the {@link FamilyCache}. + * The keys are represented as {@link Bytes} objects, encapsulating the byte array or buffer + * for efficient equality and hashing. The values are instances of {@link Op}, representing + * different types of operations that can be applied to a {@link ColumnFamily}. + * + * This field is intended to store pending batch updates before they are written to the database. + * It supports operations such as additions and deletions while maintaining the ability to overwrite + * existing entries when necessary. */ - private final Map ops = new HashMap<>(); + private final Map ops = new HashMap<>(); private boolean isCommit; private long batchSize; @@ -166,22 +274,9 @@ private class FamilyCache { void prepareBatchWrite() throws RocksDatabaseException { Preconditions.checkState(!isCommit, "%s is already committed.", this); isCommit = true; - for (Map.Entry op : ops.entrySet()) { - final Bytes key = op.getKey(); - final Object value = op.getValue(); - if (value instanceof byte[]) { - family.batchPut(writeBatch, key.array(), (byte[]) value); - } else if (value instanceof CodecBuffer) { - family.batchPut(writeBatch, key.asReadOnlyByteBuffer(), - ((CodecBuffer) value).asReadOnlyByteBuffer()); - } else if (value == Op.DELETE) { - family.batchDelete(writeBatch, key.array()); - } else { - throw new IllegalStateException("Unexpected value: " + value - + ", class=" + value.getClass().getSimpleName()); - } + for (Op op : ops.values()) { + op.apply(family, writeBatch); } - debug(this::summary); } @@ -194,11 +289,7 @@ void clear() { final boolean warn = !isCommit && batchSize > 0; String details = warn ? summary() : null; - for (Object value : ops.values()) { - if (value instanceof CodecBuffer) { - ((CodecBuffer) value).release(); // the key will also be released - } - } + IOUtils.close(LOG, ops.values()); ops.clear(); if (warn) { @@ -206,56 +297,46 @@ void clear() { } } - void putOrDelete(Bytes key, int keyLen, Object val, int valLen) { - Preconditions.checkState(!isCommit, "%s is already committed.", this); - batchSize += keyLen + valLen; - // remove previous first in order to call release() - final Object previous = ops.remove(key); + private void deleteIfExist(Bytes key) { + final Op previous = ops.remove(key); if (previous != null) { - final boolean isPut = previous != Op.DELETE; - final int preLen; - if (!isPut) { - preLen = 0; - } else if (previous instanceof CodecBuffer) { - final CodecBuffer previousValue = (CodecBuffer) previous; - preLen = previousValue.readableBytes(); - previousValue.release(); // key will also be released - } else if (previous instanceof byte[]) { - preLen = ((byte[]) previous).length; - } else { - throw new IllegalStateException("Unexpected previous: " + previous - + ", class=" + previous.getClass().getSimpleName()); - } - discardedSize += keyLen + preLen; + previous.close(); + discardedSize += previous.totalLength(); discardedCount++; - debug(() -> String.format("%s overwriting a previous %s", this, - isPut ? "put (value: " + byteSize2String(preLen) + ")" : "del")); + debug(() -> String.format("%s overwriting a previous %s[valLen => %s]", this, + previous.getClass().getName(), previous.valLen())); } - final Object overwritten = ops.put(key, val); + } + + void overwriteIfExists(Bytes key, Op op) { + Preconditions.checkState(!isCommit, "%s is already committed.", this); + deleteIfExist(key); + batchSize += op.totalLength(); + Op overwritten = ops.put(key, op); Preconditions.checkState(overwritten == null); debug(() -> String.format("%s %s, %s; key=%s", this, - valLen == 0 ? delString(keyLen) : putString(keyLen, valLen), + op instanceof DeleteOp ? delString(op.totalLength()) : putString(op.keyLen(), op.valLen()), batchSizeDiscardedString(), key)); } void put(CodecBuffer key, CodecBuffer value) { putCount++; - // always release the key with the value - value.getReleaseFuture().thenAccept(v -> key.release()); - putOrDelete(new Bytes(key), key.readableBytes(), - value, value.readableBytes()); + Bytes keyBytes = new Bytes(key); + overwriteIfExists(keyBytes, new PutOp(key, value)); } void put(byte[] key, byte[] value) { putCount++; - putOrDelete(new Bytes(key), key.length, value, value.length); + Bytes keyBytes = new Bytes(key); + overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value)); } void delete(byte[] key) { delCount++; - putOrDelete(new Bytes(key), key.length, Op.DELETE, 0); + Bytes keyBytes = new Bytes(key); + overwriteIfExists(keyBytes, new DeleteOp(key)); } String putString(int keySize, int valueSize) { @@ -264,14 +345,12 @@ String putString(int keySize, int valueSize) { } String delString(int keySize) { - return String.format("del(key: %s), #del=%s", - byteSize2String(keySize), delCount); + return String.format("del(key: %s), #del=%s", byteSize2String(keySize), delCount); } String batchSizeDiscardedString() { return String.format("batchSize=%s, discarded: %s", - byteSize2String(batchSize), - countSize2String(discardedCount, discardedSize)); + byteSize2String(batchSize), countSize2String(discardedCount, discardedSize)); } @Override @@ -303,7 +382,7 @@ UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { return this::clear; } - void clear() { + private void clear() { for (Map.Entry e : name2cache.entrySet()) { e.getValue().clear(); }