Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,7 @@ public Object clone() throws CloneNotSupportedException {
}
hash.quantity = quantity;
if (specifiers != null) {
hash.specifiers = new ArrayList<>();
for (PartitionSpecifier specifier : specifiers) {
hash.specifiers.add(specifier);
}
hash.specifiers = new ArrayList<>(specifiers);
}
return hash;
}
Expand Down Expand Up @@ -471,10 +468,7 @@ public Object clone() throws CloneNotSupportedException {
}
}
if (specifiers != null) {
listPartition.specifiers = new ArrayList<>();
for (ListPartitionSpecifier specifier : specifiers) {
listPartition.specifiers.add(specifier);
}
listPartition.specifiers = new ArrayList<>(specifiers);
}
return listPartition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.tajo.catalog.CatalogUtil.buildTableIdentifier;
import static org.apache.tajo.error.Errors.ResultCode.*;
Expand Down Expand Up @@ -522,9 +523,7 @@ public void addPartitions(String databaseName, String tableName, List<PartitionD
.setTableName(tableName);
builder.setTableIdentifier(identifier.build());

for (PartitionDescProto partition: partitions) {
builder.addPartitionDesc(partition);
}
partitions.forEach(builder::addPartitionDesc);
builder.setIfNotExists(ifNotExists);

ReturnState state = stub.addPartitions(null, builder.build());
Expand Down Expand Up @@ -782,10 +781,7 @@ public final Collection<IndexDesc> getAllIndexesByTable(final String databaseNam
final IndexListResponse response = stub.getAllIndexesByTable(null, proto);
ensureOk(response.getState());

List<IndexDesc> indexDescs = new ArrayList<>();
for (IndexDescProto descProto : response.getIndexDescList()) {
indexDescs.add(new IndexDesc(descProto));
}
List<IndexDesc> indexDescs = response.getIndexDescList().stream().map(IndexDesc::new).collect(Collectors.toList());
return indexDescs;
} catch (ServiceException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

import java.io.File;
import java.util.*;
import java.util.stream.Collectors;

public class HiveCatalogStore extends CatalogConstants implements CatalogStore {
protected final Log LOG = LogFactory.getLog(getClass());
Expand Down Expand Up @@ -689,11 +690,10 @@ private void renameColumn(String databaseName, String tableName, CatalogProtos.A
Table table = client.getHiveClient().getTable(databaseName, tableName);
List<FieldSchema> columns = table.getSd().getCols();

for (final FieldSchema currentColumn : columns) {
if (currentColumn.getName().equalsIgnoreCase(alterColumnProto.getOldColumnName())) {
currentColumn.setName(alterColumnProto.getNewColumnName());
}
}
columns.stream().filter(currentColumn -> currentColumn.getName()
.equalsIgnoreCase(alterColumnProto.getOldColumnName())).forEach(currentColumn -> {
currentColumn.setName(alterColumnProto.getNewColumnName());
});
client.getHiveClient().alter_table(databaseName, tableName, table);

} catch (NoSuchObjectException nsoe) {
Expand Down Expand Up @@ -744,10 +744,8 @@ private void addPartition(String databaseName, String tableName, CatalogProtos.P
params.put(StatsSetupConst.TOTAL_SIZE, Long.toString(partitionDescProto.getNumBytes()));
partition.setParameters(params);

List<String> values = Lists.newArrayList();
for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) {
values.add(keyProto.getPartitionValue());
}
List<String> values = partitionDescProto.getPartitionKeysList().stream()
.map(PartitionKeyProto::getPartitionValue).collect(Collectors.toList());
partition.setValues(values);

Table table = client.getHiveClient().getTable(databaseName, tableName);
Expand All @@ -772,10 +770,8 @@ private void dropPartition(String databaseName, String tableName, CatalogProtos.

client = clientPool.getClient();

List<String> values = Lists.newArrayList();
for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) {
values.add(keyProto.getPartitionValue());
}
List<String> values = partitionDescProto.getPartitionKeysList().stream()
.map(PartitionKeyProto::getPartitionValue).collect(Collectors.toList());
client.getHiveClient().dropPartition(databaseName, tableName, values, true);
} catch (Exception e) {
throw new TajoInternalError(e);
Expand Down Expand Up @@ -1249,10 +1245,8 @@ public void addPartitions(String databaseName, String tableName, List<CatalogPro
partition.setDbName(databaseName);
partition.setTableName(tableName);

List<String> values = Lists.newArrayList();
for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) {
values.add(keyProto.getPartitionValue());
}
List<String> values = partitionDescProto.getPartitionKeysList().stream()
.map(PartitionKeyProto::getPartitionValue).collect(Collectors.toList());
partition.setValues(values);

Table table = client.getHiveClient().getTable(databaseName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,9 +1032,7 @@ public GetPartitionsResponse getPartitionsByTableName(RpcController controller,
List<PartitionDescProto> partitions = store.getPartitionsOfTable(dbName, tbName);

GetPartitionsResponse.Builder builder = GetPartitionsResponse.newBuilder();
for (PartitionDescProto partition : partitions) {
builder.addPartition(partition);
}
partitions.forEach(builder::addPartition);

builder.setState(OK);
return builder.build();
Expand Down Expand Up @@ -1414,12 +1412,8 @@ private FunctionDescProto findFunction(String signature, List<TajoDataTypes.Data
List<FunctionDescProto> candidates = Lists.newArrayList();

if (functions.containsKey(signature)) {
for (FunctionDescProto func : functions.get(signature)) {
if (func.getSignature().getParameterTypesList() != null &&
func.getSignature().getParameterTypesList().equals(params)) {
candidates.add(func);
}
}
functions.get(signature).stream().filter(func -> func.getSignature().getParameterTypesList() != null &&
func.getSignature().getParameterTypesList().equals(params)).forEach(candidates::add);
}

/*
Expand All @@ -1432,12 +1426,8 @@ private FunctionDescProto findFunction(String signature, List<TajoDataTypes.Data
*
* */
if (functions.containsKey(signature)) {
for (FunctionDescProto func : functions.get(signature)) {
if (func.getSignature().getParameterTypesList() != null &&
CatalogUtil.isMatchedFunction(func.getSignature().getParameterTypesList(), params)) {
candidates.add(func);
}
}
functions.get(signature).stream().filter(func -> func.getSignature().getParameterTypesList() != null &&
CatalogUtil.isMatchedFunction(func.getSignature().getParameterTypesList(), params)).forEach(candidates::add);

// if there are more than one function candidates, we choose the nearest matched function.
if (candidates.size() > 0) {
Expand All @@ -1456,19 +1446,11 @@ private FunctionDescProto findFunction(String signature, FunctionType type, List

if (functions.containsKey(signature)) {
if (strictTypeCheck) {
for (FunctionDescProto func : functions.get(signature)) {
if (func.getSignature().getType() == type &&
func.getSignature().getParameterTypesList().equals(params)) {
candidates.add(func);
}
}
functions.get(signature).stream().filter(func -> func.getSignature().getType() == type &&
func.getSignature().getParameterTypesList().equals(params)).forEach(candidates::add);
} else {
for (FunctionDescProto func : functions.get(signature)) {
if (func.getSignature().getParameterTypesList() != null &&
CatalogUtil.isMatchedFunction(func.getSignature().getParameterTypesList(), params)) {
candidates.add(func);
}
}
functions.get(signature).stream().filter(func -> func.getSignature().getParameterTypesList() != null &&
CatalogUtil.isMatchedFunction(func.getSignature().getParameterTypesList(), params)).forEach(candidates::add);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.tajo.exception.UndefinedTableException;
import org.apache.tajo.catalog.proto.CatalogProtos;
Expand Down Expand Up @@ -78,12 +79,8 @@ public String getSystemDatabaseName() {
}

public List<String> getAllSystemTables() {
List<String> systemTableNames = new ArrayList<>();

for (TableDescriptor descriptor: schemaInfoTableDescriptors) {
systemTableNames.add(descriptor.getTableNameString());
}

List<String> systemTableNames = schemaInfoTableDescriptors.stream().map(TableDescriptor::getTableNameString).collect(Collectors.toList());

return systemTableNames;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.*;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class XMLCatalogSchemaManager {
protected final Log LOG = LogFactory.getLog(getClass());
Expand Down Expand Up @@ -302,16 +304,10 @@ public void upgradeBaseSchema(Connection conn, int currentVersion) {
throw new TajoInternalError("Database schema files are not loaded.");
}

final List<SchemaPatch> candidatePatches = new ArrayList<>();
final List<SchemaPatch> candidatePatches = this.catalogStore.getPatches().stream()
.filter(patch -> currentVersion >= patch.getPriorVersion()).sorted().collect(Collectors.toList());
Statement stmt;

for (SchemaPatch patch: this.catalogStore.getPatches()) {
if (currentVersion >= patch.getPriorVersion()) {
candidatePatches.add(patch);
}
}

Collections.sort(candidatePatches);

try {
stmt = conn.createStatement();
} catch (SQLException e) {
Expand Down Expand Up @@ -602,18 +598,9 @@ protected List<DatabaseObject> mergeDatabaseObjects(List<DatabaseObject> objects
unorderedObjects.add(object);
}
}

for (DatabaseObject object: orderedObjects) {
if (object != null) {
mergedObjects.add(object);
}
}

for (DatabaseObject object: unorderedObjects) {
if (object != null) {
mergedObjects.add(object);
}
}

Stream.concat(orderedObjects.stream(), unorderedObjects.stream())
.filter(object -> object != null).forEach(mergedObjects::add);

return mergedObjects;
}
Expand Down Expand Up @@ -643,75 +630,52 @@ protected void validatePatch(List<SchemaPatch> patches, SchemaPatch testPatch) {
}

protected void mergePatches(List<SchemaPatch> patches) {
final List<DatabaseObject> objects = new ArrayList<>();

Collections.sort(patches);

for (SchemaPatch patch: patches) {
patches.stream().forEachOrdered(patch -> {
validatePatch(patches, patch);

objects.clear();

List<DatabaseObject> tempObjects = new ArrayList<>();
tempObjects.addAll(patch.getObjects());
patch.clearObjects();
patch.addObjects(mergeDatabaseObjects(tempObjects));
patch.addObjects(mergeDatabaseObjects(tempObjects));

targetStore.addPatch(patch);
}
});
}

protected void validateSQLObject(List<SQLObject> queries, SQLObject testQuery) {
int occurredCount = 0;

for (SQLObject query: queries) {
if (query.getType() == testQuery.getType()) {
occurredCount++;
}
}
int occurredCount = (int) queries.stream().filter(query -> query.getType() == testQuery.getType()).count();

if (occurredCount > 1) {
throw new TajoInternalError("Duplicate Query type (" + testQuery.getType() + ") has found.");
}
}

protected void mergeExistQueries(List<SQLObject> queries) {
for (SQLObject query: queries) {
validateSQLObject(queries, query);

targetStore.addExistQuery(query);
}
}

protected void mergeDropStatements(List<SQLObject> queries) {
for (SQLObject query: queries) {
validateSQLObject(queries, query);

targetStore.addDropStatement(query);
}
}

public StoreObject merge() {
boolean alreadySetDatabaseObject = false;

// first pass
for (StoreObject store : this.storeObjects) {
copySchemaInfo(store);
}
this.storeObjects.forEach(this::copySchemaInfo);

// second pass
for (StoreObject store: this.storeObjects) {
if (store.getSchema().getVersion() == targetStore.getSchema().getVersion() &&
if (store.getSchema().getVersion() == targetStore.getSchema().getVersion() &&
!alreadySetDatabaseObject) {
BaseSchema targetSchema = targetStore.getSchema();
targetSchema.clearObjects();
targetSchema.addObjects(mergeDatabaseObjects(store.getSchema().getObjects()));

alreadySetDatabaseObject = true;
}

mergePatches(store.getPatches());
mergeExistQueries(store.getExistQueries());
mergeDropStatements(store.getDropStatements());
store.getExistQueries().forEach(query -> {
validateSQLObject(store.getExistQueries(), query);
targetStore.addExistQuery(query);
});
store.getDropStatements().forEach(query -> {
validateSQLObject(store.getDropStatements(), query);
targetStore.addDropStatement(query);
});
}

return this.targetStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ class CliClientParamsFactory {

public static Properties get(@Nullable Properties connParam) {
Properties copy = connParam == null ? new Properties() : (Properties) connParam.clone();
for (Map.Entry<String, String> entry : DEFAULT_PARAMS.entrySet()) {
if (!copy.contains(entry.getKey())) {
copy.setProperty(entry.getKey(), entry.getValue());
}
}
DEFAULT_PARAMS.entrySet().stream().filter(entry -> !copy.contains(entry.getKey())).forEach(entry -> {
copy.setProperty(entry.getKey(), entry.getValue());
});
return copy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,10 @@ public void invoke(String[] cmd) throws Exception {
Map<String, CatalogProtos.FunctionDescProto> functionMap =
new HashMap<>();

for (CatalogProtos.FunctionDescProto eachFunction: functions) {
if (!functionMap.containsKey(eachFunction.getSupplement().getShortDescription())) {
functionMap.put(eachFunction.getSupplement().getShortDescription(), eachFunction);
}
}
functions.stream().filter(eachFunction -> !functionMap.containsKey(eachFunction.getSupplement()
.getShortDescription())).forEach(eachFunction -> {
functionMap.put(eachFunction.getSupplement().getShortDescription(), eachFunction);
});

for (CatalogProtos.FunctionDescProto eachFunction: functionMap.values()) {
String signature = eachFunction.getSignature().getReturnType().getType() + " " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,8 @@ protected Collection<String> getBatchQueries(Collection<Path> paths) throws IOEx
List<String> queries = Lists.newArrayList();

for (Path p : paths) {
for (ParsedResult statement: SimpleParser.parseScript(FileUtil.readTextFile(new File(p.toUri())))) {
queries.add(statement.getStatement());
}
SimpleParser.parseScript(FileUtil.readTextFile(new File(p.toUri()))).stream()
.map(ParsedResult::getStatement).forEach(queries::add);
}

return queries;
Expand Down
Loading