From a215ce050f905046d12a9c05706a3aa0593d6d92 Mon Sep 17 00:00:00 2001 From: Peckstadt Yves Date: Thu, 6 Nov 2025 09:05:23 +0900 Subject: [PATCH 1/2] Revert threads argument to replace incorrectly renamed max-threads argument --- .../cli/command/dataexport/ExportCommand.java | 6 +- .../dataexport/ExportCommandOptions.java | 24 ++++- .../cli/command/dataimport/ImportCommand.java | 9 +- .../dataimport/ImportCommandOptions.java | 25 +++-- .../command/dataexport/ExportCommandTest.java | 94 ++++++++++++++++++- .../command/dataimport/ImportCommandTest.java | 83 ++++++++++++++-- .../cli/util/CommandLineInputUtilsTest.java | 4 +- .../db/dataloader/core/DataLoaderError.java | 4 +- .../core/dataimport/ImportOptions.java | 2 +- .../dataimport/processor/ImportProcessor.java | 4 +- .../processor/CsvImportProcessorTest.java | 2 +- .../processor/ImportProcessorTest.java | 14 +-- .../processor/JsonImportProcessorTest.java | 2 +- .../JsonLinesImportProcessorTest.java | 2 +- 14 files changed, 222 insertions(+), 53 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index 712a57c12c..adbb5d8000 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -61,7 +61,7 @@ public Integer call() throws Exception { FileUtils.validateFilePath(scalarDbPropertiesFilePath); validatePositiveValue( spec.commandLine(), dataChunkSize, DataLoaderError.INVALID_DATA_CHUNK_SIZE); - validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS); + validatePositiveValue(spec.commandLine(), threadCount, DataLoaderError.INVALID_THREAD_COUNT); StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath); TableMetadataService metaDataService = @@ -126,6 +126,8 @@ private void validateDeprecatedOptions() { DEPRECATED_END_EXCLUSIVE_OPTION, END_INCLUSIVE_OPTION, END_INCLUSIVE_OPTION_SHORT); + validateDeprecatedOptionPair( + spec.commandLine(), DEPRECATED_MAX_THREADS_OPTION, THREADS_OPTION, null); } private String getScalarDbPropertiesFilePath() { @@ -170,7 +172,7 @@ private ExportOptions buildExportOptions(Key partitionKey, ScanRange scanRange) .includeTransactionMetadata(includeTransactionMetadata) .delimiter(delimiter) .limit(limit) - .maxThreadCount(maxThreads) + .maxThreadCount(threadCount) .dataChunkSize(dataChunkSize) .prettyPrintJson(prettyPrintJson) .scanRange(scanRange); diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandOptions.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandOptions.java index 617338fb03..287d480909 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandOptions.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandOptions.java @@ -16,6 +16,8 @@ public class ExportCommandOptions { public static final String END_INCLUSIVE_OPTION = "--end-inclusive"; public static final String END_INCLUSIVE_OPTION_SHORT = "-ei"; public static final String DEPRECATED_END_EXCLUSIVE_OPTION = "--end-exclusive"; + public static final String THREADS_OPTION = "--threads"; + public static final String DEPRECATED_MAX_THREADS_OPTION = "--max-threads"; @CommandLine.Option( names = {"--config", "-c"}, @@ -73,11 +75,20 @@ public class ExportCommandOptions { protected boolean includeTransactionMetadata; @CommandLine.Option( - names = {"--max-threads", "-mt"}, - paramLabel = "", + names = {"--threads"}, + paramLabel = "", description = - "Maximum number of threads to use for parallel processing (default: number of available processors)") - protected int maxThreads; + "Number of threads to use for parallel processing (default: number of available processors)") + protected int threadCount; + + // Deprecated option - kept for backward compatibility + @CommandLine.Option( + names = {DEPRECATED_MAX_THREADS_OPTION, "-mt"}, + paramLabel = "", + description = "Deprecated: Use --threads instead", + hidden = true) + @Deprecated + protected Integer maxThreadsDeprecated; @CommandLine.Option( names = {"--start-key", "-sk"}, @@ -184,5 +195,10 @@ public void applyDeprecatedOptions() { if (endExclusiveDeprecated != null) { scanEndInclusive = !endExclusiveDeprecated; } + + // If the deprecated option is set, use its value + if (maxThreadsDeprecated != null) { + threadCount = maxThreadsDeprecated; + } } } diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index 73f40b0577..e52fecbb6b 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -61,7 +61,7 @@ public Integer call() throws Exception { spec.commandLine(), dataChunkSize, DataLoaderError.INVALID_DATA_CHUNK_SIZE); validatePositiveValue( spec.commandLine(), transactionSize, DataLoaderError.INVALID_TRANSACTION_SIZE); - validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS); + validatePositiveValue(spec.commandLine(), threadCount, DataLoaderError.INVALID_THREAD_COUNT); validatePositiveValue( spec.commandLine(), dataChunkQueueSize, DataLoaderError.INVALID_DATA_CHUNK_QUEUE_SIZE); ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null); @@ -282,10 +282,7 @@ private Optional parseControlFileFromPath(String controlFilePath) { */ private void validateDeprecatedOptions() { validateDeprecatedOptionPair( - spec.commandLine(), - DEPRECATED_THREADS_OPTION, - MAX_THREADS_OPTION, - MAX_THREADS_OPTION_SHORT); + spec.commandLine(), DEPRECATED_MAX_THREADS_OPTION, THREADS_OPTION, null); } /** @@ -308,7 +305,7 @@ private ImportOptions createImportOptions(ControlFile controlFile) { .namespace(namespace) .dataChunkSize(dataChunkSize) .transactionBatchSize(transactionSize) - .maxThreads(maxThreads) + .threadCount(threadCount) .dataChunkQueueSize(dataChunkQueueSize) .tableName(tableName); diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java index aa8ecd5a88..b11f9695ec 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java @@ -9,9 +9,8 @@ public class ImportCommandOptions { public static final String FILE_OPTION_NAME_LONG_FORMAT = "--file"; - public static final String MAX_THREADS_OPTION = "--max-threads"; - public static final String MAX_THREADS_OPTION_SHORT = "-mt"; - public static final String DEPRECATED_THREADS_OPTION = "--threads"; + public static final String THREADS_OPTION = "--threads"; + public static final String DEPRECATED_MAX_THREADS_OPTION = "--max-threads"; @CommandLine.Option( names = {"--mode", "-m"}, @@ -35,21 +34,21 @@ public class ImportCommandOptions { protected String sourceFilePath; @CommandLine.Option( - names = {"--max-threads", "-mt"}, - paramLabel = "", + names = {"--threads"}, + paramLabel = "", description = - "Maximum number of threads to use for parallel processing (default: number of available processors)", + "Number of threads to use for parallel processing (default: number of available processors)", defaultValue = "16") - protected int maxThreads; + protected int threadCount; // Deprecated option - kept for backward compatibility @CommandLine.Option( - names = {DEPRECATED_THREADS_OPTION}, - paramLabel = "", - description = "Deprecated: Use --max-threads instead", + names = {DEPRECATED_MAX_THREADS_OPTION, "-mt"}, + paramLabel = "", + description = "Deprecated: Use --threads instead", hidden = true) @Deprecated - protected Integer threadsDeprecated; + protected Integer maxThreadsDeprecated; @CommandLine.Option( names = {"--namespace", "-ns"}, @@ -180,8 +179,8 @@ public class ImportCommandOptions { */ public void applyDeprecatedOptions() { // If the deprecated option is set, use its value - if (threadsDeprecated != null) { - maxThreads = threadsDeprecated; + if (maxThreadsDeprecated != null) { + threadCount = maxThreadsDeprecated; } } } diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandTest.java index 907140e46f..4c02087e52 100755 --- a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandTest.java +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandTest.java @@ -44,7 +44,7 @@ void removeFileIfCreated() { void call_withBlankScalarDBConfigurationFile_shouldThrowException() { exportCommand.configFilePath = ""; exportCommand.dataChunkSize = 100; - exportCommand.maxThreads = 4; + exportCommand.threadCount = 4; exportCommand.namespace = "scalar"; exportCommand.table = "asset"; exportCommand.outputDirectory = ""; @@ -62,7 +62,7 @@ void call_withBlankScalarDBConfigurationFile_shouldThrowException() { void call_withInvalidScalarDBConfigurationFile_shouldReturnOne() throws Exception { exportCommand.configFilePath = "scalardb.properties"; exportCommand.dataChunkSize = 100; - exportCommand.maxThreads = 4; + exportCommand.threadCount = 4; exportCommand.namespace = "scalar"; exportCommand.table = "asset"; exportCommand.outputDirectory = ""; @@ -190,4 +190,94 @@ void call_withOnlyDeprecatedEndExclusive_shouldApplyInvertedValue() { // end-exclusive=false should become end-inclusive=true assertEquals(true, command.scanEndInclusive); } + + @Test + void call_withBothThreadsAndMaxThreads_shouldThrowException() { + assertBothDeprecatedAndNewOptionsThrowException( + "--max-threads=8", "--threads=16", "--max-threads", "--threads"); + } + + @Test + void call_withOnlyDeprecatedMaxThreads_shouldApplyValue() { + // Simulate command line parsing with only deprecated option + String[] args = { + "--config", + "scalardb.properties", + "--namespace", + "scalar", + "--table", + "asset", + "--format", + "JSON", + "--max-threads", + "12" + }; + ExportCommand command = new ExportCommand(); + CommandLine cmd = new CommandLine(command); + cmd.parseArgs(args); + + // Verify the deprecated value was parsed + assertEquals(12, command.maxThreadsDeprecated); + + // Apply deprecated options + command.applyDeprecatedOptions(); + + // Verify the value was applied to threadCount + assertEquals(12, command.threadCount); + } + + @Test + void call_withOnlyThreads_shouldUseValue() { + // Simulate command line parsing with only new --threads option + String[] args = { + "--config", + "scalardb.properties", + "--namespace", + "scalar", + "--table", + "asset", + "--format", + "JSON", + "--threads", + "20" + }; + ExportCommand command = new ExportCommand(); + CommandLine cmd = new CommandLine(command); + cmd.parseArgs(args); + + // Verify the value was set to threadCount + assertEquals(20, command.threadCount); + + // Verify the deprecated value was not set + assertEquals(null, command.maxThreadsDeprecated); + } + + @Test + void call_withDeprecatedShortOption_shouldApplyValue() { + // Simulate command line parsing with deprecated short option -mt + String[] args = { + "--config", + "scalardb.properties", + "--namespace", + "scalar", + "--table", + "asset", + "--format", + "JSON", + "-mt", + "15" + }; + ExportCommand command = new ExportCommand(); + CommandLine cmd = new CommandLine(command); + cmd.parseArgs(args); + + // Verify the deprecated value was parsed + assertEquals(15, command.maxThreadsDeprecated); + + // Apply deprecated options + command.applyDeprecatedOptions(); + + // Verify the value was applied to threadCount + assertEquals(15, command.threadCount); + } } diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java index cfe1a8af48..13f7f6bf85 100755 --- a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java @@ -57,7 +57,7 @@ void call_WithoutValidConfigFile_ShouldThrowException() throws Exception { importCommand.importMode = ImportMode.UPSERT; importCommand.dataChunkSize = 100; importCommand.transactionSize = 10; - importCommand.maxThreads = 4; + importCommand.threadCount = 4; importCommand.dataChunkQueueSize = 64; assertThrows(IllegalArgumentException.class, () -> importCommand.call()); } @@ -93,9 +93,9 @@ void call_withBothThreadsAndMaxThreads_shouldThrowException() throws Exception { "sample", "--table", "table", - "--threads", - "8", "--max-threads", + "8", + "--threads", "16" }; ImportCommand command = new ImportCommand(); @@ -113,11 +113,11 @@ void call_withBothThreadsAndMaxThreads_shouldThrowException() throws Exception { thrown .getMessage() .contains( - "Cannot specify both deprecated option '--threads' and new option '--max-threads'")); + "Cannot specify both deprecated option '--max-threads' and new option '--threads'")); } @Test - void call_withOnlyDeprecatedThreads_shouldApplyValue() throws Exception { + void call_withOnlyDeprecatedMaxThreads_shouldApplyValue() throws Exception { Path configFile = tempDir.resolve("config.properties"); Files.createFile(configFile); Path importFile = tempDir.resolve("import.json"); @@ -133,7 +133,7 @@ void call_withOnlyDeprecatedThreads_shouldApplyValue() throws Exception { "sample", "--table", "table", - "--threads", + "--max-threads", "12" }; ImportCommand command = new ImportCommand(); @@ -141,12 +141,77 @@ void call_withOnlyDeprecatedThreads_shouldApplyValue() throws Exception { cmd.parseArgs(args); // Verify the deprecated value was parsed - assertEquals(12, command.threadsDeprecated); + assertEquals(12, command.maxThreadsDeprecated); // Apply deprecated options (this is what the command does after validation) command.applyDeprecatedOptions(); - // Verify the value was applied to maxThreads - assertEquals(12, command.maxThreads); + // Verify the value was applied to threadCount + assertEquals(12, command.threadCount); + } + + @Test + void call_withOnlyThreads_shouldUseValue() throws Exception { + Path configFile = tempDir.resolve("config.properties"); + Files.createFile(configFile); + Path importFile = tempDir.resolve("import.json"); + Files.createFile(importFile); + + // Simulate command line parsing with only new --threads option + String[] args = { + "--config", + configFile.toString(), + "--file", + importFile.toString(), + "--namespace", + "sample", + "--table", + "table", + "--threads", + "20" + }; + ImportCommand command = new ImportCommand(); + CommandLine cmd = new CommandLine(command); + cmd.parseArgs(args); + + // Verify the value was set to threadCount + assertEquals(20, command.threadCount); + + // Verify the deprecated value was not set + assertEquals(null, command.maxThreadsDeprecated); + } + + @Test + void call_withDeprecatedShortOption_shouldApplyValue() throws Exception { + Path configFile = tempDir.resolve("config.properties"); + Files.createFile(configFile); + Path importFile = tempDir.resolve("import.json"); + Files.createFile(importFile); + + // Simulate command line parsing with deprecated short option -mt + String[] args = { + "--config", + configFile.toString(), + "--file", + importFile.toString(), + "--namespace", + "sample", + "--table", + "table", + "-mt", + "15" + }; + ImportCommand command = new ImportCommand(); + CommandLine cmd = new CommandLine(command); + cmd.parseArgs(args); + + // Verify the deprecated value was parsed + assertEquals(15, command.maxThreadsDeprecated); + + // Apply deprecated options + command.applyDeprecatedOptions(); + + // Verify the value was applied to threadCount + assertEquals(15, command.threadCount); } } diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/util/CommandLineInputUtilsTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/util/CommandLineInputUtilsTest.java index 3fd774a6d0..81ca27b6a8 100644 --- a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/util/CommandLineInputUtilsTest.java +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/util/CommandLineInputUtilsTest.java @@ -170,9 +170,9 @@ public void validatePositiveValue_differentErrorTypes_shouldUseCorrectErrorMessa CommandLine.ParameterException.class, () -> CommandLineInputUtils.validatePositiveValue( - commandLine, negativeValue, DataLoaderError.INVALID_MAX_THREADS)); + commandLine, negativeValue, DataLoaderError.INVALID_THREAD_COUNT)); assertTrue( - exception1.getMessage().contains(DataLoaderError.INVALID_MAX_THREADS.buildMessage())); + exception1.getMessage().contains(DataLoaderError.INVALID_THREAD_COUNT.buildMessage())); // Act & Assert for DATA_LOADER_INVALID_DATA_CHUNK_QUEUE_SIZE CommandLine.ParameterException exception2 = diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderError.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderError.java index 605da3bc94..f9a42d77b0 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderError.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderError.java @@ -208,8 +208,8 @@ public enum DataLoaderError implements ScalarDbError { Category.USER_ERROR, "0054", "Data chunk size must be greater than 0", "", ""), INVALID_TRANSACTION_SIZE( Category.USER_ERROR, "0055", "Transaction size must be greater than 0", "", ""), - INVALID_MAX_THREADS( - Category.USER_ERROR, "0056", "Number of max threads must be greater than 0", "", ""), + INVALID_THREAD_COUNT( + Category.USER_ERROR, "0056", "Number of threads must be greater than 0", "", ""), DEPRECATED_AND_NEW_OPTION_BOTH_SPECIFIED( Category.USER_ERROR, "0057", diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java index 359fb1f881..1db08b0988 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java @@ -34,7 +34,7 @@ public class ImportOptions { private final ControlFile controlFile; private final String namespace; private final String tableName; - private final int maxThreads; + private final int threadCount; private final String customHeaderRow; private final int dataChunkQueueSize; } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 81daf9646e..768d88c73b 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -65,12 +65,12 @@ public abstract class ImportProcessor { public void process(int dataChunkSize, int transactionBatchSize, BufferedReader reader) { ExecutorService dataChunkReaderExecutor = Executors.newSingleThreadExecutor(); ExecutorService dataChunkProcessorExecutor = - Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads()); + Executors.newFixedThreadPool(params.getImportOptions().getThreadCount()); BlockingQueue dataChunkQueue = new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); // Semaphore controls concurrent task submissions, small buffer to be two times of threads - Semaphore taskSemaphore = new Semaphore(params.getImportOptions().getMaxThreads() * 2); + Semaphore taskSemaphore = new Semaphore(params.getImportOptions().getThreadCount() * 2); // Phaser tracks task completion (start with 1 for the main thread) Phaser phaser = new Phaser(1); diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java index 5457bc51d1..069632e279 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java @@ -56,7 +56,7 @@ void setup() throws ScalarDbDaoException, TransactionException { .dataChunkSize(5) .tableName("table") .logMode(LogMode.SINGLE_FILE) - .maxThreads(8) + .threadCount(8) .dataChunkQueueSize(256) .build(); Mockito.when( diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java index d60ebecb00..7a12f25c10 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java @@ -76,7 +76,7 @@ void setUp() { tableMetadataByTableName = new HashMap<>(); tableMetadataByTableName.put("namespace.table", UnitTestUtils.createTestTableMetadata()); - when(importOptions.getMaxThreads()).thenReturn(2); + when(importOptions.getThreadCount()).thenReturn(2); when(importOptions.getDataChunkQueueSize()).thenReturn(10); when(params.getImportOptions()).thenReturn(importOptions); } @@ -147,8 +147,8 @@ void process_withEmptyData_shouldNotProcessAnyDataChunks() { @Test void process_withMultipleDataChunks_shouldUseThreadPool() { // Arrange - final int maxThreads = 4; - when(importOptions.getMaxThreads()).thenReturn(maxThreads); + final int threadCount = 4; + when(importOptions.getThreadCount()).thenReturn(threadCount); when(params.getDao()).thenReturn(dao); when(params.getDistributedStorage()).thenReturn(distributedStorage); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); @@ -173,10 +173,10 @@ void process_withMultipleDataChunks_shouldUseThreadPool() { processor.process(2, 1, reader); // Assert - // Verify that multiple threads were used but not more than maxThreads + // Verify that multiple threads were used but not more than threadCount assertTrue(processor.getMaxConcurrentThreads().get() > 1, "Should use multiple threads"); assertTrue( - processor.getMaxConcurrentThreads().get() <= maxThreads, "Should not exceed max threads"); + processor.getMaxConcurrentThreads().get() <= threadCount, "Should not exceed max threads"); // Verify that all data chunks were processed verify(eventListener, times(1)).onAllDataChunksCompleted(); @@ -202,8 +202,8 @@ void process_withInterruption_shouldShutdownGracefully() { @Test void process_withLargeNumberOfTasks_shouldWaitForAllTasksToComplete() { // Arrange - final int maxThreads = 2; - when(importOptions.getMaxThreads()).thenReturn(maxThreads); + final int threadCount = 2; + when(importOptions.getThreadCount()).thenReturn(threadCount); when(params.getDao()).thenReturn(dao); when(params.getDistributedStorage()).thenReturn(distributedStorage); when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes); diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java index a5705d3684..69b6d7db38 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java @@ -56,7 +56,7 @@ void setup() throws ScalarDbDaoException, TransactionException { .dataChunkSize(5) .tableName("table") .logMode(LogMode.SINGLE_FILE) - .maxThreads(8) + .threadCount(8) .dataChunkQueueSize(256) .build(); Mockito.when( diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java index 30992f1d35..7a299317d9 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java @@ -55,7 +55,7 @@ void setup() throws ScalarDbDaoException, TransactionException { .transactionBatchSize(1) .dataChunkSize(5) .tableName("table") - .maxThreads(8) + .threadCount(8) .dataChunkQueueSize(256) .logMode(LogMode.SINGLE_FILE) .build(); From d67e80071ad4148c6f420b79ffe9cd7ba2f0702f Mon Sep 17 00:00:00 2001 From: Peckstadt Yves Date: Thu, 6 Nov 2025 10:04:05 +0900 Subject: [PATCH 2/2] Refactoring --- .../cli/command/dataexport/ExportCommand.java | 4 +++- .../dataexport/ExportCommandOptions.java | 14 ++++++++++++++ .../cli/command/dataimport/ImportCommand.java | 2 ++ .../dataimport/ImportCommandOptions.java | 17 +++++++++++++++-- .../core/dataexport/ExportManager.java | 7 ++----- .../core/dataexport/ExportOptions.java | 2 +- 6 files changed, 37 insertions(+), 9 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index adbb5d8000..300256aae3 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -54,6 +54,8 @@ public class ExportCommand extends ExportCommandOptions implements CallableThis method is called AFTER applyDeprecatedOptions() to resolve any default values that + * depend on runtime information. For threadCount, a value of 0 indicates that the number of + * available processors should be used. + */ + public void resolveDefaults() { + // Resolve threadCount: 0 means use available processors + if (threadCount == 0) { + threadCount = Runtime.getRuntime().availableProcessors(); + } + } } diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index e52fecbb6b..baf33559bc 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -55,6 +55,8 @@ public class ImportCommand extends ImportCommandOptions implements CallableThis method is called AFTER applyDeprecatedOptions() to resolve any default values that + * depend on runtime information. For threadCount, a value of 0 indicates that the number of + * available processors should be used. + */ + public void resolveDefaults() { + // Resolve threadCount: 0 means use available processors + if (threadCount == 0) { + threadCount = Runtime.getRuntime().availableProcessors(); + } + } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index fdc27d664c..7afbbd4614 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -77,11 +77,8 @@ public ExportReport startExport( handleTransactionMetadata(exportOptions, tableMetadata); processHeader(exportOptions, tableMetadata, writer); - int maxThreadCount = - exportOptions.getMaxThreadCount() == 0 - ? Runtime.getRuntime().availableProcessors() - : exportOptions.getMaxThreadCount(); - ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount); + ExecutorService executorService = + Executors.newFixedThreadPool(exportOptions.getThreadCount()); BufferedWriter bufferedWriter = new BufferedWriter(writer); boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java index 3c7ed9ef56..518a154fba 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java @@ -21,7 +21,7 @@ public class ExportOptions { private final FileFormat outputFileFormat; private final ScanRange scanRange; private final int limit; - private final int maxThreadCount; + private final int threadCount; private final boolean prettyPrintJson; @Builder.Default private final int dataChunkSize = 200;