Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public static boolean reprocess(OMMetadataManager omMetadataManager,

try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable,
StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) {
StringCodec.get(), maxIterators, perWorkerThreshold)) {
keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public static boolean reprocessBucketLayout(BucketLayout bucketLayout,

try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable,
StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) {
StringCodec.get(), maxIterators, perWorkerThreshold)) {
keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
} catch (Exception ex) {
LOG.error("Unable to populate File Size Count for {} in RocksDB.", taskName, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.hdds.utils.db.ByteArrayCodec;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.hdds.utils.db.CodecBufferCodec;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.hdds.utils.db.StringCodec;
Expand Down Expand Up @@ -65,7 +66,6 @@ public class OmTableInsightTask implements ReconOmTask {
private Map<String, Long> objectCountMap;
private Map<String, Long> unReplicatedSizeMap;
private Map<String, Long> replicatedSizeMap;
private final int maxKeysInMemory;
private final int maxIterators;

@Inject
Expand All @@ -80,9 +80,6 @@ public OmTableInsightTask(ReconGlobalStatsManager reconGlobalStatsManager,
tableHandlers.put(OPEN_FILE_TABLE, new OpenKeysInsightHandler());
tableHandlers.put(DELETED_TABLE, new DeletedKeysInsightHandler());
tableHandlers.put(MULTIPART_INFO_TABLE, new MultipartInfoInsightHandler());
this.maxKeysInMemory = reconOMMetadataManager.getOzoneConfiguration().getInt(
ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY,
ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT);
this.maxIterators = reconOMMetadataManager.getOzoneConfiguration().getInt(
ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS,
ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT);
Expand Down Expand Up @@ -177,9 +174,9 @@ private boolean usesNonStringKeys(String tableName) {
private void processTableSequentially(String tableName, OMMetadataManager omMetadataManager) throws IOException {
LOG.info("{}: Processing table {} sequentially (non-String keys)", getTaskName(), tableName);

Table<byte[], byte[]> table = omMetadataManager.getStore()
.getTable(tableName, ByteArrayCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE);
try (TableIterator<byte[], byte[]> keyIterator = table.keyIterator()) {
Table<CodecBuffer, CodecBuffer> table = omMetadataManager.getStore()
.getTable(tableName, CodecBufferCodec.get(true), CodecBufferCodec.get(true), TableCache.CacheType.NO_CACHE);
try (TableIterator<CodecBuffer, CodecBuffer> keyIterator = table.keyIterator()) {
long count = Iterators.size(keyIterator);
objectCountMap.put(getTableCountKeyFromTable(tableName), count);
}
Expand All @@ -192,8 +189,8 @@ private void processTableSequentially(String tableName, OMMetadataManager omMeta
private void processTableInParallel(String tableName, OMMetadataManager omMetadataManager) throws Exception {
int workerCount = 2; // Only 2 workers needed for simple counting

Table<String, byte[]> table = omMetadataManager.getStore()
.getTable(tableName, StringCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE);
Table<String, CodecBuffer> table = omMetadataManager.getStore()
.getTable(tableName, StringCodec.get(), CodecBufferCodec.get(true), TableCache.CacheType.NO_CACHE);

long estimatedCount = 100000; // Default
try {
Expand All @@ -205,9 +202,8 @@ private void processTableInParallel(String tableName, OMMetadataManager omMetada

AtomicLong count = new AtomicLong(0);

try (ParallelTableIteratorOperation<String, byte[]> parallelIter = new ParallelTableIteratorOperation<>(
omMetadataManager, table, StringCodec.get(),
maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) {
try (ParallelTableIteratorOperation<String, CodecBuffer> parallelIter = new ParallelTableIteratorOperation<>(
omMetadataManager, table, StringCodec.get(), maxIterators, loggingThreshold)) {

parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> {
if (kv != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -56,37 +54,26 @@ public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implemen

// Thread Pools
private final ExecutorService iteratorExecutor; // 5
private final ExecutorService valueExecutors; // 20

private final int maxNumberOfVals;
private final long maxNumberOfVals;
private final OMMetadataManager metadataManager;
private final int maxIteratorTasks;
private final int maxWorkerTasks;
private final long logCountThreshold;

private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class);

public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec,
int iteratorCount, int workerCount, int maxNumberOfValsInMemory,
long logThreshold) {
int iteratorCount, long logThreshold) {
this.table = table;
this.keyCodec = keyCodec;
this.metadataManager = metadataManager;
this.maxIteratorTasks = 2 * iteratorCount; // Allow up to 10 pending iterator tasks
this.maxWorkerTasks = workerCount * 2; // Allow up to 40 pending worker tasks

// Create team of 5 iterator threads with UNLIMITED queue
// LinkedBlockingQueue() with no size = can hold infinite pending tasks
this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>());

// Create team of 20 worker threads with UNLIMITED queue
this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>());

// Calculate batch size per worker (e.g., 2000 / 20 = 100 keys per batch per worker)
this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount));
this.logCountThreshold = logThreshold;
this.maxNumberOfVals = Math.max(1000, this.logCountThreshold / (iteratorCount));
}

private List<K> getBounds(K startKey, K endKey) throws IOException {
Expand Down Expand Up @@ -166,9 +153,6 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey,
// Queue to track iterator threads (5 threads creating work)
Queue<Future<?>> iterFutures = new LinkedList<>();

// Queue to track worker threads (20 threads doing work)
Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>();

AtomicLong keyCounter = new AtomicLong();
AtomicLong prevLogCounter = new AtomicLong();
Object logLock = new Object();
Expand All @@ -190,84 +174,49 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey,
iterFutures.add(iteratorExecutor.submit(() -> {
try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = table.iterator()) {
iter.seek(beg);
int count = 0;
while (iter.hasNext()) {
List<Table.KeyValue<K, V>> keyValues = new ArrayList<>();
boolean reachedEnd = false;
while (iter.hasNext()) {
Table.KeyValue<K, V> kv = iter.next();
K key = kv.getKey();

// Check if key is within this segment's range
boolean withinBounds;
if (inclusive) {
// Last segment: include everything from beg onwards (or until endKey if specified)
withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
} else {
// Middle segment: include keys in range [beg, end)
withinBounds = key.compareTo(end) < 0;
}

if (withinBounds) {
keyValues.add(kv);
} else {
reachedEnd = true;
break;
}

// If batch is full (2000 keys), stop collecting
if (keyValues.size() >= maxNumberOfVals) {
break;
}
Table.KeyValue<K, V> kv = iter.next();
K key = kv.getKey();
// Check if key is within this segment's range
boolean withinBounds;
if (inclusive) {
// Last segment: include everything from beg onwards (or until endKey if specified)
withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
} else {
// Middle segment: include keys in range [beg, end)
withinBounds = key.compareTo(end) < 0;
}

// ===== STEP 5: HAND BATCH TO WORKER THREAD =====
if (!keyValues.isEmpty()) {
// WAIT if worker queue is too full (max 39 pending tasks)
waitForQueueSize(workerFutures, maxWorkerTasks - 1);

// Submit batch to worker thread pool
workerFutures.add(valueExecutors.submit(() -> {
for (Table.KeyValue<K, V> kv : keyValues) {
keyOperation.apply(kv);
}
keyCounter.addAndGet(keyValues.size());
if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
synchronized (logLock) {
if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
long cnt = keyCounter.get();
LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName);
prevLogCounter.set(cnt);
}
if (!withinBounds) {
break;
}
keyOperation.apply(kv);
count++;
if (count % maxNumberOfVals == 0) {
keyCounter.addAndGet(count);
count = 0;
if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
synchronized (logLock) {
if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
long cnt = keyCounter.get();
LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName);
prevLogCounter.set(cnt);
}
}
// Worker task done! Future is now complete.
}));
}
// If we reached the end of our segment, stop reading
if (reachedEnd) {
break;
}
}
}
keyCounter.addAndGet(count);
} catch (IOException e) {
LOG.error("IO error during parallel iteration on table {}", taskName, e);
throw new RuntimeException("IO error during iteration", e);
} catch (InterruptedException e) {
LOG.warn("Parallel iteration interrupted for task {}", taskName, e);
Thread.currentThread().interrupt();
throw new RuntimeException("Iteration interrupted", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
LOG.error("Task execution failed for {}: {}", taskName, cause.getMessage(), cause);
throw new RuntimeException("Task execution failed", cause);
}
}));
}

// ===== STEP 7: WAIT FOR EVERYONE TO FINISH =====
// ===== STEP 5: WAIT FOR EVERYONE TO FINISH =====
// Wait for all 5 iterator threads to finish reading
waitForQueueSize(iterFutures, 0);
// Wait for all 20 worker threads to finish processing
waitForQueueSize(workerFutures, 0);

// Log final stats
LOG.info("{}: Parallel iteration completed - Total keys processed: {}", taskName, keyCounter.get());
Expand All @@ -276,17 +225,12 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey,
@Override
public void close() throws IOException {
iteratorExecutor.shutdown();
valueExecutors.shutdown();
try {
if (!iteratorExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
iteratorExecutor.shutdownNow();
}
if (!valueExecutors.awaitTermination(60, TimeUnit.SECONDS)) {
valueExecutors.shutdownNow();
}
} catch (InterruptedException e) {
iteratorExecutor.shutdownNow();
valueExecutors.shutdownNow();
Thread.currentThread().interrupt();
}
}
Expand Down
Loading
Loading