diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 49693bd2967..76f0bdad543 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -186,6 +186,47 @@ public void close() { } } + /** + * Delete operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. + */ + private final class CodecBufferDeleteOperation extends Operation { + private final CodecBuffer key; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private CodecBufferDeleteOperation(CodecBuffer key, Bytes keyBytes) { + super(keyBytes); + this.key = key; + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDelete(batch, key.asReadOnlyByteBuffer()); + } + + @Override + public int keyLen() { + return key.readableBytes(); + } + + @Override + public int valLen() { + return 0; + } + + @Override + public Op getOpType() { + return Op.DELETE; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + key.release(); + } + super.close(); + } + } + /** * Delete operation to be applied to a {@link ColumnFamily} batch. */ @@ -546,6 +587,12 @@ void delete(byte[] key) { overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); } + void delete(CodecBuffer key) { + delCount++; + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new CodecBufferDeleteOperation(key, keyBytes)); + } + void deleteRange(byte[] startKey, byte[] endKey) { delRangeCount++; batchOps.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); @@ -588,6 +635,10 @@ void delete(ColumnFamily family, byte[] key) { .delete(key); } + void delete(ColumnFamily family, CodecBuffer key) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key); + } + void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) .deleteRange(startKey, endKey); @@ -671,6 +722,10 @@ public void delete(ColumnFamily family, byte[] key) { opCache.delete(family, key); } + public void delete(ColumnFamily family, CodecBuffer key) { + opCache.delete(family, key); + } + public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { opCache.put(family, key, value); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 045f020b2fe..ec9d900a1d4 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce db.deleteRange(family, beginKey, endKey); } + void deleteWithBatch(BatchOperation batch, CodecBuffer key) { + if (batch instanceof RDBBatchOperation) { + ((RDBBatchOperation) batch).delete(family, key); + } else { + throw new IllegalArgumentException("Unexpected batch class: " + batch.getClass().getSimpleName()); + } + } + @Override public void deleteWithBatch(BatchOperation batch, byte[] key) { if (batch instanceof RDBBatchOperation) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index bdc5124ac3b..544c7c33e67 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -308,6 +308,15 @@ public void batchDelete(ManagedWriteBatch writeBatch, byte[] key) } } + public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key) + throws RocksDatabaseException { + try (UncheckedAutoCloseable ignored = acquire()) { + writeBatch.delete(getHandle(), key); + } catch (RocksDBException e) { + throw toRocksDatabaseException(this, "batchDelete key " + bytes2String(key), e); + } + } + public void batchDeleteRange(ManagedWriteBatch writeBatch, byte[] beginKey, byte[] endKey) throws RocksDatabaseException { try (UncheckedAutoCloseable ignored = acquire()) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 6d2fa3a99ff..bd0f6321b5b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -377,7 +377,19 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException { @Override public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException { - rawTable.deleteWithBatch(batch, encodeKey(key)); + if (supportCodecBuffer) { + CodecBuffer keyBuffer = null; + try { + keyBuffer = keyCodec.toDirectCodecBuffer(key); + // The buffers will be released after commit. + rawTable.deleteWithBatch(batch, keyBuffer); + } catch (Exception e) { + IOUtils.closeQuietly(keyBuffer); + throw e; + } + } else { + rawTable.deleteWithBatch(batch, encodeKey(key)); + } } @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java index bd33ab070ce..d77e4074c7e 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java @@ -19,16 +19,16 @@ import static java.util.Arrays.asList; import static org.apache.hadoop.hdds.StringUtils.bytes2String; -import static org.apache.hadoop.hdds.StringUtils.string2Bytes; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.Pair; @@ -54,31 +54,47 @@ * 2. Mocking of methods to track operations performed on*/ public class TestRDBBatchOperation { @Test - public void testBatchOperationWithDeleteRange() throws RocksDatabaseException { - final List, Integer>> deleteKeyRangePairs = new ArrayList<>(); - final List, Integer>> putKeys = new ArrayList<>(); - final List> deleteKeys = new ArrayList<>(); + public void testBatchOperationWithDeleteRange() throws RocksDatabaseException, CodecException { + final List, Pair>> deleteKeyRangePairs = new ArrayList<>(); + final List, Pair>> putKeys = new ArrayList<>(); + final List>> deleteKeys = new ArrayList<>(); AtomicInteger cnt = new AtomicInteger(0); try (MockedConstruction mockedConstruction = Mockito.mockConstruction(ManagedWriteBatch.class, (writeBatch, context) -> { doAnswer(i -> { deleteKeyRangePairs.add(Pair.of(Pair.of(bytes2String((byte[]) i.getArgument(1)), - bytes2String((byte[]) i.getArgument(2))), cnt.getAndIncrement())); + bytes2String((byte[]) i.getArgument(2))), Pair.of(cnt.getAndIncrement(), false))); return null; }).when(writeBatch).deleteRange(Mockito.any(ColumnFamilyHandle.class), Mockito.any(byte[].class), Mockito.any(byte[].class)); doAnswer(i -> { putKeys.add(Pair.of(Pair.of(bytes2String((byte[]) i.getArgument(1)), bytes2String((byte[]) i.getArgument(2))), - cnt.getAndIncrement())); + Pair.of(cnt.getAndIncrement(), false))); return null; }).when(writeBatch) .put(Mockito.any(ColumnFamilyHandle.class), Mockito.any(byte[].class), Mockito.any(byte[].class)); + + doAnswer(i -> { + ByteBuffer key = i.getArgument(1); + ByteBuffer value = i.getArgument(2); + putKeys.add(Pair.of(Pair.of(bytes2String(key), bytes2String(value)), + Pair.of(cnt.getAndIncrement(), true))); + return null; + }).when(writeBatch) + .put(Mockito.any(ColumnFamilyHandle.class), Mockito.any(ByteBuffer.class), Mockito.any(ByteBuffer.class)); + doAnswer(i -> { - deleteKeys.add(Pair.of(bytes2String((byte[]) i.getArgument(1)), cnt.getAndIncrement())); + deleteKeys.add(Pair.of(bytes2String((byte[]) i.getArgument(1)), Pair.of(cnt.getAndIncrement(), false))); return null; }).when(writeBatch).delete(Mockito.any(ColumnFamilyHandle.class), Mockito.any(byte[].class)); + doAnswer(i -> { + ByteBuffer key = i.getArgument(1); + deleteKeys.add(Pair.of(bytes2String(key), Pair.of(cnt.getAndIncrement(), true))); + return null; + }).when(writeBatch).delete(Mockito.any(ColumnFamilyHandle.class), Mockito.any(ByteBuffer.class)); + }); RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation()) { ColumnFamilyHandle columnFamilyHandle = Mockito.mock(ColumnFamilyHandle.class); @@ -89,6 +105,12 @@ public void testBatchOperationWithDeleteRange() throws RocksDatabaseException { return null; }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), any(byte[].class), any(byte[].class)); + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .put(columnFamilyHandle, (ByteBuffer) i.getArgument(1), (ByteBuffer) i.getArgument(2)); + return null; + }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), any(ByteBuffer.class), any(ByteBuffer.class)); + doAnswer((i) -> { ((ManagedWriteBatch)i.getArgument(0)) .deleteRange(columnFamilyHandle, (byte[]) i.getArgument(1), (byte[]) i.getArgument(2)); @@ -101,29 +123,38 @@ public void testBatchOperationWithDeleteRange() throws RocksDatabaseException { return null; }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), any(byte[].class)); + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1)); + return null; + }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), any(ByteBuffer.class)); + when(columnFamily.getHandle()).thenReturn(columnFamilyHandle); when(columnFamily.getName()).thenReturn("test"); - batchOperation.put(columnFamily, string2Bytes("key01"), string2Bytes("value01")); - batchOperation.put(columnFamily, string2Bytes("key02"), string2Bytes("value02")); - batchOperation.put(columnFamily, string2Bytes("key03"), string2Bytes("value03")); - batchOperation.put(columnFamily, string2Bytes("key03"), string2Bytes("value04")); - batchOperation.delete(columnFamily, string2Bytes("key05")); - batchOperation.deleteRange(columnFamily, string2Bytes("key01"), string2Bytes("key02")); - batchOperation.deleteRange(columnFamily, string2Bytes("key02"), string2Bytes("key03")); - batchOperation.put(columnFamily, string2Bytes("key04"), string2Bytes("value04")); - batchOperation.put(columnFamily, string2Bytes("key06"), string2Bytes("value05")); - batchOperation.deleteRange(columnFamily, string2Bytes("key06"), string2Bytes("key12")); - batchOperation.deleteRange(columnFamily, string2Bytes("key09"), string2Bytes("key10")); + Codec codec = StringCodec.get(); + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value01")); + batchOperation.put(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("value02")); + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"), codec.toDirectCodecBuffer("value03")); + batchOperation.put(columnFamily, codec.toPersistedFormat("key03"), codec.toPersistedFormat("value04")); + batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05")); + batchOperation.delete(columnFamily, codec.toPersistedFormat("key10")); + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key01"), codec.toPersistedFormat("key02")); + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("key03")); + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), codec.toDirectCodecBuffer("value04")); + batchOperation.put(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("value05")); + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("key12")); + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key09"), codec.toPersistedFormat("key10")); RocksDatabase db = Mockito.mock(RocksDatabase.class); doNothing().when(db).batchWrite(any()); batchOperation.commit(db); - assertEquals(deleteKeys, Collections.singletonList(Pair.of("key05", 1))); - assertEquals(deleteKeyRangePairs, asList(Pair.of(Pair.of("key01", "key02"), 2), - Pair.of(Pair.of("key02", "key03"), 3), - Pair.of(Pair.of("key06", "key12"), 5), - Pair.of(Pair.of("key09", "key10"), 6))); - assertEquals(putKeys, Arrays.asList(Pair.of(Pair.of("key03", "value04"), 0), - Pair.of(Pair.of("key04", "value04"), 4))); + assertEquals(deleteKeys, ImmutableList.of(Pair.of("key05", Pair.of(1, true)), + Pair.of("key10", Pair.of(2, false)))); + assertEquals(deleteKeyRangePairs, asList(Pair.of(Pair.of("key01", "key02"), Pair.of(3, false)), + Pair.of(Pair.of("key02", "key03"), Pair.of(4, false)), + Pair.of(Pair.of("key06", "key12"), Pair.of(6, false)), + Pair.of(Pair.of("key09", "key10"), Pair.of(7, false)))); + assertEquals(putKeys, Arrays.asList(Pair.of(Pair.of("key03", "value04"), Pair.of(0, false)), + Pair.of(Pair.of("key04", "value04"), Pair.of(5, true)))); } } }