Skip to content

Commit 47af2d5

Browse files
committed
Add new generic thrift toolkit module for connectors
1 parent 1ed8cb4 commit 47af2d5

File tree

20 files changed

+418
-311
lines changed

20 files changed

+418
-311
lines changed

CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
/presto-testng-services @prestodb/committers
9090
/presto-thrift-api @prestodb/committers
9191
/presto-thrift-connector @prestodb/committers
92+
/presto-thrift-connector-toolkit @prestodb/committers
9293
/presto-thrift-spec @prestodb/committers
9394
/presto-thrift-testing-server @prestodb/committers
9495
/presto-thrift-testing-udf-server @prestodb/committers

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@
227227
<module>presto-plan-checker-router-plugin</module>
228228
<module>presto-sql-helpers/presto-sql-invoked-functions-plugin</module>
229229
<module>presto-sql-helpers/presto-native-sql-invoked-functions-plugin</module>
230+
<module>presto-thrift-connector-toolkit</module>
230231
</modules>
231232

232233
<dependencyManagement>
@@ -1110,6 +1111,12 @@
11101111
<type>test-jar</type>
11111112
</dependency>
11121113

1114+
<dependency>
1115+
<groupId>com.facebook.presto</groupId>
1116+
<artifactId>presto-thrift-connector-toolkit</artifactId>
1117+
<version>${project.version}</version>
1118+
</dependency>
1119+
11131120
<dependency>
11141121
<groupId>com.facebook.presto</groupId>
11151122
<artifactId>presto-thrift-testing-server</artifactId>

presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.connector;
1515

1616
import com.facebook.drift.codec.ThriftCodecManager;
17+
import com.facebook.presto.spi.ColumnHandle;
1718
import com.facebook.presto.spi.ConnectorCodec;
1819
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
1920
import com.facebook.presto.spi.ConnectorId;
@@ -96,4 +97,10 @@ public Optional<ConnectorCodec<ConnectorTableHandle>> getTableHandleCodec(String
9697
requireNonNull(connectorId, "connectorId is null");
9798
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableHandleCodec);
9899
}
100+
101+
public Optional<ConnectorCodec<ColumnHandle>> getColumnHandleCodec(String connectorId)
102+
{
103+
requireNonNull(connectorId, "connectorId is null");
104+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getColumnHandleCodec);
105+
}
99106
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.server.thrift;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.drift.codec.CodecThriftType;
18+
import com.facebook.drift.codec.metadata.ThriftType;
19+
import com.facebook.drift.protocol.TProtocolReader;
20+
import com.facebook.drift.protocol.TProtocolWriter;
21+
import com.facebook.presto.connector.ConnectorCodecManager;
22+
import com.facebook.presto.metadata.HandleResolver;
23+
import com.facebook.presto.spi.ColumnHandle;
24+
25+
import javax.inject.Inject;
26+
27+
import java.nio.ByteBuffer;
28+
29+
import static java.util.Objects.requireNonNull;
30+
31+
public class ColumnHandleThriftCodec
32+
extends AbstractTypedThriftCodec<ColumnHandle>
33+
{
34+
private static final ThriftType THRIFT_TYPE = createThriftType(ColumnHandle.class);
35+
private final ConnectorCodecManager connectorCodecManager;
36+
37+
@Inject
38+
public ColumnHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec<ColumnHandle> jsonCodec)
39+
{
40+
super(ColumnHandle.class,
41+
requireNonNull(jsonCodec, "jsonCodec is null"),
42+
requireNonNull(handleResolver, "handleResolver is null")::getId,
43+
handleResolver::getColumnHandleClass);
44+
this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
45+
}
46+
47+
@CodecThriftType
48+
public static ThriftType getThriftType()
49+
{
50+
return THRIFT_TYPE;
51+
}
52+
53+
@Override
54+
public ThriftType getType()
55+
{
56+
return THRIFT_TYPE;
57+
}
58+
59+
@Override
60+
public ColumnHandle readConcreteValue(String connectorId, TProtocolReader reader)
61+
throws Exception
62+
{
63+
ByteBuffer byteBuffer = reader.readBinary();
64+
assert (byteBuffer.position() == 0);
65+
byte[] bytes = byteBuffer.array();
66+
return connectorCodecManager.getColumnHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
67+
}
68+
69+
@Override
70+
public void writeConcreteValue(String connectorId, ColumnHandle value, TProtocolWriter writer)
71+
throws Exception
72+
{
73+
requireNonNull(value, "value is null");
74+
writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getColumnHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value))));
75+
}
76+
77+
@Override
78+
public boolean isThriftCodecAvailable(String connectorId)
79+
{
80+
return connectorCodecManager.getColumnHandleCodec(connectorId).isPresent();
81+
}
82+
}

presto-main-base/src/main/java/com/facebook/presto/server/thrift/HandleThriftModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.server.thrift;
1515

1616
import com.facebook.presto.metadata.HandleResolver;
17+
import com.facebook.presto.spi.ColumnHandle;
1718
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
1819
import com.facebook.presto.spi.ConnectorInsertTableHandle;
1920
import com.facebook.presto.spi.ConnectorOutputTableHandle;
@@ -41,6 +42,7 @@ public void configure(Binder binder)
4142
thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
4243
thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
4344
thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
45+
thriftCodecBinder(binder).bindCustomThriftCodec(ColumnHandleThriftCodec.class);
4446

4547
jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class);
4648
jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class);
@@ -49,6 +51,7 @@ public void configure(Binder binder)
4951
jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class);
5052
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class);
5153
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class);
54+
jsonCodecBinder(binder).bindJsonCodec(ColumnHandle.class);
5255

5356
binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
5457
}

presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import com.facebook.presto.metadata.Split;
6060
import com.facebook.presto.server.InternalCommunicationConfig;
6161
import com.facebook.presto.server.TaskUpdateRequest;
62+
import com.facebook.presto.server.thrift.ColumnHandleThriftCodec;
6263
import com.facebook.presto.server.thrift.ConnectorSplitThriftCodec;
6364
import com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec;
6465
import com.facebook.presto.server.thrift.InsertTableHandleThriftCodec;
@@ -402,6 +403,7 @@ public void configure(Binder binder)
402403
thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
403404
thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
404405
thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
406+
thriftCodecBinder(binder).bindCustomThriftCodec(ColumnHandleThriftCodec.class);
405407
thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class);
406408
thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class);
407409
thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class);

presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.facebook.presto.metadata.Split;
5858
import com.facebook.presto.server.InternalCommunicationConfig;
5959
import com.facebook.presto.server.TaskUpdateRequest;
60+
import com.facebook.presto.server.thrift.ColumnHandleThriftCodec;
6061
import com.facebook.presto.server.thrift.ConnectorSplitThriftCodec;
6162
import com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec;
6263
import com.facebook.presto.server.thrift.InsertTableHandleThriftCodec;
@@ -410,6 +411,7 @@ public void configure(Binder binder)
410411
thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
411412
thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
412413
thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
414+
thriftCodecBinder(binder).bindCustomThriftCodec(ColumnHandleThriftCodec.class);
413415
thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class);
414416
thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class);
415417
thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class);

presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.facebook.presto.spi.connector;
1515

16+
import com.facebook.presto.spi.ColumnHandle;
1617
import com.facebook.presto.spi.ConnectorCodec;
1718
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
1819
import com.facebook.presto.spi.ConnectorInsertTableHandle;
@@ -59,4 +60,9 @@ default Optional<ConnectorCodec<ConnectorTableHandle>> getConnectorTableHandleCo
5960
{
6061
return Optional.empty();
6162
}
63+
64+
default Optional<ConnectorCodec<ColumnHandle>> getColumnHandleCodec()
65+
{
66+
return Optional.empty();
67+
}
6268
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<parent>
6+
<groupId>com.facebook.presto</groupId>
7+
<artifactId>presto-root</artifactId>
8+
<version>0.296-SNAPSHOT</version>
9+
</parent>
10+
11+
<artifactId>presto-thrift-connector-toolkit</artifactId>
12+
<name>presto-thrift-toolkit</name>
13+
<packaging>jar</packaging>
14+
15+
<properties>
16+
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
17+
<air.check.skip-modernizer>true</air.check.skip-modernizer>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>com.facebook.airlift.drift</groupId>
23+
<artifactId>drift-codec</artifactId>
24+
</dependency>
25+
26+
<dependency>
27+
<groupId>com.facebook.airlift.drift</groupId>
28+
<artifactId>drift-protocol</artifactId>
29+
</dependency>
30+
31+
<dependency>
32+
<groupId>com.facebook.presto</groupId>
33+
<artifactId>presto-spi</artifactId>
34+
<scope>provided</scope>
35+
</dependency>
36+
</dependencies>
37+
</project>
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.thrift.codec;
15+
16+
import com.facebook.drift.codec.ThriftCodec;
17+
import com.facebook.drift.codec.ThriftCodecManager;
18+
import com.facebook.drift.protocol.TProtocolException;
19+
import com.facebook.presto.spi.ConnectorCodec;
20+
import com.facebook.presto.spi.PrestoException;
21+
22+
import java.lang.reflect.Type;
23+
24+
import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
25+
import static com.facebook.presto.thrift.codec.ThriftCodecUtils.fromThrift;
26+
import static com.facebook.presto.thrift.codec.ThriftCodecUtils.toThrift;
27+
import static java.util.Objects.requireNonNull;
28+
29+
public class GenericThriftCodec<T>
30+
implements ConnectorCodec<T>
31+
{
32+
private final ThriftCodec<T> thriftCodec;
33+
34+
public GenericThriftCodec(ThriftCodecManager codecManager, Type javaType, Class<T> expectedType)
35+
{
36+
requireNonNull(codecManager, "codecManager is null");
37+
requireNonNull(javaType, "javaType is null");
38+
requireNonNull(expectedType, "expectedType is null");
39+
40+
if (!(javaType instanceof Class<?>)) {
41+
throw new IllegalArgumentException("Expected a Class type for javaType, but got: " + javaType.getTypeName());
42+
}
43+
44+
Class<?> clazz = (Class<?>) javaType;
45+
if (!expectedType.isAssignableFrom(clazz)) {
46+
throw new IllegalArgumentException("javaType must be a subclass of " + expectedType.getName() + ", but got: " + clazz.getName());
47+
}
48+
49+
this.thriftCodec = (ThriftCodec<T>) codecManager.getCodec(clazz);
50+
}
51+
52+
@Override
53+
public byte[] serialize(T value)
54+
{
55+
try {
56+
return toThrift(value, thriftCodec);
57+
}
58+
catch (TProtocolException e) {
59+
throw new PrestoException(INVALID_ARGUMENTS, "Unable to serialize object of type " + value.getClass().getSimpleName(), e);
60+
}
61+
}
62+
63+
@Override
64+
public T deserialize(byte[] bytes)
65+
{
66+
try {
67+
return fromThrift(bytes, thriftCodec);
68+
}
69+
catch (TProtocolException e) {
70+
throw new PrestoException(INVALID_ARGUMENTS, "Unable to deserialize bytes to object of expected type", e);
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)