Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.scalar.db.api.DistributedStorageAdmin;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
Expand All @@ -28,6 +29,7 @@
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -159,16 +161,17 @@ private ImportManager createImportManager(
null,
scalarDbTransactionManager.getDistributedTransactionManager());
} else {
DatabaseConfig databaseConfig = new DatabaseConfig(configFile);
ScalarDbStorageManager scalarDbStorageManager =
new ScalarDbStorageManager(StorageFactory.create(configFile));
new ScalarDbStorageManager(new SingleCrudOperationTransactionManager(databaseConfig));
importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
ScalarDbMode.STORAGE,
scalarDbStorageManager.getDistributedStorage(),
scalarDbStorageManager.getSingleCrudOperationTransactionManager(),
null);
}
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.scalar.db.dataloader.core.dataimport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDbMode;
Expand All @@ -13,6 +12,7 @@
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager;
import java.io.BufferedReader;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -44,7 +44,7 @@ public class ImportManager implements ImportEventListener {
private final ImportProcessorFactory importProcessorFactory;
private final List<ImportEventListener> listeners = new ArrayList<>();
private final ScalarDbMode scalarDbMode;
private final DistributedStorage distributedStorage;
private final SingleCrudOperationTransactionManager singleCrudOperationTransactionManager;
private final DistributedTransactionManager distributedTransactionManager;

/**
Expand All @@ -62,7 +62,7 @@ public void startImport() {
.tableMetadataByTableName(tableMetadata)
.dao(new ScalarDbDao())
.distributedTransactionManager(distributedTransactionManager)
.distributedStorage(distributedStorage)
.singleCrudOperationTransactionManager(singleCrudOperationTransactionManager)
.tableColumnDataTypes(getTableColumnDataTypes())
.build();
ImportProcessor processor = importProcessorFactory.createImportProcessor(params);
Expand Down Expand Up @@ -169,8 +169,8 @@ public void onAllDataChunksCompleted() {
/** Close resources properly once the process is completed */
public void closeResources() {
try {
if (distributedStorage != null) {
distributedStorage.close();
if (singleCrudOperationTransactionManager != null) {
singleCrudOperationTransactionManager.close();
} else if (distributedTransactionManager != null) {
distributedTransactionManager.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Get;
import com.scalar.db.api.GetBuilder;
import com.scalar.db.api.Put;
Expand All @@ -10,12 +11,14 @@
import com.scalar.db.api.Scan;
import com.scalar.db.api.ScanBuilder;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.io.Column;
import com.scalar.db.io.Key;
import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -56,6 +59,36 @@ public Optional<Result> get(
}
}

/**
* Retrieve record from ScalarDB instance in storage mode
*
* @param namespace Namespace name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key for get
* @param manager SingleCrudOperationTransactionManager object
* @return Optional get result
* @throws ScalarDbDaoException if something goes wrong while reading the data
*/
public Optional<Result> get(
String namespace,
String table,
Key partitionKey,
Key clusteringKey,
SingleCrudOperationTransactionManager manager)
throws ScalarDbDaoException {

// Retrieving the key data for logging
String loggingKey = keysToString(partitionKey, clusteringKey);

try {
Get get = createGetWith(namespace, table, partitionKey, clusteringKey);
return manager.get(get);
} catch (CrudException e) {
throw new ScalarDbDaoException("error GET " + loggingKey, e);
}
}
Comment on lines +73 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This pull request introduces several new overloaded methods for SingleCrudOperationTransactionManager and DistributedTransactionManager (get, put, scan, createScanner). These new methods are very similar to existing ones, leading to significant code duplication. For instance, this new get method is almost identical to the one for DistributedTransaction.

Since SingleCrudOperationTransactionManager, DistributedTransactionManager, and DistributedTransaction all implement or can be treated as CrudOperable, you could refactor this to use a single generic method that accepts a CrudOperable argument. This would greatly reduce code duplication and improve the maintainability of this class.

A generic method might look something like this:

public Optional<Result> get(
    String namespace,
    String table,
    Key partitionKey,
    Key clusteringKey,
    CrudOperable operable)
    throws ScalarDbDaoException {
  // ...
}

This approach could be applied to put, scan, and createScanner methods as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the contents will be removed in another PR. I will also consider this and make necessary changes.


/**
* Retrieve record from ScalarDB instance in transaction mode
*
Expand Down Expand Up @@ -114,6 +147,34 @@ public void put(
}
}

/**
* Save record in ScalarDB instance
*
* @param namespace Namespace name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key
* @param columns List of column values to be inserted or updated
* @param manager SingleCrudOperationTransactionManager object
* @throws ScalarDbDaoException if something goes wrong while executing the transaction
*/
public void put(
String namespace,
String table,
Key partitionKey,
Key clusteringKey,
List<Column<?>> columns,
SingleCrudOperationTransactionManager manager)
throws ScalarDbDaoException {
Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns);
try {
manager.put(put);
} catch (CrudException e) {
throw new ScalarDbDaoException(
DataLoaderError.ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e);
}
}

/**
* Save record in ScalarDB instance
*
Expand Down Expand Up @@ -179,6 +240,44 @@ public List<Result> scan(
}
}

/**
* Scan a ScalarDB table
*
* @param namespace ScalarDB namespace
* @param table ScalarDB table name
* @param partitionKey Partition key used in ScalarDB scan
* @param range Optional range to set ScalarDB scan start and end values
* @param sorts Optional scan clustering key sorting values
* @param projections List of column projection to use during scan
* @param limit Scan limit value
* @param manager SingleCrudOperationTransactionManager object
* @return List of ScalarDB scan results
* @throws ScalarDbDaoException if scan fails
*/
public List<Result> scan(
String namespace,
String table,
Key partitionKey,
ScanRange range,
List<Scan.Ordering> sorts,
List<String> projections,
int limit,
SingleCrudOperationTransactionManager manager)
throws ScalarDbDaoException {

// Create scan
Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit);

// scan data
try {
return manager.scan(scan);
} catch (CrudException | NoSuchElementException e) {
// No such element Exception is thrown when the scan is done in transaction mode but
// ScalarDB is running in storage mode
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Scan a ScalarDB table
*
Expand Down Expand Up @@ -245,6 +344,60 @@ public Scanner createScanner(
}
}

/**
* Create a ScalarDB scanner instance
*
* @param namespace ScalarDB namespace
* @param table ScalarDB table name
* @param projectionColumns List of column projection to use during scan
* @param limit Scan limit value
* @param transaction Distributed transaction object
* @return ScalarDB Scanner object
* @throws ScalarDbDaoException if scan fails
*/
public TransactionManagerCrudOperable.Scanner createScanner(
String namespace,
String table,
List<String> projectionColumns,
int limit,
DistributedTransactionManager transaction)
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
try {
return transaction.getScanner(scan);
} catch (CrudException e) {
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Create a ScalarDB scanner instance
*
* @param namespace ScalarDB namespace
* @param table ScalarDB table name
* @param projectionColumns List of column projection to use during scan
* @param limit Scan limit value
* @param manager SingleCrudOperationTransactionManager object
* @return ScalarDB Scanner object
* @throws ScalarDbDaoException if scan fails
*/
public TransactionManagerCrudOperable.Scanner createScanner(
String namespace,
String table,
List<String> projectionColumns,
int limit,
SingleCrudOperationTransactionManager manager)
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
try {
return manager.getScanner(scan);
} catch (CrudException e) {
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Create a ScalarDB scanner instance
*
Expand Down Expand Up @@ -278,6 +431,71 @@ public Scanner createScanner(
}
}

/**
* Create a ScalarDB scanner instance
*
* @param namespace ScalarDB namespace
* @param table ScalarDB table name
* @param partitionKey Partition key used in ScalarDB scan
* @param scanRange Optional range to set ScalarDB scan start and end values
* @param sortOrders Optional scan clustering key sorting values
* @param projectionColumns List of column projection to use during scan
* @param limit Scan limit value
* @param manager SingleCrudOperationTransactionManager object
* @return ScalarDB Scanner object
* @throws ScalarDbDaoException if scan fails
*/
public TransactionManagerCrudOperable.Scanner createScanner(
String namespace,
String table,
@Nullable Key partitionKey,
@Nullable ScanRange scanRange,
@Nullable List<Scan.Ordering> sortOrders,
@Nullable List<String> projectionColumns,
int limit,
SingleCrudOperationTransactionManager manager)
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
try {
return manager.getScanner(scan);
} catch (CrudException e) {
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Create a ScalarDB scanner instance
*
* @param namespace ScalarDB namespace
* @param table ScalarDB table name
* @param partitionKey Partition key used in ScalarDB scan
* @param scanRange Optional range to set ScalarDB scan start and end values
* @param sortOrders Optional scan clustering key sorting values
* @param projectionColumns List of column projection to use during scan
* @param limit Scan limit value
* @param transaction Distributed transaction object
* @return ScalarDB Scanner object
*/
public TransactionManagerCrudOperable.Scanner createScanner(
String namespace,
String table,
@Nullable Key partitionKey,
@Nullable ScanRange scanRange,
@Nullable List<Scan.Ordering> sortOrders,
@Nullable List<String> projectionColumns,
int limit,
DistributedTransactionManager transaction)
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
try {
return transaction.getScanner(scan);
} catch (CrudException e) {
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Create ScalarDB scan instance
*
Expand Down
Loading
Loading