-
Notifications
You must be signed in to change notification settings - Fork 31
Add RFC to add thrift support for task status/updateRequest/info #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,325 @@ | ||
# **RFC0 for Presto** | ||
|
||
See [CONTRIBUTING.md](CONTRIBUTING.md) for instructions on creating your RFC and the process surrounding it. | ||
|
||
## [Thrift Serialization for TaskStatus, TaskInfo and TaskUpdateRequest] | ||
|
||
Proposers | ||
|
||
* Shang Ma | ||
* Vivian Hsu | ||
|
||
## [Related Issues] | ||
|
||
Related issues may include Github issues, PRs or other RFCs. | ||
- prestodb/presto#25020 | ||
- prestodb/presto#25079 | ||
|
||
## Summary | ||
|
||
Add support of thrift serialization for TaskStatus, TaskInfo, and TaskUpdateRequest classes for getTaskStatus and createOrUpdateTask APIs for both Java and C++ worker types to reduce CPU overhead | ||
|
||
## Background | ||
|
||
Presto coordinator sends updates to workers and workers respond with taskInfo. Both the taskUpdateRequest and taskInfo are currently serialized using JSON, which can be CPU intensive. And in the case of high task concurrency, this can become a bottleneck for the coordinator which in turn becomes a bottleneck for the whole cluster. | ||
|
||
|
||
### Goals | ||
1. Support thrift serde for TaskStatus, TaskInfo, and TaskRequestUpdate classes for both Java and C++ workers | ||
2. Maintain backward compatibility with existing JSON serialization | ||
3. Allow custom codec implementation for different connector through codec providers | ||
4. Deprecate legacy metaDataUpdate and connectorTypeSerde | ||
5. Use drift IDL generator to produce the IDL file and use it to generate c++ classes for native workers | ||
* The final IDL for all thrift structs needed for taskStatus, taskInfo, and taskUpdateRequest can be automatically generated by building the "presto-thrift-spec" module in presto repo. This module will also be automatically module while building presto. | ||
* For cpp worker, there will be an extra step to generate some utility codes by using presto-native-execution/presto_cpp/main/thrift/Makefile and run a `make` command | ||
6. Allow multiple serialization formats to coexist | ||
7. Support future serialization formats without SPI changes | ||
8. Allow gradual migration from current design to new design | ||
|
||
|
||
## Proposed Implementation | ||
|
||
### Disclaimer: Pseudo code and will be different in real implementation. | ||
### Current Architecture for Json Serde | ||
```java | ||
|
||
// Use jackson annotation | ||
public class Split { | ||
@JsonProperty | ||
private final ConnectorSplit connectorSplit; | ||
// ... other fields and methods | ||
@JsonCreator | ||
public Split(...); | ||
} | ||
``` | ||
|
||
#### For Polymorphic Types e.g. ConnectorSplit | ||
```java | ||
|
||
public class HandleJsonModule | ||
implements Module | ||
{ | ||
@Override | ||
public void configure(Binder binder) | ||
{ | ||
jsonBinder(binder).addModuleBinding().to(TableHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(TableLayoutHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(ColumnHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(DeleteTableHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(FunctionHandleJacksonModule.class); | ||
jsonBinder(binder).addModuleBinding().to(MetadataUpdateJacksonModule.class); | ||
|
||
binder.bind(HandleResolver.class).in(Scopes.SINGLETON); | ||
} | ||
} | ||
|
||
// A handle resolver to return the correct type info in runtime | ||
public HandleResolver() | ||
{ | ||
handleResolvers.put(REMOTE_CONNECTOR_ID.toString(), new MaterializedHandleResolver(new RemoteHandleResolver())); | ||
handleResolvers.put("$system", new MaterializedHandleResolver(new SystemHandleResolver())); | ||
handleResolvers.put("$info_schema", new MaterializedHandleResolver(new InformationSchemaHandleResolver())); | ||
handleResolvers.put("$empty", new MaterializedHandleResolver(new EmptySplitHandleResolver())); | ||
|
||
functionHandleResolvers.put("$static", new MaterializedFunctionHandleResolver(new BuiltInFunctionNamespaceHandleResolver())); | ||
functionHandleResolvers.put("$session", new MaterializedFunctionHandleResolver(new SessionFunctionHandleResolver())); | ||
} | ||
|
||
// Register correct serde methods for different types | ||
protected AbstractTypedJacksonModule( | ||
Class<T> baseClass, | ||
Function<T, String> nameResolver, | ||
Function<String, Class<? extends T>> classResolver) | ||
{ | ||
super(baseClass.getSimpleName() + "Module", Version.unknownVersion()); | ||
|
||
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver); | ||
|
||
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver)); | ||
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver)); | ||
} | ||
|
||
// type information | ||
private static final String TYPE_PROPERTY = "@type"; | ||
|
||
public InternalTypeSerializer(Class<T> baseClass, TypeIdResolver typeIdResolver) | ||
{ | ||
super(baseClass); | ||
this.typeSerializer = new AsPropertyTypeSerializer(typeIdResolver, null, TYPE_PROPERTY); | ||
} | ||
|
||
public InternalTypeDeserializer(Class<T> baseClass, TypeIdResolver typeIdResolver) | ||
{ | ||
super(baseClass); | ||
this.typeDeserializer = new AsPropertyTypeDeserializer( | ||
TypeFactory.defaultInstance().constructType(baseClass), | ||
typeIdResolver, | ||
TYPE_PROPERTY, | ||
false, | ||
null); | ||
} | ||
|
||
``` | ||
|
||
### Preferred Option: Pluggable Serde for Polymorphic Types | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shangm2 : Would be great to see an example of what the native code corresponding to this would look like. Can you add some code-sample for that ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @aditi-pandit Apology for the slow response. Here is a code change for the native worker by @vhsu14 prestodb/presto#25079. Let us know if you have any questions. |
||
|
||
```java | ||
// In presto-spi, an interface for connector-specific data serde | ||
public interface ConnectorCodec<T> | ||
{ | ||
byte[] serialize(T value); | ||
|
||
T deserialize(byte[] bytes); | ||
} | ||
|
||
// A codec provider interface so that different connectors can provide their custom way for serde | ||
public interface ConnectorCodecProvider | ||
{ | ||
default Optional<ConnectorCodec<ConnectorSplit>> getConnectorSplitCodec() | ||
{ | ||
return Optional.empty(); | ||
} | ||
|
||
default Optional<ConnectorCodec<ConnectorTransactionHandle>> getConnectorTransactionHandleCodec() | ||
{ | ||
return Optional.empty(); | ||
} | ||
|
||
default Optional<ConnectorCodec<ConnectorOutputTableHandle>> getConnectorOutputTableHandleCodec() | ||
{ | ||
return Optional.empty(); | ||
} | ||
|
||
default Optional<ConnectorCodec<ConnectorInsertTableHandle>> getConnectorInsertTableHandleCodec() | ||
{ | ||
return Optional.empty(); | ||
} | ||
|
||
default Optional<ConnectorCodec<ConnectorDeleteTableHandle>> getConnectorDeleteTableHandleCodec() | ||
{ | ||
return Optional.empty(); | ||
} | ||
|
||
default Optional<ConnectorCodec<ConnectorTableLayoutHandle>> getConnectorTableLayoutHandleCodec() | ||
{ | ||
return Optional.empty(); | ||
} | ||
|
||
default Optional<ConnectorCodec<ConnectorTableHandle>> getConnectorTableHandleCodec() | ||
{ | ||
return Optional.empty(); | ||
} | ||
} | ||
|
||
// Similar to json, this is where connector-specific thrift codec get registered. | ||
public class HandleThriftModule | ||
implements Module | ||
{ | ||
@Override | ||
public void configure(Binder binder) | ||
{ | ||
thriftCodecBinder(binder).bindCustomThriftCodec(ConnectorSplitThriftCodec.class); | ||
thriftCodecBinder(binder).bindCustomThriftCodec(TransactionHandleThriftCodec.class); | ||
thriftCodecBinder(binder).bindCustomThriftCodec(OutputTableHandleThriftCodec.class); | ||
thriftCodecBinder(binder).bindCustomThriftCodec(InsertTableHandleThriftCodec.class); | ||
thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class); | ||
thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class); | ||
thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class); | ||
|
||
jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class); | ||
jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class); | ||
jsonCodecBinder(binder).bindJsonCodec(ConnectorOutputTableHandle.class); | ||
jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class); | ||
jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class); | ||
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class); | ||
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class); | ||
|
||
binder.bind(HandleResolver.class).in(Scopes.SINGLETON); | ||
} | ||
} | ||
|
||
// An example: tpcdsSplitCodec | ||
public class TpcdsSplitCodec | ||
implements ConnectorCodec<ConnectorSplit> | ||
{ | ||
private final ThriftCodec<TpcdsSplit> thriftCodec; | ||
|
||
public TpcdsSplitCodec(ThriftCodecManager thriftCodecManager) | ||
{ | ||
this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(TpcdsSplit.class); | ||
} | ||
|
||
@Override | ||
public byte[] serialize(ConnectorSplit split) | ||
{ | ||
try { | ||
return toThrift((TpcdsSplit) split, thriftCodec); | ||
} | ||
catch (TProtocolException e) { | ||
throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize tpcds split", e); | ||
} | ||
} | ||
|
||
@Override | ||
public ConnectorSplit deserialize(byte[] bytes) | ||
{ | ||
try { | ||
return fromThrift(bytes, thriftCodec); | ||
} | ||
catch (TProtocolException e) { | ||
throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize tpcds split", e); | ||
} | ||
} | ||
} | ||
``` | ||
|
||
#### Pros | ||
- Each connector can choose its own serialization format | ||
- The internal details of how serialization is handled are hidden | ||
- Connectors can evolve their serialization format independently | ||
- New connectors can adopt newer, more efficient serialization formats without waiting for the entire system to upgrade | ||
- Existing connectors can migrate to better formats without forcing other connectors to change | ||
- Performance optimizations can be made on a per-connector basis | ||
|
||
### Q & A | ||
1. What modules are involved | ||
* presto-spi | ||
* presto-main-base | ||
* presto-main | ||
2. Any new terminologies/concepts/SQL language additions | ||
* N/A | ||
3. Method/class/interface contracts which you deem fit for implementation. | ||
* See above code example | ||
4. Code flow using bullet points or pseudo code as applicable | ||
* See above code example | ||
5. Any new user facing metrics that can be shown on CLI or UI. | ||
* N/A | ||
|
||
## Metrics | ||
|
||
How can we measure the impact of this feature? | ||
1. taskUpdateSerializedCpuNanos | ||
2. taskUpdateDeliveredWallTimeNanos | ||
3. CPU usage for task update serde | ||
|
||
## Adoption Plan | ||
|
||
### Rollout | ||
* As the first step, we will use drift to annotate all primitive types within those 3 classes mentioned before while keep complicated data types, e.g. Split, MetadataUpdate, TableWriteInfo as json | ||
* During the second step, we will add related interfaces/methods to support connector-specific fields but leave the concrete implementation to the user of the connector. | ||
|
||
- What impact (if any) will there be on existing users? Are there any new session parameters, configurations, SPI updates, client API updates, or SQL grammar? | ||
* the thrift serde will be disabled by default and can be enabled by a config | ||
- If we are changing behaviour how will we phase out the older behaviour? | ||
* We will NOT maintain two serde scheme at the same time. Basically, if the feature toggle for thrift serde is enabled, only thrift serde will be available for APIs including getTaskStatus and createOrUpdateTask. And if the thrift is not working, we should let the system fail so that the developer can get a clean signal. But the json serde will still be working if the feature toggle is disabled. | ||
- If we need special migration tools, describe them here. | ||
* N/A | ||
- When will we remove the existing behaviour, if applicable. | ||
* N/A | ||
- How should this feature be taught to new and existing users? Basically mention if documentation changes/new blog are needed? | ||
* This feature will be documented in the Presto documentation. | ||
- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC? | ||
* N/A | ||
- What are the commands/instructions to generate the thrift idl and use it for cpp workers? | ||
* To generate the .idl file, run `mvn clean install -X -e -DskipTests -pl presto-thrift-spec` and the result will be at "presto-thrift-spec/target/thrift/presto-thrift-protocol.thrift" and then move the .idl file to "presto-native-execution/presto_cpp/main/thrift/presto_thrift.thrift" | ||
* To update the protocol for cpp worker: | ||
* Follow the instructions in the [Presto Protocol README](https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/presto_protocol/README.md) to generate the protocol code. This process produces the presto_protocol_core.json file, which is required for generating the thrift conversion code. | ||
``` | ||
cd presto/presto-native-execution/presto_cpp/presto_protocol | ||
make presto_protocol | ||
``` | ||
* Run the following commands to generate ProtocolToThrift.h and ProtocolToThrift.cpp. These files enable conversion between internal classes and the C++ classes generated by the thrift IDL. Do not edit these files manually. For more details on the generation process, see the [Thrift README](https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/thrift/README.md). | ||
``` | ||
cd presto/presto-native-execution/presto_cpp/main/thrift | ||
make | ||
``` | ||
|
||
|
||
## Test Plan | ||
|
||
How do we ensure the feature works as expected? Mention if any functional tests/integration tests are needed. Special mention for product-test changes. If any PoC has been done already, please mention the relevant test results here that you think will bolster your case of getting this RFC approved. | ||
|
||
- A PoC for step 1 about primitive type can be found from the following 2 PRs: | ||
* prestodb/presto#25020 | ||
* prestodb/presto#25079 | ||
* prestodb/presto#25242 | ||
|
||
## Q & A | ||
- If we're using Thrift, why can't we just use either the Drift IDL generator, or have both Java and C++ auto-generate the Thrift structures from a checked in IDL? | ||
- Answer: We are using Drift to annotate classes in Java and generate an IDL for C++ to auto-gen the Thrift structures. Using a checked-in IDL has some cons in Java: | ||
- While we are in the process of switching, we need to maintain two sets of classes and write toThrift/ fromThrift to convert the objects. This requires huge amounts of effort, can be prone to mistakes, and causes GC. | ||
- Thrift structures don't support having class methods, so for every class that has methods, we will need to have a utils class for it. | ||
- Everyone needs to be aware of the checked-in IDL. Whenever we want to modify classes in the Thrift flow, we need to make corresponding changes to the IDL. | ||
|
||
- Does the existing readme in the Thrift directory apply to the current changes? | ||
- Answer: if you are referring to the pipeline of converting thrift to json with the help of python script and yaml file, then yes, we still need this pipeline to generate some code, e.g. those in presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift.cpp so that we dont need to write them manually. | ||
- What is the timeline for maintaining both JSON and Thrift, or do we plan to maintain both? | ||
- Answer: I don't think we have a timeline to deprecate json at least for now. But we can discuss if/how we can do this once the thrift migration is in a stable state. | ||
- Is the current Thrift file manually generated? Is this transitional, or going forward would it be a part of the developer workflow to regenerate the Thrift IDL using the Drift tool? | ||
- Answer: the idl file is auto generated using drift library. It is achieved when the presto-thrift-spec gets built which happens everything when you build presto. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is linked to the main
make
command, or is it a separate command required post-build?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a separate
make
command to invoke that Makefile. This is necessary even for now if a protocol (between java coordinator and cpp worker) change is needed.