Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<dep.okhttp.version>4.12.0</dep.okhttp.version>
<dep.jdbi3.version>3.4.0</dep.jdbi3.version>
<dep.oracle.version>19.3.0.0</dep.oracle.version>
<dep.drift.version>1.43</dep.drift.version>
<dep.drift.version>1.45</dep.drift.version>
<!-- Changing joda version changes tzdata which must match deployed JVM tzdata
Do not change this without also making sure it matches -->
<dep.joda.version>2.13.1</dep.joda.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package com.facebook.presto.hive;

import com.facebook.drift.annotations.ThriftConstructor;
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;
Expand All @@ -24,6 +27,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

@ThriftStruct
public class CacheQuotaRequirement
{
public static final CacheQuotaRequirement NO_CACHE_REQUIREMENT = new CacheQuotaRequirement(GLOBAL, Optional.empty());
Expand All @@ -32,6 +36,7 @@ public class CacheQuotaRequirement
private final Optional<DataSize> quota;

@JsonCreator
@ThriftConstructor
public CacheQuotaRequirement(
@JsonProperty("cacheQuotaScope") CacheQuotaScope cacheQuotaScope,
@JsonProperty("quota") Optional<DataSize> quota)
Expand All @@ -41,12 +46,14 @@ public CacheQuotaRequirement(
}

@JsonProperty
@ThriftField(1)
public CacheQuotaScope getCacheQuotaScope()
{
return cacheQuotaScope;
}

@JsonProperty
@ThriftField(2)
public Optional<DataSize> getQuota()
{
return quota;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,28 @@
*/
package com.facebook.presto.hive;

import com.facebook.drift.annotations.ThriftEnum;
import com.facebook.drift.annotations.ThriftEnumValue;

@ThriftEnum
public enum CacheQuotaScope
{
GLOBAL, SCHEMA, TABLE, PARTITION
GLOBAL(0),
SCHEMA(1),
TABLE(2),
PARTITION(3),
/**/;

private final int value;

CacheQuotaScope(int value)
{
this.value = value;
}

@ThriftEnumValue
public int getValue()
{
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector;

import com.facebook.drift.codec.ThriftCodecManager;
import com.facebook.presto.spi.ConnectorCodec;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.connector.ConnectorCodecProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.thrift.RemoteCodecProvider;
import com.google.inject.Provider;

import javax.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID;
import static java.util.Objects.requireNonNull;

public class ConnectorCodecManager
{
private final Map<String, ConnectorCodecProvider> connectorCodecProviders = new ConcurrentHashMap<>();

@Inject
public ConnectorCodecManager(Provider<ThriftCodecManager> thriftCodecManagerProvider)
{
requireNonNull(thriftCodecManagerProvider, "thriftCodecManager is null");

connectorCodecProviders.put(REMOTE_CONNECTOR_ID.toString(), new RemoteCodecProvider(thriftCodecManagerProvider));
}

public void addConnectorCodecProvider(ConnectorId connectorId, ConnectorCodecProvider connectorCodecProvider)
{
requireNonNull(connectorId, "connectorId is null");
requireNonNull(connectorCodecProvider, "connectorThriftCodecProvider is null");
connectorCodecProviders.put(connectorId.getCatalogName(), connectorCodecProvider);
}

public Optional<ConnectorCodec<ConnectorSplit>> getConnectorSplitCodec(String connectorId)
{
requireNonNull(connectorId, "connectorId is null");
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorSplitCodec);
}

public Optional<ConnectorCodec<ConnectorTransactionHandle>> getTransactionHandleCodec(String connectorId)
{
requireNonNull(connectorId, "connectorId is null");
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTransactionHandleCodec);
}

public Optional<ConnectorCodec<ConnectorOutputTableHandle>> getOutputTableHandleCodec(String connectorId)
{
requireNonNull(connectorId, "connectorId is null");
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorOutputTableHandleCodec);
}

public Optional<ConnectorCodec<ConnectorInsertTableHandle>> getInsertTableHandleCodec(String connectorId)
{
requireNonNull(connectorId, "connectorId is null");
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorInsertTableHandleCodec);
}

public Optional<ConnectorCodec<ConnectorDeleteTableHandle>> getDeleteTableHandleCodec(String connectorId)
{
requireNonNull(connectorId, "connectorId is null");
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorDeleteTableHandleCodec);
}

public Optional<ConnectorCodec<ConnectorTableLayoutHandle>> getTableLayoutHandleCodec(String connectorId)
{
requireNonNull(connectorId, "connectorId is null");
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableLayoutHandleCodec);
}

public Optional<ConnectorCodec<ConnectorTableHandle>> getTableHandleCodec(String connectorId)
{
requireNonNull(connectorId, "connectorId is null");
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableHandleCodec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorAccessControl;
import com.facebook.presto.spi.connector.ConnectorCodecProvider;
import com.facebook.presto.spi.connector.ConnectorContext;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.connector.ConnectorIndexProvider;
Expand Down Expand Up @@ -118,6 +119,7 @@ public class ConnectorManager
private final FilterStatsCalculator filterStatsCalculator;
private final BlockEncodingSerde blockEncodingSerde;
private final ConnectorSystemConfig connectorSystemConfig;
private final ConnectorCodecManager connectorCodecManager;

@GuardedBy("this")
private final ConcurrentMap<String, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -151,7 +153,8 @@ public ConnectorManager(
DeterminismEvaluator determinismEvaluator,
FilterStatsCalculator filterStatsCalculator,
BlockEncodingSerde blockEncodingSerde,
FeaturesConfig featuresConfig)
FeaturesConfig featuresConfig,
ConnectorCodecManager connectorCodecManager)
{
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
this.catalogManager = requireNonNull(catalogManager, "catalogManager is null");
Expand All @@ -176,6 +179,7 @@ public ConnectorManager(
this.filterStatsCalculator = requireNonNull(filterStatsCalculator, "filterStatsCalculator is null");
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.connectorSystemConfig = () -> featuresConfig.isNativeExecutionEnabled();
this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
}

@PreDestroy
Expand Down Expand Up @@ -303,7 +307,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
connector.getPlanOptimizerProvider()
.ifPresent(planOptimizerProvider -> connectorPlanOptimizerManager.addPlanOptimizerProvider(connectorId, planOptimizerProvider));
}

connector.getConnectorCodecProvider().ifPresent(connectorCodecProvider -> connectorCodecManager.addConnectorCodecProvider(connectorId, connectorCodecProvider));
metadataManager.getProcedureRegistry().addProcedures(connectorId, connector.getProcedures());

connector.getAccessControl()
Expand Down Expand Up @@ -392,6 +396,7 @@ private static class MaterializedConnector
private final Optional<ConnectorIndexProvider> indexProvider;
private final Optional<ConnectorNodePartitioningProvider> partitioningProvider;
private final Optional<ConnectorPlanOptimizerProvider> planOptimizerProvider;
private final Optional<ConnectorCodecProvider> connectorCodecProvider;
private final Optional<ConnectorAccessControl> accessControl;
private final List<PropertyMetadata<?>> sessionProperties;
private final List<PropertyMetadata<?>> tableProperties;
Expand Down Expand Up @@ -472,6 +477,15 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector)
}
this.planOptimizerProvider = Optional.ofNullable(planOptimizerProvider);

ConnectorCodecProvider connectorCodecProvider = null;
try {
connectorCodecProvider = connector.getConnectorCodecProvider();
requireNonNull(connectorCodecProvider, format("Connector %s returned null connector specific codec provider", connectorId));
}
catch (UnsupportedOperationException ignored) {
}
this.connectorCodecProvider = Optional.ofNullable(connectorCodecProvider);

ConnectorAccessControl accessControl = null;
try {
accessControl = connector.getAccessControl();
Expand Down Expand Up @@ -580,5 +594,10 @@ public List<PropertyMetadata<?>> getAnalyzeProperties()
{
return analyzeProperties;
}

public Optional<ConnectorCodecProvider> getConnectorCodecProvider()
{
return connectorCodecProvider;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,30 @@
*/
package com.facebook.presto.execution;

import com.facebook.drift.annotations.ThriftConstructor;
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.net.URI;

import static java.util.Objects.requireNonNull;

@ThriftStruct
public class Location
{
private final String location;

@JsonCreator
@ThriftConstructor
public Location(@JsonProperty("location") String location)
{
this.location = requireNonNull(location, "location is null");
}

@JsonProperty
@ThriftField(1)
public String getLocation()
{
return location;
Expand Down
Loading
Loading