Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
/presto-testng-services @prestodb/committers
/presto-thrift-api @prestodb/committers
/presto-thrift-connector @prestodb/committers
/presto-thrift-connector-toolkit @prestodb/committers
/presto-thrift-spec @prestodb/committers
/presto-thrift-testing-server @prestodb/committers
/presto-thrift-testing-udf-server @prestodb/committers
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@
<module>presto-plan-checker-router-plugin</module>
<module>presto-sql-helpers/presto-sql-invoked-functions-plugin</module>
<module>presto-sql-helpers/presto-native-sql-invoked-functions-plugin</module>
<module>presto-thrift-connector-toolkit</module>
</modules>

<dependencyManagement>
Expand Down Expand Up @@ -1110,6 +1111,12 @@
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-thrift-connector-toolkit</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-thrift-testing-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.connector;

import com.facebook.drift.codec.ThriftCodecManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorCodec;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
Expand Down Expand Up @@ -96,4 +97,10 @@ public Optional<ConnectorCodec<ConnectorTableHandle>> getTableHandleCodec(String
requireNonNull(connectorId, "connectorId is null");
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableHandleCodec);
}

public Optional<ConnectorCodec<ColumnHandle>> getColumnHandleCodec(String connectorId)
{
requireNonNull(connectorId, "connectorId is null");
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getColumnHandleCodec);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server.thrift;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.drift.codec.CodecThriftType;
import com.facebook.drift.codec.metadata.ThriftType;
import com.facebook.drift.protocol.TProtocolReader;
import com.facebook.drift.protocol.TProtocolWriter;
import com.facebook.presto.connector.ConnectorCodecManager;
import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.spi.ColumnHandle;

import javax.inject.Inject;

import java.nio.ByteBuffer;

import static java.util.Objects.requireNonNull;

public class ColumnHandleThriftCodec
extends AbstractTypedThriftCodec<ColumnHandle>
{
private static final ThriftType THRIFT_TYPE = createThriftType(ColumnHandle.class);
private final ConnectorCodecManager connectorCodecManager;

@Inject
public ColumnHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec<ColumnHandle> jsonCodec)
{
super(ColumnHandle.class,
requireNonNull(jsonCodec, "jsonCodec is null"),
requireNonNull(handleResolver, "handleResolver is null")::getId,
handleResolver::getColumnHandleClass);
this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
}

@CodecThriftType
public static ThriftType getThriftType()
{
return THRIFT_TYPE;
}

@Override
public ThriftType getType()
{
return THRIFT_TYPE;
}

@Override
public ColumnHandle readConcreteValue(String connectorId, TProtocolReader reader)
throws Exception
{
ByteBuffer byteBuffer = reader.readBinary();
assert (byteBuffer.position() == 0);
byte[] bytes = byteBuffer.array();
Comment on lines +63 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Potential risk with ByteBuffer.array() usage.

Directly accessing byteBuffer.array() may cause issues if the buffer is not array-backed or contains extra data. To ensure correct deserialization, use byteBuffer.get() with byteBuffer.remaining() to extract only the relevant bytes.

return connectorCodecManager.getColumnHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
}

@Override
public void writeConcreteValue(String connectorId, ColumnHandle value, TProtocolWriter writer)
throws Exception
{
requireNonNull(value, "value is null");
writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getColumnHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value))));
}

@Override
public boolean isThriftCodecAvailable(String connectorId)
{
return connectorCodecManager.getColumnHandleCodec(connectorId).isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.server.thrift;

import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -41,6 +42,7 @@ public void configure(Binder binder)
thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(ColumnHandleThriftCodec.class);

jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class);
jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class);
Expand All @@ -49,6 +51,7 @@ public void configure(Binder binder)
jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class);
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class);
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class);
jsonCodecBinder(binder).bindJsonCodec(ColumnHandle.class);

binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@
import com.facebook.presto.metadata.Split;
import com.facebook.presto.server.InternalCommunicationConfig;
import com.facebook.presto.server.TaskUpdateRequest;
import com.facebook.presto.server.thrift.ColumnHandleThriftCodec;
import com.facebook.presto.server.thrift.ConnectorSplitThriftCodec;
import com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec;
import com.facebook.presto.server.thrift.InsertTableHandleThriftCodec;
import com.facebook.presto.server.thrift.OutputTableHandleThriftCodec;
import com.facebook.presto.server.thrift.TableHandleThriftCodec;
import com.facebook.presto.server.thrift.TableLayoutHandleThriftCodec;
import com.facebook.presto.server.thrift.TransactionHandleThriftCodec;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -392,6 +394,7 @@ public void configure(Binder binder)
jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class);
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class);
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class);
jsonCodecBinder(binder).bindJsonCodec(ColumnHandle.class);

binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON);

Expand All @@ -402,6 +405,7 @@ public void configure(Binder binder)
thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(ColumnHandleThriftCodec.class);
thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class);
thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class);
thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@
import com.facebook.presto.metadata.Split;
import com.facebook.presto.server.InternalCommunicationConfig;
import com.facebook.presto.server.TaskUpdateRequest;
import com.facebook.presto.server.thrift.ColumnHandleThriftCodec;
import com.facebook.presto.server.thrift.ConnectorSplitThriftCodec;
import com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec;
import com.facebook.presto.server.thrift.InsertTableHandleThriftCodec;
import com.facebook.presto.server.thrift.OutputTableHandleThriftCodec;
import com.facebook.presto.server.thrift.TableHandleThriftCodec;
import com.facebook.presto.server.thrift.TableLayoutHandleThriftCodec;
import com.facebook.presto.server.thrift.TransactionHandleThriftCodec;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -400,6 +402,7 @@ public void configure(Binder binder)
jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class);
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class);
jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class);
jsonCodecBinder(binder).bindJsonCodec(ColumnHandle.class);

binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON);

Expand All @@ -410,6 +413,7 @@ public void configure(Binder binder)
thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(ColumnHandleThriftCodec.class);
thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class);
thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class);
thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.spi.connector;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorCodec;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -59,4 +60,9 @@ default Optional<ConnectorCodec<ConnectorTableHandle>> getConnectorTableHandleCo
{
return Optional.empty();
}

default Optional<ConnectorCodec<ColumnHandle>> getColumnHandleCodec()
{
return Optional.empty();
}
}
37 changes: 37 additions & 0 deletions presto-thrift-connector-toolkit/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-root</artifactId>
<version>0.296-SNAPSHOT</version>
</parent>

<artifactId>presto-thrift-connector-toolkit</artifactId>
<name>presto-thrift-toolkit</name>
<packaging>jar</packaging>

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<air.check.skip-modernizer>true</air.check.skip-modernizer>
</properties>

<dependencies>
<dependency>
<groupId>com.facebook.airlift.drift</groupId>
<artifactId>drift-codec</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift.drift</groupId>
<artifactId>drift-protocol</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.thrift.codec;

import com.facebook.drift.codec.ThriftCodec;
import com.facebook.drift.codec.ThriftCodecManager;
import com.facebook.drift.protocol.TProtocolException;
import com.facebook.presto.spi.ConnectorCodec;
import com.facebook.presto.spi.PrestoException;

import java.lang.reflect.Type;

import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static com.facebook.presto.thrift.codec.ThriftCodecUtils.fromThrift;
import static com.facebook.presto.thrift.codec.ThriftCodecUtils.toThrift;
import static java.util.Objects.requireNonNull;

public class GenericThriftCodec<T>
implements ConnectorCodec<T>
{
private final ThriftCodec<T> thriftCodec;

public GenericThriftCodec(ThriftCodecManager codecManager, Type javaType, Class<T> expectedType)
{
requireNonNull(codecManager, "codecManager is null");
requireNonNull(javaType, "javaType is null");
requireNonNull(expectedType, "expectedType is null");

if (!(javaType instanceof Class<?>)) {
throw new IllegalArgumentException("Expected a Class type for javaType, but got: " + javaType.getTypeName());
}

Class<?> clazz = (Class<?>) javaType;
if (!expectedType.isAssignableFrom(clazz)) {
throw new IllegalArgumentException("javaType must be a subclass of " + expectedType.getName() + ", but got: " + clazz.getName());
}

this.thriftCodec = (ThriftCodec<T>) codecManager.getCodec(clazz);
}

@Override
public byte[] serialize(T value)
{
try {
return toThrift(value, thriftCodec);
}
catch (TProtocolException e) {
throw new PrestoException(INVALID_ARGUMENTS, "Unable to serialize object of type " + value.getClass().getSimpleName(), e);
}
}

@Override
public T deserialize(byte[] bytes)
{
try {
return fromThrift(bytes, thriftCodec);
}
catch (TProtocolException e) {
throw new PrestoException(INVALID_ARGUMENTS, "Unable to deserialize bytes to object of expected type", e);
}
}
}
Loading
Loading