From 40d17e3cd30230fedd25c7a59e4f4bba153b9aa6 Mon Sep 17 00:00:00 2001 From: Shang Ma Date: Wed, 30 Jul 2025 08:57:51 -0700 Subject: [PATCH] Add thrift support for split, transactionHandle, and tableWriteInfo --- pom.xml | 2 +- .../presto/hive/CacheQuotaRequirement.java | 7 + .../facebook/presto/hive/CacheQuotaScope.java | 23 +- .../connector/ConnectorCodecManager.java | 99 ++++++++ .../presto/connector/ConnectorManager.java | 23 +- .../facebook/presto/execution/Location.java | 6 + .../scheduler/ExecutionWriterTarget.java | 23 ++ .../scheduler/ExecutionWriterTargetUnion.java | 156 ++++++++++++ .../execution/scheduler/TableWriteInfo.java | 17 ++ .../presto/metadata/AnalyzeTableHandle.java | 8 + .../presto/metadata/DeleteTableHandle.java | 8 + .../presto/metadata/InsertTableHandle.java | 8 + .../presto/metadata/OutputTableHandle.java | 8 + .../metadata/RemoteTransactionHandle.java | 4 + .../com/facebook/presto/metadata/Split.java | 10 + .../thrift/AbstractTypedThriftCodec.java | 222 ++++++++++++++++++ .../thrift/ConnectorSplitThriftCodec.java | 82 +++++++ .../server/thrift/CustomCodecUtils.java | 109 --------- .../thrift/DeleteTableHandleThriftCodec.java | 82 +++++++ .../server/thrift/HandleThriftModule.java | 55 +++++ .../thrift/InsertTableHandleThriftCodec.java | 82 +++++++ .../thrift/OutputTableHandleThriftCodec.java | 82 +++++++ .../presto/server/thrift/SplitCodec.java | 74 ------ .../server/thrift/TableHandleThriftCodec.java | 82 +++++++ .../thrift/TableLayoutHandleThriftCodec.java | 82 +++++++ .../server/thrift/TableWriteInfoCodec.java | 74 ------ .../server/thrift/ThriftCodecUtils.java | 53 +++++ .../thrift/TransactionHandleThriftCodec.java | 82 +++++++ .../facebook/presto/split/RemoteSplit.java | 7 + .../presto/testing/LocalQueryRunner.java | 5 +- .../presto/thrift/RemoteCodecProvider.java | 48 ++++ .../presto/thrift/RemoteSplitCodec.java | 60 +++++ .../thrift/RemoteTransactionHandleCodec.java | 60 +++++ ...TestGroupInnerJoinsByConnectorRuleSet.java | 16 +- .../presto/server/ServerMainModule.java | 13 +- .../server/remotetask/TestHttpRemoteTask.java | 38 ++- .../TestHttpRemoteTaskWithEventLoop.java | 38 ++- .../PrestoNativeQueryRunnerUtils.java | 7 +- presto-spark-base/pom.xml | 5 + .../presto/spark/PrestoSparkModule.java | 6 + .../facebook/presto/spi/ConnectorCodec.java | 24 ++ .../facebook/presto/spi/SchemaTableName.java | 4 +- .../com/facebook/presto/spi/SplitContext.java | 6 + .../com/facebook/presto/spi/SplitWeight.java | 8 +- .../com/facebook/presto/spi/TableHandle.java | 23 ++ .../presto/spi/connector/Connector.java | 8 + .../spi/connector/ConnectorCodecProvider.java | 62 +++++ .../spi/schedule/NodeSelectionStrategy.java | 24 +- presto-thrift-spec/pom.xml | 21 +- presto-tpcds/pom.xml | 16 ++ .../presto/tpcds/TpcdsConnectorFactory.java | 9 + .../com/facebook/presto/tpcds/TpcdsSplit.java | 10 + .../presto/tpcds/TpcdsTableHandle.java | 7 + .../presto/tpcds/TpcdsTableLayoutHandle.java | 6 + .../presto/tpcds/TpcdsTransactionHandle.java | 18 +- .../presto/tpcds/thrift/ThriftCodecUtils.java | 53 +++++ .../tpcds/thrift/TpcdsCodecProvider.java | 61 +++++ .../presto/tpcds/thrift/TpcdsSplitCodec.java | 60 +++++ .../tpcds/thrift/TpcdsTableHandleCodec.java | 60 +++++ .../thrift/TpcdsTableLayoutHandleCodec.java | 60 +++++ .../thrift/TpcdsTransactionHandleCodec.java | 60 +++++ .../presto/tpcds/TestTpcdsWithThrift.java | 31 +++ 62 files changed, 2198 insertions(+), 299 deletions(-) create mode 100644 presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTargetUnion.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/AbstractTypedThriftCodec.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/ConnectorSplitThriftCodec.java delete mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/DeleteTableHandleThriftCodec.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/HandleThriftModule.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/InsertTableHandleThriftCodec.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/OutputTableHandleThriftCodec.java delete mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableHandleThriftCodec.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableLayoutHandleThriftCodec.java delete mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecUtils.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/server/thrift/TransactionHandleThriftCodec.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteCodecProvider.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteSplitCodec.java create mode 100644 presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteTransactionHandleCodec.java create mode 100644 presto-spi/src/main/java/com/facebook/presto/spi/ConnectorCodec.java create mode 100644 presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java create mode 100644 presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/ThriftCodecUtils.java create mode 100644 presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java create mode 100644 presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsSplitCodec.java create mode 100644 presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableHandleCodec.java create mode 100644 presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableLayoutHandleCodec.java create mode 100644 presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTransactionHandleCodec.java create mode 100644 presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithThrift.java diff --git a/pom.xml b/pom.xml index f1d7c50b5d127..6a0c2a77c4adf 100644 --- a/pom.xml +++ b/pom.xml @@ -52,7 +52,7 @@ 4.12.0 3.4.0 19.3.0.0 - 1.43 + 1.45 2.13.1 diff --git a/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java b/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java index fcc2404926d91..ec30b024c58e7 100644 --- a/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java +++ b/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.hive; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.airlift.units.DataSize; @@ -24,6 +27,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; +@ThriftStruct public class CacheQuotaRequirement { public static final CacheQuotaRequirement NO_CACHE_REQUIREMENT = new CacheQuotaRequirement(GLOBAL, Optional.empty()); @@ -32,6 +36,7 @@ public class CacheQuotaRequirement private final Optional quota; @JsonCreator + @ThriftConstructor public CacheQuotaRequirement( @JsonProperty("cacheQuotaScope") CacheQuotaScope cacheQuotaScope, @JsonProperty("quota") Optional quota) @@ -41,12 +46,14 @@ public CacheQuotaRequirement( } @JsonProperty + @ThriftField(1) public CacheQuotaScope getCacheQuotaScope() { return cacheQuotaScope; } @JsonProperty + @ThriftField(2) public Optional getQuota() { return quota; diff --git a/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java b/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java index 2c8dddd3b6ba7..c10f05acebd4b 100644 --- a/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java +++ b/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java @@ -13,7 +13,28 @@ */ package com.facebook.presto.hive; +import com.facebook.drift.annotations.ThriftEnum; +import com.facebook.drift.annotations.ThriftEnumValue; + +@ThriftEnum public enum CacheQuotaScope { - GLOBAL, SCHEMA, TABLE, PARTITION + GLOBAL(0), + SCHEMA(1), + TABLE(2), + PARTITION(3), + /**/; + + private final int value; + + CacheQuotaScope(int value) + { + this.value = value; + } + + @ThriftEnumValue + public int getValue() + { + return value; + } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java new file mode 100644 index 0000000000000..fb5179d141507 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.connector; + +import com.facebook.drift.codec.ThriftCodecManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.thrift.RemoteCodecProvider; +import com.google.inject.Provider; + +import javax.inject.Inject; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID; +import static java.util.Objects.requireNonNull; + +public class ConnectorCodecManager +{ + private final Map connectorCodecProviders = new ConcurrentHashMap<>(); + + @Inject + public ConnectorCodecManager(Provider thriftCodecManagerProvider) + { + requireNonNull(thriftCodecManagerProvider, "thriftCodecManager is null"); + + connectorCodecProviders.put(REMOTE_CONNECTOR_ID.toString(), new RemoteCodecProvider(thriftCodecManagerProvider)); + } + + public void addConnectorCodecProvider(ConnectorId connectorId, ConnectorCodecProvider connectorCodecProvider) + { + requireNonNull(connectorId, "connectorId is null"); + requireNonNull(connectorCodecProvider, "connectorThriftCodecProvider is null"); + connectorCodecProviders.put(connectorId.getCatalogName(), connectorCodecProvider); + } + + public Optional> getConnectorSplitCodec(String connectorId) + { + requireNonNull(connectorId, "connectorId is null"); + return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorSplitCodec); + } + + public Optional> getTransactionHandleCodec(String connectorId) + { + requireNonNull(connectorId, "connectorId is null"); + return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTransactionHandleCodec); + } + + public Optional> getOutputTableHandleCodec(String connectorId) + { + requireNonNull(connectorId, "connectorId is null"); + return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorOutputTableHandleCodec); + } + + public Optional> getInsertTableHandleCodec(String connectorId) + { + requireNonNull(connectorId, "connectorId is null"); + return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorInsertTableHandleCodec); + } + + public Optional> getDeleteTableHandleCodec(String connectorId) + { + requireNonNull(connectorId, "connectorId is null"); + return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorDeleteTableHandleCodec); + } + + public Optional> getTableLayoutHandleCodec(String connectorId) + { + requireNonNull(connectorId, "connectorId is null"); + return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableLayoutHandleCodec); + } + + public Optional> getTableHandleCodec(String connectorId) + { + requireNonNull(connectorId, "connectorId is null"); + return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableHandleCodec); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java index 32fada27a9b7b..b1747993fe0fe 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java +++ b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java @@ -40,6 +40,7 @@ import com.facebook.presto.spi.classloader.ThreadContextClassLoader; import com.facebook.presto.spi.connector.Connector; import com.facebook.presto.spi.connector.ConnectorAccessControl; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; import com.facebook.presto.spi.connector.ConnectorContext; import com.facebook.presto.spi.connector.ConnectorFactory; import com.facebook.presto.spi.connector.ConnectorIndexProvider; @@ -118,6 +119,7 @@ public class ConnectorManager private final FilterStatsCalculator filterStatsCalculator; private final BlockEncodingSerde blockEncodingSerde; private final ConnectorSystemConfig connectorSystemConfig; + private final ConnectorCodecManager connectorCodecManager; @GuardedBy("this") private final ConcurrentMap connectorFactories = new ConcurrentHashMap<>(); @@ -151,7 +153,8 @@ public ConnectorManager( DeterminismEvaluator determinismEvaluator, FilterStatsCalculator filterStatsCalculator, BlockEncodingSerde blockEncodingSerde, - FeaturesConfig featuresConfig) + FeaturesConfig featuresConfig, + ConnectorCodecManager connectorCodecManager) { this.metadataManager = requireNonNull(metadataManager, "metadataManager is null"); this.catalogManager = requireNonNull(catalogManager, "catalogManager is null"); @@ -176,6 +179,7 @@ public ConnectorManager( this.filterStatsCalculator = requireNonNull(filterStatsCalculator, "filterStatsCalculator is null"); this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); this.connectorSystemConfig = () -> featuresConfig.isNativeExecutionEnabled(); + this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null"); } @PreDestroy @@ -303,7 +307,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector) connector.getPlanOptimizerProvider() .ifPresent(planOptimizerProvider -> connectorPlanOptimizerManager.addPlanOptimizerProvider(connectorId, planOptimizerProvider)); } - + connector.getConnectorCodecProvider().ifPresent(connectorCodecProvider -> connectorCodecManager.addConnectorCodecProvider(connectorId, connectorCodecProvider)); metadataManager.getProcedureRegistry().addProcedures(connectorId, connector.getProcedures()); connector.getAccessControl() @@ -392,6 +396,7 @@ private static class MaterializedConnector private final Optional indexProvider; private final Optional partitioningProvider; private final Optional planOptimizerProvider; + private final Optional connectorCodecProvider; private final Optional accessControl; private final List> sessionProperties; private final List> tableProperties; @@ -472,6 +477,15 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector) } this.planOptimizerProvider = Optional.ofNullable(planOptimizerProvider); + ConnectorCodecProvider connectorCodecProvider = null; + try { + connectorCodecProvider = connector.getConnectorCodecProvider(); + requireNonNull(connectorCodecProvider, format("Connector %s returned null connector specific codec provider", connectorId)); + } + catch (UnsupportedOperationException ignored) { + } + this.connectorCodecProvider = Optional.ofNullable(connectorCodecProvider); + ConnectorAccessControl accessControl = null; try { accessControl = connector.getAccessControl(); @@ -580,5 +594,10 @@ public List> getAnalyzeProperties() { return analyzeProperties; } + + public Optional getConnectorCodecProvider() + { + return connectorCodecProvider; + } } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/Location.java b/presto-main-base/src/main/java/com/facebook/presto/execution/Location.java index ed5e59718992c..fc93ee63c9458 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/Location.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/Location.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.execution; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -20,17 +23,20 @@ import static java.util.Objects.requireNonNull; +@ThriftStruct public class Location { private final String location; @JsonCreator + @ThriftConstructor public Location(@JsonProperty("location") String location) { this.location = requireNonNull(location, "location is null"); } @JsonProperty + @ThriftField(1) public String getLocation() { return location; diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java index 2b0c9e5167dac..2704ab5ab8960 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java @@ -14,6 +14,9 @@ package com.facebook.presto.execution.scheduler; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.metadata.DeleteTableHandle; import com.facebook.presto.metadata.InsertTableHandle; import com.facebook.presto.metadata.OutputTableHandle; @@ -36,6 +39,7 @@ @SuppressWarnings({"EmptyClass", "ClassMayBeInterface"}) public abstract class ExecutionWriterTarget { + @ThriftStruct public static class CreateHandle extends ExecutionWriterTarget { @@ -43,6 +47,7 @@ public static class CreateHandle private final SchemaTableName schemaTableName; @JsonCreator + @ThriftConstructor public CreateHandle( @JsonProperty("handle") OutputTableHandle handle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName) @@ -52,12 +57,14 @@ public CreateHandle( } @JsonProperty + @ThriftField(1) public OutputTableHandle getHandle() { return handle; } @JsonProperty + @ThriftField(2) public SchemaTableName getSchemaTableName() { return schemaTableName; @@ -70,6 +77,7 @@ public String toString() } } + @ThriftStruct public static class InsertHandle extends ExecutionWriterTarget { @@ -77,6 +85,7 @@ public static class InsertHandle private final SchemaTableName schemaTableName; @JsonCreator + @ThriftConstructor public InsertHandle( @JsonProperty("handle") InsertTableHandle handle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName) @@ -86,12 +95,14 @@ public InsertHandle( } @JsonProperty + @ThriftField(1) public InsertTableHandle getHandle() { return handle; } @JsonProperty + @ThriftField(2) public SchemaTableName getSchemaTableName() { return schemaTableName; @@ -104,6 +115,7 @@ public String toString() } } + @ThriftStruct public static class DeleteHandle extends ExecutionWriterTarget { @@ -111,6 +123,7 @@ public static class DeleteHandle private final SchemaTableName schemaTableName; @JsonCreator + @ThriftConstructor public DeleteHandle( @JsonProperty("handle") DeleteTableHandle handle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName) @@ -120,12 +133,14 @@ public DeleteHandle( } @JsonProperty + @ThriftField(1) public DeleteTableHandle getHandle() { return handle; } @JsonProperty + @ThriftField(2) public SchemaTableName getSchemaTableName() { return schemaTableName; @@ -138,6 +153,7 @@ public String toString() } } + @ThriftStruct public static class RefreshMaterializedViewHandle extends ExecutionWriterTarget { @@ -145,6 +161,7 @@ public static class RefreshMaterializedViewHandle private final SchemaTableName schemaTableName; @JsonCreator + @ThriftConstructor public RefreshMaterializedViewHandle( @JsonProperty("handle") InsertTableHandle handle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName) @@ -154,12 +171,14 @@ public RefreshMaterializedViewHandle( } @JsonProperty + @ThriftField(1) public InsertTableHandle getHandle() { return handle; } @JsonProperty + @ThriftField(2) public SchemaTableName getSchemaTableName() { return schemaTableName; @@ -172,6 +191,7 @@ public String toString() } } + @ThriftStruct public static class UpdateHandle extends ExecutionWriterTarget { @@ -179,6 +199,7 @@ public static class UpdateHandle private final SchemaTableName schemaTableName; @JsonCreator + @ThriftConstructor public UpdateHandle( @JsonProperty("handle") TableHandle handle, @JsonProperty("schemaTableName") SchemaTableName schemaTableName) @@ -188,12 +209,14 @@ public UpdateHandle( } @JsonProperty + @ThriftField(1) public TableHandle getHandle() { return handle; } @JsonProperty + @ThriftField(2) public SchemaTableName getSchemaTableName() { return schemaTableName; diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTargetUnion.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTargetUnion.java new file mode 100644 index 0000000000000..f61bd8228d1f1 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTargetUnion.java @@ -0,0 +1,156 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.execution.scheduler; + +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftUnion; +import com.facebook.drift.annotations.ThriftUnionId; + +import static java.util.Objects.requireNonNull; + +@ThriftUnion +public class ExecutionWriterTargetUnion +{ + private short id; + private ExecutionWriterTarget.CreateHandle createHandle; + private ExecutionWriterTarget.InsertHandle insertHandle; + private ExecutionWriterTarget.DeleteHandle deleteHandle; + private ExecutionWriterTarget.RefreshMaterializedViewHandle refreshMaterializedViewHandle; + private ExecutionWriterTarget.UpdateHandle updateHandle; + + @ThriftConstructor + public ExecutionWriterTargetUnion() + { + this.id = 0; + } + + @ThriftConstructor + public ExecutionWriterTargetUnion(ExecutionWriterTarget.CreateHandle createHandle) + { + this.id = 1; + this.createHandle = createHandle; + } + + @ThriftField(1) + public ExecutionWriterTarget.CreateHandle getCreateHandle() + { + return createHandle; + } + + @ThriftConstructor + public ExecutionWriterTargetUnion(ExecutionWriterTarget.InsertHandle insertHandle) + { + this.id = 2; + this.insertHandle = insertHandle; + } + + @ThriftField(2) + public ExecutionWriterTarget.InsertHandle getInsertHandle() + { + return insertHandle; + } + + @ThriftConstructor + public ExecutionWriterTargetUnion(ExecutionWriterTarget.DeleteHandle deleteHandle) + { + this.id = 3; + this.deleteHandle = deleteHandle; + } + + @ThriftField(3) + public ExecutionWriterTarget.DeleteHandle getDeleteHandle() + { + return deleteHandle; + } + + @ThriftConstructor + public ExecutionWriterTargetUnion(ExecutionWriterTarget.RefreshMaterializedViewHandle refreshMaterializedViewHandle) + { + this.id = 4; + this.refreshMaterializedViewHandle = refreshMaterializedViewHandle; + } + + @ThriftField(4) + public ExecutionWriterTarget.RefreshMaterializedViewHandle getRefreshMaterializedViewHandle() + { + return refreshMaterializedViewHandle; + } + + @ThriftConstructor + public ExecutionWriterTargetUnion(ExecutionWriterTarget.UpdateHandle updateHandle) + { + this.id = 5; + this.updateHandle = updateHandle; + } + + @ThriftField(5) + public ExecutionWriterTarget.UpdateHandle getUpdateHandle() + { + return updateHandle; + } + + @ThriftUnionId + public short getId() + { + return id; + } + + public static ExecutionWriterTarget toExecutionWriterTarget(ExecutionWriterTargetUnion executionWriterTargetUnion) + { + requireNonNull(executionWriterTargetUnion, "executionWriterTargetUnion is null"); + if (executionWriterTargetUnion.getCreateHandle() != null) { + return executionWriterTargetUnion.getCreateHandle(); + } + else if (executionWriterTargetUnion.getInsertHandle() != null) { + return executionWriterTargetUnion.getInsertHandle(); + } + else if (executionWriterTargetUnion.getDeleteHandle() != null) { + return executionWriterTargetUnion.getDeleteHandle(); + } + else if (executionWriterTargetUnion.getRefreshMaterializedViewHandle() != null) { + return executionWriterTargetUnion.getRefreshMaterializedViewHandle(); + } + else if (executionWriterTargetUnion.getUpdateHandle() != null) { + return executionWriterTargetUnion.getUpdateHandle(); + } + else { + throw new IllegalArgumentException("Unrecognized execution writer target: " + executionWriterTargetUnion); + } + } + + public static ExecutionWriterTargetUnion fromExecutionWriterTarget(ExecutionWriterTarget executionWriterTarget) + { + requireNonNull(executionWriterTarget, "executionWriterTarget is null"); + + if (executionWriterTarget instanceof ExecutionWriterTarget.CreateHandle) { + return new ExecutionWriterTargetUnion((ExecutionWriterTarget.CreateHandle) executionWriterTarget); + } + else if (executionWriterTarget instanceof ExecutionWriterTarget.InsertHandle) { + return new ExecutionWriterTargetUnion((ExecutionWriterTarget.InsertHandle) executionWriterTarget); + } + else if (executionWriterTarget instanceof ExecutionWriterTarget.DeleteHandle) { + return new ExecutionWriterTargetUnion((ExecutionWriterTarget.DeleteHandle) executionWriterTarget); + } + else if (executionWriterTarget instanceof ExecutionWriterTarget.RefreshMaterializedViewHandle) { + return new ExecutionWriterTargetUnion((ExecutionWriterTarget.RefreshMaterializedViewHandle) executionWriterTarget); + } + else if (executionWriterTarget instanceof ExecutionWriterTarget.UpdateHandle) { + return new ExecutionWriterTargetUnion((ExecutionWriterTarget.UpdateHandle) executionWriterTarget); + } + else { + throw new IllegalArgumentException("Unsupported execution writer target: " + executionWriterTarget); + } + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java index 63d952c1c0ce1..2d2fa96b6882e 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java @@ -14,6 +14,9 @@ package com.facebook.presto.execution.scheduler; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.Session; import com.facebook.presto.metadata.AnalyzeTableHandle; import com.facebook.presto.metadata.Metadata; @@ -38,6 +41,7 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; +@ThriftStruct public class TableWriteInfo { private final Optional writerTarget; @@ -53,6 +57,12 @@ public TableWriteInfo( checkArgument(!analyzeTableHandle.isPresent() || !writerTarget.isPresent(), "analyzeTableHandle is present, so no other fields should be present"); } + @ThriftConstructor + public TableWriteInfo(ExecutionWriterTargetUnion writerTargetUnion, Optional analyzeTableHandle) + { + this(Optional.ofNullable(writerTargetUnion).map(ExecutionWriterTargetUnion::toExecutionWriterTarget), analyzeTableHandle == null ? Optional.empty() : analyzeTableHandle); + } + public static TableWriteInfo createTableWriteInfo(StreamingSubPlan plan, Metadata metadata, Session session) { Optional writerTarget = createWriterTarget(plan, metadata, session); @@ -159,7 +169,14 @@ public Optional getWriterTarget() return writerTarget; } + @ThriftField(value = 1, name = "writerTargetUnion") + public ExecutionWriterTargetUnion getWriterTargetUnion() + { + return writerTarget.map(ExecutionWriterTargetUnion::fromExecutionWriterTarget).orElse(null); + } + @JsonProperty + @ThriftField(2) public Optional getAnalyzeTableHandle() { return analyzeTableHandle; diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/AnalyzeTableHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/AnalyzeTableHandle.java index 5e77adeaa1818..df97d5f1eb735 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/AnalyzeTableHandle.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/AnalyzeTableHandle.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.metadata; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; @@ -23,6 +26,7 @@ import static java.util.Objects.requireNonNull; +@ThriftStruct public class AnalyzeTableHandle { private final ConnectorId connectorId; @@ -30,6 +34,7 @@ public class AnalyzeTableHandle private final ConnectorTableHandle connectorHandle; @JsonCreator + @ThriftConstructor public AnalyzeTableHandle( @JsonProperty("connectorId") ConnectorId connectorId, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, @@ -41,18 +46,21 @@ public AnalyzeTableHandle( } @JsonProperty + @ThriftField(1) public ConnectorId getConnectorId() { return connectorId; } @JsonProperty + @ThriftField(3) public ConnectorTableHandle getConnectorHandle() { return connectorHandle; } @JsonProperty + @ThriftField(2) public ConnectorTransactionHandle getTransactionHandle() { return transactionHandle; diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java index ea05b223843d5..de2868576bb3f 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.metadata; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.ConnectorDeleteTableHandle; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; @@ -23,6 +26,7 @@ import static java.util.Objects.requireNonNull; +@ThriftStruct public final class DeleteTableHandle { private final ConnectorId connectorId; @@ -30,6 +34,7 @@ public final class DeleteTableHandle private final ConnectorDeleteTableHandle connectorHandle; @JsonCreator + @ThriftConstructor public DeleteTableHandle( @JsonProperty("connectorId") ConnectorId connectorId, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, @@ -41,18 +46,21 @@ public DeleteTableHandle( } @JsonProperty + @ThriftField(1) public ConnectorId getConnectorId() { return connectorId; } @JsonProperty + @ThriftField(2) public ConnectorTransactionHandle getTransactionHandle() { return transactionHandle; } @JsonProperty + @ThriftField(3) public ConnectorDeleteTableHandle getConnectorHandle() { return connectorHandle; diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandle.java index 6cf3e0d2525d9..bb286e048185f 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandle.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandle.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.metadata; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; @@ -23,6 +26,7 @@ import static java.util.Objects.requireNonNull; +@ThriftStruct public final class InsertTableHandle { private final ConnectorId connectorId; @@ -30,6 +34,7 @@ public final class InsertTableHandle private final ConnectorInsertTableHandle connectorHandle; @JsonCreator + @ThriftConstructor public InsertTableHandle( @JsonProperty("connectorId") ConnectorId connectorId, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, @@ -41,18 +46,21 @@ public InsertTableHandle( } @JsonProperty + @ThriftField(1) public ConnectorId getConnectorId() { return connectorId; } @JsonProperty + @ThriftField(2) public ConnectorTransactionHandle getTransactionHandle() { return transactionHandle; } @JsonProperty + @ThriftField(3) public ConnectorInsertTableHandle getConnectorHandle() { return connectorHandle; diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandle.java index 8d3d3d2f6dc87..f88dabb0c243b 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandle.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandle.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.metadata; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; @@ -23,6 +26,7 @@ import static java.util.Objects.requireNonNull; +@ThriftStruct public final class OutputTableHandle { private final ConnectorId connectorId; @@ -30,6 +34,7 @@ public final class OutputTableHandle private final ConnectorOutputTableHandle connectorHandle; @JsonCreator + @ThriftConstructor public OutputTableHandle( @JsonProperty("connectorId") ConnectorId connectorId, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, @@ -41,18 +46,21 @@ public OutputTableHandle( } @JsonProperty + @ThriftField(1) public ConnectorId getConnectorId() { return connectorId; } @JsonProperty + @ThriftField(2) public ConnectorTransactionHandle getTransactionHandle() { return transactionHandle; } @JsonProperty + @ThriftField(3) public ConnectorOutputTableHandle getConnectorHandle() { return connectorHandle; diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/RemoteTransactionHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/RemoteTransactionHandle.java index 2f89509efa0ee..9f2e60061dcdf 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/RemoteTransactionHandle.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/RemoteTransactionHandle.java @@ -13,14 +13,18 @@ */ package com.facebook.presto.metadata; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +@ThriftStruct public class RemoteTransactionHandle implements ConnectorTransactionHandle { @JsonCreator + @ThriftConstructor public RemoteTransactionHandle() { } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/Split.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/Split.java index d8581dc1730db..ba93e58a16efc 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/Split.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/Split.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.metadata; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.execution.Lifespan; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorSplit; @@ -33,6 +36,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; +@ThriftStruct public final class Split { private final ConnectorId connectorId; @@ -48,6 +52,7 @@ public Split(ConnectorId connectorId, ConnectorTransactionHandle transactionHand } @JsonCreator + @ThriftConstructor public Split( @JsonProperty("connectorId") ConnectorId connectorId, @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, @@ -63,30 +68,35 @@ public Split( } @JsonProperty + @ThriftField(1) public ConnectorId getConnectorId() { return connectorId; } @JsonProperty + @ThriftField(2) public ConnectorTransactionHandle getTransactionHandle() { return transactionHandle; } @JsonProperty + @ThriftField(3) public ConnectorSplit getConnectorSplit() { return connectorSplit; } @JsonProperty + @ThriftField(4) public Lifespan getLifespan() { return lifespan; } @JsonProperty + @ThriftField(5) public SplitContext getSplitContext() { return splitContext; diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/AbstractTypedThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/AbstractTypedThriftCodec.java new file mode 100644 index 0000000000000..f5d394b6cda7d --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/AbstractTypedThriftCodec.java @@ -0,0 +1,222 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.log.Logger; +import com.facebook.drift.annotations.ThriftField.Requiredness; +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.codec.metadata.DefaultThriftTypeReference; +import com.facebook.drift.codec.metadata.FieldKind; +import com.facebook.drift.codec.metadata.ThriftFieldMetadata; +import com.facebook.drift.codec.metadata.ThriftMethodInjection; +import com.facebook.drift.codec.metadata.ThriftStructMetadata; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TField; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.drift.protocol.TStruct; +import com.facebook.drift.protocol.TType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; + +public abstract class AbstractTypedThriftCodec + implements ThriftCodec +{ + private static final Set NON_THRIFT_CONNECTOR = new HashSet<>(); + private static final Logger log = Logger.get(AbstractTypedThriftCodec.class); + private static final String TYPE_VALUE = "connectorId"; + private static final String CUSTOM_SERIALIZED_VALUE = "customSerializedValue"; + private static final String JSON_VALUE = "jsonValue"; + private static final short TYPE_FIELD_ID = 1; + private static final short CUSTOM_FIELD_ID = 2; + private static final short JSON_FIELD_ID = 3; + + private final Class baseClass; + private final JsonCodec jsonCodec; + private final Function nameResolver; + private final Function> classResolver; + + protected AbstractTypedThriftCodec(Class baseClass, + JsonCodec jsonCodec, + Function nameResolver, + Function> classResolver) + { + this.baseClass = requireNonNull(baseClass, "baseClass is null"); + this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); + this.nameResolver = requireNonNull(nameResolver, "nameResolver is null"); + this.classResolver = requireNonNull(classResolver, "classResolver is null"); + } + + @Override + public abstract ThriftType getType(); + + protected static ThriftType createThriftType(Class baseClass) + { + List fields = new ArrayList<>(); + try { + fields.add(new ThriftFieldMetadata( + TYPE_FIELD_ID, + false, false, Requiredness.OPTIONAL, ImmutableMap.of(), + new DefaultThriftTypeReference(ThriftType.STRING), + TYPE_VALUE, + FieldKind.THRIFT_FIELD, + ImmutableList.of(), + Optional.empty(), + // Drift requires at least one of the three arguments below, so we provide a dummy method here as a workaround. + // https://github.com/airlift/drift/blob/master/drift-codec/src/main/java/io/airlift/drift/codec/metadata/ThriftFieldMetadata.java#L99 + Optional.of(new ThriftMethodInjection(AbstractTypedThriftCodec.class.getDeclaredMethod("getTypeField"), ImmutableList.of())), + Optional.empty(), + Optional.empty())); + fields.add(new ThriftFieldMetadata( + CUSTOM_FIELD_ID, + false, false, Requiredness.OPTIONAL, ImmutableMap.of(), + new DefaultThriftTypeReference(ThriftType.BINARY), + CUSTOM_SERIALIZED_VALUE, + FieldKind.THRIFT_FIELD, + ImmutableList.of(), + Optional.empty(), + // Drift requires at least one of the three arguments below, so we provide a dummy method here as a workaround. + // https://github.com/airlift/drift/blob/master/drift-codec/src/main/java/io/airlift/drift/codec/metadata/ThriftFieldMetadata.java#L99 + Optional.of(new ThriftMethodInjection(AbstractTypedThriftCodec.class.getDeclaredMethod("getCustomField"), ImmutableList.of())), + Optional.empty(), + Optional.empty())); + // TODO: This field will be cleaned up: https://github.com/prestodb/presto/issues/25671 + fields.add(new ThriftFieldMetadata( + JSON_FIELD_ID, + false, false, Requiredness.OPTIONAL, ImmutableMap.of(), + new DefaultThriftTypeReference(ThriftType.STRING), + JSON_VALUE, + FieldKind.THRIFT_FIELD, + ImmutableList.of(), + Optional.empty(), + // Drift requires at least one of the three arguments below, so we provide a dummy method here as a workaround. + // https://github.com/airlift/drift/blob/master/drift-codec/src/main/java/io/airlift/drift/codec/metadata/ThriftFieldMetadata.java#L99 + Optional.of(new ThriftMethodInjection(AbstractTypedThriftCodec.class.getDeclaredMethod("getJsonField"), ImmutableList.of())), + Optional.empty(), + Optional.empty())); + } + catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Failed to create ThriftFieldMetadata", e); + } + + return ThriftType.struct(new ThriftStructMetadata( + baseClass.getSimpleName(), + ImmutableMap.of(), baseClass, null, ThriftStructMetadata.MetadataType.STRUCT, + Optional.empty(), ImmutableList.of(), fields, Optional.empty(), ImmutableList.of())); + } + + @Override + public T read(TProtocolReader reader) + throws Exception + { + String connectorId = null; + T value = null; + String jsonValue = null; + + reader.readStructBegin(); + while (true) { + TField field = reader.readFieldBegin(); + if (field.getType() == TType.STOP) { + break; + } + switch (field.getId()) { + case JSON_FIELD_ID: + jsonValue = reader.readString(); + break; + case TYPE_FIELD_ID: + connectorId = reader.readString(); + break; + case CUSTOM_FIELD_ID: + requireNonNull(connectorId, "connectorId is null"); + Class concreteClass = classResolver.apply(connectorId); + requireNonNull(concreteClass, "concreteClass is null"); + value = readConcreteValue(connectorId, reader); + break; + } + reader.readFieldEnd(); + } + reader.readStructEnd(); + + if (jsonValue != null) { + return jsonCodec.fromJson(jsonValue); + } + if (value != null) { + return value; + } + throw new IllegalStateException("Neither thrift nor json value was present"); + } + + public abstract T readConcreteValue(String connectorId, TProtocolReader reader) + throws Exception; + + public abstract void writeConcreteValue(String connectorId, T value, TProtocolWriter writer) + throws Exception; + + public abstract boolean isThriftCodecAvailable(String connectorId); + + @Override + public void write(T value, TProtocolWriter writer) + throws Exception + { + if (value == null) { + return; + } + String connectorId = nameResolver.apply(value); + requireNonNull(connectorId, "connectorId is null"); + + writer.writeStructBegin(new TStruct(baseClass.getSimpleName())); + if (isThriftCodecAvailable(connectorId)) { + writer.writeFieldBegin(new TField(TYPE_VALUE, TType.STRING, TYPE_FIELD_ID)); + writer.writeString(connectorId); + writer.writeFieldEnd(); + + writer.writeFieldBegin(new TField(CUSTOM_SERIALIZED_VALUE, TType.STRING, CUSTOM_FIELD_ID)); + writeConcreteValue(connectorId, value, writer); + writer.writeFieldEnd(); + } + else { + // If thrift codec is not available for this connector, fall back to its json + writer.writeFieldBegin(new TField(JSON_VALUE, TType.STRING, JSON_FIELD_ID)); + writer.writeString(jsonCodec.toJson(value)); + writer.writeFieldEnd(); + } + writer.writeFieldStop(); + writer.writeStructEnd(); + } + + private String getTypeField() + { + return "getTypeField"; + } + + private String getCustomField() + { + return "getCustomField"; + } + + private String getJsonField() + { + return "getJsonField"; + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ConnectorSplitThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ConnectorSplitThriftCodec.java new file mode 100644 index 0000000000000..234a53611f421 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ConnectorSplitThriftCodec.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.CodecThriftType; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.spi.ConnectorSplit; + +import javax.inject.Inject; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +public class ConnectorSplitThriftCodec + extends AbstractTypedThriftCodec +{ + private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorSplit.class); + private final ConnectorCodecManager connectorCodecManager; + + @Inject + public ConnectorSplitThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec) + { + super(ConnectorSplit.class, + requireNonNull(jsonCodec, "jsonCodec is null"), + requireNonNull(handleResolver, "handleResolver is null")::getId, + handleResolver::getSplitClass); + this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null"); + } + + @CodecThriftType + public static ThriftType getThriftType() + { + return THRIFT_TYPE; + } + + @Override + public ThriftType getType() + { + return THRIFT_TYPE; + } + + @Override + public ConnectorSplit readConcreteValue(String connectorId, TProtocolReader reader) + throws Exception + { + ByteBuffer byteBuffer = reader.readBinary(); + assert (byteBuffer.position() == 0); + byte[] bytes = byteBuffer.array(); + return connectorCodecManager.getConnectorSplitCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null); + } + + @Override + public void writeConcreteValue(String connectorId, ConnectorSplit value, TProtocolWriter writer) + throws Exception + { + requireNonNull(value, "value is null"); + writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getConnectorSplitCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Cannot serialize " + value)))); + } + + @Override + public boolean isThriftCodecAvailable(String connectorId) + { + return connectorCodecManager.getConnectorSplitCodec(connectorId).isPresent(); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java deleted file mode 100644 index 483c0c3def17d..0000000000000 --- a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.server.thrift; - -import com.facebook.airlift.json.JsonCodec; -import com.facebook.drift.TException; -import com.facebook.drift.codec.metadata.DefaultThriftTypeReference; -import com.facebook.drift.codec.metadata.FieldKind; -import com.facebook.drift.codec.metadata.ThriftFieldExtractor; -import com.facebook.drift.codec.metadata.ThriftFieldMetadata; -import com.facebook.drift.codec.metadata.ThriftStructMetadata; -import com.facebook.drift.codec.metadata.ThriftType; -import com.facebook.drift.protocol.TField; -import com.facebook.drift.protocol.TProtocolException; -import com.facebook.drift.protocol.TProtocolReader; -import com.facebook.drift.protocol.TProtocolWriter; -import com.facebook.drift.protocol.TStruct; -import com.facebook.drift.protocol.TType; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -import java.util.Optional; - -import static com.facebook.drift.annotations.ThriftField.Requiredness.NONE; -import static java.lang.String.format; - -/*** - * When we need a custom codec for a primitive type, we need a wrapper to pass the needsCodec check within ThriftCodecByteCodeGenerator.java - */ -public class CustomCodecUtils -{ - private CustomCodecUtils() {} - - public static ThriftStructMetadata createSyntheticMetadata(short fieldId, String fieldName, Class originalType, Class referencedType, ThriftType thriftType) - { - ThriftFieldMetadata fieldMetaData = new ThriftFieldMetadata( - fieldId, - false, false, NONE, ImmutableMap.of(), - new DefaultThriftTypeReference(thriftType), - fieldName, - FieldKind.THRIFT_FIELD, - ImmutableList.of(), - Optional.empty(), - Optional.empty(), - Optional.of(new ThriftFieldExtractor( - fieldId, - fieldName, - FieldKind.THRIFT_FIELD, - originalType.getDeclaredFields()[0], // Any field should work since we are handing extraction in codec on our own - referencedType)), - Optional.empty()); - return new ThriftStructMetadata( - originalType.getSimpleName() + "Wrapper", - ImmutableMap.of(), - originalType, null, - ThriftStructMetadata.MetadataType.STRUCT, - Optional.empty(), ImmutableList.of(), ImmutableList.of(fieldMetaData), Optional.empty(), ImmutableList.of()); - } - - public static T readSingleJsonField(TProtocolReader protocol, JsonCodec jsonCodec, short fieldId, String fieldName) - throws TException - { - protocol.readStructBegin(); - String jsonValue = null; - TField field = protocol.readFieldBegin(); - while (field.getType() != TType.STOP) { - if (field.getId() == fieldId) { - if (field.getType() == TType.STRING) { - jsonValue = protocol.readString(); - } - else { - throw new TProtocolException(format("Unexpected field type: %s for field %s", field.getType(), fieldName)); - } - } - protocol.readFieldEnd(); - field = protocol.readFieldBegin(); - } - protocol.readStructEnd(); - - if (jsonValue == null) { - throw new TProtocolException(format("Required field '%s' was not found", fieldName)); - } - return jsonCodec.fromJson(jsonValue); - } - - public static void writeSingleJsonField(T value, TProtocolWriter protocol, JsonCodec jsonCodec, short fieldId, String fieldName, String structName) - throws TException - { - protocol.writeStructBegin(new TStruct(structName)); - - protocol.writeFieldBegin(new TField(fieldName, TType.STRING, fieldId)); - protocol.writeString(jsonCodec.toJson(value)); - protocol.writeFieldEnd(); - - protocol.writeFieldStop(); - protocol.writeStructEnd(); - } -} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/DeleteTableHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/DeleteTableHandleThriftCodec.java new file mode 100644 index 0000000000000..37232f59e1fba --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/DeleteTableHandleThriftCodec.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.CodecThriftType; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; + +import javax.inject.Inject; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +public class DeleteTableHandleThriftCodec + extends AbstractTypedThriftCodec +{ + private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorDeleteTableHandle.class); + private final ConnectorCodecManager connectorCodecManager; + + @Inject + public DeleteTableHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec) + { + super(ConnectorDeleteTableHandle.class, + requireNonNull(jsonCodec, "jsonCodec is null"), + requireNonNull(handleResolver, "handleResolver is null")::getId, + handleResolver::getDeleteTableHandleClass); + this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null"); + } + + @CodecThriftType + public static ThriftType getThriftType() + { + return THRIFT_TYPE; + } + + @Override + public ThriftType getType() + { + return THRIFT_TYPE; + } + + @Override + public ConnectorDeleteTableHandle readConcreteValue(String connectorId, TProtocolReader reader) + throws Exception + { + ByteBuffer byteBuffer = reader.readBinary(); + assert (byteBuffer.position() == 0); + byte[] bytes = byteBuffer.array(); + return connectorCodecManager.getDeleteTableHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null); + } + + @Override + public void writeConcreteValue(String connectorId, ConnectorDeleteTableHandle value, TProtocolWriter writer) + throws Exception + { + requireNonNull(value, "value is null"); + writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getDeleteTableHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value)))); + } + + @Override + public boolean isThriftCodecAvailable(String connectorId) + { + return connectorCodecManager.getDeleteTableHandleCodec(connectorId).isPresent(); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/HandleThriftModule.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/HandleThriftModule.java new file mode 100644 index 0000000000000..6d4c169e82032 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/HandleThriftModule.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static com.facebook.drift.codec.guice.ThriftCodecBinder.thriftCodecBinder; + +public class HandleThriftModule + implements Module +{ + @Override + public void configure(Binder binder) + { + thriftCodecBinder(binder).bindCustomThriftCodec(ConnectorSplitThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TransactionHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(OutputTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(InsertTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class); + + jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorOutputTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class); + + binder.bind(HandleResolver.class).in(Scopes.SINGLETON); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/InsertTableHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/InsertTableHandleThriftCodec.java new file mode 100644 index 0000000000000..c01480a5c535e --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/InsertTableHandleThriftCodec.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.CodecThriftType; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.spi.ConnectorInsertTableHandle; + +import javax.inject.Inject; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +public class InsertTableHandleThriftCodec + extends AbstractTypedThriftCodec +{ + private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorInsertTableHandle.class); + private final ConnectorCodecManager connectorCodecManager; + + @Inject + public InsertTableHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec) + { + super(ConnectorInsertTableHandle.class, + requireNonNull(jsonCodec, "jsonCodec is null"), + requireNonNull(handleResolver, "handleResolver is null")::getId, + handleResolver::getInsertTableHandleClass); + this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null"); + } + + @CodecThriftType + public static ThriftType getThriftType() + { + return THRIFT_TYPE; + } + + @Override + public ThriftType getType() + { + return THRIFT_TYPE; + } + + @Override + public ConnectorInsertTableHandle readConcreteValue(String connectorId, TProtocolReader reader) + throws Exception + { + ByteBuffer byteBuffer = reader.readBinary(); + assert (byteBuffer.position() == 0); + byte[] bytes = byteBuffer.array(); + return connectorCodecManager.getInsertTableHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null); + } + + @Override + public void writeConcreteValue(String connectorId, ConnectorInsertTableHandle value, TProtocolWriter writer) + throws Exception + { + requireNonNull(value, "value is null"); + writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getInsertTableHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value)))); + } + + @Override + public boolean isThriftCodecAvailable(String connectorId) + { + return connectorCodecManager.getInsertTableHandleCodec(connectorId).isPresent(); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/OutputTableHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/OutputTableHandleThriftCodec.java new file mode 100644 index 0000000000000..b364dc694f8b7 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/OutputTableHandleThriftCodec.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.CodecThriftType; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.spi.ConnectorOutputTableHandle; + +import javax.inject.Inject; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +public class OutputTableHandleThriftCodec + extends AbstractTypedThriftCodec +{ + private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorOutputTableHandle.class); + private final ConnectorCodecManager connectorCodecManager; + + @Inject + public OutputTableHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec) + { + super(ConnectorOutputTableHandle.class, + requireNonNull(jsonCodec, "jsonCodec is null"), + requireNonNull(handleResolver, "handleResolver is null")::getId, + handleResolver::getOutputTableHandleClass); + this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null"); + } + + @CodecThriftType + public static ThriftType getThriftType() + { + return THRIFT_TYPE; + } + + @Override + public ThriftType getType() + { + return THRIFT_TYPE; + } + + @Override + public ConnectorOutputTableHandle readConcreteValue(String connectorId, TProtocolReader reader) + throws Exception + { + ByteBuffer byteBuffer = reader.readBinary(); + assert (byteBuffer.position() == 0); + byte[] bytes = byteBuffer.array(); + return connectorCodecManager.getOutputTableHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null); + } + + @Override + public void writeConcreteValue(String connectorId, ConnectorOutputTableHandle value, TProtocolWriter writer) + throws Exception + { + requireNonNull(value, "value is null"); + writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getOutputTableHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value)))); + } + + @Override + public boolean isThriftCodecAvailable(String connectorId) + { + return connectorCodecManager.getOutputTableHandleCodec(connectorId).isPresent(); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java deleted file mode 100644 index e47fdcf9b3e19..0000000000000 --- a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.server.thrift; - -import com.facebook.airlift.json.JsonCodec; -import com.facebook.drift.codec.CodecThriftType; -import com.facebook.drift.codec.ThriftCodec; -import com.facebook.drift.codec.metadata.ThriftCatalog; -import com.facebook.drift.codec.metadata.ThriftType; -import com.facebook.drift.protocol.TProtocolReader; -import com.facebook.drift.protocol.TProtocolWriter; -import com.facebook.presto.metadata.Split; - -import javax.inject.Inject; - -import static com.facebook.presto.server.thrift.CustomCodecUtils.createSyntheticMetadata; -import static com.facebook.presto.server.thrift.CustomCodecUtils.readSingleJsonField; -import static com.facebook.presto.server.thrift.CustomCodecUtils.writeSingleJsonField; -import static java.util.Objects.requireNonNull; - -public class SplitCodec - implements ThriftCodec -{ - private static final short SPLIT_DATA_FIELD_ID = 1; - private static final String SPLIT_DATA_FIELD_NAME = "split"; - private static final String SPLIT_DATA_STRUCT_NAME = "Split"; - private static final ThriftType SYNTHETIC_STRUCT_TYPE = ThriftType.struct(createSyntheticMetadata(SPLIT_DATA_FIELD_ID, SPLIT_DATA_FIELD_NAME, Split.class, String.class, ThriftType.STRING)); - - private final JsonCodec jsonCodec; - - @Inject - public SplitCodec(JsonCodec jsonCodec, ThriftCatalog thriftCatalog) - { - this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); - thriftCatalog.addThriftType(SYNTHETIC_STRUCT_TYPE); - } - - @CodecThriftType - public static ThriftType getThriftType() - { - return SYNTHETIC_STRUCT_TYPE; - } - - @Override - public ThriftType getType() - { - return SYNTHETIC_STRUCT_TYPE; - } - - @Override - public Split read(TProtocolReader protocol) - throws Exception - { - return readSingleJsonField(protocol, jsonCodec, SPLIT_DATA_FIELD_ID, SPLIT_DATA_FIELD_NAME); - } - - @Override - public void write(Split value, TProtocolWriter protocol) - throws Exception - { - writeSingleJsonField(value, protocol, jsonCodec, SPLIT_DATA_FIELD_ID, SPLIT_DATA_FIELD_NAME, SPLIT_DATA_STRUCT_NAME); - } -} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableHandleThriftCodec.java new file mode 100644 index 0000000000000..95b209879c762 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableHandleThriftCodec.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.CodecThriftType; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.spi.ConnectorTableHandle; + +import javax.inject.Inject; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +public class TableHandleThriftCodec + extends AbstractTypedThriftCodec +{ + private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorTableHandle.class); + private final ConnectorCodecManager connectorCodecManager; + + @Inject + public TableHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec) + { + super(ConnectorTableHandle.class, + requireNonNull(jsonCodec, "jsonCodec is null"), + requireNonNull(handleResolver, "handleResolver is null")::getId, + handleResolver::getTableHandleClass); + this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null"); + } + + @CodecThriftType + public static ThriftType getThriftType() + { + return THRIFT_TYPE; + } + + @Override + public ThriftType getType() + { + return THRIFT_TYPE; + } + + @Override + public ConnectorTableHandle readConcreteValue(String connectorId, TProtocolReader reader) + throws Exception + { + ByteBuffer byteBuffer = reader.readBinary(); + assert (byteBuffer.position() == 0); + byte[] bytes = byteBuffer.array(); + return connectorCodecManager.getTableHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null); + } + + @Override + public void writeConcreteValue(String connectorId, ConnectorTableHandle value, TProtocolWriter writer) + throws Exception + { + requireNonNull(value, "value is null"); + writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getTableHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value)))); + } + + @Override + public boolean isThriftCodecAvailable(String connectorId) + { + return connectorCodecManager.getTableHandleCodec(connectorId).isPresent(); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableLayoutHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableLayoutHandleThriftCodec.java new file mode 100644 index 0000000000000..9387a6c0058b5 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableLayoutHandleThriftCodec.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.CodecThriftType; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; + +import javax.inject.Inject; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +public class TableLayoutHandleThriftCodec + extends AbstractTypedThriftCodec +{ + private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorTableLayoutHandle.class); + private final ConnectorCodecManager connectorCodecManager; + + @Inject + public TableLayoutHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec) + { + super(ConnectorTableLayoutHandle.class, + requireNonNull(jsonCodec, "jsonCodec is null"), + requireNonNull(handleResolver, "handleResolver is null")::getId, + handleResolver::getTableLayoutHandleClass); + this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null"); + } + + @CodecThriftType + public static ThriftType getThriftType() + { + return THRIFT_TYPE; + } + + @Override + public ThriftType getType() + { + return THRIFT_TYPE; + } + + @Override + public ConnectorTableLayoutHandle readConcreteValue(String connectorId, TProtocolReader reader) + throws Exception + { + ByteBuffer byteBuffer = reader.readBinary(); + assert (byteBuffer.position() == 0); + byte[] bytes = byteBuffer.array(); + return connectorCodecManager.getTableLayoutHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null); + } + + @Override + public void writeConcreteValue(String connectorId, ConnectorTableLayoutHandle value, TProtocolWriter writer) + throws Exception + { + requireNonNull(value, "value is null"); + writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getTableLayoutHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value)))); + } + + @Override + public boolean isThriftCodecAvailable(String connectorId) + { + return connectorCodecManager.getTableLayoutHandleCodec(connectorId).isPresent(); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java deleted file mode 100644 index 50754a0593bae..0000000000000 --- a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.server.thrift; - -import com.facebook.airlift.json.JsonCodec; -import com.facebook.drift.codec.CodecThriftType; -import com.facebook.drift.codec.ThriftCodec; -import com.facebook.drift.codec.metadata.ThriftCatalog; -import com.facebook.drift.codec.metadata.ThriftType; -import com.facebook.drift.protocol.TProtocolReader; -import com.facebook.drift.protocol.TProtocolWriter; -import com.facebook.presto.execution.scheduler.TableWriteInfo; - -import javax.inject.Inject; - -import static com.facebook.presto.server.thrift.CustomCodecUtils.createSyntheticMetadata; -import static com.facebook.presto.server.thrift.CustomCodecUtils.readSingleJsonField; -import static com.facebook.presto.server.thrift.CustomCodecUtils.writeSingleJsonField; -import static java.util.Objects.requireNonNull; - -public class TableWriteInfoCodec - implements ThriftCodec -{ - private static final short TABLE_WRITE_INFO_DATA_FIELD_ID = 1; - private static final String TABLE_WRITE_INFO_DATA_FIELD_NAME = "tableWriteInfo"; - private static final String TABLE_WRITE_INFO_STRUCT_NAME = "TableWriteInfo"; - private static final ThriftType SYNTHETIC_STRUCT_TYPE = ThriftType.struct(createSyntheticMetadata(TABLE_WRITE_INFO_DATA_FIELD_ID, TABLE_WRITE_INFO_DATA_FIELD_NAME, TableWriteInfo.class, String.class, ThriftType.STRING)); - - private final JsonCodec jsonCodec; - - @Inject - public TableWriteInfoCodec(JsonCodec jsonCodec, ThriftCatalog thriftCatalog) - { - this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); - thriftCatalog.addThriftType(SYNTHETIC_STRUCT_TYPE); - } - - @CodecThriftType - public static ThriftType getThriftType() - { - return SYNTHETIC_STRUCT_TYPE; - } - - @Override - public ThriftType getType() - { - return SYNTHETIC_STRUCT_TYPE; - } - - @Override - public TableWriteInfo read(TProtocolReader protocol) - throws Exception - { - return readSingleJsonField(protocol, jsonCodec, TABLE_WRITE_INFO_DATA_FIELD_ID, TABLE_WRITE_INFO_DATA_FIELD_NAME); - } - - @Override - public void write(TableWriteInfo value, TProtocolWriter protocol) - throws Exception - { - writeSingleJsonField(value, protocol, jsonCodec, TABLE_WRITE_INFO_DATA_FIELD_ID, TABLE_WRITE_INFO_DATA_FIELD_NAME, TABLE_WRITE_INFO_STRUCT_NAME); - } -} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecUtils.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecUtils.java new file mode 100644 index 0000000000000..4819ac2a00a40 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.protocol.TBinaryProtocol; +import com.facebook.drift.protocol.TMemoryBuffer; +import com.facebook.drift.protocol.TMemoryBufferWriteOnly; +import com.facebook.drift.protocol.TProtocolException; + +public class ThriftCodecUtils +{ + private ThriftCodecUtils() {} + + public static T fromThrift(byte[] bytes, ThriftCodec thriftCodec) + throws TProtocolException + { + try { + TMemoryBuffer transport = new TMemoryBuffer(bytes.length); + transport.write(bytes); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + return thriftCodec.read(protocol); + } + catch (Exception e) { + throw new TProtocolException("Can not deserialize the data", e); + } + } + + public static byte[] toThrift(T value, ThriftCodec thriftCodec) + throws TProtocolException + { + TMemoryBufferWriteOnly transport = new TMemoryBufferWriteOnly(1024); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + try { + thriftCodec.write(value, protocol); + return transport.getBytes(); + } + catch (Exception e) { + throw new TProtocolException("Can not serialize the data", e); + } + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TransactionHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TransactionHandleThriftCodec.java new file mode 100644 index 0000000000000..65a96ec5f863d --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TransactionHandleThriftCodec.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.CodecThriftType; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +import javax.inject.Inject; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +public class TransactionHandleThriftCodec + extends AbstractTypedThriftCodec +{ + private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorTransactionHandle.class); + private final ConnectorCodecManager connectorCodecManager; + + @Inject + public TransactionHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec) + { + super(ConnectorTransactionHandle.class, + requireNonNull(jsonCodec, "jsonCodec is null"), + requireNonNull(handleResolver, "handleResolver is null")::getId, + handleResolver::getTransactionHandleClass); + this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null"); + } + + @CodecThriftType + public static ThriftType getThriftType() + { + return THRIFT_TYPE; + } + + @Override + public ThriftType getType() + { + return THRIFT_TYPE; + } + + @Override + public ConnectorTransactionHandle readConcreteValue(String connectorId, TProtocolReader reader) + throws Exception + { + ByteBuffer byteBuffer = reader.readBinary(); + assert (byteBuffer.position() == 0); + byte[] bytes = byteBuffer.array(); + return connectorCodecManager.getTransactionHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null); + } + + @Override + public void writeConcreteValue(String connectorId, ConnectorTransactionHandle value, TProtocolWriter writer) + throws Exception + { + requireNonNull(value, "value is null"); + writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getTransactionHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value)))); + } + + @Override + public boolean isThriftCodecAvailable(String connectorId) + { + return connectorCodecManager.getTransactionHandleCodec(connectorId).isPresent(); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/split/RemoteSplit.java b/presto-main-base/src/main/java/com/facebook/presto/split/RemoteSplit.java index c91d7f1e38395..b87429a880615 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/split/RemoteSplit.java +++ b/presto-main-base/src/main/java/com/facebook/presto/split/RemoteSplit.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.split; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.execution.Location; import com.facebook.presto.execution.TaskId; import com.facebook.presto.spi.ConnectorSplit; @@ -29,6 +32,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; +@ThriftStruct public class RemoteSplit implements ConnectorSplit { @@ -36,6 +40,7 @@ public class RemoteSplit private final TaskId remoteSourceTaskId; @JsonCreator + @ThriftConstructor public RemoteSplit(@JsonProperty("location") Location location, @JsonProperty("remoteSourceTaskId") TaskId remoteSourceTaskId) { this.location = requireNonNull(location, "location is null"); @@ -43,12 +48,14 @@ public RemoteSplit(@JsonProperty("location") Location location, @JsonProperty("r } @JsonProperty + @ThriftField(1) public Location getLocation() { return location; } @JsonProperty + @ThriftField(2) public TaskId getRemoteSourceTaskId() { return remoteSourceTaskId; diff --git a/presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 7d92bc2d27139..67e1c396d3c4b 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -14,6 +14,7 @@ package com.facebook.presto.testing; import com.facebook.airlift.node.NodeInfo; +import com.facebook.drift.codec.ThriftCodecManager; import com.facebook.presto.ClientRequestFilterManager; import com.facebook.presto.GroupByHashPageIndexerFactory; import com.facebook.presto.PagesIndexPageSorter; @@ -26,6 +27,7 @@ import com.facebook.presto.common.block.SortOrder; import com.facebook.presto.common.type.BooleanType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.connector.ConnectorCodecManager; import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.connector.system.AnalyzePropertiesSystemTable; import com.facebook.presto.connector.system.CatalogSystemTable; @@ -506,7 +508,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, new RowExpressionDeterminismEvaluator(metadata.getFunctionAndTypeManager()), new FilterStatsCalculator(metadata, scalarStatsCalculator, statsNormalizer), blockEncodingManager, - featuresConfig); + featuresConfig, + new ConnectorCodecManager(ThriftCodecManager::new)); GlobalSystemConnectorFactory globalSystemConnectorFactory = new GlobalSystemConnectorFactory(ImmutableSet.of( new NodeSystemTable(nodeManager), diff --git a/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteCodecProvider.java b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteCodecProvider.java new file mode 100644 index 0000000000000..fa68ecc23d993 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteCodecProvider.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.thrift; + +import com.facebook.drift.codec.ThriftCodecManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.google.inject.Provider; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class RemoteCodecProvider + implements ConnectorCodecProvider +{ + private final Provider thriftCodecManagerProvider; + + public RemoteCodecProvider(Provider thriftCodecManagerProvider) + { + this.thriftCodecManagerProvider = requireNonNull(thriftCodecManagerProvider, "thriftCodecManagerProvider is null"); + } + + @Override + public Optional> getConnectorSplitCodec() + { + return Optional.of(new RemoteSplitCodec(thriftCodecManagerProvider)); + } + + @Override + public Optional> getConnectorTransactionHandleCodec() + { + return Optional.of(new RemoteTransactionHandleCodec(thriftCodecManagerProvider)); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteSplitCodec.java b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteSplitCodec.java new file mode 100644 index 0000000000000..f8d1c90f1c35f --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteSplitCodec.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.thrift; + +import com.facebook.drift.codec.ThriftCodecManager; +import com.facebook.drift.protocol.TProtocolException; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.split.RemoteSplit; +import com.google.inject.Provider; + +import static com.facebook.presto.server.thrift.ThriftCodecUtils.fromThrift; +import static com.facebook.presto.server.thrift.ThriftCodecUtils.toThrift; +import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS; +import static java.util.Objects.requireNonNull; + +public class RemoteSplitCodec + implements ConnectorCodec +{ + private final Provider thriftCodecManagerProvider; + + public RemoteSplitCodec(Provider thriftCodecManagerProvider) + { + this.thriftCodecManagerProvider = requireNonNull(thriftCodecManagerProvider, "thriftCodecManagerProvider is null"); + } + + @Override + public byte[] serialize(ConnectorSplit split) + { + try { + return toThrift((RemoteSplit) split, thriftCodecManagerProvider.get().getCodec(RemoteSplit.class)); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize remote split", e); + } + } + + @Override + public ConnectorSplit deserialize(byte[] bytes) + { + try { + return fromThrift(bytes, thriftCodecManagerProvider.get().getCodec(RemoteSplit.class)); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize remote split", e); + } + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteTransactionHandleCodec.java b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteTransactionHandleCodec.java new file mode 100644 index 0000000000000..b3a664042764a --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteTransactionHandleCodec.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.thrift; + +import com.facebook.drift.codec.ThriftCodecManager; +import com.facebook.drift.protocol.TProtocolException; +import com.facebook.presto.metadata.RemoteTransactionHandle; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.google.inject.Provider; + +import static com.facebook.presto.server.thrift.ThriftCodecUtils.fromThrift; +import static com.facebook.presto.server.thrift.ThriftCodecUtils.toThrift; +import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS; +import static java.util.Objects.requireNonNull; + +public class RemoteTransactionHandleCodec + implements ConnectorCodec +{ + private final Provider thriftCodecManagerProvider; + + public RemoteTransactionHandleCodec(Provider thriftCodecManagerProvider) + { + this.thriftCodecManagerProvider = requireNonNull(thriftCodecManagerProvider, "thriftCodecManagerProvider is null"); + } + + @Override + public byte[] serialize(ConnectorTransactionHandle handle) + { + try { + return toThrift((RemoteTransactionHandle) handle, thriftCodecManagerProvider.get().getCodec(RemoteTransactionHandle.class)); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize remote transaction handle", e); + } + } + + @Override + public ConnectorTransactionHandle deserialize(byte[] bytes) + { + try { + return fromThrift(bytes, thriftCodecManagerProvider.get().getCodec(RemoteTransactionHandle.class)); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize remote transaction handle", e); + } + } +} diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestGroupInnerJoinsByConnectorRuleSet.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestGroupInnerJoinsByConnectorRuleSet.java index 3e3fbf6089d8d..d29241c1531a0 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestGroupInnerJoinsByConnectorRuleSet.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestGroupInnerJoinsByConnectorRuleSet.java @@ -201,7 +201,7 @@ public void testDPartialPushDownTwoDifferentConnectors() .matches( project( filter( - "a1 = b1 and a1 = c1 and true", + "a1 = b1 and a1 = c1 and true", join( JoinTableScanMatcher.tableScan(CATALOG_SUPPORTING_JOIN_PUSHDOWN, tableHandle1, "a1", "a2", "c1", "c2"), JoinTableScanMatcher.tableScan(LOCAL, tableHandle2, "b1", "b2"))))); @@ -277,7 +277,7 @@ public void testJoinPushDownHappenedWithFilters() .matches( project( filter( - "a1 = a2 and a1 > b1 and true", + "a1 = a2 and a1 > b1 and true", JoinTableScanMatcher.tableScan(CATALOG_SUPPORTING_JOIN_PUSHDOWN, tableHandle, "a1", "a2", "b1")))); } @@ -339,11 +339,11 @@ public void testPushDownWithTwoDifferentConnectors() tableScan(CATALOG_SUPPORTING_JOIN_PUSHDOWN, "b1", "b2"), tableScan(OTHER_CATALOG_SUPPORTING_JOIN_PUSHDOWN, "c1", "c2"), new EquiJoinClause(newBigintVariable("b1"), newBigintVariable("c1"))), - new EquiJoinClause(newBigintVariable("c1"), newBigintVariable("d1")))) + new EquiJoinClause(newBigintVariable("c1"), newBigintVariable("d1")))) .matches( project( filter( - "((a1 = b1 and a1 = d1) and (b1 = c1 and c1 = d1)) and true", + "((a1 = b1 and a1 = d1) and (b1 = c1 and c1 = d1)) and true", join( JoinTableScanMatcher.tableScan(CATALOG_SUPPORTING_JOIN_PUSHDOWN, tableHandle1, "a1", "b1"), JoinTableScanMatcher.tableScan(OTHER_CATALOG_SUPPORTING_JOIN_PUSHDOWN, tableHandle2, "c1", "d1"))))); @@ -352,9 +352,7 @@ public void testPushDownWithTwoDifferentConnectors() private RuleAssert assertGroupInnerJoinsByConnectorRuleSet() { // For testing, we do not wish to push down pulled up predicates - return tester.assertThat(new GroupInnerJoinsByConnectorRuleSet.OnlyJoinRule(tester.getMetadata(), - (plan, session, types, variableAllocator, idAllocator, warningCollector) -> - PlanOptimizerResult.optimizerResult(plan, false)), + return tester.assertThat(new GroupInnerJoinsByConnectorRuleSet.OnlyJoinRule(tester.getMetadata(), (plan, session, types, variableAllocator, idAllocator, warningCollector) -> PlanOptimizerResult.optimizerResult(plan, false)), ImmutableList.of(CATALOG_SUPPORTING_JOIN_PUSHDOWN, OTHER_CATALOG_SUPPORTING_JOIN_PUSHDOWN)); } @@ -473,8 +471,8 @@ public MatchResult detailMatches(PlanNode node, StatsProvider stats, Session ses ConnectorTableHandle connectorHandle = otherTable.getConnectorHandle(); if (connectorId.equals(otherTable.getConnectorId()) && Objects.equals(otherTable.getConnectorId(), this.tableHandle.getConnectorId()) && - Objects.equals(otherTable.getConnectorHandle(), this.tableHandle.getConnectorHandle()) && - Objects.equals(otherTable.getLayout().isPresent(), this.tableHandle.getLayout().isPresent())) { + Objects.equals(otherTable.getConnectorHandle(), this.tableHandle.getConnectorHandle()) && + Objects.equals(otherTable.getLayout().isPresent(), this.tableHandle.getLayout().isPresent())) { return MatchResult.match(SymbolAliases.builder().putAll(Arrays.stream(columns).collect(toMap(identity(), SymbolReference::new))).build()); } diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 2b2772d7d0b51..ddb834d818dfc 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -41,6 +41,7 @@ import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorCodecManager; import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.connector.system.SystemConnectorModule; import com.facebook.presto.cost.FilterStatsCalculator; @@ -98,7 +99,6 @@ import com.facebook.presto.metadata.SchemaPropertyManager; import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.metadata.SessionPropertyProviderConfig; -import com.facebook.presto.metadata.Split; import com.facebook.presto.metadata.StaticCatalogStore; import com.facebook.presto.metadata.StaticCatalogStoreConfig; import com.facebook.presto.metadata.StaticFunctionNamespaceStore; @@ -141,8 +141,7 @@ import com.facebook.presto.resourcemanager.ResourceManagerResourceGroupService; import com.facebook.presto.server.remotetask.HttpLocationFactory; import com.facebook.presto.server.thrift.FixedAddressSelector; -import com.facebook.presto.server.thrift.SplitCodec; -import com.facebook.presto.server.thrift.TableWriteInfoCodec; +import com.facebook.presto.server.thrift.HandleThriftModule; import com.facebook.presto.server.thrift.ThriftServerInfoClient; import com.facebook.presto.server.thrift.ThriftServerInfoService; import com.facebook.presto.server.thrift.ThriftTaskClient; @@ -426,10 +425,13 @@ else if (serverConfig.isCoordinator()) { binder.bind(TaskManagementExecutor.class).in(Scopes.SINGLETON); install(new DefaultThriftCodecsModule()); + // handle resolve for thrift + binder.install(new HandleThriftModule()); + thriftCodecBinder(binder).bindCustomThriftCodec(SqlInvokedFunctionCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(SqlFunctionIdCodec.class); - thriftCodecBinder(binder).bindCustomThriftCodec(SplitCodec.class); - thriftCodecBinder(binder).bindCustomThriftCodec(TableWriteInfoCodec.class); + + binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindListJsonCodec(TaskMemoryReservationSummary.class); binder.bind(SqlTaskManager.class).in(Scopes.SINGLETON); @@ -563,7 +565,6 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon jsonCodecBinder(binder).bindJsonCodec(TableCommitContext.class); jsonCodecBinder(binder).bindJsonCodec(SqlInvokedFunction.class); jsonCodecBinder(binder).bindJsonCodec(TaskSource.class); - jsonCodecBinder(binder).bindJsonCodec(Split.class); jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class); smileCodecBinder(binder).bindSmileCodec(TaskStatus.class); smileCodecBinder(binder).bindSmileCodec(TaskInfo.class); diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java index 5947963043a7c..a5a5a1bf88a65 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java +++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java @@ -33,6 +33,7 @@ import com.facebook.presto.common.ErrorCode; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorCodecManager; import com.facebook.presto.execution.Lifespan; import com.facebook.presto.execution.NodeTaskMap; import com.facebook.presto.execution.QueryManagerConfig; @@ -56,9 +57,21 @@ import com.facebook.presto.metadata.Split; import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.server.TaskUpdateRequest; -import com.facebook.presto.server.thrift.SplitCodec; -import com.facebook.presto.server.thrift.TableWriteInfoCodec; +import com.facebook.presto.server.thrift.ConnectorSplitThriftCodec; +import com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec; +import com.facebook.presto.server.thrift.InsertTableHandleThriftCodec; +import com.facebook.presto.server.thrift.OutputTableHandleThriftCodec; +import com.facebook.presto.server.thrift.TableHandleThriftCodec; +import com.facebook.presto.server.thrift.TableLayoutHandleThriftCodec; +import com.facebook.presto.server.thrift.TransactionHandleThriftCodec; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.sql.Serialization; @@ -74,6 +87,7 @@ import com.google.inject.Injector; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.Scopes; import io.airlift.units.DataSize; import io.airlift.units.Duration; import org.testng.annotations.DataProvider; @@ -370,14 +384,28 @@ public void configure(Binder binder) jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class); jsonCodecBinder(binder).bindJsonCodec(PlanFragment.class); jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class); - jsonCodecBinder(binder).bindJsonCodec(Split.class); jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionSerializer.class); jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionDeserializer.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorOutputTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class); + + binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON); + + thriftCodecBinder(binder).bindCustomThriftCodec(ConnectorSplitThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TransactionHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(OutputTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(InsertTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class); thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class); thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class); thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class); - thriftCodecBinder(binder).bindCustomThriftCodec(SplitCodec.class); - thriftCodecBinder(binder).bindCustomThriftCodec(TableWriteInfoCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(LocaleToLanguageTagCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(JodaDateTimeToEpochMillisThriftCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(DurationToMillisThriftCodec.class); diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java index d38892df6df33..e97cd8e37a2aa 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java +++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java @@ -32,6 +32,7 @@ import com.facebook.presto.common.ErrorCode; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorCodecManager; import com.facebook.presto.execution.Lifespan; import com.facebook.presto.execution.NodeTaskMap; import com.facebook.presto.execution.QueryManagerConfig; @@ -54,9 +55,21 @@ import com.facebook.presto.metadata.Split; import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.server.TaskUpdateRequest; -import com.facebook.presto.server.thrift.SplitCodec; -import com.facebook.presto.server.thrift.TableWriteInfoCodec; +import com.facebook.presto.server.thrift.ConnectorSplitThriftCodec; +import com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec; +import com.facebook.presto.server.thrift.InsertTableHandleThriftCodec; +import com.facebook.presto.server.thrift.OutputTableHandleThriftCodec; +import com.facebook.presto.server.thrift.TableHandleThriftCodec; +import com.facebook.presto.server.thrift.TableLayoutHandleThriftCodec; +import com.facebook.presto.server.thrift.TransactionHandleThriftCodec; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.sql.Serialization; @@ -73,6 +86,7 @@ import com.google.inject.Injector; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.Scopes; import io.airlift.units.DataSize; import io.airlift.units.Duration; import org.testng.annotations.DataProvider; @@ -378,14 +392,28 @@ public void configure(Binder binder) jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class); jsonCodecBinder(binder).bindJsonCodec(PlanFragment.class); jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class); - jsonCodecBinder(binder).bindJsonCodec(Split.class); jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionSerializer.class); jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionDeserializer.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorOutputTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class); + + binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON); + + thriftCodecBinder(binder).bindCustomThriftCodec(ConnectorSplitThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TransactionHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(OutputTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(InsertTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class); thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class); thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class); thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class); - thriftCodecBinder(binder).bindCustomThriftCodec(SplitCodec.class); - thriftCodecBinder(binder).bindCustomThriftCodec(TableWriteInfoCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(LocaleToLanguageTagCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(JodaDateTimeToEpochMillisThriftCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(DurationToMillisThriftCodec.class); diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index eeb8d1b8b6fc2..3fddc68925c89 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -70,7 +70,8 @@ public class PrestoNativeQueryRunnerUtils { - public enum QueryRunnerType { + public enum QueryRunnerType + { JAVA, NATIVE } @@ -176,8 +177,8 @@ public HiveQueryRunnerBuilder setFailOnNestedLoopJoin(boolean failOnNestedLoopJo public HiveQueryRunnerBuilder setUseThrift(boolean useThrift) { - this.extraProperties - .put("experimental.internal-communication.thrift-transport-enabled", String.valueOf(useThrift)); + this.extraProperties.put("experimental.internal-communication.thrift-transport-enabled", String.valueOf(useThrift)); + this.extraProperties.put("experimental.internal-communication.task-info-thrift-transport-enabled", String.valueOf(useThrift)); return this; } diff --git a/presto-spark-base/pom.xml b/presto-spark-base/pom.xml index 6926105a4fcaa..e2bcff9c8d4de 100644 --- a/presto-spark-base/pom.xml +++ b/presto-spark-base/pom.xml @@ -203,6 +203,11 @@ commons-text + + com.facebook.drift + drift-codec + + com.facebook.presto presto-testng-services diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java index ab050bb2c63c2..53abdce4a89ff 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java @@ -19,6 +19,7 @@ import com.facebook.airlift.json.smile.SmileCodec; import com.facebook.airlift.node.NodeConfig; import com.facebook.airlift.node.NodeInfo; +import com.facebook.drift.codec.ThriftCodecManager; import com.facebook.presto.ClientRequestFilterManager; import com.facebook.presto.GroupByHashPageIndexerFactory; import com.facebook.presto.PagesIndexPageSorter; @@ -32,6 +33,7 @@ import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorCodecManager; import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.connector.system.SystemConnectorModule; import com.facebook.presto.cost.CostCalculator; @@ -426,6 +428,10 @@ protected void setup(Binder binder) binder.bind(PageSourceManager.class).in(Scopes.SINGLETON); binder.bind(PageSourceProvider.class).to(PageSourceManager.class).in(Scopes.SINGLETON); + // for thrift serde + binder.bind(ThriftCodecManager.class).toInstance(new ThriftCodecManager()); + binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON); + // page sink provider binder.bind(PageSinkManager.class).in(Scopes.SINGLETON); binder.bind(PageSinkProvider.class).to(PageSinkManager.class).in(Scopes.SINGLETON); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorCodec.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorCodec.java new file mode 100644 index 0000000000000..baded6e02eb8e --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorCodec.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi; + +import com.facebook.presto.spi.api.Experimental; + +@Experimental +public interface ConnectorCodec +{ + byte[] serialize(T value); + + T deserialize(byte[] bytes); +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/SchemaTableName.java b/presto-spi/src/main/java/com/facebook/presto/spi/SchemaTableName.java index 349c41e7eb205..b34f1309e9b28 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/SchemaTableName.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/SchemaTableName.java @@ -49,14 +49,14 @@ public static SchemaTableName valueOf(String schemaTableName) } @JsonProperty("schema") - @ThriftField(1) + @ThriftField(value = 1, name = "schema") public String getSchemaName() { return schemaName; } @JsonProperty("table") - @ThriftField(2) + @ThriftField(value = 2, name = "table") public String getTableName() { return tableName; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/SplitContext.java b/presto-spi/src/main/java/com/facebook/presto/spi/SplitContext.java index 16ccbfa188a9d..9b686fd7fdaf9 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/SplitContext.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/SplitContext.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.spi; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.common.predicate.TupleDomain; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -22,6 +25,7 @@ import static java.util.Objects.requireNonNull; // TODO: Use builder pattern for SplitContext if we are to add optional field +@ThriftStruct public class SplitContext { public static final SplitContext NON_CACHEABLE = new SplitContext(false); @@ -31,6 +35,7 @@ public class SplitContext private final Optional> dynamicFilterPredicate; @JsonCreator + @ThriftConstructor public SplitContext(@JsonProperty boolean cacheable) { this(cacheable, Optional.empty()); @@ -48,6 +53,7 @@ private SplitContext(boolean cacheable, Optional> dyna } @JsonProperty + @ThriftField(1) public boolean isCacheable() { return cacheable; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/SplitWeight.java b/presto-spi/src/main/java/com/facebook/presto/spi/SplitWeight.java index cc8289de8d312..b7f5843fa375f 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/SplitWeight.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/SplitWeight.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.spi; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; @@ -23,6 +26,7 @@ import static java.lang.Math.addExact; import static java.lang.Math.multiplyExact; +@ThriftStruct public final class SplitWeight { private static final long UNIT_VALUE = 100; @@ -31,7 +35,8 @@ public final class SplitWeight private final long value; - private SplitWeight(long value) + @ThriftConstructor + public SplitWeight(long value) { if (value <= 0) { throw new IllegalArgumentException("value must be > 0, found: " + value); @@ -43,6 +48,7 @@ private SplitWeight(long value) * @return The internal integer representation for this weight value */ @JsonValue + @ThriftField(value = 1, name = "value") public long getRawValue() { return value; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/TableHandle.java b/presto-spi/src/main/java/com/facebook/presto/spi/TableHandle.java index bbaa4a9422f3c..51df365849ee0 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/TableHandle.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/TableHandle.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.spi; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.fasterxml.jackson.annotation.JsonCreator; @@ -24,6 +27,7 @@ import static java.util.Objects.requireNonNull; +@ThriftStruct public final class TableHandle { private final ConnectorId connectorId; @@ -47,6 +51,16 @@ public TableHandle( this(connectorId, connectorHandle, transaction, layout, Optional.empty()); } + @ThriftConstructor + public TableHandle( + ConnectorId connectorId, + ConnectorTableHandle connectorHandle, + ConnectorTransactionHandle transaction, + ConnectorTableLayoutHandle connectorTableLayout) + { + this(connectorId, connectorHandle, transaction, Optional.of(connectorTableLayout), Optional.empty()); + } + public TableHandle( ConnectorId connectorId, ConnectorTableHandle connectorHandle, @@ -62,18 +76,21 @@ public TableHandle( } @JsonProperty + @ThriftField(1) public ConnectorId getConnectorId() { return connectorId; } @JsonProperty + @ThriftField(2) public ConnectorTableHandle getConnectorHandle() { return connectorHandle; } @JsonProperty + @ThriftField(3) public ConnectorTransactionHandle getTransaction() { return transaction; @@ -85,6 +102,12 @@ public Optional getLayout() return layout; } + @ThriftField(value = 4, name = "connectorTableLayout") + public ConnectorTableLayoutHandle getLayoutHandle() + { + return layout.orElse(null); + } + public Optional>> getDynamicFilter() { return dynamicFilter; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/Connector.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/Connector.java index f4603e66016c1..6fd46075ff1c8 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/Connector.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/Connector.java @@ -85,6 +85,14 @@ default ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider() throw new UnsupportedOperationException(); } + /** + * @throws UnsupportedOperationException if this connector does not support connector specific codec + */ + default ConnectorCodecProvider getConnectorCodecProvider() + { + throw new UnsupportedOperationException(); + } + /** * @return the set of system tables provided by this connector */ diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java new file mode 100644 index 0000000000000..4bd2d81d456b4 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.connector; + +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; + +import java.util.Optional; + +public interface ConnectorCodecProvider +{ + default Optional> getConnectorSplitCodec() + { + return Optional.empty(); + } + + default Optional> getConnectorTransactionHandleCodec() + { + return Optional.empty(); + } + + default Optional> getConnectorOutputTableHandleCodec() + { + return Optional.empty(); + } + + default Optional> getConnectorInsertTableHandleCodec() + { + return Optional.empty(); + } + + default Optional> getConnectorDeleteTableHandleCodec() + { + return Optional.empty(); + } + + default Optional> getConnectorTableLayoutHandleCodec() + { + return Optional.empty(); + } + + default Optional> getConnectorTableHandleCodec() + { + return Optional.empty(); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/schedule/NodeSelectionStrategy.java b/presto-spi/src/main/java/com/facebook/presto/spi/schedule/NodeSelectionStrategy.java index 384d5b98e7999..06930f8730853 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/schedule/NodeSelectionStrategy.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/schedule/NodeSelectionStrategy.java @@ -13,9 +13,27 @@ */ package com.facebook.presto.spi.schedule; +import com.facebook.drift.annotations.ThriftEnum; +import com.facebook.drift.annotations.ThriftEnumValue; + +@ThriftEnum public enum NodeSelectionStrategy { - HARD_AFFINITY, - SOFT_AFFINITY, - NO_PREFERENCE + HARD_AFFINITY(0), + SOFT_AFFINITY(1), + NO_PREFERENCE(2), + /**/; + + private final int value; + + NodeSelectionStrategy(int value) + { + this.value = value; + } + + @ThriftEnumValue + public int getValue() + { + return value; + } } diff --git a/presto-thrift-spec/pom.xml b/presto-thrift-spec/pom.xml index 6b4e3e4baed8c..a403c718fbf18 100644 --- a/presto-thrift-spec/pom.xml +++ b/presto-thrift-spec/pom.xml @@ -20,6 +20,7 @@ com.facebook.presto presto-common + com.facebook.presto presto-main-base @@ -53,14 +54,30 @@ com.facebook.presto.execution.TaskStatus com.facebook.presto.execution.TaskInfo com.facebook.presto.server.TaskUpdateRequest + com.facebook.presto.spi.ConnectorSplit + com.facebook.presto.spi.connector.ConnectorTransactionHandle + com.facebook.presto.spi.ConnectorOutputTableHandle + com.facebook.presto.spi.ConnectorDeleteTableHandle + com.facebook.presto.spi.ConnectorInsertTableHandle + com.facebook.presto.spi.ConnectorTableHandle + com.facebook.presto.spi.ConnectorTableLayoutHandle + + facebook.presto.thrift + - com.facebook.presto.server.thrift.SplitCodec - com.facebook.presto.server.thrift.TableWriteInfoCodec + com.facebook.presto.server.thrift.ConnectorSplitThriftCodec + com.facebook.presto.server.thrift.TransactionHandleThriftCodec + com.facebook.presto.server.thrift.OutputTableHandleThriftCodec + com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec + com.facebook.presto.server.thrift.InsertTableHandleThriftCodec + com.facebook.presto.server.thrift.TableHandleThriftCodec + com.facebook.presto.server.thrift.TableLayoutHandleThriftCodec com.facebook.drift.codec.utils.DurationToMillisThriftCodec com.facebook.drift.codec.utils.DataSizeToBytesThriftCodec com.facebook.drift.codec.utils.JodaDateTimeToEpochMillisThriftCodec com.facebook.drift.codec.utils.LocaleToLanguageTagCodec + com.facebook.drift.codec.utils.UuidToLeachSalzBinaryEncodingThriftCodec com.facebook.drift.codec.internal.builtin.OptionalIntThriftCodec com.facebook.drift.codec.internal.builtin.OptionalLongThriftCodec com.facebook.drift.codec.internal.builtin.OptionalDoubleThriftCodec diff --git a/presto-tpcds/pom.xml b/presto-tpcds/pom.xml index a1f4befb094cd..6d2d68f8f7c00 100644 --- a/presto-tpcds/pom.xml +++ b/presto-tpcds/pom.xml @@ -42,6 +42,16 @@ jackson-datatype-jdk8 + + com.facebook.drift + drift-codec + + + + com.facebook.drift + drift-protocol + + com.facebook.presto @@ -67,6 +77,12 @@ provided + + io.airlift + units + provided + + com.fasterxml.jackson.core jackson-annotations diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java index 092ded05a523c..0e3880c20c022 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java @@ -13,9 +13,11 @@ */ package com.facebook.presto.tpcds; +import com.facebook.drift.codec.ThriftCodecManager; import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.NodeManager; import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; import com.facebook.presto.spi.connector.ConnectorContext; import com.facebook.presto.spi.connector.ConnectorFactory; import com.facebook.presto.spi.connector.ConnectorMetadata; @@ -24,6 +26,7 @@ import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.transaction.IsolationLevel; +import com.facebook.presto.tpcds.thrift.TpcdsCodecProvider; import java.util.Map; @@ -103,6 +106,12 @@ public ConnectorNodePartitioningProvider getNodePartitioningProvider() { return new TpcdsNodePartitioningProvider(nodeManager, splitsPerNode); } + + @Override + public ConnectorCodecProvider getConnectorCodecProvider() + { + return new TpcdsCodecProvider(new ThriftCodecManager()); + } }; } diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java index 466a9b96302cc..7ad33fb467e7a 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.tpcds; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.NodeProvider; @@ -29,6 +32,7 @@ import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; +@ThriftStruct public class TpcdsSplit implements ConnectorSplit { @@ -39,6 +43,7 @@ public class TpcdsSplit private final boolean noSexism; @JsonCreator + @ThriftConstructor public TpcdsSplit( @JsonProperty("tableHandle") TpcdsTableHandle tableHandle, @JsonProperty("partNumber") int partNumber, @@ -60,18 +65,21 @@ public TpcdsSplit( } @JsonProperty + @ThriftField(1) public TpcdsTableHandle getTableHandle() { return tableHandle; } @JsonProperty + @ThriftField(2) public int getTotalParts() { return totalParts; } @JsonProperty + @ThriftField(3) public int getPartNumber() { return partNumber; @@ -90,6 +98,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy() } @JsonProperty + @ThriftField(4) public List getAddresses() { return addresses; @@ -102,6 +111,7 @@ public List getPreferredNodes(NodeProvider nodeProvider) } @JsonProperty + @ThriftField(5) public boolean isNoSexism() { return noSexism; diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java index e2434dc254d78..581b091c1d52c 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.tpcds; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.ConnectorTableHandle; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -22,6 +25,7 @@ import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; +@ThriftStruct public class TpcdsTableHandle implements ConnectorTableHandle { @@ -29,6 +33,7 @@ public class TpcdsTableHandle private final double scaleFactor; @JsonCreator + @ThriftConstructor public TpcdsTableHandle(@JsonProperty("tableName") String tableName, @JsonProperty("scaleFactor") double scaleFactor) { this.tableName = requireNonNull(tableName, "tableName is null"); @@ -37,12 +42,14 @@ public TpcdsTableHandle(@JsonProperty("tableName") String tableName, @JsonProper } @JsonProperty + @ThriftField(1) public String getTableName() { return tableName; } @JsonProperty + @ThriftField(2) public double getScaleFactor() { return scaleFactor; diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java index 04b2ed175f482..45e8d9cd1f221 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java @@ -13,24 +13,30 @@ */ package com.facebook.presto.tpcds; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import static java.util.Objects.requireNonNull; +@ThriftStruct public class TpcdsTableLayoutHandle implements ConnectorTableLayoutHandle { private final TpcdsTableHandle table; @JsonCreator + @ThriftConstructor public TpcdsTableLayoutHandle(@JsonProperty("table") TpcdsTableHandle table) { this.table = requireNonNull(table, "table is null"); } @JsonProperty + @ThriftField(1) public TpcdsTableHandle getTable() { return table; diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTransactionHandle.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTransactionHandle.java index 1e8e4d0c99e27..7f362324ec9a8 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTransactionHandle.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTransactionHandle.java @@ -13,10 +13,26 @@ */ package com.facebook.presto.tpcds; +import com.facebook.drift.annotations.ThriftEnum; +import com.facebook.drift.annotations.ThriftEnumValue; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +@ThriftEnum public enum TpcdsTransactionHandle implements ConnectorTransactionHandle { - INSTANCE + INSTANCE(1); + + private final int value; + + TpcdsTransactionHandle(int value) + { + this.value = value; + } + + @ThriftEnumValue + public int getValue() + { + return value; + } } diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/ThriftCodecUtils.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/ThriftCodecUtils.java new file mode 100644 index 0000000000000..4dbff7ef02038 --- /dev/null +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/ThriftCodecUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tpcds.thrift; + +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.protocol.TBinaryProtocol; +import com.facebook.drift.protocol.TMemoryBuffer; +import com.facebook.drift.protocol.TMemoryBufferWriteOnly; +import com.facebook.drift.protocol.TProtocolException; + +public class ThriftCodecUtils +{ + private ThriftCodecUtils() {} + + public static T fromThrift(byte[] bytes, ThriftCodec thriftCodec) + throws TProtocolException + { + try { + TMemoryBuffer transport = new TMemoryBuffer(bytes.length); + transport.write(bytes); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + return thriftCodec.read(protocol); + } + catch (Exception e) { + throw new TProtocolException("Can not deserialize the data", e); + } + } + + public static byte[] toThrift(T value, ThriftCodec thriftCodec) + throws TProtocolException + { + TMemoryBufferWriteOnly transport = new TMemoryBufferWriteOnly(1024); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + try { + thriftCodec.write(value, protocol); + return transport.getBytes(); + } + catch (Exception e) { + throw new TProtocolException("Can not serialize the data", e); + } + } +} diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java new file mode 100644 index 0000000000000..09545c6b96fe7 --- /dev/null +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tpcds.thrift; + +import com.facebook.drift.codec.ThriftCodecManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class TpcdsCodecProvider + implements ConnectorCodecProvider +{ + private final ThriftCodecManager thriftCodecManager; + + public TpcdsCodecProvider(ThriftCodecManager thriftCodecManager) + { + this.thriftCodecManager = requireNonNull(thriftCodecManager, "thriftCodecManager is null"); + } + + @Override + public Optional> getConnectorSplitCodec() + { + return Optional.of(new TpcdsSplitCodec(thriftCodecManager)); + } + + @Override + public Optional> getConnectorTransactionHandleCodec() + { + return Optional.of(new TpcdsTransactionHandleCodec(thriftCodecManager)); + } + + @Override + public Optional> getConnectorTableLayoutHandleCodec() + { + return Optional.of(new TpcdsTableLayoutHandleCodec(thriftCodecManager)); + } + + @Override + public Optional> getConnectorTableHandleCodec() + { + return Optional.of(new TpcdsTableHandleCodec(thriftCodecManager)); + } +} diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsSplitCodec.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsSplitCodec.java new file mode 100644 index 0000000000000..32e45525de417 --- /dev/null +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsSplitCodec.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tpcds.thrift; + +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.codec.ThriftCodecManager; +import com.facebook.drift.protocol.TProtocolException; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.tpcds.TpcdsSplit; + +import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS; +import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.fromThrift; +import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.toThrift; +import static java.util.Objects.requireNonNull; + +public class TpcdsSplitCodec + implements ConnectorCodec +{ + private final ThriftCodec thriftCodec; + + public TpcdsSplitCodec(ThriftCodecManager thriftCodecManager) + { + this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(TpcdsSplit.class); + } + + @Override + public byte[] serialize(ConnectorSplit split) + { + try { + return toThrift((TpcdsSplit) split, thriftCodec); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize tpcds split", e); + } + } + + @Override + public ConnectorSplit deserialize(byte[] bytes) + { + try { + return fromThrift(bytes, thriftCodec); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize tpcds split", e); + } + } +} diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableHandleCodec.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableHandleCodec.java new file mode 100644 index 0000000000000..815d981bc1059 --- /dev/null +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableHandleCodec.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tpcds.thrift; + +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.codec.ThriftCodecManager; +import com.facebook.drift.protocol.TProtocolException; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.tpcds.TpcdsTableHandle; + +import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS; +import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.fromThrift; +import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.toThrift; +import static java.util.Objects.requireNonNull; + +public class TpcdsTableHandleCodec + implements ConnectorCodec +{ + private final ThriftCodec thriftCodec; + + public TpcdsTableHandleCodec(ThriftCodecManager thriftCodecManager) + { + this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(TpcdsTableHandle.class); + } + + @Override + public byte[] serialize(ConnectorTableHandle handle) + { + try { + return toThrift((TpcdsTableHandle) handle, thriftCodec); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize tpcds table handle", e); + } + } + + @Override + public ConnectorTableHandle deserialize(byte[] bytes) + { + try { + return fromThrift(bytes, thriftCodec); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize tpcds table handle", e); + } + } +} diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableLayoutHandleCodec.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableLayoutHandleCodec.java new file mode 100644 index 0000000000000..c7a84e168d3ce --- /dev/null +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableLayoutHandleCodec.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tpcds.thrift; + +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.codec.ThriftCodecManager; +import com.facebook.drift.protocol.TProtocolException; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.tpcds.TpcdsTableLayoutHandle; + +import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS; +import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.fromThrift; +import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.toThrift; +import static java.util.Objects.requireNonNull; + +public class TpcdsTableLayoutHandleCodec + implements ConnectorCodec +{ + private final ThriftCodec thriftCodec; + + public TpcdsTableLayoutHandleCodec(ThriftCodecManager thriftCodecManager) + { + this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(TpcdsTableLayoutHandle.class); + } + + @Override + public byte[] serialize(ConnectorTableLayoutHandle handle) + { + try { + return toThrift((TpcdsTableLayoutHandle) handle, thriftCodec); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize tpcds table Layout handle", e); + } + } + + @Override + public ConnectorTableLayoutHandle deserialize(byte[] bytes) + { + try { + return fromThrift(bytes, thriftCodec); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize tpcds table Layout handle", e); + } + } +} diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTransactionHandleCodec.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTransactionHandleCodec.java new file mode 100644 index 0000000000000..0f98eefb4d40d --- /dev/null +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTransactionHandleCodec.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tpcds.thrift; + +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.codec.ThriftCodecManager; +import com.facebook.drift.protocol.TProtocolException; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.tpcds.TpcdsTransactionHandle; + +import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS; +import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.fromThrift; +import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.toThrift; +import static java.util.Objects.requireNonNull; + +public class TpcdsTransactionHandleCodec + implements ConnectorCodec +{ + private final ThriftCodec thriftCodec; + + public TpcdsTransactionHandleCodec(ThriftCodecManager thriftCodecManager) + { + this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(TpcdsTransactionHandle.class); + } + + @Override + public byte[] serialize(ConnectorTransactionHandle handle) + { + try { + return toThrift((TpcdsTransactionHandle) handle, thriftCodec); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize tpcds transaction handle", e); + } + } + + @Override + public ConnectorTransactionHandle deserialize(byte[] bytes) + { + try { + return fromThrift(bytes, thriftCodec); + } + catch (TProtocolException e) { + throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize tpcds transaction handle", e); + } + } +} diff --git a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithThrift.java b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithThrift.java new file mode 100644 index 0000000000000..5ce27d9be0815 --- /dev/null +++ b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithThrift.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tpcds; + +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; + +public class TestTpcdsWithThrift + extends AbstractTestTpcds +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return TpcdsQueryRunner.createQueryRunner(ImmutableMap.builder() + .put("experimental.internal-communication.task-info-response-thrift-serde-enabled", "true") + .put("experimental.internal-communication.task-update-request-thrift-serde-enabled", "true") + .build()); + } +}