Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import static org.apache.hadoop.hdds.StringUtils.bytes2String;

import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
Expand All @@ -50,8 +53,6 @@ public final class RDBBatchOperation implements BatchOperation {

private final OpCache opCache = new OpCache();

private enum Op { DELETE }

public static RDBBatchOperation newAtomicOperation() {
return newAtomicOperation(new ManagedWriteBatch());
}
Expand Down Expand Up @@ -97,10 +98,6 @@ static final class Bytes {
this.hash = ByteBuffer.wrap(array).hashCode();
}

byte[] array() {
return array;
}

ByteBuffer asReadOnlyByteBuffer() {
return buffer.asReadOnlyByteBuffer();
}
Expand Down Expand Up @@ -135,6 +132,112 @@ public String toString() {
}
}

private abstract static class Op implements Closeable {

abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException;

abstract int keyLen();

int valLen() {
return 0;
}

int totalLength() {
return keyLen() + valLen();
}

@Override
public void close() {
}
}

/**
* Delete operation to be applied to a {@link ColumnFamily} batch.
*/
private static final class DeleteOp extends Op {
private final byte[] key;

private DeleteOp(byte[] key) {
this.key = Objects.requireNonNull(key, "key == null");
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchDelete(batch, this.key);
}

@Override
public int keyLen() {
return key.length;
}
}

/**
* Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api.
*/
private final class PutOp extends Op {
private final CodecBuffer key;
private final CodecBuffer value;
private final AtomicBoolean closed = new AtomicBoolean(false);

private PutOp(CodecBuffer key, CodecBuffer value) {
this.key = key;
this.value = value;
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer());
}

@Override
public int keyLen() {
return key.readableBytes();
}

@Override
public int valLen() {
return value.readableBytes();
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
key.release();
value.release();
}
super.close();
}
}

/**
* Put operation to be applied to a {@link ColumnFamily} batch using the byte array api.
*/
private static final class ByteArrayPutOp extends Op {
private final byte[] key;
private final byte[] value;

private ByteArrayPutOp(byte[] key, byte[] value) {
this.key = Objects.requireNonNull(key, "key == null");
this.value = Objects.requireNonNull(value, "value == null");
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchPut(batch, key, value);
}

@Override
public int keyLen() {
return key.length;
}

@Override
public int valLen() {
return value.length;
}
}

/** Cache and deduplicate db ops (put/delete). */
private class OpCache {
/** A (family name -> {@link FamilyCache}) map. */
Expand All @@ -143,13 +246,18 @@ private class OpCache {
/** A cache for a {@link ColumnFamily}. */
private class FamilyCache {
private final ColumnFamily family;

/**
* A (dbKey -> dbValue) map, where the dbKey type is {@link Bytes}
* and the dbValue type is {@link Object}.
* When dbValue is a byte[]/{@link ByteBuffer}, it represents a put-op.
* Otherwise, it represents a delete-op (dbValue is {@link Op#DELETE}).
* A mapping of keys to operations for batch processing in the {@link FamilyCache}.
* The keys are represented as {@link Bytes} objects, encapsulating the byte array or buffer
* for efficient equality and hashing. The values are instances of {@link Op}, representing
* different types of operations that can be applied to a {@link ColumnFamily}.
*
* This field is intended to store pending batch updates before they are written to the database.
* It supports operations such as additions and deletions while maintaining the ability to overwrite
* existing entries when necessary.
*/
private final Map<Bytes, Object> ops = new HashMap<>();
private final Map<Bytes, Op> ops = new HashMap<>();
private boolean isCommit;

private long batchSize;
Expand All @@ -166,22 +274,9 @@ private class FamilyCache {
void prepareBatchWrite() throws RocksDatabaseException {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
isCommit = true;
for (Map.Entry<Bytes, Object> op : ops.entrySet()) {
final Bytes key = op.getKey();
final Object value = op.getValue();
if (value instanceof byte[]) {
family.batchPut(writeBatch, key.array(), (byte[]) value);
} else if (value instanceof CodecBuffer) {
family.batchPut(writeBatch, key.asReadOnlyByteBuffer(),
((CodecBuffer) value).asReadOnlyByteBuffer());
} else if (value == Op.DELETE) {
family.batchDelete(writeBatch, key.array());
} else {
throw new IllegalStateException("Unexpected value: " + value
+ ", class=" + value.getClass().getSimpleName());
}
for (Op op : ops.values()) {
op.apply(family, writeBatch);
}

debug(this::summary);
}

Expand All @@ -194,68 +289,54 @@ void clear() {
final boolean warn = !isCommit && batchSize > 0;
String details = warn ? summary() : null;

for (Object value : ops.values()) {
if (value instanceof CodecBuffer) {
((CodecBuffer) value).release(); // the key will also be released
}
}
IOUtils.close(LOG, ops.values());
ops.clear();

if (warn) {
LOG.warn("discarding changes {}", details);
}
}

void putOrDelete(Bytes key, int keyLen, Object val, int valLen) {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
batchSize += keyLen + valLen;
// remove previous first in order to call release()
final Object previous = ops.remove(key);
private void deleteIfExist(Bytes key) {
final Op previous = ops.remove(key);
if (previous != null) {
final boolean isPut = previous != Op.DELETE;
final int preLen;
if (!isPut) {
preLen = 0;
} else if (previous instanceof CodecBuffer) {
final CodecBuffer previousValue = (CodecBuffer) previous;
preLen = previousValue.readableBytes();
previousValue.release(); // key will also be released
} else if (previous instanceof byte[]) {
preLen = ((byte[]) previous).length;
} else {
throw new IllegalStateException("Unexpected previous: " + previous
+ ", class=" + previous.getClass().getSimpleName());
}
discardedSize += keyLen + preLen;
previous.close();
discardedSize += previous.totalLength();
discardedCount++;
debug(() -> String.format("%s overwriting a previous %s", this,
isPut ? "put (value: " + byteSize2String(preLen) + ")" : "del"));
debug(() -> String.format("%s overwriting a previous %s[valLen => %s]", this,
previous.getClass().getName(), previous.valLen()));
}
final Object overwritten = ops.put(key, val);
}

void overwriteIfExists(Bytes key, Op op) {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
deleteIfExist(key);
batchSize += op.totalLength();
Op overwritten = ops.put(key, op);
Preconditions.checkState(overwritten == null);

debug(() -> String.format("%s %s, %s; key=%s", this,
valLen == 0 ? delString(keyLen) : putString(keyLen, valLen),
op instanceof DeleteOp ? delString(op.totalLength()) : putString(op.keyLen(), op.valLen()),
batchSizeDiscardedString(), key));
}

void put(CodecBuffer key, CodecBuffer value) {
putCount++;

// always release the key with the value
value.getReleaseFuture().thenAccept(v -> key.release());
putOrDelete(new Bytes(key), key.readableBytes(),
value, value.readableBytes());
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new PutOp(key, value));
}

void put(byte[] key, byte[] value) {
putCount++;
putOrDelete(new Bytes(key), key.length, value, value.length);
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value));
}

void delete(byte[] key) {
delCount++;
putOrDelete(new Bytes(key), key.length, Op.DELETE, 0);
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new DeleteOp(key));
}

String putString(int keySize, int valueSize) {
Expand All @@ -264,14 +345,12 @@ String putString(int keySize, int valueSize) {
}

String delString(int keySize) {
return String.format("del(key: %s), #del=%s",
byteSize2String(keySize), delCount);
return String.format("del(key: %s), #del=%s", byteSize2String(keySize), delCount);
}

String batchSizeDiscardedString() {
return String.format("batchSize=%s, discarded: %s",
byteSize2String(batchSize),
countSize2String(discardedCount, discardedSize));
byteSize2String(batchSize), countSize2String(discardedCount, discardedSize));
}

@Override
Expand Down Expand Up @@ -303,7 +382,7 @@ UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
return this::clear;
}

void clear() {
private void clear() {
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
e.getValue().clear();
}
Expand Down