From 05160045c5abc1d93092d9d8be04da0ab66dc52f Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Fri, 3 Jun 2022 13:00:44 -0400 Subject: [PATCH 1/9] Initial Work --- .../store/druid/DruidBatchRecordReader.java | 161 ++++++++++++++++++ .../exec/store/druid/DruidGroupScan.java | 40 ++++- .../store/druid/DruidScanBatchCreator.java | 45 +++++ .../exec/store/druid/DruidStoragePlugin.java | 38 ++++- .../drill/exec/store/druid/DruidSubScan.java | 28 ++- 5 files changed, 296 insertions(+), 16 deletions(-) create mode 100644 contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java new file mode 100644 index 00000000000..264be74bb32 --- /dev/null +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.druid; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec; +import org.apache.drill.exec.store.druid.common.DruidFilter; +import org.apache.drill.exec.store.druid.druid.DruidScanResponse; +import org.apache.drill.exec.store.druid.druid.ScanQuery; +import org.apache.drill.exec.store.druid.druid.ScanQueryBuilder; +import org.apache.drill.exec.store.druid.rest.DruidQueryClient; +import org.apache.drill.exec.store.easy.json.loader.JsonLoader; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; +import org.apache.drill.exec.vector.BaseValueVector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +public class DruidBatchRecordReader implements ManagedReader { + private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class); + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private final DruidStoragePlugin plugin; + private final DruidSubScan.DruidSubScanSpec scanSpec; + private final List columns; + private final DruidFilter filter; + private final DruidQueryClient druidQueryClient; + private final FragmentContext fragmentContext; + + private final TupleMetadata schema; + private BigInteger nextOffset = BigInteger.ZERO; + private int maxRecordsToRead = -1; + + private JsonLoaderBuilder jsonBuilder; + + private JsonLoader jsonLoader; + private ResultSetLoader resultSetLoader; + + private CustomErrorContext errorContext; + + + public DruidBatchRecordReader(DruidSubScanSpec subScanSpec, + List projectedColumns, + int maxRecordsToRead, + FragmentContext context, + DruidStoragePlugin plugin) { + columns = new ArrayList<>(); + setColumns(projectedColumns); + this.maxRecordsToRead = maxRecordsToRead; + this.plugin = plugin; + scanSpec = subScanSpec; + this.schema = scanSpec.getSchema(); + fragmentContext = context; + this.filter = subScanSpec.getFilter(); + this.druidQueryClient = plugin.getDruidQueryClient(); + } + + @Override + public boolean open(SchemaNegotiator negotiator) { + resultSetLoader = negotiator.build(); + errorContext = negotiator.parentErrorContext(); + + negotiator + jsonBuilder = new JsonLoaderBuilder() + .resultSetLoader(resultSetLoader) + .errorContext(errorContext); + + + + return true; + } + + @Override + public boolean next() { + boolean result = false; + try { + String query = getQuery(); + DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query); + setNextOffset(druidScanResponse); + + for (ObjectNode eventNode : druidScanResponse.getEvents()) { + jsonLoader = jsonBuilder + .fromString(eventNode.asText()) + .build(); + result = jsonLoader.readBatch(); + } + return result; + } catch (Exception e) { + throw UserException + .dataReadError(e) + .message("Failure while executing druid query: " + e.getMessage()) + .addContext(errorContext) + .build(logger); + } + } + + @Override + public void close() { + if (jsonLoader != null) { + jsonLoader.close(); + jsonLoader = null; + } + + if (! nextOffset.equals(BigInteger.ZERO)) { + nextOffset = BigInteger.ZERO; + } + } + + private String getQuery() throws JsonProcessingException { + int queryThreshold = + maxRecordsToRead >= 0 + ? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, maxRecordsToRead) + : BaseValueVector.INITIAL_VALUE_ALLOCATION; + ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder(); + ScanQuery scanQuery = + scanQueryBuilder.build( + scanSpec.dataSourceName, + columns, + filter, + nextOffset, + queryThreshold, + scanSpec.getMinTime(), + scanSpec.getMaxTime() + ); + return objectMapper.writeValueAsString(scanQuery); + } + + private void setNextOffset(DruidScanResponse druidScanResponse) { + nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size())); + } +} diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java index 24dce99da6b..36673ebc98f 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java @@ -23,14 +23,17 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.calcite.avatica.Meta; import org.apache.drill.common.PlanStringBuilder; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.schedule.AffinityCreator; @@ -44,6 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -55,6 +59,8 @@ public class DruidGroupScan extends AbstractGroupScan { private final DruidScanSpec scanSpec; private final DruidStoragePlugin storagePlugin; + private MetadataProviderManager metadataProviderManager; + private List columns; private boolean filterPushedDown = false; private int maxRecordsToRead; @@ -73,19 +79,20 @@ public DruidGroupScan(@JsonProperty("userName") String userName, pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class), scanSpec, columns, - maxRecordsToRead); + maxRecordsToRead, null); } public DruidGroupScan(String userName, DruidStoragePlugin storagePlugin, DruidScanSpec scanSpec, List columns, - int maxRecordsToRead) { + int maxRecordsToRead, MetadataProviderManager metadataProviderManager) { super(userName); this.storagePlugin = storagePlugin; this.scanSpec = scanSpec; this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; this.maxRecordsToRead = maxRecordsToRead; + this.metadataProviderManager = metadataProviderManager; init(); } @@ -102,6 +109,7 @@ private DruidGroupScan(DruidGroupScan that) { this.filterPushedDown = that.filterPushedDown; this.druidWorkList = that.druidWorkList; this.assignments = that.assignments; + this.metadataProviderManager = that.metadataProviderManager; } @Override @@ -163,7 +171,8 @@ private void init() { getScanSpec().getFilter(), getDatasourceSize(), getDataSourceMinTime(), - getDataSourceMaxTime() + getDataSourceMaxTime(), + getSchema() ) ); druidWorkList.add(druidWork); @@ -225,12 +234,13 @@ public DruidSubScan getSpecificScan(int minorFragmentId) { druidWork.getDruidSubScanSpec().getFilter(), druidWork.getDruidSubScanSpec().getDataSourceSize(), druidWork.getDruidSubScanSpec().getMinTime(), - druidWork.getDruidSubScanSpec().getMaxTime() + druidWork.getDruidSubScanSpec().getMaxTime(), + druidWork.getDruidSubScanSpec().getSchema() ) ); } - return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns, this.maxRecordsToRead); + return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns, this.maxRecordsToRead, getSchema()); } @JsonIgnore @@ -283,13 +293,25 @@ public int getMaxRecordsToRead() { return maxRecordsToRead; } + public TupleMetadata getSchema() { + if (metadataProviderManager == null) { + return null; + } + try { + return metadataProviderManager.getSchemaProvider().read().getSchema(); + } catch (IOException | NullPointerException e) { + return null; + } + } + @Override public String toString() { return new PlanStringBuilder(this) - .field("druidScanSpec", scanSpec) - .field("columns", columns) - .field("druidStoragePlugin", storagePlugin) - .toString(); + .field("druidScanSpec", scanSpec) + .field("columns", columns) + .field("druidStoragePlugin", storagePlugin) + .field("schema", getSchema()) + .toString(); } @Override diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java index 45bac99adf5..e95fb321cfb 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java @@ -17,16 +17,27 @@ */ package org.apache.drill.exec.store.druid; +import org.apache.drill.common.exceptions.ChildErrorContext; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.RecordReader; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,4 +64,38 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubSc logger.debug("Number of record readers initialized - {}", readers.size()); return new ScanBatch(subScan, context, readers); } + + private ScanFrameworkBuilder createBuilder(OptionManager options, + DruidSubScan subScan, + DruidSubScanSpec scanSpec) { + ScanFrameworkBuilder builder = new ScanFrameworkBuilder(); + builder.projection(subScan.getColumns()); + builder.providedSchema(subScan.getSchema()); + builder.setUserName(subScan.getUserName()); + // Provide custom error context + builder.errorContext(new ChildErrorContext(builder.errorContext()) {}); + + // Reader + ReaderFactory readerFactory = new DruidReaderFactory(subScan); + builder.setReaderFactory(readerFactory); + builder.nullType(Types.optional(MinorType.VARCHAR)); + + return builder; + } + + private static class DruidReaderFactory() implements ReaderFactory { + + private final DruidSubScan subScan; + public DruidReaderFactory(DruidSubScan subScan) { + this.subScan = subScan; + } + + @Override + public void bind(ManagedScanFramework framework) { } + + @Override + public ManagedReader next() { + return new DruidBatchRecordReader(); + } + } } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java index f702f1a2904..6571ba3ef67 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java @@ -20,9 +20,13 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.planner.PlannerPhase; +import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SessionOptionManager; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginOptimizerRule; @@ -35,6 +39,7 @@ import com.google.common.collect.ImmutableSet; import java.io.IOException; +import java.util.List; import java.util.Set; public class DruidStoragePlugin extends AbstractStoragePlugin { @@ -57,9 +62,36 @@ public DruidStoragePlugin(DruidStoragePluginConfig pluginConfig, DrillbitContext } @Override - public DruidGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { - DruidScanSpec scanSpec = selection.getListWith(new TypeReference() {}); - return new DruidGroupScan(userName, this, scanSpec, null, -1); + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, + SessionOptionManager options) throws IOException { + return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, + options, null); + } + + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, + SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException { + return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, + options, metadataProviderManager); + } + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, + List columns) throws IOException { + return getPhysicalScan(userName, selection, columns, null, null); + } + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { + return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, null); + } + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List columns, SessionOptionManager options, + MetadataProviderManager metadataProviderManager) throws IOException { + DruidScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference() {}); + return new DruidGroupScan(userName, this, scanSpec, null, -1, metadataProviderManager); } @Override diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java index 05405126141..bdc30ca1bb2 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.druid.common.DruidFilter; import com.google.common.base.Preconditions; @@ -53,30 +54,36 @@ public class DruidSubScan extends AbstractBase implements SubScan { private final List columns; private final int maxRecordsToRead; + private final TupleMetadata schema; + @JsonCreator public DruidSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, @JsonProperty("config") StoragePluginConfig config, @JsonProperty("scanSpec") LinkedList datasourceScanSpecList, @JsonProperty("columns") List columns, - @JsonProperty("maxRecordsToRead") int maxRecordsToRead) { + @JsonProperty("maxRecordsToRead") int maxRecordsToRead, + @JsonProperty("schema") TupleMetadata schema) { super(userName); druidStoragePlugin = registry.resolve(config, DruidStoragePlugin.class); this.scanSpec = datasourceScanSpecList; this.columns = columns; this.maxRecordsToRead = maxRecordsToRead; + this.schema = schema; } public DruidSubScan(String userName, DruidStoragePlugin plugin, List dataSourceInfoList, List columns, - int maxRecordsToRead) { + int maxRecordsToRead, + TupleMetadata schema) { super(userName); this.druidStoragePlugin = plugin; this.scanSpec = dataSourceInfoList; this.columns = columns; this.maxRecordsToRead = maxRecordsToRead; + this.schema = schema; } @Override @@ -93,6 +100,10 @@ public List getColumns() { return columns; } + public TupleMetadata getSchema() { + return schema; + } + public int getMaxRecordsToRead() { return maxRecordsToRead; } @JsonIgnore @@ -109,7 +120,7 @@ public DruidStoragePlugin getStorageEngine(){ @Override public PhysicalOperator getNewWithChildren(List children) { Preconditions.checkArgument(children.isEmpty()); - return new DruidSubScan(getUserName(), druidStoragePlugin, scanSpec, columns, maxRecordsToRead); + return new DruidSubScan(getUserName(), druidStoragePlugin, scanSpec, columns, maxRecordsToRead, schema); } @JsonIgnore @@ -131,17 +142,21 @@ public static class DruidSubScanSpec { protected final String maxTime; protected final String minTime; + protected final TupleMetadata schema; + @JsonCreator public DruidSubScanSpec(@JsonProperty("dataSourceName") String dataSourceName, @JsonProperty("filter") DruidFilter filter, @JsonProperty("dataSourceSize") long dataSourceSize, @JsonProperty("minTime") String minTime, - @JsonProperty("maxTime") String maxTime) { + @JsonProperty("maxTime") String maxTime, + @JsonProperty("schema") TupleMetadata schema) { this.dataSourceName = dataSourceName; this.filter = filter; this.dataSourceSize = dataSourceSize; this.minTime = minTime; this.maxTime = maxTime; + this.schema = schema; } public String getDataSourceName() { @@ -158,6 +173,10 @@ public String getMinTime() { return minTime; } + public TupleMetadata getSchema() { + return schema; + } + public String getMaxTime() { return maxTime; } @@ -170,6 +189,7 @@ public String toString() { .field("dataSourceSize", dataSourceSize) .field("minTime", minTime) .field("maxTime", maxTime) + .field("schema", schema) .toString(); } } From 9fe0186ea640ae0cf69c8daa44194c5205487968 Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Thu, 7 Jul 2022 10:23:42 -0400 Subject: [PATCH 2/9] WIP --- .../store/druid/DruidBatchRecordReader.java | 11 +++-- .../store/druid/DruidScanBatchCreator.java | 43 ++++++++++++++++--- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java index 264be74bb32..209a553e698 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java @@ -57,6 +57,8 @@ public class DruidBatchRecordReader implements ManagedReader { private final DruidQueryClient druidQueryClient; private final FragmentContext fragmentContext; + private final DruidSubScan subScan; + private final TupleMetadata schema; private BigInteger nextOffset = BigInteger.ZERO; private int maxRecordsToRead = -1; @@ -69,17 +71,18 @@ public class DruidBatchRecordReader implements ManagedReader { private CustomErrorContext errorContext; - public DruidBatchRecordReader(DruidSubScanSpec subScanSpec, + public DruidBatchRecordReader(DruidSubScan subScan, + DruidSubScanSpec subScanSpec, List projectedColumns, int maxRecordsToRead, FragmentContext context, DruidStoragePlugin plugin) { + this.subScan = subScan; columns = new ArrayList<>(); - setColumns(projectedColumns); this.maxRecordsToRead = maxRecordsToRead; this.plugin = plugin; scanSpec = subScanSpec; - this.schema = scanSpec.getSchema(); + this.schema = subScan.getSchema(); fragmentContext = context; this.filter = subScanSpec.getFilter(); this.druidQueryClient = plugin.getDruidQueryClient(); @@ -89,8 +92,8 @@ public DruidBatchRecordReader(DruidSubScanSpec subScanSpec, public boolean open(SchemaNegotiator negotiator) { resultSetLoader = negotiator.build(); errorContext = negotiator.parentErrorContext(); + negotiator.setErrorContext(errorContext); - negotiator jsonBuilder = new JsonLoaderBuilder() .resultSetLoader(resultSetLoader) .errorContext(errorContext); diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java index e95fb321cfb..cc618d18a8a 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java @@ -19,7 +19,6 @@ import org.apache.drill.common.exceptions.ChildErrorContext; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; @@ -41,12 +40,33 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; public class DruidScanBatchCreator implements BatchCreator { private static final Logger logger = LoggerFactory.getLogger(DruidScanBatchCreator.class); + /* + @Override + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, + SplunkSubScan subScan, List children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + + try { + ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan); + return builder.buildScanOperator(context, subScan); + } catch (UserException e) { + // Rethrow user exceptions directly + throw e; + } catch (Throwable e) { + // Wrap all others + throw new ExecutionSetupException(e); + } + } + */ + + @Override public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubScan subScan, List children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); @@ -55,6 +75,7 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubSc for (DruidSubScan.DruidSubScanSpec scanSpec : subScan.getScanSpec()) { try { + ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan, context); columns = subScan.getColumns(); readers.add(new DruidRecordReader(scanSpec, columns, subScan.getMaxRecordsToRead(), context, subScan.getStorageEngine())); } catch (Exception ex) { @@ -67,7 +88,7 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubSc private ScanFrameworkBuilder createBuilder(OptionManager options, DruidSubScan subScan, - DruidSubScanSpec scanSpec) { + ExecutorFragmentContext context) { ScanFrameworkBuilder builder = new ScanFrameworkBuilder(); builder.projection(subScan.getColumns()); builder.providedSchema(subScan.getSchema()); @@ -76,18 +97,21 @@ private ScanFrameworkBuilder createBuilder(OptionManager options, builder.errorContext(new ChildErrorContext(builder.errorContext()) {}); // Reader - ReaderFactory readerFactory = new DruidReaderFactory(subScan); + ReaderFactory readerFactory = new DruidReaderFactory(subScan, context); builder.setReaderFactory(readerFactory); builder.nullType(Types.optional(MinorType.VARCHAR)); return builder; } - private static class DruidReaderFactory() implements ReaderFactory { - + private static class DruidReaderFactory implements ReaderFactory { private final DruidSubScan subScan; - public DruidReaderFactory(DruidSubScan subScan) { + private final ExecutorFragmentContext context; + private final Iterator subScanSpecIterator; + public DruidReaderFactory(DruidSubScan subScan, ExecutorFragmentContext context) { this.subScan = subScan; + this.context = context; + this.subScanSpecIterator = subScan.getScanSpec().iterator(); } @Override @@ -95,7 +119,12 @@ public void bind(ManagedScanFramework framework) { } @Override public ManagedReader next() { - return new DruidBatchRecordReader(); + return new DruidBatchRecordReader(subScan, + subScanSpecIterator.next(), + subScan.getColumns(), + subScan.getMaxRecordsToRead(), + context, + subScan.getStorageEngine()); } } } From e17b6422064cd3b8fb0b523715bbca4b371b19ae Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Tue, 20 Sep 2022 22:41:54 -0400 Subject: [PATCH 3/9] Almost working --- contrib/storage-druid/pom.xml | 2 +- .../store/druid/DruidBatchRecordReader.java | 26 +-- .../exec/store/druid/DruidGroupScan.java | 10 +- .../druid/DruidPushDownFilterForScan.java | 3 +- .../exec/store/druid/DruidRecordReader.java | 183 ------------------ .../store/druid/DruidScanBatchCreator.java | 76 +++----- .../store/druid/DruidFilterBuilderTest.java | 2 +- .../exec/store/druid/DruidTestConstants.java | 1 + .../exec/store/druid/TestDruidQueries.java | 9 + .../test/resources/druid/docker-compose.yaml | 2 +- 10 files changed, 50 insertions(+), 264 deletions(-) delete mode 100755 contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java mode change 100755 => 100644 contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java diff --git a/contrib/storage-druid/pom.xml b/contrib/storage-druid/pom.xml index 9a9d8175615..c7761bcf117 100755 --- a/contrib/storage-druid/pom.xml +++ b/contrib/storage-druid/pom.xml @@ -29,7 +29,7 @@ drill-druid-storage Drill : Contrib : Storage : Druid - **/DruidTestSuit.class + **/DruidTestSuite.class diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java index 209a553e698..d4c41badc56 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java @@ -24,7 +24,6 @@ import org.apache.drill.common.exceptions.CustomErrorContext; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; @@ -47,27 +46,18 @@ public class DruidBatchRecordReader implements ManagedReader { private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class); - private static final ObjectMapper objectMapper = new ObjectMapper(); - private final DruidStoragePlugin plugin; private final DruidSubScan.DruidSubScanSpec scanSpec; private final List columns; private final DruidFilter filter; private final DruidQueryClient druidQueryClient; - private final FragmentContext fragmentContext; - - private final DruidSubScan subScan; - private final TupleMetadata schema; private BigInteger nextOffset = BigInteger.ZERO; private int maxRecordsToRead = -1; - private JsonLoaderBuilder jsonBuilder; - private JsonLoader jsonLoader; private ResultSetLoader resultSetLoader; - private CustomErrorContext errorContext; @@ -75,15 +65,11 @@ public DruidBatchRecordReader(DruidSubScan subScan, DruidSubScanSpec subScanSpec, List projectedColumns, int maxRecordsToRead, - FragmentContext context, DruidStoragePlugin plugin) { - this.subScan = subScan; - columns = new ArrayList<>(); + this.columns = new ArrayList<>(); this.maxRecordsToRead = maxRecordsToRead; this.plugin = plugin; - scanSpec = subScanSpec; - this.schema = subScan.getSchema(); - fragmentContext = context; + this.scanSpec = subScanSpec; this.filter = subScanSpec.getFilter(); this.druidQueryClient = plugin.getDruidQueryClient(); } @@ -96,10 +82,9 @@ public boolean open(SchemaNegotiator negotiator) { jsonBuilder = new JsonLoaderBuilder() .resultSetLoader(resultSetLoader) + .standardOptions(negotiator.queryOptions()) .errorContext(errorContext); - - return true; } @@ -112,9 +97,10 @@ public boolean next() { setNextOffset(druidScanResponse); for (ObjectNode eventNode : druidScanResponse.getEvents()) { - jsonLoader = jsonBuilder - .fromString(eventNode.asText()) + JsonLoader jsonLoader = jsonBuilder + .fromString(eventNode.toString()) .build(); + result = jsonLoader.readBatch(); } return result; diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java index 36673ebc98f..55b7bd9871e 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonIgnore; -import org.apache.calcite.avatica.Meta; import org.apache.drill.common.PlanStringBuilder; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.metastore.MetadataProviderManager; @@ -58,9 +57,7 @@ public class DruidGroupScan extends AbstractGroupScan { private static final long DEFAULT_TABLET_SIZE = 1000; private final DruidScanSpec scanSpec; private final DruidStoragePlugin storagePlugin; - - private MetadataProviderManager metadataProviderManager; - + private final MetadataProviderManager metadataProviderManager; private List columns; private boolean filterPushedDown = false; private int maxRecordsToRead; @@ -293,6 +290,11 @@ public int getMaxRecordsToRead() { return maxRecordsToRead; } + @JsonIgnore + public MetadataProviderManager getMetadataProviderManager() { + return metadataProviderManager; + } + public TupleMetadata getSchema() { if (metadataProviderManager == null) { return null; diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java index ba5ab9dc3b8..3b31f5bc307 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java @@ -70,7 +70,8 @@ public void onMatch(RelOptRuleCall relOptRuleCall) { groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns(), - groupScan.getMaxRecordsToRead()); + groupScan.getMaxRecordsToRead(), + groupScan.getMetadataProviderManager()); newGroupsScan.setFilterPushedDown(true); ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan, filter.getRowType()); diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java deleted file mode 100755 index 5c437c3e681..00000000000 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.druid; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.util.JacksonUtils; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.store.AbstractRecordReader; -import org.apache.drill.exec.store.druid.common.DruidFilter; -import org.apache.drill.exec.store.druid.druid.DruidScanResponse; -import org.apache.drill.exec.store.druid.druid.ScanQuery; -import org.apache.drill.exec.store.druid.druid.ScanQueryBuilder; -import org.apache.drill.exec.store.druid.rest.DruidQueryClient; -import org.apache.drill.exec.vector.BaseValueVector; -import org.apache.drill.exec.vector.complex.fn.JsonReader; -import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -public class DruidRecordReader extends AbstractRecordReader { - - private static final Logger logger = LoggerFactory.getLogger(DruidRecordReader.class); - private static final ObjectMapper objectMapper = JacksonUtils.createObjectMapper(); - private final DruidStoragePlugin plugin; - private final DruidSubScan.DruidSubScanSpec scanSpec; - private final List columns; - private final DruidFilter filter; - private BigInteger nextOffset = BigInteger.ZERO; - private int maxRecordsToRead = -1; - - private JsonReader jsonReader; - private VectorContainerWriter writer; - - private final FragmentContext fragmentContext; - private final DruidQueryClient druidQueryClient; - - public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec, - List projectedColumns, - int maxRecordsToRead, - FragmentContext context, - DruidStoragePlugin plugin) { - columns = new ArrayList<>(); - setColumns(projectedColumns); - this.maxRecordsToRead = maxRecordsToRead; - this.plugin = plugin; - scanSpec = subScanSpec; - fragmentContext = context; - this.filter = subScanSpec.getFilter(); - this.druidQueryClient = plugin.getDruidQueryClient(); - } - - @Override - protected Collection transformColumns(Collection projectedColumns) { - Set transformed = Sets.newLinkedHashSet(); - if (isStarQuery()) { - transformed.add(SchemaPath.STAR_COLUMN); - } else { - for (SchemaPath column : projectedColumns) { - String fieldName = column.getRootSegment().getPath(); - transformed.add(column); - this.columns.add(fieldName); - } - } - return transformed; - } - - @Override - public void setup(OperatorContext context, OutputMutator output) { - this.writer = new VectorContainerWriter(output); - - this.jsonReader = - new JsonReader.Builder(fragmentContext.getManagedBuffer()) - .schemaPathColumns(ImmutableList.copyOf(getColumns())) - .skipOuterList(true) - .build(); - } - - @Override - public int next() { - writer.allocate(); - writer.reset(); - Stopwatch watch = Stopwatch.createStarted(); - try { - String query = getQuery(); - DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query); - setNextOffset(druidScanResponse); - - int docCount = 0; - for (ObjectNode eventNode : druidScanResponse.getEvents()) { - writer.setPosition(docCount); - jsonReader.setSource(eventNode); - try { - jsonReader.write(writer); - } catch (IOException e) { - throw UserException - .dataReadError(e) - .message("Failure while reading document") - .addContext("Failed Query", query) - .addContext("Parser was at record", eventNode.toString()) - .addContext(e.getMessage()) - .build(logger); - } - docCount++; - } - - writer.setValueCount(docCount); - logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), docCount); - return docCount; - } catch (Exception e) { - throw UserException - .dataReadError(e) - .message("Failure while executing druid query") - .addContext(e.getMessage()) - .build(logger); - } - } - - private String getQuery() throws JsonProcessingException { - int queryThreshold = - this.maxRecordsToRead >= 0 - ? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, this.maxRecordsToRead) - : BaseValueVector.INITIAL_VALUE_ALLOCATION; - ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder(); - ScanQuery scanQuery = - scanQueryBuilder.build( - scanSpec.dataSourceName, - this.columns, - this.filter, - this.nextOffset, - queryThreshold, - scanSpec.getMinTime(), - scanSpec.getMaxTime() - ); - return objectMapper.writeValueAsString(scanQuery); - } - - private void setNextOffset(DruidScanResponse druidScanResponse) { - this.nextOffset = this.nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size())); - } - - @Override - public void close() throws Exception { - if (writer != null) { - writer.close(); - } - if (!this.nextOffset.equals(BigInteger.ZERO)) { - this.nextOffset = BigInteger.ZERO; - } - jsonReader = null; - } -} diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java old mode 100755 new mode 100644 index cc618d18a8a..01be10dc676 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java @@ -15,16 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.drill.exec.store.druid; -import org.apache.drill.common.exceptions.ChildErrorContext; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework; import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory; @@ -47,84 +46,55 @@ public class DruidScanBatchCreator implements BatchCreator { private static final Logger logger = LoggerFactory.getLogger(DruidScanBatchCreator.class); - /* - @Override - public CloseableRecordBatch getBatch(ExecutorFragmentContext context, - SplunkSubScan subScan, List children) throws ExecutionSetupException { - Preconditions.checkArgument(children.isEmpty()); + @Override + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, + DruidSubScan subScan, + List children) throws ExecutionSetupException { try { ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan); return builder.buildScanOperator(context, subScan); } catch (UserException e) { - // Rethrow user exceptions directly throw e; } catch (Throwable e) { - // Wrap all others - throw new ExecutionSetupException(e); - } - } - */ - - - @Override - public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubScan subScan, List children) throws ExecutionSetupException { - Preconditions.checkArgument(children.isEmpty()); - List readers = Lists.newArrayList(); - List columns; - - for (DruidSubScan.DruidSubScanSpec scanSpec : subScan.getScanSpec()) { - try { - ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan, context); - columns = subScan.getColumns(); - readers.add(new DruidRecordReader(scanSpec, columns, subScan.getMaxRecordsToRead(), context, subScan.getStorageEngine())); - } catch (Exception ex) { - throw new ExecutionSetupException(ex); - } + throw e; } - logger.debug("Number of record readers initialized - {}", readers.size()); - return new ScanBatch(subScan, context, readers); } - private ScanFrameworkBuilder createBuilder(OptionManager options, - DruidSubScan subScan, - ExecutorFragmentContext context) { + private ScanFrameworkBuilder createBuilder(OptionManager options, DruidSubScan subScan) { ScanFrameworkBuilder builder = new ScanFrameworkBuilder(); builder.projection(subScan.getColumns()); builder.providedSchema(subScan.getSchema()); builder.setUserName(subScan.getUserName()); - // Provide custom error context - builder.errorContext(new ChildErrorContext(builder.errorContext()) {}); - // Reader - ReaderFactory readerFactory = new DruidReaderFactory(subScan, context); + ReaderFactory readerFactory = new DruidReaderFactory(subScan); builder.setReaderFactory(readerFactory); builder.nullType(Types.optional(MinorType.VARCHAR)); - return builder; } private static class DruidReaderFactory implements ReaderFactory { private final DruidSubScan subScan; - private final ExecutorFragmentContext context; - private final Iterator subScanSpecIterator; - public DruidReaderFactory(DruidSubScan subScan, ExecutorFragmentContext context) { + + private final Iterator scanSpecIterator; + + public DruidReaderFactory(DruidSubScan subScan) { this.subScan = subScan; - this.context = context; - this.subScanSpecIterator = subScan.getScanSpec().iterator(); + this.scanSpecIterator = subScan.getScanSpec().listIterator(); } @Override - public void bind(ManagedScanFramework framework) { } + public void bind(ManagedScanFramework framework) { + + } @Override - public ManagedReader next() { - return new DruidBatchRecordReader(subScan, - subScanSpecIterator.next(), - subScan.getColumns(), - subScan.getMaxRecordsToRead(), - context, - subScan.getStorageEngine()); + public ManagedReader next() { + if (scanSpecIterator.hasNext()) { + DruidSubScanSpec scanSpec = scanSpecIterator.next(); + return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine()); + } + return null; } } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java index 9ca7c0ec449..1a84c8fcc55 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java @@ -71,7 +71,7 @@ public void setup() { when(logicalExpression.accept(any(), any())).thenReturn(druidScanSpecRight); } catch (Exception ignored) { } - DruidGroupScan druidGroupScan = new DruidGroupScan("some username", null, druidScanSpecLeft, null, 5); + DruidGroupScan druidGroupScan = new DruidGroupScan("some username", null, druidScanSpecLeft, null, 5, null); druidFilterBuilder = new DruidFilterBuilder(druidGroupScan, logicalExpression); } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java index 78c8de00e67..24ac6dfd6ef 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java @@ -25,4 +25,5 @@ public interface DruidTestConstants { String TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1 = "select * from druid.`%s` as ds where ds.user = 'Dansker' OR ds.page = '1904'"; String TEST_QUERY_PROJECT_PUSH_DOWN_TEMPLATE_1 = "SELECT ds.`comment` FROM druid.`%s` as ds"; String TEST_QUERY_COUNT_QUERY_TEMPLATE = "SELECT count(*) as mycount FROM druid.`%s` as ds"; + String TEST_STAR_QUERY = "SELECT * FROM druid.`%s`"; } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java index deea3f1edaf..1b0d9361eae 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java @@ -28,6 +28,15 @@ @Category({SlowTest.class, DruidStorageTest.class}) public class TestDruidQueries extends DruidTestBase { + @Test + public void testStarQuery() throws Exception { + testBuilder() + .sqlQuery(String.format(TEST_STAR_QUERY, TEST_DATASOURCE_WIKIPEDIA)) + .unOrdered() + .expectsNumRecords(2) + .go(); + } + @Test public void testEqualsFilter() throws Exception { testBuilder() diff --git a/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml index 700e9704dd8..a651cb61e19 100644 --- a/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml +++ b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml @@ -31,7 +31,7 @@ volumes: services: postgres: container_name: postgres - image: postgres:latest + image: postgres:12 volumes: - metadata_data:/var/lib/postgresql/data environment: From 7b3ad77f462ed399f6865b4402ee5acc6bc9e32c Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Wed, 21 Sep 2022 10:24:02 -0400 Subject: [PATCH 4/9] Removed unused imports --- .../drill/exec/store/druid/DruidBatchRecordReader.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java index d4c41badc56..e26179f216c 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java @@ -27,14 +27,13 @@ import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; -import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec; import org.apache.drill.exec.store.druid.common.DruidFilter; import org.apache.drill.exec.store.druid.druid.DruidScanResponse; import org.apache.drill.exec.store.druid.druid.ScanQuery; import org.apache.drill.exec.store.druid.druid.ScanQueryBuilder; import org.apache.drill.exec.store.druid.rest.DruidQueryClient; -import org.apache.drill.exec.store.easy.json.loader.JsonLoader; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; import org.apache.drill.exec.vector.BaseValueVector; import org.slf4j.Logger; @@ -56,7 +55,7 @@ public class DruidBatchRecordReader implements ManagedReader { private BigInteger nextOffset = BigInteger.ZERO; private int maxRecordsToRead = -1; private JsonLoaderBuilder jsonBuilder; - private JsonLoader jsonLoader; + private JsonLoaderImpl jsonLoader; private ResultSetLoader resultSetLoader; private CustomErrorContext errorContext; @@ -97,7 +96,7 @@ public boolean next() { setNextOffset(druidScanResponse); for (ObjectNode eventNode : druidScanResponse.getEvents()) { - JsonLoader jsonLoader = jsonBuilder + jsonLoader = (JsonLoaderImpl) jsonBuilder .fromString(eventNode.toString()) .build(); From 6d695488c4d3c60bb665329506335d865178ff3c Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Fri, 23 Sep 2022 11:41:40 -0400 Subject: [PATCH 5/9] Added offset tracker --- .../store/druid/DruidBatchRecordReader.java | 15 ++++---- .../exec/store/druid/DruidOffsetTracker.java | 36 +++++++++++++++++++ .../store/druid/DruidScanBatchCreator.java | 5 +-- 3 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java index e26179f216c..b5811203d30 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java @@ -51,8 +51,7 @@ public class DruidBatchRecordReader implements ManagedReader { private final List columns; private final DruidFilter filter; private final DruidQueryClient druidQueryClient; - - private BigInteger nextOffset = BigInteger.ZERO; + private final DruidOffsetTracker offsetTracker; private int maxRecordsToRead = -1; private JsonLoaderBuilder jsonBuilder; private JsonLoaderImpl jsonLoader; @@ -64,13 +63,14 @@ public DruidBatchRecordReader(DruidSubScan subScan, DruidSubScanSpec subScanSpec, List projectedColumns, int maxRecordsToRead, - DruidStoragePlugin plugin) { + DruidStoragePlugin plugin, DruidOffsetTracker offsetTracker) { this.columns = new ArrayList<>(); this.maxRecordsToRead = maxRecordsToRead; this.plugin = plugin; this.scanSpec = subScanSpec; this.filter = subScanSpec.getFilter(); this.druidQueryClient = plugin.getDruidQueryClient(); + this.offsetTracker = offsetTracker; } @Override @@ -118,10 +118,6 @@ public void close() { jsonLoader.close(); jsonLoader = null; } - - if (! nextOffset.equals(BigInteger.ZERO)) { - nextOffset = BigInteger.ZERO; - } } private String getQuery() throws JsonProcessingException { @@ -135,7 +131,7 @@ private String getQuery() throws JsonProcessingException { scanSpec.dataSourceName, columns, filter, - nextOffset, + offsetTracker.getOffset(), queryThreshold, scanSpec.getMinTime(), scanSpec.getMaxTime() @@ -144,6 +140,7 @@ private String getQuery() throws JsonProcessingException { } private void setNextOffset(DruidScanResponse druidScanResponse) { - nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size())); + //nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size())); + offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size())); } } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java new file mode 100644 index 00000000000..da15b309cdc --- /dev/null +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.druid; + +import java.math.BigInteger; + +public class DruidOffsetTracker { + private BigInteger nextOffset; + + public DruidOffsetTracker() { + this.nextOffset = BigInteger.ZERO; + } + + public BigInteger getOffset() { + return nextOffset; + } + + public void setNextOffset(BigInteger offset) { + nextOffset = nextOffset.add(offset); + } +} diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java index 01be10dc676..1aa6893204d 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java @@ -75,12 +75,13 @@ private ScanFrameworkBuilder createBuilder(OptionManager options, DruidSubScan s private static class DruidReaderFactory implements ReaderFactory { private final DruidSubScan subScan; - + private final DruidOffsetTracker offsetTracker; private final Iterator scanSpecIterator; public DruidReaderFactory(DruidSubScan subScan) { this.subScan = subScan; this.scanSpecIterator = subScan.getScanSpec().listIterator(); + this.offsetTracker = new DruidOffsetTracker(); } @Override @@ -92,7 +93,7 @@ public void bind(ManagedScanFramework framework) { public ManagedReader next() { if (scanSpecIterator.hasNext()) { DruidSubScanSpec scanSpec = scanSpecIterator.next(); - return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine()); + return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine(), offsetTracker); } return null; } From 1805c9e6268e598087940fca29054951f17365ba Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Sun, 12 Feb 2023 12:33:26 -0500 Subject: [PATCH 6/9] Working? --- contrib/storage-druid/.gitignore | 2 + contrib/storage-druid/pom.xml | 7 --- .../store/druid/DruidBatchRecordReader.java | 46 +++++++++++++------ .../exec/store/druid/DruidOffsetTracker.java | 5 ++ .../druid/DruidStoragePluginConfigTest.java | 16 ++++--- .../exec/store/druid/TestDruidQueries.java | 46 +++++++++++++++++-- 6 files changed, 89 insertions(+), 33 deletions(-) create mode 100644 contrib/storage-druid/.gitignore diff --git a/contrib/storage-druid/.gitignore b/contrib/storage-druid/.gitignore new file mode 100644 index 00000000000..9341ff44dc5 --- /dev/null +++ b/contrib/storage-druid/.gitignore @@ -0,0 +1,2 @@ +# Directory to store oauth tokens for testing Googlesheets Storage plugin +/src/test/resources/logback-test.xml diff --git a/contrib/storage-druid/pom.xml b/contrib/storage-druid/pom.xml index c7761bcf117..54cf88e887c 100755 --- a/contrib/storage-druid/pom.xml +++ b/contrib/storage-druid/pom.xml @@ -53,13 +53,6 @@ ${project.version} test - - org.assertj - assertj-core - - 3.11.1 - test - diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java index b5811203d30..d9068954f62 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.store.druid.rest.DruidQueryClient; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; -import org.apache.drill.exec.vector.BaseValueVector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +44,7 @@ public class DruidBatchRecordReader implements ManagedReader { private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class); + private static final int BATCH_SIZE = 4096; private static final ObjectMapper objectMapper = new ObjectMapper(); private final DruidStoragePlugin plugin; private final DruidSubScan.DruidSubScanSpec scanSpec; @@ -55,6 +55,7 @@ public class DruidBatchRecordReader implements ManagedReader { private int maxRecordsToRead = -1; private JsonLoaderBuilder jsonBuilder; private JsonLoaderImpl jsonLoader; + private SchemaNegotiator negotiator; private ResultSetLoader resultSetLoader; private CustomErrorContext errorContext; @@ -75,34 +76,50 @@ public DruidBatchRecordReader(DruidSubScan subScan, @Override public boolean open(SchemaNegotiator negotiator) { - resultSetLoader = negotiator.build(); - errorContext = negotiator.parentErrorContext(); - negotiator.setErrorContext(errorContext); + this.negotiator = negotiator; + this.errorContext = this.negotiator.parentErrorContext(); + this.negotiator.batchSize(BATCH_SIZE); + this.negotiator.setErrorContext(errorContext); + + resultSetLoader = this.negotiator.build(); - jsonBuilder = new JsonLoaderBuilder() - .resultSetLoader(resultSetLoader) - .standardOptions(negotiator.queryOptions()) - .errorContext(errorContext); return true; } @Override public boolean next() { + jsonBuilder = new JsonLoaderBuilder() + .resultSetLoader(resultSetLoader) + .standardOptions(negotiator.queryOptions()) + .errorContext(errorContext); + int eventCounter = 0; boolean result = false; try { String query = getQuery(); + logger.debug("Executing query: {}", query); DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query); setNextOffset(druidScanResponse); + StringBuilder events = new StringBuilder(); for (ObjectNode eventNode : druidScanResponse.getEvents()) { - jsonLoader = (JsonLoaderImpl) jsonBuilder - .fromString(eventNode.toString()) + events.append(eventNode); + events.append("\n"); + eventCounter++; + } + + + jsonLoader = (JsonLoaderImpl) jsonBuilder + .fromString(events.toString()) .build(); - result = jsonLoader.readBatch(); + result = jsonLoader.readBatch(); + + if (eventCounter < BATCH_SIZE) { + return false; + } else { + return result; } - return result; } catch (Exception e) { throw UserException .dataReadError(e) @@ -123,8 +140,8 @@ public void close() { private String getQuery() throws JsonProcessingException { int queryThreshold = maxRecordsToRead >= 0 - ? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, maxRecordsToRead) - : BaseValueVector.INITIAL_VALUE_ALLOCATION; + ? Math.min(BATCH_SIZE, maxRecordsToRead) + : BATCH_SIZE; ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder(); ScanQuery scanQuery = scanQueryBuilder.build( @@ -140,7 +157,6 @@ private String getQuery() throws JsonProcessingException { } private void setNextOffset(DruidScanResponse druidScanResponse) { - //nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size())); offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size())); } } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java index da15b309cdc..16604f0b494 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java @@ -17,9 +17,13 @@ */ package org.apache.drill.exec.store.druid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.math.BigInteger; public class DruidOffsetTracker { + private static final Logger logger = LoggerFactory.getLogger(DruidOffsetTracker.class); private BigInteger nextOffset; public DruidOffsetTracker() { @@ -32,5 +36,6 @@ public BigInteger getOffset() { public void setNextOffset(BigInteger offset) { nextOffset = nextOffset.add(offset); + logger.debug("Incrementing offset by {}", offset); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java index dd76a64ed37..027f80c8ee6 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java @@ -28,7 +28,9 @@ import java.io.IOException; import java.net.URISyntaxException; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class DruidStoragePluginConfigTest { @@ -40,11 +42,11 @@ public void testDruidStoragePluginConfigSuccessfullyParsed() Resources.getResource("bootstrap-storage-plugins.json").toURI())); DruidStoragePluginConfig druidStoragePluginConfig = mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class); - assertThat(druidStoragePluginConfig).isNotNull(); - assertThat(druidStoragePluginConfig.getBrokerAddress()).isEqualTo("http://localhost:8082"); - assertThat(druidStoragePluginConfig.getCoordinatorAddress()).isEqualTo("http://localhost:8081"); - assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(200); - assertThat(druidStoragePluginConfig.isEnabled()).isFalse(); + assertNotNull(druidStoragePluginConfig); + assertEquals("http://localhost:8082", druidStoragePluginConfig.getBrokerAddress()); + assertEquals("http://localhost:8081", druidStoragePluginConfig.getCoordinatorAddress()); + assertEquals(200, druidStoragePluginConfig.getAverageRowSizeBytes()); + assertFalse(druidStoragePluginConfig.isEnabled()); } @Test @@ -59,6 +61,6 @@ public void testDefaultRowSizeUsedWhenNotProvidedInConfig() JsonNode storagePluginJson = mapper.readTree(druidConfigStr); DruidStoragePluginConfig druidStoragePluginConfig = mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class); - assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(100); + assertEquals(100, druidStoragePluginConfig.getAverageRowSizeBytes()); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java index 1b0d9361eae..c4a3a43b578 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java @@ -20,10 +20,19 @@ import org.apache.drill.categories.DruidStorageTest; import org.apache.drill.categories.SlowTest; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.rowSet.RowSetComparison; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; + + @Ignore("These tests require a running druid instance. You may start druid by using the docker-compose provide in resources/druid and enable these tests") @Category({SlowTest.class, DruidStorageTest.class}) public class TestDruidQueries extends DruidTestBase { @@ -33,7 +42,7 @@ public void testStarQuery() throws Exception { testBuilder() .sqlQuery(String.format(TEST_STAR_QUERY, TEST_DATASOURCE_WIKIPEDIA)) .unOrdered() - .expectsNumRecords(2) + .expectsNumRecords(876) .go(); } @@ -60,7 +69,7 @@ public void testTwoOrdEqualsFilter() throws Exception { testBuilder() .sqlQuery(String.format(TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA)) .unOrdered() - .expectsNumRecords(3) + .expectsNumRecords(1) .go(); } @@ -72,7 +81,7 @@ public void testSingleColumnProject() throws Exception { .sqlQuery(query) .unOrdered() .baselineColumns("comment") - .expectsNumRecords(24433) + .expectsNumRecords(876) .go(); } @@ -84,7 +93,36 @@ public void testCountAllRowsQuery() throws Exception { .sqlQuery(query) .unOrdered() .baselineColumns("mycount") - .baselineValues(24433L) + .baselineValues(876L) .go(); } + + @Test + public void testGroupByQuery() throws Exception { + String sql = String.format("SELECT `namespace`, COUNT(*) AS user_count FROM druid.`%s` GROUP BY `namespace` ORDER BY user_count DESC LIMIT 5",TEST_DATASOURCE_WIKIPEDIA); + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("namespace", MinorType.VARCHAR, DataMode.OPTIONAL) + .add("user_count", MinorType.BIGINT) + .buildSchema(); + + RowSet expected = client.rowSetBuilder(expectedSchema) + .addRow("Main", 702) + .addRow("User talk", 29) + .addRow("Wikipedia", 26) + .addRow("Talk", 17) + .addRow("User", 12) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testSerDe() throws Exception { + String sql = String.format("SELECT COUNT(*) FROM druid.`%s`", TEST_DATASOURCE_WIKIPEDIA); + String plan = queryBuilder().sql(sql).explainJson(); + long cnt = queryBuilder().physical(plan).singletonLong(); + assertEquals("Counts should match", 876L, cnt); + } } From f4a9cd28aa6b4aa752c432f6f0bc0e1641184c0c Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Tue, 14 Feb 2023 14:43:41 -0500 Subject: [PATCH 7/9] WIP --- .../org/apache/drill/exec/store/druid/TestDruidQueries.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java index c4a3a43b578..45f2267dd1e 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java @@ -120,6 +120,8 @@ public void testGroupByQuery() throws Exception { @Test public void testSerDe() throws Exception { + // TODO Start here... filters are not deserializing properly + String sql = String.format("SELECT COUNT(*) FROM druid.`%s`", TEST_DATASOURCE_WIKIPEDIA); String plan = queryBuilder().sql(sql).explainJson(); long cnt = queryBuilder().physical(plan).singletonLong(); From 652275118e16cb2390ddd705b47e820406d27840 Mon Sep 17 00:00:00 2001 From: cgivre Date: Sun, 14 Jul 2024 21:03:28 -0400 Subject: [PATCH 8/9] Fixed unit tests --- .../store/druid/DruidScanBatchCreator.java | 5 -- .../exec/store/druid/DruidStoragePlugin.java | 5 +- .../store/druid/DruidFilterBuilderTest.java | 8 +-- .../store/druid/DruidScanSpecBuilderTest.java | 52 +++++++++++-------- .../druid/rest/DruidQueryClientTest.java | 18 +++---- 5 files changed, 46 insertions(+), 42 deletions(-) diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java index 1aa6893204d..de59cd813e2 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java @@ -32,9 +32,6 @@ import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.store.RecordReader; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,8 +53,6 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, return builder.buildScanOperator(context, subScan); } catch (UserException e) { throw e; - } catch (Throwable e) { - throw e; } } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java index 6571ba3ef67..f066cde6a35 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java @@ -18,13 +18,15 @@ package org.apache.drill.exec.store.druid; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.metastore.MetadataProviderManager; import org.apache.drill.exec.ops.OptimizerRulesContext; -import org.apache.drill.exec.planner.PlannerPhase; import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.planner.PlannerPhase; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.SessionOptionManager; import org.apache.drill.exec.store.AbstractStoragePlugin; @@ -36,7 +38,6 @@ import org.apache.drill.exec.store.druid.rest.RestClient; import org.apache.drill.exec.store.druid.rest.RestClientWrapper; import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory; -import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.List; diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java index 1a84c8fcc55..5dcb42bfa8b 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java @@ -28,7 +28,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -80,7 +80,7 @@ public void parseTreeWithAndOfTwoSelectorFilters() { DruidScanSpec parsedSpec = druidFilterBuilder.parseTree(); String expectedFilterJson = "{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}"; String actual = parsedSpec.getFilter().toJson(); - assertThat(actual).isEqualTo(expectedFilterJson); + assertEquals(expectedFilterJson, actual); } @Test @@ -99,7 +99,7 @@ public void visitBooleanOperatorWithAndOperator() { druidFilterBuilder.visitBooleanOperator(booleanOperator, null); String expectedFilterJson = "{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}"; String actual = druidScanSpec.getFilter().toJson(); - assertThat(actual).isEqualTo(expectedFilterJson); + assertEquals(expectedFilterJson, actual); } @Test @@ -118,6 +118,6 @@ public void visitBooleanOperatorWithOrOperator() { druidFilterBuilder.visitBooleanOperator(booleanOperator, null); String expectedFilterJson = "{\"type\":\"or\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}"; String actual = druidScanSpec.getFilter().toJson(); - assertThat(actual).isEqualTo(expectedFilterJson); + assertEquals(actual, expectedFilterJson); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java index 6c8a4f06ce4..c2adbe80b7e 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java @@ -22,8 +22,9 @@ import org.apache.drill.exec.store.druid.common.DruidConstants; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; -import static org.assertj.core.api.Assertions.assertThat; public class DruidScanSpecBuilderTest { @@ -54,8 +55,8 @@ public void buildCalledWithEqualFxShouldBuildSelectorFilter() { FunctionNames.EQ, schemaPath, SOME_VALUE); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}"); + String actual = "{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -70,8 +71,8 @@ public void buildCalledWithEqualFxIntervalFieldShouldBuildIntervalFilter() { FunctionNames.EQ, schemaPath, SOME_VALUE); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"eventInterval\":\"some value\"}"); + String actual = "{\"eventInterval\":\"some value\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -86,8 +87,8 @@ public void buildCalledWithNotEqualFxShouldBuildSelectorFilter() { FunctionNames.NE, schemaPath, SOME_VALUE ); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}}"); + String actual = "{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -103,8 +104,8 @@ public void buildCalledWithGreaterThanOrEqualToFxShouldBuildBoundFilter() { schemaPath, SOME_VALUE ); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"ordering\":\"lexicographic\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"ordering\":\"lexicographic\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -120,8 +121,8 @@ public void buildCalledWithGreaterThanFxShouldBuildBoundFilter() { schemaPath, SOME_VALUE ); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"lowerStrict\":true,\"ordering\":\"lexicographic\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"lowerStrict\":true,\"ordering\":\"lexicographic\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -138,7 +139,8 @@ public void buildCalledWithGreaterThanFxAndNumericValueShouldBuildBoundFilter() "1" ); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"1\",\"lowerStrict\":true,\"ordering\":\"numeric\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"1\",\"lowerStrict\":true,\"ordering\":\"numeric\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -154,7 +156,8 @@ public void buildCalledWithLessThanOrEqualToFxShouldBuildBoundFilter() { schemaPath, SOME_VALUE); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"ordering\":\"lexicographic\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"ordering\":\"lexicographic\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -169,7 +172,8 @@ public void buildCalledWithLessThanFxShouldBuildBoundFilter() { schemaPath, SOME_VALUE); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"upperStrict\":true,\"ordering\":\"lexicographic\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"upperStrict\":true,\"ordering\":\"lexicographic\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -184,7 +188,8 @@ public void buildCalledWithLessThanFxAndNumericValueShouldBuildBoundFilter() { schemaPath, "1"); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"1\",\"upperStrict\":true,\"ordering\":\"numeric\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"1\",\"upperStrict\":true,\"ordering\":\"numeric\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -199,8 +204,9 @@ public void buildCalledWithIsNullFxShouldBuildSelectorFilter() { FunctionNames.IS_NULL, schemaPath, null); - assertThat(druidScanSpec).isNotNull(); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}"); + assertNotNull(druidScanSpec); + String actual = "{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -215,8 +221,9 @@ public void buildCalledWithIsNotNullFxShouldBuildSelectorFilter() { FunctionNames.IS_NOT_NULL, schemaPath, null); - assertThat(druidScanSpec).isNotNull(); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}}"); + assertNotNull(druidScanSpec); + String actual = "{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -232,7 +239,8 @@ public void buildCalledWithLikeFxButIfValueIsPrefixedWithRegexKeywordHintShouldB schemaPath, "$regex$_some_regular_expression"); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"regex\",\"dimension\":\"some field\",\"pattern\":\"some_regular_expression\"}"); + String actual = "{\"type\":\"regex\",\"dimension\":\"some field\",\"pattern\":\"some_regular_expression\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -247,7 +255,7 @@ public void buildCalledWithLikeFxShouldBuildSearchFilter() { FunctionNames.LIKE, schemaPath, "some search string"); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"search\",\"dimension\":\"some field\",\"query\":{\"type\":\"contains\",\"value\":\"some search string\",\"caseSensitive\":false}}"); + String actual = "{\"type\":\"search\",\"dimension\":\"some field\",\"query\":{\"type\":\"contains\",\"value\":\"some search string\",\"caseSensitive\":false}}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java index bb065ab7b31..7818ee33905 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java @@ -17,23 +17,23 @@ */ package org.apache.drill.exec.store.druid.rest; +import okhttp3.Response; +import okhttp3.ResponseBody; import org.apache.drill.exec.store.druid.druid.DruidScanResponse; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; -import okhttp3.Response; -import okhttp3.ResponseBody; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.assertj.core.api.Assertions.assertThat; public class DruidQueryClientTest { @@ -79,7 +79,7 @@ public void executeQueryCalledNoResponsesFoundReturnsEmptyEventList() when(httpResponseBody.byteStream()).thenReturn(inputStream); DruidScanResponse response = druidQueryClient.executeQuery(QUERY); - assertThat(response.getEvents()).isEmpty(); + assertEquals(0, response.getEvents().size()); } @Test @@ -91,9 +91,9 @@ public void executeQueryCalledSuccessfullyParseQueryResults() when(httpResponseBody.byteStream()).thenReturn(inputStream); DruidScanResponse response = druidQueryClient.executeQuery(QUERY); - assertThat(response.getEvents()).isNotEmpty(); - assertThat(response.getEvents().size()).isEqualTo(1); - assertThat(response.getEvents().get(0).get("user").textValue()).isEqualTo("Dansker"); - assertThat(response.getEvents().get(0).get("sum_deleted").intValue()).isEqualTo(133); + assertFalse(response.getEvents().isEmpty()); + assertEquals(response.getEvents().size(), 1); + assertEquals(response.getEvents().get(0).get("user").textValue(), "Dansker"); + assertEquals(response.getEvents().get(0).get("sum_deleted").intValue(), 133); } } From 818c9787c2cd1da7ca71d3f5f252c695e6abcb6f Mon Sep 17 00:00:00 2001 From: cgivre Date: Sun, 14 Jul 2024 21:34:51 -0400 Subject: [PATCH 9/9] Tested with latest version of Druid --- contrib/storage-druid/README.md | 12 ++++++------ .../src/test/resources/druid/docker-compose.yaml | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/contrib/storage-druid/README.md b/contrib/storage-druid/README.md index 479024aa067..5b88b285a05 100644 --- a/contrib/storage-druid/README.md +++ b/contrib/storage-druid/README.md @@ -4,7 +4,7 @@ Drill druid storage plugin allows you to perform SQL queries against Druid datas This storage plugin is part of [Apache Drill](https://github.com/apache/drill) ### Tested with Druid version -[0.22.0](https://github.com/apache/druid/releases/tag/druid-0.22.0) +[30.0.0](https://github.com/apache/druid/releases/tag/druid-0.22.0) ### Druid API @@ -33,27 +33,27 @@ Following is the default registration configuration. ### Druid storage plugin developer notes. -* Building the plugin +* Building the plugin `mvn install -pl contrib/storage-druid` * Building DRILL `mvn clean install -DskipTests` - + * Start Drill In Embedded Mode (mac) ```shell script distribution/target/apache-drill-1.20.0-SNAPSHOT/apache-drill-1.20.0-SNAPSHOT/bin/drill-embedded ``` - + * Starting Druid (Docker and Docker Compose required) ``` cd contrib/storage-druid/src/test/resources/druid docker-compose up -d ``` - + * There is an `Indexing Task Json` in the same folder as the docker compose file. It can be used to ingest the wikipedia datasource. - + * Make sure the druid storage plugin is enabled in Drill. diff --git a/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml index a651cb61e19..53cca236965 100644 --- a/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml +++ b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml @@ -65,7 +65,7 @@ services: - environment.env broker: - image: apache/druid:0.22.0 + image: apache/druid:30.0.0 container_name: broker volumes: - broker_var:/opt/druid/var @@ -81,7 +81,7 @@ services: - environment.env historical: - image: apache/druid:0.22.0 + image: apache/druid:30.0.0 container_name: historical volumes: - druid_shared:/opt/shared @@ -98,7 +98,7 @@ services: - environment.env middlemanager: - image: apache/druid:0.22.0 + image: apache/druid:30.0.0 container_name: middlemanager volumes: - druid_shared:/opt/shared @@ -116,7 +116,7 @@ services: - environment.env router: - image: apache/druid:0.22.0 + image: apache/druid:30.0.0 container_name: router volumes: - router_var:/opt/druid/var