Skip to content

Commit 6bbe094

Browse files
vhsu14Presto CUDF CI
authored andcommitted
[native] Add thrift codec for remote split and support for ExecutionWriterTargetUnion (prestodb#25595)
## Description 1. Add thrift codec for remote split 2. Add support for ExecutionWriterTargetUnion, including expanding multiple connector specific data 3. Depends on prestodb#25242 ## Motivation and Context prestodb/rfcs#38 ## Impact <!---Describe any public API or user-facing feature change or any performance impact--> ## Test Plan - Verifier: 234471 - Performance testing ## Contributor checklist - [x] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [x] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [x] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [x] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [x] Adequate tests were added if applicable. - [ ] CI passed. ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == RELEASE NOTES == General Changes * Improve efficiency by supporting thrift codec for connector-specific data. ```
1 parent 0ad6e60 commit 6bbe094

29 files changed

+1390
-333
lines changed

presto-native-execution/presto_cpp/main/TaskResource.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
215215
protocol::TaskId taskId = pathMatch[1];
216216
bool summarize = message->hasQueryParam("summarize");
217217

218-
auto& headers = message->getHeaders();
218+
const auto& headers = message->getHeaders();
219219
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
220220
const auto sendThrift =
221221
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
@@ -392,7 +392,7 @@ proxygen::RequestHandler* TaskResource::deleteTask(
392392
message->getQueryParam(protocol::PRESTO_ABORT_TASK_URL_PARAM) == "true";
393393
}
394394
bool summarize = message->hasQueryParam("summarize");
395-
auto& headers = message->getHeaders();
395+
const auto& headers = message->getHeaders();
396396
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
397397
const auto sendThrift =
398398
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
@@ -544,7 +544,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
544544
auto currentState = getCurrentState(message);
545545
auto maxWait = getMaxWait(message);
546546

547-
auto& headers = message->getHeaders();
547+
const auto& headers = message->getHeaders();
548548
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
549549
const auto sendThrift =
550550
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
@@ -615,7 +615,7 @@ proxygen::RequestHandler* TaskResource::getTaskInfo(
615615
auto maxWait = getMaxWait(message);
616616
bool summarize = message->hasQueryParam("summarize");
617617

618-
auto& headers = message->getHeaders();
618+
const auto& headers = message->getHeaders();
619619
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
620620
const auto sendThrift =
621621
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;

presto-native-execution/presto_cpp/main/tests/TaskUpdateRequestTest.cpp

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <gtest/gtest.h>
1616
#include "presto_cpp/main/thrift/ProtocolToThrift.h"
17+
#include "presto_cpp/main/thrift/ThriftIO.h"
1718
#include "presto_cpp/main/common/tests/test_json.h"
1819
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
1920

@@ -100,7 +101,7 @@ TEST_F(TaskUpdateRequestTest, mapOutputBuffers) {
100101
ASSERT_EQ(outputBuffers.buffers["2"], 20);
101102
}
102103

103-
TEST_F(TaskUpdateRequestTest, binarySplitFromThrift) {
104+
TEST_F(TaskUpdateRequestTest, binaryHiveSplitFromThrift) {
104105
thrift::Split thriftSplit;
105106
thriftSplit.connectorId()->catalogName_ref() = "hive";
106107
thriftSplit.transactionHandle()->jsonValue_ref() = R"({
@@ -127,14 +128,89 @@ TEST_F(TaskUpdateRequestTest, binarySplitFromThrift) {
127128
protocol::NodeSelectionStrategy::NO_PREFERENCE);
128129
}
129130

130-
TEST_F(TaskUpdateRequestTest, binaryTableWriteInfo) {
131-
std::string str = slurp(getDataPath(BASE_DATA_PATH, "TableWriteInfo.json"));
132-
protocol::TableWriteInfo tableWriteInfo;
131+
TEST_F(TaskUpdateRequestTest, binaryRemoteSplitFromThrift) {
132+
thrift::Split thriftSplit;
133+
thrift::RemoteTransactionHandle thriftTransactionHandle;
134+
thrift::RemoteSplit thriftRemoteSplit;
135+
136+
thriftSplit.connectorId()->catalogName_ref() = "$remote";
137+
thriftSplit.transactionHandle()->customSerializedValue_ref() =
138+
thriftWrite(thriftTransactionHandle);
139+
140+
thriftRemoteSplit.location()->location_ref() = "/test_location";
141+
thriftRemoteSplit.remoteSourceTaskId()->id_ref() = 100;
142+
thriftRemoteSplit.remoteSourceTaskId()->attemptNumber_ref() = 200;
143+
thriftRemoteSplit.remoteSourceTaskId()->stageExecutionId()->id_ref() = 300;
144+
thriftRemoteSplit.remoteSourceTaskId()->stageExecutionId()->stageId()->id_ref() = 400;
145+
thriftRemoteSplit.remoteSourceTaskId()->stageExecutionId()->stageId()->queryId_ref() = "test_query_id";
146+
147+
thriftSplit.connectorSplit()->connectorId_ref() = "$remote";
148+
thriftSplit.connectorSplit()->customSerializedValue_ref() =
149+
thriftWrite(thriftRemoteSplit);
150+
151+
protocol::Split split;
152+
thrift::fromThrift(thriftSplit, split);
153+
154+
// Verify that connector specific fields are set correctly with thrift codec
155+
auto remoteSplit = std::dynamic_pointer_cast<protocol::RemoteSplit>(
156+
split.connectorSplit);
157+
ASSERT_EQ((remoteSplit->location).location, "/test_location");
158+
ASSERT_EQ(remoteSplit->remoteSourceTaskId, "test_query_id.400.300.100.200");
159+
}
160+
161+
TEST_F(TaskUpdateRequestTest, unionExecutionWriterTargetFromThrift) {
162+
// Construct ExecutionWriterTarget with CreateHandle
163+
thrift::CreateHandle thriftCreateHandle;
164+
thrift::ExecutionWriterTargetUnion thriftWriterTarget;
165+
thriftCreateHandle.schemaTableName()->schema_ref() = "test_schema";
166+
thriftCreateHandle.schemaTableName()->table_ref() = "test_table";
167+
thriftCreateHandle.handle()->connectorId()->catalogName_ref() = "hive";
168+
thriftCreateHandle.handle()->transactionHandle()->jsonValue_ref() = R"({
169+
"@type": "hive",
170+
"uuid": "8a4d6c83-60ee-46de-9715-bc91755619fa"
171+
})";
172+
thriftCreateHandle.handle()->connectorHandle()->jsonValue_ref() = slurp(getDataPath(BASE_DATA_PATH, "HiveOutputTableHandle.json"));;
173+
thriftWriterTarget.set_createHandle(std::move(thriftCreateHandle));
174+
175+
// Convert from thrift to protocol and verify fields
176+
auto writerTarget = std::make_shared<protocol::ExecutionWriterTarget>();
177+
thrift::fromThrift(thriftWriterTarget, writerTarget);
178+
179+
ASSERT_EQ(writerTarget->_type, "CreateHandle");
180+
auto createHandle = std::dynamic_pointer_cast<protocol::CreateHandle>(writerTarget);
181+
ASSERT_NE(createHandle, nullptr);
182+
ASSERT_EQ(createHandle->schemaTableName.schema, "test_schema");
183+
ASSERT_EQ(createHandle->schemaTableName.table, "test_table");
184+
185+
auto* hiveTxnHandle = dynamic_cast<protocol::hive::HiveTransactionHandle*>(createHandle->handle.transactionHandle.get());
186+
ASSERT_NE(hiveTxnHandle, nullptr);
187+
ASSERT_EQ(hiveTxnHandle->uuid, "8a4d6c83-60ee-46de-9715-bc91755619fa");
188+
189+
auto* hiveOutputTableHandle = dynamic_cast<protocol::hive::HiveOutputTableHandle*>(createHandle->handle.connectorHandle.get());
190+
ASSERT_NE(hiveOutputTableHandle, nullptr);
191+
ASSERT_EQ(hiveOutputTableHandle->schemaName, "test_schema");
192+
ASSERT_EQ(hiveOutputTableHandle->tableName, "test_table");
193+
ASSERT_EQ(hiveOutputTableHandle->tableStorageFormat, protocol::hive::HiveStorageFormat::ORC);
194+
ASSERT_EQ(hiveOutputTableHandle->locationHandle.targetPath, "/path/to/target");
195+
}
196+
197+
TEST_F(TaskUpdateRequestTest, unionExecutionWriterTargetToThrift) {
198+
// Construct thrift ExecutionWriterTarget with CreateHandle
199+
auto createHandle = std::make_shared<protocol::CreateHandle>();
200+
createHandle->schemaTableName.schema = "test_schema";
201+
createHandle->schemaTableName.table = "test_table";
202+
203+
auto writerTarget = std::make_shared<protocol::ExecutionWriterTarget>();
204+
writerTarget->_type = "CreateHandle";
205+
writerTarget = createHandle;
133206

134-
thrift::fromThrift(str, tableWriteInfo);
135-
auto hiveTableHandle = std::dynamic_pointer_cast<protocol::hive::HiveTableHandle>((*tableWriteInfo.analyzeTableHandle).connectorHandle);
136-
ASSERT_EQ(hiveTableHandle->tableName, "test_table");
137-
ASSERT_EQ(hiveTableHandle->analyzePartitionValues->size(), 2);
207+
// Convert to thrift and verify fields. Note that toThrift functions for connector fields are not implemented.
208+
thrift::ExecutionWriterTargetUnion thriftWriterTarget;
209+
thrift::toThrift(writerTarget, thriftWriterTarget);
210+
ASSERT_TRUE(thriftWriterTarget.createHandle_ref().has_value());
211+
const auto& thriftCreateHandle = thriftWriterTarget.createHandle_ref().value();
212+
ASSERT_EQ(thriftCreateHandle.schemaTableName()->schema_ref().value(), "test_schema");
213+
ASSERT_EQ(thriftCreateHandle.schemaTableName()->table_ref().value(), "test_table");
138214
}
139215

140216
TEST_F(TaskUpdateRequestTest, fragment) {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"@type": "hive",
3+
"schemaName": "test_schema",
4+
"tableName": "test_table",
5+
"inputColumns": [],
6+
"pageSinkMetadata": {
7+
"schemaTableName": {
8+
"schema": "test_schema",
9+
"table": "test_table"
10+
},
11+
"modifiedPartitions": {}
12+
},
13+
"locationHandle": {
14+
"targetPath": "/path/to/target",
15+
"writePath": "/path/to/write",
16+
"tableType": "NEW",
17+
"writeMode": "STAGE_AND_MOVE_TO_TARGET_DIRECTORY"
18+
},
19+
"tableStorageFormat": "ORC",
20+
"partitionStorageFormat": "ORC",
21+
"actualStorageFormat": "ORC",
22+
"compressionCodec": "NONE",
23+
"partitionedBy": [],
24+
"preferredOrderingColumns": [],
25+
"tableOwner": "owner_name",
26+
"additionalTableParameters": {}
27+
}

presto-native-execution/presto_cpp/main/tests/data/TableWriteInfo.json

Lines changed: 0 additions & 18 deletions
This file was deleted.

presto-native-execution/presto_cpp/main/thrift/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
1212

13+
.PHONY: presto_protocol-to-thrift-json.json presto_thrift.json
1314
all: ProtocolToThrift.h ProtocolToThrift.cpp
1415

1516
ProtocolToThrift.h: ProtocolToThrift-hpp.mustache presto_protocol-to-thrift-json.json
@@ -27,4 +28,3 @@ presto_protocol-to-thrift-json.json: presto_protocol-to-thrift-json.py presto_pr
2728

2829
presto_thrift.json: presto_thrift.thrift ./thrift2json.py
2930
./thrift2json.py presto_thrift.thrift | jq . > presto_thrift.json
30-

presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift-cpp.mustache

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
{{/.}}
2121

2222
#include "presto_cpp/main/thrift/ProtocolToThrift.h"
23+
#include "presto_cpp/main/thrift/ThriftIO.h"
2324
#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h"
2425

2526
namespace facebook::presto::thrift {
@@ -190,13 +191,69 @@ void fromThrift(const std::map<K1, V1>& thriftMap, std::map<K2, V2>& protoMap) {
190191
{{&cinc}}
191192
{{/cinc}}
192193
{{^cinc}}
194+
{{#connector}}
195+
void toThrift(const std::shared_ptr<facebook::presto::protocol::{{class_name}}>& proto, {{class_name}}& thrift) {
196+
}
197+
void fromThrift(const {{class_name}}& thrift, std::shared_ptr<facebook::presto::protocol::{{class_name}}>& proto) {
198+
if (thrift.connectorId().has_value() && thrift.customSerializedValue().has_value()) {
199+
facebook::presto::protocol::getConnectorProtocol(thrift.connectorId().value())
200+
.deserialize(thrift.customSerializedValue().value(), proto);
201+
} else if (thrift.jsonValue().has_value()) {
202+
json j = json::parse(thrift.jsonValue().value());
203+
from_json(j, proto);
204+
}
205+
}
206+
207+
{{/connector}}
208+
{{#union}}
209+
void toThrift(
210+
const std::shared_ptr<facebook::presto::protocol::{{proto_name}}>& proto,
211+
apache::thrift::optional_field_ref<{{class_name}}&> thrift) {
212+
if (proto) {
213+
thrift.ensure();
214+
toThrift(proto, *thrift);
215+
}
216+
}
217+
void toThrift(
218+
const std::shared_ptr<facebook::presto::protocol::{{proto_name}}>& proto,
219+
{{class_name}}& thrift) {
220+
{{#fields}}
221+
if (auto {{field_name}} =
222+
std::dynamic_pointer_cast<facebook::presto::protocol::{{field_type}}>(proto)) {
223+
{{field_type}} thrift{{field_type}};
224+
toThrift(*{{field_name}}, thrift{{field_type}});
225+
thrift.set_{{field_name}}(std::move(thrift{{field_type}}));
226+
}
227+
{{/fields}}
228+
}
229+
void fromThrift(
230+
apache::thrift::optional_field_ref<const {{class_name}}&> thrift,
231+
std::shared_ptr<facebook::presto::protocol::{{proto_name}}>& proto) {
232+
if (thrift.has_value()) {
233+
proto = std::make_shared<facebook::presto::protocol::{{proto_name}}>();
234+
fromThrift(thrift.value(), proto);
235+
}
236+
}
237+
void fromThrift(
238+
const {{class_name}}& thrift,
239+
std::shared_ptr<facebook::presto::protocol::{{proto_name}}>& proto) {
240+
{{#fields}}
241+
if (thrift.getType() == {{class_name}}::Type::{{field_name}}) {
242+
std::shared_ptr<facebook::presto::protocol::{{#proto_field_type}}{{proto_field_type}}{{/proto_field_type}}{{^proto_field_type}}{{field_type}}{{/proto_field_type}}> {{field_name}};
243+
fromThrift(thrift.get_{{field_name}}(), {{field_name}});
244+
proto = {{field_name}};
245+
}
246+
{{/fields}}
247+
}
248+
249+
{{/union}}
193250
{{#struct}}
194251
void toThrift(const facebook::presto::protocol::{{class_name}}& proto, {{&class_name}}& thrift) {
195252
{{#fields}}
196253
toThrift(proto.{{proto_name}}, {{^optional}}*{{/optional}}thrift.{{field_name}}_ref());
197254
{{/fields}}
198255
}
199-
void fromThrift(const {{&class_name}}& thrift, facebook::presto::protocol::{{class_name}}& proto) {
256+
void fromThrift(const {{&class_name}}& thrift, facebook::presto::protocol::{{#proto_name}}{{proto_name}}{{/proto_name}}{{^proto_name}}{{class_name}}{{/proto_name}}& proto) {
200257
{{#fields}}
201258
fromThrift({{^optional}}*{{/optional}}thrift.{{field_name}}_ref(), proto.{{proto_name}});
202259
{{/fields}}
@@ -216,11 +273,11 @@ void fromThrift(const {{class_name}}& thrift, facebook::presto::protocol::{{clas
216273
}
217274
{{/wrapper}}
218275
{{#enum}}
219-
void toThrift(const facebook::presto::protocol::{{class_name}}& proto, {{class_name}}& thrift) {
276+
void toThrift(const facebook::presto::protocol::{{proto_name}}& proto, {{class_name}}& thrift) {
220277
thrift = ({{class_name}})(static_cast<int>(proto));
221278
}
222-
void fromThrift(const {{class_name}}& thrift, facebook::presto::protocol::{{class_name}}& proto) {
223-
proto = (facebook::presto::protocol::{{class_name}})(static_cast<int>(thrift));
279+
void fromThrift(const {{class_name}}& thrift, facebook::presto::protocol::{{proto_name}}& proto) {
280+
proto = (facebook::presto::protocol::{{proto_name}})(static_cast<int>(thrift));
224281
}
225282

226283
{{/enum}}

presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift-hpp.mustache

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,28 @@ void fromThrift(const double& thrift, facebook::presto::protocol::DataSize& data
3636
{{&hinc}}
3737
{{/hinc}}
3838
{{^hinc}}
39+
{{#connector}}
40+
void toThrift(const std::shared_ptr<facebook::presto::protocol::{{class_name}}>& proto, {{class_name}}& thrift);
41+
void fromThrift(const {{class_name}}& thrift, std::shared_ptr<facebook::presto::protocol::{{class_name}}>& proto);
42+
43+
{{/connector}}
44+
{{#union}}
45+
void toThrift(
46+
const std::shared_ptr<facebook::presto::protocol::{{proto_name}}>& proto,
47+
apache::thrift::optional_field_ref<{{class_name}}&> thrift);
48+
void toThrift(
49+
const std::shared_ptr<facebook::presto::protocol::{{proto_name}}>& proto,
50+
{{class_name}}& thrift);
51+
void fromThrift(
52+
apache::thrift::optional_field_ref<const {{class_name}}&> thrift,
53+
std::shared_ptr<facebook::presto::protocol::{{proto_name}}>& proto);
54+
void fromThrift(
55+
const {{class_name}}& thrift,
56+
std::shared_ptr<facebook::presto::protocol::{{proto_name}}>& proto);
57+
{{/union}}
3958
{{#struct}}
40-
void toThrift(const facebook::presto::protocol::{{class_name}}& proto, {{class_name}}& thrift);
41-
void fromThrift(const {{&class_name}}& thrift, facebook::presto::protocol::{{class_name}}& proto);
59+
void toThrift(const facebook::presto::protocol::{{class_name}}& proto, {{&class_name}}& thrift);
60+
void fromThrift(const {{&class_name}}& thrift, facebook::presto::protocol::{{#proto_name}}{{proto_name}}{{/proto_name}}{{^proto_name}}{{class_name}}{{/proto_name}}& proto);
4261

4362
{{/struct}}
4463
{{#wrapper}}

0 commit comments

Comments
 (0)