Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,11 @@ public class HiveCatalogStoreClient {

private HiveCatalogStoreClient(HiveConf hiveConf) {
try {
HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
@Override
public HiveMetaHook getHook(Table table) throws MetaException {
/* metadata hook implementation, or null if this
* storage handler does not need any metadata notifications
*/
return null;
}
HiveMetaHookLoader hookLoader = table -> {
/* metadata hook implementation, or null if this
* storage handler does not need any metadata notifications
*/
return null;
};

this.hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, hookLoader, HiveMetaStoreClient.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,10 @@ public GetTablespaceListResponse getAllTablespaces(RpcController controller, Nul

// retrieves tablespaces from linked meta data
tableSpaces.addAll(Collections2.transform(linkedMetadataManager.getTablespaces(),
new Function<Pair<String, URI>, TablespaceProto>() {
@Override
public TablespaceProto apply(Pair<String, URI> input) {
return TablespaceProto.newBuilder()
.setSpaceName(input.getFirst())
.setUri(input.getSecond().toString())
.build();
}
}));
input -> TablespaceProto.newBuilder()
.setSpaceName(input.getFirst())
.setUri(input.getSecond().toString())
.build()));

return GetTablespaceListResponse.newBuilder()
.setState(OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ public Collection<String> getTablespaceNames() {
*/
public Optional<Pair<String, URI>> getTablespace(final String spaceName) {
Collection<MetadataProvider> filtered = filter(providerMap.values(),
new Predicate<MetadataProvider>() {
@Override
public boolean apply(@Nullable MetadataProvider input) {
return input.getTablespaceName().equals(spaceName);
}
});
input -> input.getTablespaceName().equals(spaceName));

if (filtered.isEmpty()) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,14 +447,8 @@ protected String[] listJarResources(URL dirURL, FilenameFilter filter)
protected String[] listResources() throws IOException, URISyntaxException {
String[] files = new String[0];
URL dirURL = ClassLoader.getSystemResource(schemaPath);
FilenameFilter fileFilter = new FilenameFilter() {

@Override
public boolean accept(File dir, String name) {
return ((name.lastIndexOf('.') > -1) &&
(".xml".equalsIgnoreCase(name.substring(name.lastIndexOf('.')))));
}
};
FilenameFilter fileFilter = (dir, name) -> ((name.lastIndexOf('.') > -1) &&
(".xml".equalsIgnoreCase(name.substring(name.lastIndexOf('.')))));

if (dirURL == null) {
throw new FileNotFoundException(schemaPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,7 @@ public void testGetTablespace() throws Exception {
@Test
public void testGetTablespaces() throws Exception {
Collection<String> names = Collections2.transform(catalog.getAllTablespaces(),
new Function<CatalogProtos.TablespaceProto, String>() {
@Override
public String apply(@Nullable CatalogProtos.TablespaceProto input) {
return input.getSpaceName();
}
});
input -> input.getSpaceName());

assertEquals(Sets.newHashSet("space1", "space2", "default"), Sets.newHashSet(names));
}
Expand Down
13 changes: 5 additions & 8 deletions tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,15 +440,12 @@ private Collection<String> getKeywords() {
}

private void addShutdownHook() {
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
try {
history.flush();
} catch (IOException e) {
}
client.close();
ShutdownHookManager.get().addShutdownHook(() -> {
try {
history.flush();
} catch (IOException e) {
}
client.close();
}, SHUTDOWN_HOOK_PRIORITY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,12 +902,7 @@ private Collection<Path> getPositiveQueryFiles() throws IOException {
throw new IOException("Cannot find " + positiveQueryDir);
}

return Collections2.transform(Lists.newArrayList(fs.listStatus(positiveQueryDir)), new Function<FileStatus, Path>(){
@Override
public Path apply(@Nullable FileStatus fileStatus) {
return fileStatus.getPath();
}
});
return Collections2.transform(Lists.newArrayList(fs.listStatus(positiveQueryDir)), fileStatus -> fileStatus.getPath());
}

private Collection<Path> getNegativeQueryFiles() throws IOException {
Expand All @@ -918,12 +913,7 @@ private Collection<Path> getNegativeQueryFiles() throws IOException {
throw new IOException("Cannot find " + positiveQueryDir);
}

return Collections2.transform(Lists.newArrayList(fs.listStatus(positiveQueryDir)),new Function<FileStatus, Path>(){
@Override
public Path apply(@Nullable FileStatus fileStatus) {
return fileStatus.getPath();
}
});
return Collections2.transform(Lists.newArrayList(fs.listStatus(positiveQueryDir)), fileStatus -> fileStatus.getPath());
}

private Path getQueryFilePath(String fileName) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,12 +770,7 @@ public void testGetQueryInfoAndHistory() throws Exception {

List<ClientProtos.StageHistoryProto> taskHistories =
new ArrayList<>(queryHistory.getStageHistoriesList());
Collections.sort(taskHistories, new Comparator<StageHistoryProto>() {
@Override
public int compare(ClientProtos.StageHistoryProto o1, StageHistoryProto o2) {
return o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId());
}
});
Collections.sort(taskHistories, (o1, o2) -> o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId()));
assertEquals(8, taskHistories.get(0).getTotalReadRows());
assertEquals(1, taskHistories.get(0).getTotalWriteRows());
assertEquals(1, taskHistories.get(1).getTotalReadRows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,30 +157,27 @@ public void testExecuteQueryAsyncWithListener() throws TajoException, IOExceptio
final AtomicBoolean success = new AtomicBoolean(false);
final List<ResultSet> resultContainer = Lists.newArrayList();

future.addListener(new FutureListener<QueryFuture>() {
@Override
public void processingCompleted(QueryFuture future) {
try {
ResultSet result = future.get();
resultContainer.add(result); // for better error handling, it should be verified outside this future.

assertTrue(future.isDone());
assertEquals(QueryState.COMPLETED, future.state());
assertTrue(future.isSuccessful());
assertFalse(future.isFailed());
assertFalse(future.isKilled());
assertTrue(1.0f == future.progress());
assertEquals("default", future.queue());

assertTrue(future.submitTime() > 0);
assertTrue(future.startTime() > 0);
assertTrue(future.finishTime() > 0);

success.set(true);

} catch (Throwable t) {
throw new RuntimeException(t);
}
future.addListener(queryFuture -> {
try {
ResultSet result = queryFuture.get();
resultContainer.add(result); // for better error handling, it should be verified outside this future.

assertTrue(queryFuture.isDone());
assertEquals(QueryState.COMPLETED, queryFuture.state());
assertTrue(queryFuture.isSuccessful());
assertFalse(queryFuture.isFailed());
assertFalse(queryFuture.isKilled());
assertTrue(1.0f == queryFuture.progress());
assertEquals("default", queryFuture.queue());

assertTrue(queryFuture.submitTime() > 0);
assertTrue(queryFuture.startTime() > 0);
assertTrue(queryFuture.finishTime() > 0);

success.set(true);

} catch (Throwable t) {
throw new RuntimeException(t);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1355,12 +1355,7 @@ public void testSelectFromSelfDescTable() throws Exception {

// projection column test
List<Target> targets = projectionNode.getTargets();
Collections.sort(targets, new Comparator<Target>() {
@Override
public int compare(Target o1, Target o2) {
return o1.getCanonicalName().compareTo(o2.getCanonicalName());
}
});
Collections.sort(targets, (o1, o2) -> o1.getCanonicalName().compareTo(o2.getCanonicalName()));
assertEquals(3, targets.size());
assertEquals("default.self_desc_table1.dept", targets.get(0).getCanonicalName());
assertEquals("default.self_desc_table1.id", targets.get(1).getCanonicalName());
Expand All @@ -1370,12 +1365,7 @@ public int compare(Target o1, Target o2) {
assertEquals(NodeType.SCAN, projectionNode.getChild().getType());
ScanNode scanNode = projectionNode.getChild();
targets = scanNode.getTargets();
Collections.sort(targets, new Comparator<Target>() {
@Override
public int compare(Target o1, Target o2) {
return o1.getCanonicalName().compareTo(o2.getCanonicalName());
}
});
Collections.sort(targets, (o1, o2) -> o1.getCanonicalName().compareTo(o2.getCanonicalName()));
assertEquals(3, targets.size());
assertEquals("default.self_desc_table1.dept", targets.get(0).getCanonicalName());
assertEquals("default.self_desc_table1.id", targets.get(1).getCanonicalName());
Expand Down Expand Up @@ -1409,12 +1399,7 @@ public void testSelectWhereFromSelfDescTable() throws Exception {

// projection column test
List<Target> targets = projectionNode.getTargets();
Collections.sort(targets, new Comparator<Target>() {
@Override
public int compare(Target o1, Target o2) {
return o1.getCanonicalName().compareTo(o2.getCanonicalName());
}
});
Collections.sort(targets, (o1, o2) -> o1.getCanonicalName().compareTo(o2.getCanonicalName()));
assertEquals(2, targets.size());
assertEquals("default.self_desc_table1.dept", targets.get(0).getCanonicalName());
assertEquals("default.self_desc_table1.name", targets.get(1).getCanonicalName());
Expand All @@ -1428,12 +1413,7 @@ public int compare(Target o1, Target o2) {
assertEquals(NodeType.SCAN, selectionNode.getChild().getType());
ScanNode scanNode = selectionNode.getChild();
targets = scanNode.getTargets();
Collections.sort(targets, new Comparator<Target>() {
@Override
public int compare(Target o1, Target o2) {
return o1.getCanonicalName().compareTo(o2.getCanonicalName());
}
});
Collections.sort(targets, (o1, o2) -> o1.getCanonicalName().compareTo(o2.getCanonicalName()));
assertEquals(4, targets.size());
assertEquals("?greaterthan", targets.get(0).getCanonicalName());
assertEquals("default.self_desc_table1.dept", targets.get(1).getCanonicalName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,21 @@ protected static void createCommonTables() throws Exception {

// According to node type(leaf or non-leaf) Broadcast join is determined differently by Repartitioner.
// testMultipleBroadcastDataFileWithZeroLength testcase is for the leaf node
createMultiFile("nation", 2, new TupleCreator() {
public Tuple createTuple(String[] columnDatas) {
return new VTuple(new Datum[]{
columnDatas[0].equals("") ? NullDatum.get() : new Int4Datum(Integer.parseInt(columnDatas[0])),
columnDatas[1].equals("") ? NullDatum.get() : new TextDatum(columnDatas[1]),
columnDatas[2].equals("") ? NullDatum.get() : new Int4Datum(Integer.parseInt(columnDatas[2])),
columnDatas[3].equals("") ? NullDatum.get() : new TextDatum(columnDatas[3])
});
}
});
//<<<<<<< 1c44272bff0fc0022a1c8ce060b70d11a30c59e0
createMultiFile("nation", 2, columnDatas -> new VTuple(new Datum[]{
columnDatas[0].equals("") ? NullDatum.get() : new Int4Datum(Integer.parseInt(columnDatas[0])),
columnDatas[1].equals("") ? NullDatum.get() : new TextDatum(columnDatas[1]),
columnDatas[2].equals("") ? NullDatum.get() : new Int4Datum(Integer.parseInt(columnDatas[2])),
columnDatas[3].equals("") ? NullDatum.get() : new TextDatum(columnDatas[3])
}));
//=======
// createMultiFile("nation", 2, columnDatas -> new VTuple(new Datum[]{
// new Int4Datum(Integer.parseInt(columnDatas[0])),
// new TextDatum(columnDatas[1]),
// new Int4Datum(Integer.parseInt(columnDatas[2])),
// new TextDatum(columnDatas[3])
// }));
//>>>>>>> initial commit
addEmptyDataFile("nation_multifile", false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,43 +67,40 @@ public void testBroadcastTableCache() throws Exception {
}

private Callable<CacheHolder<Long>> createTask(final TableCacheKey key, final ExecutionBlockSharedResource resource) {
return new Callable<CacheHolder<Long>>() {
@Override
public CacheHolder<Long> call() throws Exception {
CacheHolder<Long> result;
synchronized (resource.getLock()) {
if (!TableCache.getInstance().hasCache(key)) {
final long nanoTime = System.nanoTime();
final TableStats tableStats = new TableStats();
tableStats.setNumRows(100);
tableStats.setNumBytes(1000);

final CacheHolder<Long> cacheHolder = new CacheHolder<Long>() {

@Override
public Long getData() {
return nanoTime;
}

@Override
public TableStats getTableStats() {
return tableStats;
}

@Override
public void release() {

}
};

resource.addBroadcastCache(key, cacheHolder);
}
return () -> {
CacheHolder<Long> result;
synchronized (resource.getLock()) {
if (!TableCache.getInstance().hasCache(key)) {
final long nanoTime = System.nanoTime();
final TableStats tableStats = new TableStats();
tableStats.setNumRows(100);
tableStats.setNumBytes(1000);

final CacheHolder<Long> cacheHolder = new CacheHolder<Long>() {

@Override
public Long getData() {
return nanoTime;
}

@Override
public TableStats getTableStats() {
return tableStats;
}

@Override
public void release() {

}
};

resource.addBroadcastCache(key, cacheHolder);
}

CacheHolder<?> holder = resource.getBroadcastCache(key);
result = (CacheHolder<Long>) holder;
return result;
}

CacheHolder<?> holder = resource.getBroadcastCache(key);
result = (CacheHolder<Long>) holder;
return result;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,10 @@ public QueryContext getQueryContext() {

@Override
protected boolean startQuery(final QueryId queryId, final AllocationResourceProto allocation) {
executorService.schedule(new Runnable() {
@Override
public void run() {
barrier.release();
qmAllocationMap.put(queryId, allocation);
rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
}
executorService.schedule((Runnable) () -> {
barrier.release();
qmAllocationMap.put(queryId, allocation);
rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
}, testDelay, TimeUnit.MILLISECONDS);
return true;
}
Expand Down
Loading