Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
083bed5
Add the list of output files and backup files to TaskAttemptContext
blrunner Feb 24, 2016
b39c8d1
Add outputFiles and backupFiles to Protocol Buffer
blrunner Feb 24, 2016
e3b26ea
Add property for setting Direct Output Committer to TajoConf and Sess…
blrunner Feb 24, 2016
9efb466
Remove related property from SessionVars
blrunner Feb 25, 2016
234f282
Add temporary codes for testing
blrunner Mar 4, 2016
7effec1
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 15, 2016
cb76276
Prefix of output file name must be the id of query.
blrunner Mar 15, 2016
dce41c6
Implement direct Output Committer to FileTablespace
blrunner Mar 16, 2016
908ccd2
Implement a method for renaming recursively directories
blrunner Mar 16, 2016
bd1e1b3
Remove proto modifications
blrunner Mar 16, 2016
95e513a
Add session variable and add more unit test cases
blrunner Mar 16, 2016
5940940
Fix unit test bug
blrunner Mar 16, 2016
ec20061
Remove unnecessary updates
blrunner Mar 16, 2016
db53589
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 18, 2016
386d4a3
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 18, 2016
d8a9e96
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 23, 2016
aa8d777
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 30, 2016
fa26f5a
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 5, 2016
2b42597
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 8, 2016
01e6606
Change the size of query state column
blrunner Apr 11, 2016
eb95336
Add catalog base vesion history
blrunner Apr 11, 2016
6d67fa0
Change the type of start time and end time
blrunner Apr 11, 2016
9a6ee7e
Implement methods for handling informs of DirectOutputCommitter
blrunner Apr 11, 2016
caa9603
Add more unit tests
blrunner Apr 11, 2016
8e9ac5e
Implement TableSpace::clearDirectOutputCommit
blrunner Apr 12, 2016
ac0797a
Update the status of query history to catalog or delete output commit…
blrunner Apr 12, 2016
e4147f5
When TajoMaster starting, check the status of DOC histories to catalog.
blrunner Apr 12, 2016
2bb3a57
After finish clear logs, update the status of DOC
blrunner Apr 12, 2016
8001939
Update tab indent
blrunner Apr 12, 2016
4e89ea0
Recover existing method for committing output data
blrunner Apr 12, 2016
049c8ce
Move the location of code blocks
blrunner Apr 12, 2016
abbd072
Remove unnecessary conditions
blrunner Apr 12, 2016
7706a9b
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 19, 2016
45e2d86
Add unit test cases for cleansing output commit results in query fail…
blrunner Apr 20, 2016
dd42a2b
Add unit test cases for DirectOutputCommitter to remove output files …
blrunner Apr 20, 2016
ba0733c
Add unit test cases for verifying the status of query history from ca…
blrunner Apr 20, 2016
3f9aceb
Write a document for DirectOutputCommitter
blrunner Apr 20, 2016
3017959
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 21, 2016
2e02a65
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 22, 2016
8cbf697
Add missing package
blrunner Apr 22, 2016
7939fc3
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 25, 2016
7dd977a
Remove default staging directory
blrunner Apr 25, 2016
358b1f0
Fix bugs of AbstractDBStore and add unit test cases about it
blrunner Apr 26, 2016
58da95a
Fix bugs and add a method for renaming files recursively
blrunner Apr 27, 2016
cf7d0c3
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 28, 2016
8cc34cd
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 29, 2016
e0cc100
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner May 8, 2016
7ebc74c
Fix bugs of unit tests
blrunner May 8, 2016
ead35f1
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner May 12, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
import org.apache.tajo.catalog.CatalogProtocol.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
Expand Down Expand Up @@ -957,4 +958,65 @@ public void updateTableStats(final UpdateTableStatsProto updateTableStatsProto)
throw new RuntimeException(e);
}
}

@Override
public void addDirectOutputCommitHistory(DirectOutputCommitHistoryProto history) throws DuplicateQueryIdException
, InsufficientPrivilegeException{
try {
final BlockingInterface stub = getStub();
final ReturnState state = stub.addDirectOutputCommitHistory(null, history);

throwsIfThisError(state, DuplicateQueryIdException.class);
throwsIfThisError(state, InsufficientPrivilegeException.class);
ensureOk(state);
} catch (ServiceException e) {
throw new RuntimeException(e);
}
}

@Override
public void updateDirectOutputCommitHistoryProto(UpdateDirectOutputCommitHistoryProto history)
throws UndefinedQueryIdException, InsufficientPrivilegeException {
try {
final BlockingInterface stub = getStub();
final ReturnState state = stub.updateDirectOutputCommitHistory(null, history);

throwsIfThisError(state, UndefinedQueryIdException.class);
throwsIfThisError(state, InsufficientPrivilegeException.class);
ensureOk(state);
} catch (ServiceException e) {
throw new RuntimeException(e);
}
}

@Override
public List<DirectOutputCommitHistoryProto> getAllDirectOutputCommitHistories() {
try {
final BlockingInterface stub = getStub();
final DirectOutputCommitHistoryListResponse response = stub.getAllDirectOutputCommitHistories(null,
ProtoUtil.NULL_PROTO);

ensureOk(response.getState());
return response.getCommitHistoryList();

} catch (ServiceException e) {
throw new RuntimeException(e);
}
}

@Override
public List<DirectOutputCommitHistoryProto> getIncompleteDirectOutputCommitHistories() {
try {
final BlockingInterface stub = getStub();
final DirectOutputCommitHistoryListResponse response = stub.getIncompleteDirectOutputCommitHistories(null,
ProtoUtil.NULL_PROTO);

ensureOk(response.getState());
return response.getCommitHistoryList();

} catch (ServiceException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,10 @@ service CatalogProtocolService {
rpc getFunctions(NullProto) returns (GetFunctionsResponse);
rpc getFunctionMeta(GetFunctionMetaRequest) returns (FunctionResponse);
rpc containFunction(ContainFunctionRequest) returns (ReturnState);

rpc addDirectOutputCommitHistory(DirectOutputCommitHistoryProto) returns (ReturnState);
rpc updateDirectOutputCommitHistory(UpdateDirectOutputCommitHistoryProto) returns (ReturnState);
rpc getAllDirectOutputCommitHistories(NullProto) returns (DirectOutputCommitHistoryListResponse);
rpc getIncompleteDirectOutputCommitHistories(NullProto) returns (DirectOutputCommitHistoryListResponse);

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class CatalogConstants {
public static final String TB_PARTITION_METHODS = "PARTITION_METHODS";
public static final String TB_PARTTIONS = "PARTITIONS";
public static final String TB_PARTTION_KEYS = "PARTITION_KEYS";
public static final String TB_DIRECT_OUTPUT_COMMIT_HISTORIES = "DIRECT_OUTPUT_COMMIT_HISTORIES";

public static final String COL_TABLESPACE_PK = "SPACE_ID";
public static final String COL_DATABASES_PK = "DB_ID";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,17 @@ void alterTable(AlterTableDesc desc)
NotImplementedException;

void updateTableStats(UpdateTableStatsProto stats) throws UndefinedTableException, InsufficientPrivilegeException;


/************************** DirectOutputCommitter *****************************/

void addDirectOutputCommitHistory(DirectOutputCommitHistoryProto history) throws DuplicateQueryIdException
, InsufficientPrivilegeException;

void updateDirectOutputCommitHistoryProto(UpdateDirectOutputCommitHistoryProto history)
throws UndefinedQueryIdException, InsufficientPrivilegeException;

List<DirectOutputCommitHistoryProto> getAllDirectOutputCommitHistories();

List<DirectOutputCommitHistoryProto> getIncompleteDirectOutputCommitHistories();
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,26 @@ message PartitionListResponse {
required ReturnState state = 1;
repeated PartitionDescProto partition = 2;
}


////////////////////////////////////////////////
// DirectOutputCommitter
////////////////////////////////////////////////

message DirectOutputCommitHistoryProto {
required string query_id = 1;
required string path = 2;
required int64 start_time = 3;
optional int64 end_time = 4;
required string query_state = 5;
}

message DirectOutputCommitHistoryListResponse {
required ReturnState state = 1;
repeated DirectOutputCommitHistoryProto commitHistory = 2;
}

message UpdateDirectOutputCommitHistoryProto {
required string query_id = 1;
required string query_state = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1294,4 +1294,24 @@ public List<TableDescriptorProto> getAllTables() {
public List<TablespaceProto> getTablespaces() {
return Lists.newArrayList(getTablespace(TajoConstants.DEFAULT_TABLESPACE_NAME));
}

@Override
public void addDirectOutputCommitHistory(DirectOutputCommitHistoryProto history) {
throw new TajoRuntimeException(new UnsupportedException());
}

@Override
public void updateDirectOutputCommitHistoryProto(UpdateDirectOutputCommitHistoryProto history) {
throw new TajoRuntimeException(new UnsupportedException());
}

@Override
public List<DirectOutputCommitHistoryProto> getAllDirectOutputCommitHistories() {
throw new TajoRuntimeException(new UnsupportedException());
}

@Override
public List<DirectOutputCommitHistoryProto> getIncompleteDirectOutputCommitHistories() {
throw new TajoRuntimeException(new UnsupportedException());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListResponse;
Expand Down Expand Up @@ -1613,6 +1614,84 @@ public ReturnState containFunction(RpcController controller, ContainFunctionRequ
return returnError(t);
}
}

@Override
public ReturnState addDirectOutputCommitHistory(RpcController controller, DirectOutputCommitHistoryProto request)
throws ServiceException {
wlock.lock();
try {
store.addDirectOutputCommitHistory(request);
return OK;

} catch (Throwable t) {
printStackTraceIfError(LOG, t);
return returnError(t);

} finally {
wlock.unlock();
}
}

@Override
public ReturnState updateDirectOutputCommitHistory(RpcController controller,
UpdateDirectOutputCommitHistoryProto request) throws ServiceException {
wlock.lock();
try {
store.updateDirectOutputCommitHistoryProto(request);
return OK;

} catch (Throwable t) {
printStackTraceIfError(LOG, t);
return returnError(t);

} finally {
wlock.unlock();
}
}

@Override
public DirectOutputCommitHistoryListResponse getAllDirectOutputCommitHistories(RpcController controller,
NullProto request) throws ServiceException {
rlock.lock();
try {
return DirectOutputCommitHistoryListResponse.newBuilder()
.setState(OK)
.addAllCommitHistory(store.getAllDirectOutputCommitHistories())
.build();

} catch (Throwable t) {
printStackTraceIfError(LOG, t);

return DirectOutputCommitHistoryListResponse.newBuilder()
.setState(returnError(t))
.build();

} finally {
rlock.unlock();
}
}

@Override
public DirectOutputCommitHistoryListResponse getIncompleteDirectOutputCommitHistories(RpcController controller,
NullProto request) throws ServiceException {
rlock.lock();
try {
return DirectOutputCommitHistoryListResponse.newBuilder()
.setState(OK)
.addAllCommitHistory(store.getIncompleteDirectOutputCommitHistories())
.build();

} catch (Throwable t) {
printStackTraceIfError(LOG, t);

return DirectOutputCommitHistoryListResponse.newBuilder()
.setState(returnError(t))
.build();

} finally {
rlock.unlock();
}
}
}

private static class FunctionSignature {
Expand Down
Loading