diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index 626376ac09a..8f35057e0c8 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -167,7 +167,7 @@ public static boolean reprocess(OMMetadataManager omMetadataManager, try (ParallelTableIteratorOperation keyIter = new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable, - StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) { + StringCodec.get(), maxIterators, perWorkerThreshold)) { keyIter.performTaskOnTableVals(taskName, null, null, kvOperation); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java index b82fcd556a1..72e7246be76 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java @@ -182,7 +182,7 @@ public static boolean reprocessBucketLayout(BucketLayout bucketLayout, try (ParallelTableIteratorOperation keyIter = new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable, - StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) { + StringCodec.get(), maxIterators, perWorkerThreshold)) { keyIter.performTaskOnTableVals(taskName, null, null, kvOperation); } catch (Exception ex) { LOG.error("Unable to populate File Size Count for {} in RocksDB.", taskName, ex); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java index 8027966231d..e13a5b27d82 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java @@ -34,7 +34,8 @@ import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.tuple.Triple; -import org.apache.hadoop.hdds.utils.db.ByteArrayCodec; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.CodecBufferCodec; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.StringCodec; @@ -65,7 +66,6 @@ public class OmTableInsightTask implements ReconOmTask { private Map objectCountMap; private Map unReplicatedSizeMap; private Map replicatedSizeMap; - private final int maxKeysInMemory; private final int maxIterators; @Inject @@ -80,9 +80,6 @@ public OmTableInsightTask(ReconGlobalStatsManager reconGlobalStatsManager, tableHandlers.put(OPEN_FILE_TABLE, new OpenKeysInsightHandler()); tableHandlers.put(DELETED_TABLE, new DeletedKeysInsightHandler()); tableHandlers.put(MULTIPART_INFO_TABLE, new MultipartInfoInsightHandler()); - this.maxKeysInMemory = reconOMMetadataManager.getOzoneConfiguration().getInt( - ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY, - ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT); this.maxIterators = reconOMMetadataManager.getOzoneConfiguration().getInt( ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS, ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT); @@ -177,9 +174,9 @@ private boolean usesNonStringKeys(String tableName) { private void processTableSequentially(String tableName, OMMetadataManager omMetadataManager) throws IOException { LOG.info("{}: Processing table {} sequentially (non-String keys)", getTaskName(), tableName); - Table table = omMetadataManager.getStore() - .getTable(tableName, ByteArrayCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE); - try (TableIterator keyIterator = table.keyIterator()) { + Table table = omMetadataManager.getStore() + .getTable(tableName, CodecBufferCodec.get(true), CodecBufferCodec.get(true), TableCache.CacheType.NO_CACHE); + try (TableIterator keyIterator = table.keyIterator()) { long count = Iterators.size(keyIterator); objectCountMap.put(getTableCountKeyFromTable(tableName), count); } @@ -192,8 +189,8 @@ private void processTableSequentially(String tableName, OMMetadataManager omMeta private void processTableInParallel(String tableName, OMMetadataManager omMetadataManager) throws Exception { int workerCount = 2; // Only 2 workers needed for simple counting - Table table = omMetadataManager.getStore() - .getTable(tableName, StringCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE); + Table table = omMetadataManager.getStore() + .getTable(tableName, StringCodec.get(), CodecBufferCodec.get(true), TableCache.CacheType.NO_CACHE); long estimatedCount = 100000; // Default try { @@ -205,9 +202,8 @@ private void processTableInParallel(String tableName, OMMetadataManager omMetada AtomicLong count = new AtomicLong(0); - try (ParallelTableIteratorOperation parallelIter = new ParallelTableIteratorOperation<>( - omMetadataManager, table, StringCodec.get(), - maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) { + try (ParallelTableIteratorOperation parallelIter = new ParallelTableIteratorOperation<>( + omMetadataManager, table, StringCodec.get(), maxIterators, loggingThreshold)) { parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> { if (kv != null) { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java index c91a3d97775..a8b0b9f1308 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.LinkedList; @@ -28,7 +27,6 @@ import java.util.Objects; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -56,37 +54,26 @@ public class ParallelTableIteratorOperation, V> implemen // Thread Pools private final ExecutorService iteratorExecutor; // 5 - private final ExecutorService valueExecutors; // 20 - - private final int maxNumberOfVals; + private final long maxNumberOfVals; private final OMMetadataManager metadataManager; private final int maxIteratorTasks; - private final int maxWorkerTasks; private final long logCountThreshold; private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table table, Codec keyCodec, - int iteratorCount, int workerCount, int maxNumberOfValsInMemory, - long logThreshold) { + int iteratorCount, long logThreshold) { this.table = table; this.keyCodec = keyCodec; this.metadataManager = metadataManager; this.maxIteratorTasks = 2 * iteratorCount; // Allow up to 10 pending iterator tasks - this.maxWorkerTasks = workerCount * 2; // Allow up to 40 pending worker tasks // Create team of 5 iterator threads with UNLIMITED queue // LinkedBlockingQueue() with no size = can hold infinite pending tasks this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); - - // Create team of 20 worker threads with UNLIMITED queue - this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>()); - - // Calculate batch size per worker (e.g., 2000 / 20 = 100 keys per batch per worker) - this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); this.logCountThreshold = logThreshold; + this.maxNumberOfVals = Math.max(1000, this.logCountThreshold / (iteratorCount)); } private List getBounds(K startKey, K endKey) throws IOException { @@ -166,9 +153,6 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, // Queue to track iterator threads (5 threads creating work) Queue> iterFutures = new LinkedList<>(); - // Queue to track worker threads (20 threads doing work) - Queue> workerFutures = new ConcurrentLinkedQueue<>(); - AtomicLong keyCounter = new AtomicLong(); AtomicLong prevLogCounter = new AtomicLong(); Object logLock = new Object(); @@ -190,84 +174,49 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, iterFutures.add(iteratorExecutor.submit(() -> { try (TableIterator> iter = table.iterator()) { iter.seek(beg); + int count = 0; while (iter.hasNext()) { - List> keyValues = new ArrayList<>(); - boolean reachedEnd = false; - while (iter.hasNext()) { - Table.KeyValue kv = iter.next(); - K key = kv.getKey(); - - // Check if key is within this segment's range - boolean withinBounds; - if (inclusive) { - // Last segment: include everything from beg onwards (or until endKey if specified) - withinBounds = (endKey == null || key.compareTo(endKey) <= 0); - } else { - // Middle segment: include keys in range [beg, end) - withinBounds = key.compareTo(end) < 0; - } - - if (withinBounds) { - keyValues.add(kv); - } else { - reachedEnd = true; - break; - } - - // If batch is full (2000 keys), stop collecting - if (keyValues.size() >= maxNumberOfVals) { - break; - } + Table.KeyValue kv = iter.next(); + K key = kv.getKey(); + // Check if key is within this segment's range + boolean withinBounds; + if (inclusive) { + // Last segment: include everything from beg onwards (or until endKey if specified) + withinBounds = (endKey == null || key.compareTo(endKey) <= 0); + } else { + // Middle segment: include keys in range [beg, end) + withinBounds = key.compareTo(end) < 0; } - - // ===== STEP 5: HAND BATCH TO WORKER THREAD ===== - if (!keyValues.isEmpty()) { - // WAIT if worker queue is too full (max 39 pending tasks) - waitForQueueSize(workerFutures, maxWorkerTasks - 1); - - // Submit batch to worker thread pool - workerFutures.add(valueExecutors.submit(() -> { - for (Table.KeyValue kv : keyValues) { - keyOperation.apply(kv); - } - keyCounter.addAndGet(keyValues.size()); - if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { - synchronized (logLock) { - if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { - long cnt = keyCounter.get(); - LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName); - prevLogCounter.set(cnt); - } + if (!withinBounds) { + break; + } + keyOperation.apply(kv); + count++; + if (count % maxNumberOfVals == 0) { + keyCounter.addAndGet(count); + count = 0; + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + synchronized (logLock) { + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + long cnt = keyCounter.get(); + LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName); + prevLogCounter.set(cnt); } } - // Worker task done! Future is now complete. - })); - } - // If we reached the end of our segment, stop reading - if (reachedEnd) { - break; + } } } + keyCounter.addAndGet(count); } catch (IOException e) { LOG.error("IO error during parallel iteration on table {}", taskName, e); throw new RuntimeException("IO error during iteration", e); - } catch (InterruptedException e) { - LOG.warn("Parallel iteration interrupted for task {}", taskName, e); - Thread.currentThread().interrupt(); - throw new RuntimeException("Iteration interrupted", e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - LOG.error("Task execution failed for {}: {}", taskName, cause.getMessage(), cause); - throw new RuntimeException("Task execution failed", cause); } })); } - // ===== STEP 7: WAIT FOR EVERYONE TO FINISH ===== + // ===== STEP 5: WAIT FOR EVERYONE TO FINISH ===== // Wait for all 5 iterator threads to finish reading waitForQueueSize(iterFutures, 0); - // Wait for all 20 worker threads to finish processing - waitForQueueSize(workerFutures, 0); // Log final stats LOG.info("{}: Parallel iteration completed - Total keys processed: {}", taskName, keyCounter.get()); @@ -276,17 +225,12 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, @Override public void close() throws IOException { iteratorExecutor.shutdown(); - valueExecutors.shutdown(); try { if (!iteratorExecutor.awaitTermination(60, TimeUnit.SECONDS)) { iteratorExecutor.shutdownNow(); } - if (!valueExecutors.awaitTermination(60, TimeUnit.SECONDS)) { - valueExecutors.shutdownNow(); - } } catch (InterruptedException e) { iteratorExecutor.shutdownNow(); - valueExecutors.shutdownNow(); Thread.currentThread().interrupt(); } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java index 4f64d27297b..3104f566955 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.recon.tasks; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.BUCKET_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELEGATION_TOKEN_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_DIR_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; @@ -48,6 +49,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -55,9 +57,14 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.CodecBufferCodec; import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.utils.db.TypedTable; +import org.apache.hadoop.hdds.utils.db.cache.TableCache; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -861,4 +868,103 @@ private OmKeyInfo getOmKeyInfo(String volumeName, String bucketName, .setObjectID(objectID) .build(); } + + @Test + public void testSequentialProcessingWithCodecBufferCodec() + throws Exception { + OmTableInsightTask task = + new OmTableInsightTask(reconGlobalStatsManager, reconOMMetadataManager) { + @Override + public Collection getTaskTables() { + return Collections.singletonList(DELEGATION_TOKEN_TABLE); + } + }; + OMMetadataManager omMetadataManager = mock(OMMetadataManager.class); + DBStore store = mock(DBStore.class); + when(omMetadataManager.getStore()).thenReturn(store); + @SuppressWarnings("unchecked") + Table table = + (Table) mock(Table.class); + @SuppressWarnings("unchecked") + TableIterator iterator = + (TableIterator) mock(TableIterator.class); + when(store.getTable( + eq(DELEGATION_TOKEN_TABLE), + any(CodecBufferCodec.class), + any(CodecBufferCodec.class), + eq(TableCache.CacheType.NO_CACHE))) + .thenReturn((Table) table); + when(table.keyIterator()).thenReturn(iterator); + when(iterator.hasNext()).thenReturn(true, true, true, false); + ReconOmTask.TaskResult result = task.reprocess(omMetadataManager); + assertTrue(result.isTaskSuccess(), + "Sequential processing should succeed"); + String countKey = + OmTableInsightTask.getTableCountKeyFromTable(DELEGATION_TOKEN_TABLE); + Long count = task.initializeCountMap().get(countKey); + assertEquals(3L, count, + "Sequential iterator must count all keys"); + } + + @Test + public void testParallelProcessingWithCodecBufferCodec() + throws Exception { + // Parallel processing is enabled only for string tables (tables with string keys). + OmTableInsightTask task = + new OmTableInsightTask(reconGlobalStatsManager, reconOMMetadataManager) { + @Override + public Collection getTaskTables() { + return Collections.singletonList(KEY_TABLE); + } + }; + + OMMetadataManager omMetadataManager = mock(OMMetadataManager.class); + DBStore store = mock(DBStore.class); + when(omMetadataManager.getStore()).thenReturn(store); + + @SuppressWarnings("unchecked") + Table mockCodecBufferTable = + (Table) mock(Table.class); + + // Mock KeyValueIterator returned by iterator(). + @SuppressWarnings("unchecked") + Table.KeyValueIterator kvIterator = + (Table.KeyValueIterator) + mock(Table.KeyValueIterator.class); + + @SuppressWarnings("unchecked") + Table.KeyValue kv = + (Table.KeyValue) mock(Table.KeyValue.class); + + when(kv.getKey()).thenReturn("/vol1/buck1/key-001"); + when(kv.getValue()).thenReturn(mock(CodecBuffer.class)); + + // Simulate KeyValueIterator with 5 entries. + when(kvIterator.hasNext()) + .thenReturn(true, true, true, true, true, false); + when(kvIterator.next()).thenReturn(kv); + + when(mockCodecBufferTable.iterator()).thenReturn(kvIterator); + when(mockCodecBufferTable.getEstimatedKeyCount()).thenReturn(5L); + + // DBStore.getTable(...) must return CodecBuffer table. + when(store.getTable( + eq(KEY_TABLE), + eq(StringCodec.get()), + any(CodecBufferCodec.class), + eq(TableCache.CacheType.NO_CACHE))) + .thenReturn((Table) mockCodecBufferTable); + + // Invoke reprocess (which triggers parallel processing for table with string keys). + ReconOmTask.TaskResult result = task.reprocess(omMetadataManager); + assertTrue(result.isTaskSuccess(), + "Parallel processing should succeed"); + + String countKey = + OmTableInsightTask.getTableCountKeyFromTable(KEY_TABLE); + Long count = task.initializeCountMap().get(countKey); + // Validate iterator count + assertEquals(5L, count, + "Parallel iterator must count all keys"); + } }