Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hadoop-hdds/framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-managed-rocksdb</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-test-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
*/
public final class CodecBufferCodec implements Codec<CodecBuffer> {

private static final Codec<CodecBuffer> DIRECT_INSTANCE = new CodecBufferCodec(true);
private static final Codec<CodecBuffer> NON_DIRECT_INSTANCE = new CodecBufferCodec(false);
private static final CodecBufferCodec DIRECT_INSTANCE = new CodecBufferCodec(true);
private static final CodecBufferCodec NON_DIRECT_INSTANCE = new CodecBufferCodec(false);

private final CodecBuffer.Allocator allocator;

public static Codec<CodecBuffer> get(boolean direct) {
public static CodecBufferCodec get(boolean direct) {
return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class RDBBatchOperation implements BatchOperation {
static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class);

private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true);

private final String name = "Batch-" + BATCH_COUNT.getAndIncrement();

Expand Down Expand Up @@ -136,15 +137,29 @@ public void close() {
}

private abstract static class Op implements Closeable {
private final CodecBuffer keyBuffer;
private final Bytes keyBytes;
private final AtomicBoolean closed = new AtomicBoolean(false);

private Op(CodecBuffer keyBuffer) {
this.keyBuffer = keyBuffer;
this.keyBytes = keyBuffer == null ? null : Bytes.newBytes(keyBuffer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am allowing null so that deleteRange can do things independently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still keeping the Bytes.newBytes() since we still need to support byte array operations for delete range

}

private Op(Bytes keyBytes) {
this.keyBytes = keyBytes;
CodecBuffer getKeyBuffer() {
return keyBuffer;
}

Bytes getKeyBytes() {
return keyBytes;
}

abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException;

abstract int keyLen();
int keyLen() {
CodecBuffer key = getKeyBuffer();
return key == null ? 0 : key.readableBytes();
}

int valLen() {
return 0;
Expand All @@ -154,58 +169,49 @@ int totalLength() {
return keyLen() + valLen();
}

@Override
public void close() {
if (keyBytes != null) {
keyBytes.close();
boolean closeImpl() {
if (closed.compareAndSet(false, true)) {
IOUtils.close(LOG, keyBuffer, keyBytes);
return true;
}
return false;
}

@Override
public final void close() {
closeImpl();
}
}

/**
* Delete operation to be applied to a {@link ColumnFamily} batch.
*/
private static final class DeleteOp extends Op {
private final byte[] key;

private DeleteOp(byte[] key, Bytes keyBytes) {
super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
this.key = Objects.requireNonNull(key, "key == null");
private DeleteOp(CodecBuffer key) {
super(Objects.requireNonNull(key, "keyBytes == null"));
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchDelete(batch, this.key);
}

@Override
public int keyLen() {
return key.length;
family.batchDelete(batch, this.getKeyBuffer().asReadOnlyByteBuffer());
}
}

/**
* Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api.
*/
private final class PutOp extends Op {
private final CodecBuffer key;
private final CodecBuffer value;
private final AtomicBoolean closed = new AtomicBoolean(false);

private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) {
super(keyBytes);
this.key = key;
this.value = value;
private PutOp(CodecBuffer key, CodecBuffer value) {
super(Objects.requireNonNull(key, "key == null"));
this.value = Objects.requireNonNull(value, "value == null");
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer());
}

@Override
public int keyLen() {
return key.readableBytes();
family.batchPut(batch, getKeyBuffer().asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer());
}

@Override
Expand All @@ -214,41 +220,12 @@ public int valLen() {
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
key.release();
value.release();
boolean closeImpl() {
if (super.closeImpl()) {
IOUtils.close(LOG, value);
return true;
}
super.close();
}
}

/**
* Put operation to be applied to a {@link ColumnFamily} batch using the byte array api.
*/
private static final class ByteArrayPutOp extends Op {
private final byte[] key;
private final byte[] value;

private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) {
super(keyBytes);
this.key = Objects.requireNonNull(key, "key == null");
this.value = Objects.requireNonNull(value, "value == null");
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchPut(batch, key, value);
}

@Override
public int keyLen() {
return key.length;
}

@Override
public int valLen() {
return value.length;
return false;
}
}

Expand Down Expand Up @@ -322,8 +299,9 @@ private void deleteIfExist(Bytes key) {
}
}

void overwriteIfExists(Bytes key, Op op) {
void overwriteIfExists(Op op) {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
Bytes key = op.getKeyBytes();
deleteIfExist(key);
batchSize += op.totalLength();
Op overwritten = ops.put(key, op);
Expand All @@ -336,21 +314,25 @@ void overwriteIfExists(Bytes key, Op op) {

void put(CodecBuffer key, CodecBuffer value) {
putCount++;
// always release the key with the value
Bytes keyBytes = Bytes.newBytes(key);
overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes));
overwriteIfExists(new PutOp(key, value));
}

void put(byte[] key, byte[] value) {
putCount++;
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes));
CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key);
CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value);
overwriteIfExists(new PutOp(keyBuffer, valueBuffer));
}

void delete(byte[] key) {
delCount++;
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes));
CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key);
overwriteIfExists(new DeleteOp(keyBuffer));
}

void delete(CodecBuffer key) {
delCount++;
overwriteIfExists(new DeleteOp(key));
}

String putString(int keySize, int valueSize) {
Expand Down Expand Up @@ -388,6 +370,10 @@ void delete(ColumnFamily family, byte[] key) {
.delete(key);
}

void delete(ColumnFamily family, CodecBuffer key) {
name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key);
}

/** Prepare batch write for the entire cache. */
UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
Expand Down Expand Up @@ -464,6 +450,10 @@ public void delete(ColumnFamily family, byte[] key) {
opCache.delete(family, key);
}

public void delete(ColumnFamily family, CodecBuffer key) {
opCache.delete(family, key);
}

public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) {
opCache.put(family, key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce
db.deleteRange(family, beginKey, endKey);
}

void deleteWithBatch(BatchOperation batch, CodecBuffer key) {
if (batch instanceof RDBBatchOperation) {
((RDBBatchOperation) batch).delete(family, key);
} else {
throw new IllegalArgumentException("Unexpected batch class: " + batch.getClass().getSimpleName());
}
}

@Override
public void deleteWithBatch(BatchOperation batch, byte[] key) {
if (batch instanceof RDBBatchOperation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public ColumnFamilyHandle getHandle() {
return handle;
}

public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key)
throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire()) {
writeBatch.delete(getHandle(), key);
Expand All @@ -308,20 +308,6 @@ public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
}
}

public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[] value)
throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("batchPut array key {}", bytes2String(key));
LOG.debug("batchPut array value {}", bytes2String(value));
}

try (UncheckedAutoCloseable ignored = acquire()) {
writeBatch.put(getHandle(), key, value);
} catch (RocksDBException e) {
throw toRocksDatabaseException(this, "batchPut key " + bytes2String(key), e);
}
}

public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key,
ByteBuffer value) throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,19 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException {

@Override
public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException {
rawTable.deleteWithBatch(batch, encodeKey(key));
if (supportCodecBuffer) {
CodecBuffer keyBuffer = null;
try {
keyBuffer = keyCodec.toDirectCodecBuffer(key);
// The buffers will be released after commit.
rawTable.deleteWithBatch(batch, keyBuffer);
} catch (Exception e) {
IOUtils.closeQuietly(keyBuffer);
throw e;
}
} else {
rawTable.deleteWithBatch(batch, encodeKey(key));
}
}

@Override
Expand Down
Loading