Skip to content

Commit 8b419f6

Browse files
vhsu14Presto CUDF CI
authored andcommitted
[native] [presto] Expand connector specific fields (prestodb#25474)
## Description - Native changes to expand connector specific fields for thrift migration - Depends on prestodb#25242 ## Motivation and Context prestodb/rfcs#38 ## Test Plan Build package and test in verifier: 230498 ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == RELEASE NOTES == Prestissimo (Native Execution) Changes * Update thrift IDL to expand connector specific fields ```
1 parent 1b24755 commit 8b419f6

21 files changed

+539
-229
lines changed

presto-native-execution/presto_cpp/main/tests/TaskInfoTest.cpp

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,59 +18,62 @@
1818
#include "presto_cpp/main/common/tests/test_json.h"
1919
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
2020

21-
using namespace facebook;
22-
using namespace facebook::presto::protocol;
21+
using namespace facebook::presto;
2322

24-
class TaskInfoTest : public ::testing::Test {};
23+
class TaskInfoTest : public ::testing::Test {
24+
protected:
25+
void SetUp() override {
26+
registerPrestoToVeloxConnector(
27+
std::make_unique<facebook::presto::HivePrestoToVeloxConnector>("hive"));
28+
}
29+
30+
void TearDown() override {
31+
unregisterPrestoToVeloxConnector("hive");
32+
}
33+
};
2534

2635
const std::string BASE_DATA_PATH = "/github/presto-trunk/presto-native-execution/presto_cpp/main/tests/data/";
2736

2837
TEST_F(TaskInfoTest, duration) {
2938
double thrift = 0;
30-
facebook::presto::thrift::toThrift(Duration(123, TimeUnit::MILLISECONDS), thrift);
39+
thrift::toThrift(protocol::Duration(123, protocol::TimeUnit::MILLISECONDS), thrift);
3140
ASSERT_EQ(thrift, 123);
3241
}
3342

3443
TEST_F(TaskInfoTest, binaryMetadataUpdates) {
3544
std::string str = slurp(getDataPath(BASE_DATA_PATH, "MetadataUpdates.json"));
3645
json j = json::parse(str);
37-
registerPrestoToVeloxConnector(std::make_unique<facebook::presto::HivePrestoToVeloxConnector>("hive"));
38-
MetadataUpdates metadataUpdates = j;
46+
protocol::MetadataUpdates metadataUpdates = j;
3947
std::unique_ptr<std::string> thriftMetadataUpdates = std::make_unique<std::string>();
40-
facebook::presto::thrift::toThrift(metadataUpdates, *thriftMetadataUpdates);
48+
thrift::toThrift(metadataUpdates, *thriftMetadataUpdates);
4149

4250
json thriftJson = json::parse(*thriftMetadataUpdates);
4351
ASSERT_EQ(j, thriftJson);
44-
45-
presto::unregisterPrestoToVeloxConnector("hive");
4652
}
4753

4854
TEST_F(TaskInfoTest, taskInfo) {
4955
std::string str = slurp(getDataPath(BASE_DATA_PATH, "TaskInfo.json"));
5056
json j = json::parse(str);
51-
registerPrestoToVeloxConnector(std::make_unique<facebook::presto::HivePrestoToVeloxConnector>("hive"));
52-
TaskInfo taskInfo = j;
53-
facebook::presto::thrift::TaskInfo thriftTaskInfo;
54-
facebook::presto::thrift::toThrift(taskInfo, thriftTaskInfo);
57+
protocol::TaskInfo taskInfo = j;
58+
thrift::TaskInfo thriftTaskInfo;
59+
thrift::toThrift(taskInfo, thriftTaskInfo);
5560

5661
json thriftJson = json::parse(*thriftTaskInfo.metadataUpdates()->metadataUpdates());
5762
ASSERT_EQ(taskInfo.metadataUpdates, thriftJson);
5863
ASSERT_EQ(thriftTaskInfo.needsPlan(), false);
5964
ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()->size(), 2);
6065
ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()[0].bufferId()->id(), 100);
6166
ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()[1].bufferId()->id(), 200);
62-
ASSERT_EQ(thriftTaskInfo.stats()->blockedReasons()->count(facebook::presto::thrift::BlockedReason::WAITING_FOR_MEMORY), 1);
67+
ASSERT_EQ(thriftTaskInfo.stats()->blockedReasons()->count(thrift::BlockedReason::WAITING_FOR_MEMORY), 1);
6368
ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()->size(), 2);
6469
ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()["test_metric1"].sum(), 123);
6570
ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()["test_metric2"].name(), "test_metric2");
66-
67-
presto::unregisterPrestoToVeloxConnector("hive");
6871
}
6972

7073
TEST_F(TaskInfoTest, taskId) {
71-
TaskId taskId = "queryId.1.2.3.4";
72-
facebook::presto::thrift::TaskId thriftTaskId;
73-
facebook::presto::thrift::toThrift(taskId, thriftTaskId);
74+
protocol::TaskId taskId = "queryId.1.2.3.4";
75+
thrift::TaskId thriftTaskId;
76+
thrift::toThrift(taskId, thriftTaskId);
7477

7578
ASSERT_EQ(thriftTaskId.stageExecutionId()->stageId()->queryId(), "queryId");
7679
ASSERT_EQ(thriftTaskId.stageExecutionId()->stageId()->id(), 1);
@@ -83,9 +86,9 @@ TEST_F(TaskInfoTest, taskId) {
8386
TEST_F(TaskInfoTest, operatorStatsEmptyBlockedReason) {
8487
std::string str = slurp(getDataPath(BASE_DATA_PATH, "OperatorStatsEmptyBlockedReason.json"));
8588
json j = json::parse(str);
86-
OperatorStats operatorStats = j;
87-
facebook::presto::thrift::OperatorStats thriftOperatorStats;
88-
facebook::presto::thrift::toThrift(operatorStats, thriftOperatorStats);
89+
protocol::OperatorStats operatorStats = j;
90+
thrift::OperatorStats thriftOperatorStats;
91+
thrift::toThrift(operatorStats, thriftOperatorStats);
8992

9093
ASSERT_EQ(thriftOperatorStats.blockedReason().has_value(), false);
9194
ASSERT_EQ(thriftOperatorStats.blockedWall(), 80);
@@ -95,9 +98,9 @@ TEST_F(TaskInfoTest, operatorStatsEmptyBlockedReason) {
9598
TEST_F(TaskInfoTest, operatorStats) {
9699
std::string str = slurp(getDataPath(BASE_DATA_PATH, "OperatorStats.json"));
97100
json j = json::parse(str);
98-
OperatorStats operatorStats = j;
99-
facebook::presto::thrift::OperatorStats thriftOperatorStats;
100-
facebook::presto::thrift::toThrift(operatorStats, thriftOperatorStats);
101+
protocol::OperatorStats operatorStats = j;
102+
thrift::OperatorStats thriftOperatorStats;
103+
thrift::toThrift(operatorStats, thriftOperatorStats);
101104

102-
ASSERT_EQ(thriftOperatorStats.blockedReason(), facebook::presto::thrift::BlockedReason::WAITING_FOR_MEMORY);
105+
ASSERT_EQ(thriftOperatorStats.blockedReason(), thrift::BlockedReason::WAITING_FOR_MEMORY);
103106
}

presto-native-execution/presto_cpp/main/tests/TaskStatusTest.cpp

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,17 @@
1616
#include "presto_cpp/main/thrift/ProtocolToThrift.h"
1717
#include "presto_cpp/main/common/tests/test_json.h"
1818

19-
using namespace facebook;
20-
using namespace facebook::presto::protocol;
19+
using namespace facebook::presto;
2120

2221
class TaskStatusTest : public ::testing::Test {};
2322

2423
TEST_F(TaskStatusTest, lifeSpan) {
2524
std::string str = R"("Group1001")";
2625

2726
json j = json::parse(str);
28-
Lifespan lifeSpan = j;
29-
facebook::presto::thrift::Lifespan thriftLifespan;
30-
facebook::presto::thrift::toThrift(lifeSpan, thriftLifespan);
27+
protocol::Lifespan lifeSpan = j;
28+
thrift::Lifespan thriftLifespan;
29+
thrift::toThrift(lifeSpan, thriftLifespan);
3130

3231
ASSERT_EQ(thriftLifespan.grouped(), true);
3332
ASSERT_EQ(thriftLifespan.groupId(), 1001);
@@ -42,13 +41,13 @@ TEST_F(TaskStatusTest, errorCode) {
4241
})";
4342

4443
json j = json::parse(str);
45-
ErrorCode errorCode = j;
46-
facebook::presto::thrift::ErrorCode thriftErrorCode;
47-
facebook::presto::thrift::toThrift(errorCode, thriftErrorCode);
44+
protocol::ErrorCode errorCode = j;
45+
thrift::ErrorCode thriftErrorCode;
46+
thrift::toThrift(errorCode, thriftErrorCode);
4847

4948
ASSERT_EQ(thriftErrorCode.code(), 1234);
5049
ASSERT_EQ(thriftErrorCode.name(), "name");
51-
ASSERT_EQ(thriftErrorCode.type(), facebook::presto::thrift::ErrorType::INTERNAL_ERROR);
50+
ASSERT_EQ(thriftErrorCode.type(), thrift::ErrorType::INTERNAL_ERROR);
5251
ASSERT_EQ(thriftErrorCode.retriable(), false);
5352
}
5453

@@ -73,16 +72,16 @@ TEST_F(TaskStatusTest, executionFailureInfoOptionalFieldsEmpty) {
7372
})";
7473

7574
json j = json::parse(str);
76-
ExecutionFailureInfo executionFailureInfo = j;
77-
facebook::presto::thrift::ExecutionFailureInfo thriftExecutionFailureInfo;
78-
facebook::presto::thrift::toThrift(executionFailureInfo, thriftExecutionFailureInfo);
75+
protocol::ExecutionFailureInfo executionFailureInfo = j;
76+
thrift::ExecutionFailureInfo thriftExecutionFailureInfo;
77+
thrift::toThrift(executionFailureInfo, thriftExecutionFailureInfo);
7978

8079
ASSERT_EQ(thriftExecutionFailureInfo.type(), "type");
8180
ASSERT_EQ(thriftExecutionFailureInfo.errorLocation()->columnNumber(), 2);
8281
ASSERT_EQ(thriftExecutionFailureInfo.remoteHost()->hostPortString(), "localhost:8080");
83-
ASSERT_EQ(thriftExecutionFailureInfo.errorCode()->type(), facebook::presto::thrift::ErrorType::INTERNAL_ERROR);
82+
ASSERT_EQ(thriftExecutionFailureInfo.errorCode()->type(), thrift::ErrorType::INTERNAL_ERROR);
8483
ASSERT_EQ(thriftExecutionFailureInfo.errorCode()->retriable(), false);
85-
ASSERT_EQ(thriftExecutionFailureInfo.errorCause(), facebook::presto::thrift::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT);
84+
ASSERT_EQ(thriftExecutionFailureInfo.errorCause(), thrift::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT);
8685
ASSERT_EQ(thriftExecutionFailureInfo.cause(), nullptr);
8786
ASSERT_EQ(thriftExecutionFailureInfo.suppressed()->size(), 0);
8887
}
@@ -91,18 +90,18 @@ TEST_F(TaskStatusTest, executionFailureInfoOptionalFieldsNonempty) {
9190
std::string str = slurp(getDataPath("/github/presto-trunk/presto-native-execution/presto_cpp/main/tests/data/", "ExecutionFailureInfo.json"));
9291

9392
json j = json::parse(str);
94-
ExecutionFailureInfo executionFailureInfo = j;
95-
facebook::presto::thrift::ExecutionFailureInfo thriftExecutionFailureInfo;
96-
facebook::presto::thrift::toThrift(executionFailureInfo, thriftExecutionFailureInfo);
93+
protocol::ExecutionFailureInfo executionFailureInfo = j;
94+
thrift::ExecutionFailureInfo thriftExecutionFailureInfo;
95+
thrift::toThrift(executionFailureInfo, thriftExecutionFailureInfo);
9796

9897
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).type(), "cause");
99-
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCause(), facebook::presto::thrift::ErrorCause::UNKNOWN);
100-
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCode()->type(), facebook::presto::thrift::ErrorType::INSUFFICIENT_RESOURCES);
98+
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCause(), thrift::ErrorCause::UNKNOWN);
99+
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCode()->type(), thrift::ErrorType::INSUFFICIENT_RESOURCES);
101100
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCode()->retriable(), true);
102101
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].type(), "suppressed1");
103-
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCause(), facebook::presto::thrift::ErrorCause::LOW_PARTITION_COUNT);
104-
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCode()->type(), facebook::presto::thrift::ErrorType::EXTERNAL);
102+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCause(), thrift::ErrorCause::LOW_PARTITION_COUNT);
103+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCode()->type(), thrift::ErrorType::EXTERNAL);
105104
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].type(), "suppressed2");
106-
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCause(), facebook::presto::thrift::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT);
107-
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCode()->type(), facebook::presto::thrift::ErrorType::INTERNAL_ERROR);
105+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCause(), thrift::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT);
106+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCode()->type(), thrift::ErrorType::INTERNAL_ERROR);
108107
}

0 commit comments

Comments
 (0)