diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 055581b04e..2d82175c94 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -78,6 +78,11 @@ public static TableDesc newTableDesc(String tableName, Schema schema, TableMeta return new TableDesc(tableName, schema, meta, path.toUri()); } + public static TableDesc newTableDesc(String tableName, Schema schema, TableMeta meta, Path path + , PartitionMethodDesc partitionMethodDesc) { + return new TableDesc(tableName, schema, meta, path.toUri(), partitionMethodDesc); + } + public static TableDesc newTableDesc(TableDescProto proto) { return new TableDesc(proto); } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java index 8122bd59db..948567001c 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java @@ -51,32 +51,46 @@ public class TableDesc implements ProtoObject, GsonObject, Clone public TableDesc() { } + public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, @Nullable URI uri, boolean external) { + this(tableName, schema, meta, uri, null, external); + } + public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, - @Nullable URI uri, boolean external) { + @Nullable URI uri, @Nullable PartitionMethodDesc partitionMethodDesc, boolean external) { this.tableName = tableName; this.schema = schema; this.meta = meta; this.uri = uri; + this.partitionMethodDesc = partitionMethodDesc; this.external = external; } public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, @Nullable URI path) { - this(tableName, schema, meta, path, true); + this(tableName, schema, meta, path, null, true); } - + + public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, @Nullable URI path, + @Nullable PartitionMethodDesc partitionMethodDesc) { + this(tableName, schema, meta, path, partitionMethodDesc, true); + } + public TableDesc(String tableName, @Nullable Schema schema, String dataFormat, KeyValueSet options, @Nullable URI path) { this(tableName, schema, new TableMeta(dataFormat, options), path); } - - public TableDesc(TableDescProto proto) { + + public TableDesc(String tableName, @Nullable Schema schema, String dataFormat, KeyValueSet options, + @Nullable URI path, @Nullable PartitionMethodDesc partitionMethodDesc) { + this(tableName, schema, new TableMeta(dataFormat, options), path, partitionMethodDesc); + } + + public TableDesc(TableDescProto proto) { this(proto.getTableName(), proto.hasSchema() ? SchemaFactory.newV1(proto.getSchema()) : null, - new TableMeta(proto.getMeta()), proto.hasPath() ? URI.create(proto.getPath()) : null, proto.getIsExternal()); + new TableMeta(proto.getMeta()), proto.hasPath() ? URI.create(proto.getPath()) : null, + proto.hasPartition() ? new PartitionMethodDesc(proto.getPartition()) : null, + proto.getIsExternal()); if(proto.hasStats()) { this.stats = new TableStats(proto.getStats()); - } - if (proto.hasPartition()) { - this.partitionMethodDesc = new PartitionMethodDesc(proto.getPartition()); } } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java new file mode 100644 index 0000000000..d5b89e669c --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java @@ -0,0 +1,465 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.*; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.parser.sql.SQLAnalyzer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.partition.PartitionPruningHandle; +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.junit.*; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.*; + +public class TestPartitionedTableRewriter { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPartitionedTableRewriter"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + private FileSystem fs; + + final static String PARTITION_TABLE_NAME = "tb_partition"; + final static String MULTIPLE_PARTITION_TABLE_NAME = "tb_multiple_partition"; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + util.startCatalogCluster(); + catalog = util.getCatalogService(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + fs = FileSystem.get(conf); + + Schema schema = SchemaBuilder.builder() + .add("n_nationkey", TajoDataTypes.Type.INT8) + .add("n_name", TajoDataTypes.Type.TEXT) + .add("n_regionkey", TajoDataTypes.Type.INT8) + .build(); + + TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, util.getConfiguration()); + + createTableWithOnePartitionKeyColumn(fs, schema, meta); + createTableWithMultiplePartitionKeyColumns(fs, schema, meta); + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + } + + private void createTableWithOnePartitionKeyColumn(FileSystem fs, Schema schema, + TableMeta meta) throws Exception { + Schema partSchema = SchemaBuilder.builder() + .add("key", TajoDataTypes.Type.TEXT) + .build(); + + PartitionMethodDesc partitionMethodDesc = + new PartitionMethodDesc(DEFAULT_DATABASE_NAME, PARTITION_TABLE_NAME, + CatalogProtos.PartitionType.COLUMN, "key", partSchema); + + Path tablePath = new Path(testDir, PARTITION_TABLE_NAME); + fs.mkdirs(tablePath); + + TableDesc desc = CatalogUtil.newTableDesc(DEFAULT_DATABASE_NAME + "." + PARTITION_TABLE_NAME, schema, meta, + tablePath, partitionMethodDesc); + catalog.createTable(desc); + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME + "." + PARTITION_TABLE_NAME); + assertNotNull(tableDesc); + + Path path = new Path(tableDesc.getUri().toString() + "/key=part123"); + fs.mkdirs(path); + FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key=part456"); + fs.mkdirs(path); + FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key=part789"); + fs.mkdirs(path); + FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data")); + } + + private void createTableWithMultiplePartitionKeyColumns(FileSystem fs, + Schema schema, TableMeta meta) throws Exception { + Schema partSchema = SchemaBuilder.builder() + .add("key1", TajoDataTypes.Type.TEXT) + .add("key2", TajoDataTypes.Type.TEXT) + .add("key3", TajoDataTypes.Type.INT8) + .build(); + + PartitionMethodDesc partitionMethodDesc = + new PartitionMethodDesc("default", MULTIPLE_PARTITION_TABLE_NAME, + CatalogProtos.PartitionType.COLUMN, "key1,key2,key3", partSchema); + + Path tablePath = new Path(testDir, MULTIPLE_PARTITION_TABLE_NAME); + fs.mkdirs(tablePath); + + TableDesc desc = CatalogUtil.newTableDesc(DEFAULT_DATABASE_NAME + "." + MULTIPLE_PARTITION_TABLE_NAME, schema, + meta, tablePath, partitionMethodDesc); + catalog.createTable(desc); + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME + "." + MULTIPLE_PARTITION_TABLE_NAME); + assertNotNull(tableDesc); + + Path path = new Path(tableDesc.getUri().toString() + "/key1=part123"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=1"); + fs.mkdirs(path); + FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=2"); + fs.mkdirs(path); + FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key1=part789"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789/key3=3"); + fs.mkdirs(path); + FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data")); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + @Test + public void testFilterIncludePartitionKeyColumn() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part456' ORDER BY key"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Path[] filteredPaths = partitionPruningHandle.getPartitionPaths(); + assertEquals(1, filteredPaths.length); + assertEquals("key=part456", filteredPaths[0].getName()); + + String[] partitionKeys = partitionPruningHandle.getPartitionKeys(); + assertEquals(1, partitionKeys.length); + assertEquals("key=part456", partitionKeys[0]); + + assertEquals(10L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testWithoutAnyFilters() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " ORDER BY key"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SCAN, sortNode.getChild().getType()); + ScanNode scanNode = sortNode.getChild(); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(3, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key=part123")); + assertTrue(partitionPathList.get(1).toString().endsWith("key=part456")); + assertTrue(partitionPathList.get(2).toString().endsWith("key=part789")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(3, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), "key=part123"); + assertEquals(partitionKeysList.get(1), "key=part456"); + assertEquals(partitionKeysList.get(2), "key=part789"); + + assertEquals(33L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeNonExistingPartitionValue() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part123456789'"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + assertEquals(0, partitionPruningHandle.getPartitionPaths().length); + assertEquals(0, partitionPruningHandle.getPartitionKeys().length); + + assertEquals(0L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeNonPartitionKeyColumn() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE n_nationkey = 1"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(3, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key=part123")); + assertTrue(partitionPathList.get(1).toString().endsWith("key=part456")); + assertTrue(partitionPathList.get(2).toString().endsWith("key=part789")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(3, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), "key=part123"); + assertEquals(partitionKeysList.get(1), "key=part456"); + assertEquals(partitionKeysList.get(2), "key=part789"); + + assertEquals(33L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeEveryPartitionKeyColumn() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part789' and key2 = 'supp789' and key3=3"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Path[] filteredPaths = partitionPruningHandle.getPartitionPaths(); + assertEquals(1, filteredPaths.length); + assertEquals("key3=3", filteredPaths[0].getName()); + assertEquals("key2=supp789", filteredPaths[0].getParent().getName()); + assertEquals("key1=part789", filteredPaths[0].getParent().getParent().getName()); + + String[] partitionKeys = partitionPruningHandle.getPartitionKeys(); + assertEquals(1, partitionKeys.length); + assertEquals("key1=part789/key2=supp789/key3=3", partitionKeys[0]); + + assertEquals(10L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeSomeOfPartitionKeyColumns() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part123' and key2 = 'supp123' order by n_nationkey"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(2, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key1=part123/key2=supp123/key3=1")); + assertTrue(partitionPathList.get(1).toString().endsWith("key1=part123/key2=supp123/key3=2")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(2, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), ("key1=part123/key2=supp123/key3=1")); + assertEquals(partitionKeysList.get(1), ("key1=part123/key2=supp123/key3=2")); + + assertEquals(23L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeNonPartitionKeyColumns() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part123' and n_nationkey >= 2 order by n_nationkey"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(2, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key1=part123/key2=supp123/key3=1")); + assertTrue(partitionPathList.get(1).toString().endsWith("key1=part123/key2=supp123/key3=2")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(2, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), ("key1=part123/key2=supp123/key3=1")); + assertEquals(partitionKeysList.get(1), ("key1=part123/key2=supp123/key3=2")); + + assertEquals(23L, partitionPruningHandle.getTotalVolume()); + } + +} \ No newline at end of file diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java index a58744f8f2..2a200146bb 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java @@ -181,4 +181,28 @@ public void testBuildTupleFromPartitionPath() { assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); } + + @Test + public void testBuildTupleFromPartitionName() { + Schema schema = SchemaBuilder.builder() + .add("key1", Type.INT8) + .add("key2", Type.TEXT) + .build(); + + Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionKeys(schema, "key1=123"); + assertNotNull(tuple); + assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); + assertEquals(DatumFactory.createNullDatum(), tuple.asDatum(1)); + + tuple = PartitionedTableRewriter.buildTupleFromPartitionKeys(schema, "key1=123/key2=abc"); + assertNotNull(tuple); + assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); + assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); + + tuple = PartitionedTableRewriter.buildTupleFromPartitionKeys(schema, "key2=abc"); + assertNotNull(tuple); + assertEquals(DatumFactory.createNullDatum(), tuple.asDatum(0)); + assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); + + } } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java index cccff709f1..1d0363d484 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java @@ -74,7 +74,8 @@ public final void testCompareTo() { for (int i = num - 1; i >= 0; i--) { tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500); } - + + Arrays.sort(tablets); for(int i = 0; i < num; i++) { @@ -94,4 +95,62 @@ public final void testCompareTo2() { Collections.addAll(sortedSet, tablets); assertEquals(num, sortedSet.size()); } + + @Test + public final void testGetAndSetFieldsWithPartitionKeys() { + FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0/col1=1"), + 0, 500, "col1=1"); + + assertEquals("table1_1", fragment1.getInputSourceId()); + assertEquals(new Path(path, "table0/col1=1"), fragment1.getPath()); + assertEquals("col1=1", fragment1.getPartitionKeys()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getLength()); + } + + @Test + public final void testGetProtoAndRestoreWithPartitionKeys() { + FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0/col1=1"), 0, + 500, "col1=1"); + + FileFragment fragment1 = FragmentConvertor.convert(conf, BuiltinFragmentKinds.FILE, + FragmentConvertor.toFragmentProto(conf, fragment)); + + assertEquals("table1_1", fragment1.getInputSourceId()); + assertEquals(new Path(path, "table0/col1=1"), fragment1.getPath()); + assertEquals("col1=1", fragment1.getPartitionKeys()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getLength()); + } + + @Test + public final void testCompareToWithPartitionKeys() { + final int num = 10; + FileFragment[] tablets = new FileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500 + , "/col1=0/col2=0"); + } + + Arrays.sort(tablets); + + for(int i = 0; i < num; i++) { + assertEquals("tablet1_"+i, tablets[i].getInputSourceId()); + } + } + + @Test + public final void testCompareToWithPartitionKeys2() { + final int num = 1860; + FileFragment[] tablets = new FileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new FileFragment("tablet1", new Path(path, "tablet/col1=" +i), (long)i * 6553500, + (long) (i+1) * 6553500, "col1=" + i); + } + + SortedSet sortedSet = Sets.newTreeSet(); + Collections.addAll(sortedSet, tablets); + assertEquals(num, sortedSet.size()); + } + } diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan index 2f2ca890fb..7456b5238a 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -82,8 +82,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan index f1fa414673..46e0b4b7ee 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -100,8 +100,8 @@ Block Id: eb_0000000000000_0000_000004 [LEAF] [q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.c.c_custkey (INT4), num=32) PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan index 2f2ca890fb..7456b5238a 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -82,8 +82,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan index f1fa414673..46e0b4b7ee 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -100,8 +100,8 @@ Block Id: eb_0000000000000_0000_000004 [LEAF] [q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.c.c_custkey (INT4), num=32) PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index fce56fc366..4474402dae 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -896,12 +896,12 @@ private boolean checkIfSortEquivalance(TaskAttemptContext ctx, ScanNode scanNode public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack node) throws IOException { + FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); // check if an input is sorted in the same order to the subsequence sort operator. if (checkIfSortEquivalance(ctx, scanNode, node)) { - if (ctx.getTable(scanNode.getCanonicalName()) == null) { + if (fragments == null) { return new SeqScanExec(ctx, scanNode, null); } - FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); return new ExternalSortExec(ctx, (SortNode) node.peek(), scanNode, fragments); } else { Enforcer enforcer = ctx.getEnforcer(); @@ -915,31 +915,15 @@ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, St } } - if (scanNode instanceof PartitionedTableScanNode - && ((PartitionedTableScanNode)scanNode).getInputPaths() != null && - ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) { - - if (broadcastFlag) { - PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode; - List fileFragments = new ArrayList<>(); - - FileTablespace space = TablespaceManager.get(scanNode.getTableDesc().getUri()); - for (Path path : partitionedTableScanNode.getInputPaths()) { - fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path))); - } - - FragmentProto[] fragments = - FragmentConvertor.toFragmentProtoArray(conf, fileFragments.toArray(new Fragment[fileFragments.size()])); - - ctx.addFragments(scanNode.getCanonicalName(), fragments); - return new PartitionMergeScanExec(ctx, scanNode, fragments); - } + if (scanNode.getTableDesc().hasPartition() && broadcastFlag && fragments != null) { + ctx.addFragments(scanNode.getCanonicalName(), fragments); + return new PartitionMergeScanExec(ctx, scanNode, fragments); } - if (ctx.getTable(scanNode.getCanonicalName()) == null) { + if (fragments == null) { return new SeqScanExec(ctx, scanNode, null); } - FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); + return new SeqScanExec(ctx, scanNode, fragments); } } @@ -1216,3 +1200,4 @@ public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, Log } } } + diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index 43e8382ed2..e5ab0ad3bc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -343,6 +343,7 @@ private long estimateOutputVolumeInternal(LogicalNode node) throws TajoInternalE if (node instanceof RelationNode) { switch (node.getType()) { case INDEX_SCAN: + case PARTITIONS_SCAN: case SCAN: ScanNode scanNode = (ScanNode) node; if (scanNode.getTableDesc().getStats() == null) { @@ -352,20 +353,6 @@ private long estimateOutputVolumeInternal(LogicalNode node) throws TajoInternalE } else { return scanNode.getTableDesc().getStats().getNumBytes(); } - case PARTITIONS_SCAN: - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node; - if (pScanNode.getTableDesc().getStats() == null) { - // TODO - this case means that data is not located in HDFS. So, we need additional - // broadcast method. - return Long.MAX_VALUE; - } else { - // if there is no selected partition - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - return 0; - } else { - return pScanNode.getTableDesc().getStats().getNumBytes(); - } - } case TABLE_SUBQUERY: return estimateOutputVolumeInternal(((TableSubQueryNode) node).getSubQuery()); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java index fd9d02275a..8840e8ce67 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java @@ -96,6 +96,7 @@ public static long computeDescendentVolume(LogicalNode node) { if (node instanceof RelationNode) { switch (node.getType()) { + case PARTITIONS_SCAN: case SCAN: ScanNode scanNode = (ScanNode) node; if (scanNode.getTableDesc().getStats() == null) { @@ -105,20 +106,6 @@ public static long computeDescendentVolume(LogicalNode node) { } else { return scanNode.getTableDesc().getStats().getNumBytes(); } - case PARTITIONS_SCAN: - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node; - if (pScanNode.getTableDesc().getStats() == null) { - // TODO - this case means that data is not located in HDFS. So, we need additional - // broadcast method. - return Long.MAX_VALUE; - } else { - // if there is no selected partition - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - return 0; - } else { - return pScanNode.getTableDesc().getStats().getNumBytes(); - } - } case TABLE_SUBQUERY: return computeDescendentVolume(((TableSubQueryNode) node).getSubQuery()); default: diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index 42ada8bb32..8ce1d17b2e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -96,7 +96,8 @@ public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf t String[] previousPartitionPathNames = null; for (FileStatus eachFile: nonZeroLengthFiles) { - FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null); + FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), + null, null); if (partitionDepth > 0) { // finding partition key; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 52cb080edd..dd14f6c43a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -99,9 +99,9 @@ private void rewriteColumnPartitionedTableSchema() throws IOException { if (fragments != null && fragments.length > 0) { List fileFragments = FragmentConvertor.convert(context.getConf(), fragments); - // Get a partition key value from a given path - partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath( - columnPartitionSchema, fileFragments.get(0).getPath(), false); + // Get tuple from first partition fragment using parition keys + partitionRow = PartitionedTableRewriter.buildTupleFromPartitionKeys(columnPartitionSchema, + fileFragments.get(0).getPartitionKeys()); } // Targets or search conditions may contain column references. diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index d423c041f1..de8f98f4e9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -568,7 +568,8 @@ public GetQueryResultDataResponse getQueryResultData(RpcController controller, G queryId, scanNode, Integer.MAX_VALUE, - codecType); + codecType, + context.getCatalog()); queryResultScanner.init(); session.addNonForwardQueryResultScanner(queryResultScanner); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java index c16e95bae1..aaebe11800 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java @@ -120,9 +120,6 @@ public LogicalNode visitPartitionedTableScan(PlanShapeFixerContext context, Logi throws TajoException { super.visitPartitionedTableScan(context, plan, block, node, stack); context.childNumbers.push(1); - Path[] inputPaths = node.getInputPaths(); - Arrays.sort(inputPaths); - node.setInputPaths(inputPaths); if (node.hasTargets()) { node.setTargets(sortTargets(node.getTargets())); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index 70268cb03a..766739a602 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -29,6 +29,7 @@ import org.apache.tajo.TajoProtos.CodecType; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; +import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableDesc; @@ -82,10 +83,12 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc final private Optional codecType; private MemoryRowBlock rowBlock; private Future nextFetch; + private CatalogService catalog; + public NonForwardQueryResultFileScanner(AsyncTaskService asyncTaskService, - TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, - int maxRow, Optional codecType) throws IOException { + TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, + int maxRow, Optional codecType, CatalogService catalog) throws IOException { this.asyncTaskService = asyncTaskService; this.tajoConf = tajoConf; this.sessionId = sessionId; @@ -95,6 +98,7 @@ public NonForwardQueryResultFileScanner(AsyncTaskService asyncTaskService, this.maxRow = maxRow; this.rowEncoder = RowStoreUtil.createEncoder(scanNode.getOutSchema()); this.codecType = codecType; + this.catalog = catalog; } public void init() throws IOException, TajoException { @@ -105,7 +109,7 @@ private void initSeqScanExec() throws IOException, TajoException { Tablespace tablespace = TablespaceManager.get(tableDesc.getUri()); List fragments = Lists.newArrayList( - SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true)); + SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true, catalog, tajoConf)); if (!fragments.isEmpty()) { FragmentProto[] fragmentProtos = diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 58f4ec013f..0186d16785 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -311,7 +311,8 @@ public void execSimpleQuery(QueryContext queryContext, Session session, String q queryInfo.getQueryId(), scanNode, maxRow, - Optional.empty()); + Optional.empty(), + context.getCatalog()); queryResultScanner.init(); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index c264e3e75f..79e8dc2cbd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -31,6 +31,7 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.PhysicalPlannerImpl; import org.apache.tajo.engine.planner.RangePartitionAlgorithm; @@ -92,13 +93,23 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC throws IOException, TajoException { ExecutionBlock execBlock = stage.getBlock(); QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext(); + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); ScanNode[] scans = execBlock.getScanNodes(); + ScanNode[] clonedScans = new ScanNode[scans.length]; Fragment[] fragments = new Fragment[scans.length]; long[] stats = new long[scans.length]; // initialize variables from the child operators for (int i = 0; i < scans.length; i++) { + // Clone ScanNode to reuse scan filter in broadcast join phase. + try { + clonedScans[i] = (ScanNode) scans[i].clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + TableDesc tableDesc = masterContext.getTableDesc(scans[i]); if (tableDesc == null) { // if it is a real table stored on storage @@ -124,7 +135,7 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC // So, we need to handle FileFragment by its size. // If we don't check its size, it can cause IndexOutOfBoundsException. List fileFragments = SplitUtil.getSplits( - TablespaceManager.get(tableDesc.getUri()), scans[i], tableDesc, false); + TablespaceManager.get(tableDesc.getUri()), scans[i], tableDesc, false, catalog, conf); if (fileFragments.size() > 0) { fragments[i] = fileFragments.get(0); @@ -243,7 +254,7 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx : largeScanIndexList.get(0); LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d", scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx])); - scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments); + scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, clonedScans); } else { if (largeScanIndexList.size() > 2) { throw new IOException("Symmetric Repartition Join should have two scan node, but " + nonLeafScanNames); @@ -264,6 +275,11 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC for (Integer eachIdx : broadcastIndexList) { scans[eachIdx].setBroadcastTable(true); broadcastScans[index] = scans[eachIdx]; + + if (broadcastScans[index].getType() == NodeType.PARTITIONS_SCAN && clonedScans[eachIdx].hasQual()) { + broadcastScans[index].setQual(clonedScans[eachIdx].getQual()); + } + broadcastStats[index] = stats[eachIdx]; broadcastFragments[index] = fragments[eachIdx]; index++; @@ -388,17 +404,17 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster //In this phase a ScanNode has a single fragment. //If there are more than one data files, that files should be added to fragments or partition path - for (ScanNode eachScan: broadcastScans) { - // TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with - // plain tables. - if (eachScan.getType() != NodeType.PARTITIONS_SCAN) { - TableDesc tableDesc = masterContext.getTableDesc(eachScan); + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); - Collection scanFragments = SplitUtil.getSplits( - TablespaceManager.get(tableDesc.getUri()), eachScan, tableDesc, false); - if (scanFragments != null) { - rightFragments.addAll(scanFragments); - } + for (int i = 0; i < broadcastScans.length; i++) { + ScanNode eachScan = broadcastScans[i]; + TableDesc tableDesc = masterContext.getTableDesc(eachScan); + + Collection scanFragments = SplitUtil.getSplits( + TablespaceManager.get(tableDesc.getUri()), eachScan, tableDesc, false, catalog, conf); + if (scanFragments != null) { + rightFragments.addAll(scanFragments); } } } @@ -462,7 +478,7 @@ public static Map>> merge } private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage, - int baseScanId, Fragment[] fragments) + int baseScanId, ScanNode[] clonedScans) throws IOException, TajoException { ExecutionBlock execBlock = stage.getBlock(); ScanNode[] scans = execBlock.getScanNodes(); @@ -484,21 +500,25 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch // . add all fragments to broadcastFragments Collection baseFragments = null; List broadcastFragments = new ArrayList<>(); + + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); + for (int i = 0; i < scans.length; i++) { ScanNode scan = scans[i]; TableDesc desc = stage.getContext().getTableDesc(scan); + if (scans[i].getType() == NodeType.PARTITIONS_SCAN && clonedScans[i].hasQual()) { + scans[i].setQual(clonedScans[i].getQual()); + } - Collection scanFragments = SplitUtil.getSplits(TablespaceManager.get(desc.getUri()), scan, desc, false); + Collection scanFragments = SplitUtil.getSplits(TablespaceManager.get(desc.getUri()), scan, desc, false + , catalog, conf); if (scanFragments != null) { if (i == baseScanId) { baseFragments = scanFragments; } else { - // TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with - // plain tables. - if (scan.getType() != NodeType.PARTITIONS_SCAN) { - broadcastFragments.addAll(scanFragments); - } + broadcastFragments.addAll(scanFragments); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index c055d119e4..e0bcf56253 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -28,10 +28,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; import org.apache.tajo.*; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; @@ -1183,9 +1180,11 @@ private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOExceptio ScanNode scan = scans[0]; TableDesc table = stage.context.getTableDesc(scan); + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); + Collection fragments = SplitUtil.getSplits( - TablespaceManager.get(scan.getTableDesc().getUri()), scan, table, false); - SplitUtil.preparePartitionScanPlanForSchedule(scan); + TablespaceManager.get(scan.getTableDesc().getUri()), scan, table, false, catalog, conf); Stage.scheduleFragments(stage, fragments); // The number of leaf tasks should be the number of fragments. diff --git a/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java index 5f66b07e85..84e8c4bdd6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java @@ -20,11 +20,15 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.PartitionedTableScanNode; import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.partition.PartitionPruningHandle; +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.storage.FileTablespace; import org.apache.tajo.storage.Tablespace; @@ -33,6 +37,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Optional; public class SplitUtil { @@ -56,12 +61,13 @@ public class SplitUtil { public static List getSplits(Tablespace tablespace, ScanNode scan, TableDesc tableDesc, - boolean requireSort) + boolean requireSort, + CatalogService catalog, + TajoConf conf) throws IOException, TajoException { List fragments; if (tableDesc.hasPartition()) { - // TODO: Partition tables should also be handled by tablespace. - fragments = SplitUtil.getFragmentsFromPartitionedTable(tablespace, scan, tableDesc, requireSort); + fragments = SplitUtil.getFragmentsFromPartitionedTable(tablespace, scan, tableDesc, requireSort, catalog, conf); } else { fragments = tablespace.getSplits(scan.getCanonicalName(), tableDesc, requireSort, scan.getQual()); } @@ -75,32 +81,25 @@ public static List getSplits(Tablespace tablespace, private static List getFragmentsFromPartitionedTable(Tablespace tsHandler, ScanNode scan, TableDesc table, - boolean requireSort) throws IOException { + boolean requireSort, + CatalogService catalog, + TajoConf conf) throws IOException, TajoException { Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace"); if (!(scan instanceof PartitionedTableScanNode)) { throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type."); } List fragments = Lists.newArrayList(); PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan; - fragments.addAll(((FileTablespace) tsHandler).getSplits( - scan.getCanonicalName(), table.getMeta(), table.getSchema(), requireSort, partitionsScan.getInputPaths())); + partitionsScan.init(scan); + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + PartitionPruningHandle pruningHandle = rewriter.getPartitionPruningHandle(conf, partitionsScan); + + FileTablespace tablespace = (FileTablespace) tsHandler; + fragments.addAll(tablespace.getSplits(scan.getCanonicalName(), table.getMeta(), table.getSchema() + , requireSort, Optional.of(pruningHandle.getPartitionKeys()), pruningHandle.getPartitionPaths())); + return fragments; - } - /** - * Clear input paths of {@link PartitionedTableScanNode}. - * This is to avoid unnecessary transmission of a lot of partition table paths to workers. - * So, this method should be invoked before {@link org.apache.tajo.querymaster.Stage#scheduleFragment(Stage, Fragment)} - * unless the scan is broadcasted. - * - * @param scanNode scan node - */ - public static void preparePartitionScanPlanForSchedule(ScanNode scanNode) { - if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { - // TODO: The partition input paths don't have to be kept in a logical node at all. - // This should be improved by implementing a specialized fragment for partition tables. - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scanNode; - partitionScan.clearInputPaths(); - } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java index 7d5c78ac7b..abbfd33cde 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java @@ -144,7 +144,8 @@ private static NonForwardQueryResultScanner getNonForwardQueryResultScanner( queryId, scanNode, Integer.MAX_VALUE, - Optional.empty()); + Optional.empty(), + masterContext.getCatalog()); resultScanner.init(); session.addNonForwardQueryResultScanner(resultScanner); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java index d3f714837f..a03b169241 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java @@ -18,133 +18,21 @@ package org.apache.tajo.plan.logical; -import com.google.common.base.Objects; -import com.google.gson.annotations.Expose; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.plan.PlanString; -import org.apache.tajo.plan.Target; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.util.TUtil; - -import java.util.ArrayList; -import java.util.Arrays; - public class PartitionedTableScanNode extends ScanNode { - @Expose private Path [] inputPaths; - public PartitionedTableScanNode(int pid) { super(pid, NodeType.PARTITIONS_SCAN); } - public void init(ScanNode scanNode, Path[] inputPaths) { + public void init(ScanNode scanNode) { tableDesc = scanNode.tableDesc; setInSchema(scanNode.getInSchema()); setOutSchema(scanNode.getOutSchema()); this.qual = scanNode.qual; this.targets = scanNode.targets; - setInputPaths(inputPaths); if (scanNode.hasAlias()) { alias = scanNode.alias; } } - public void clearInputPaths() { - this.inputPaths = null; - } - - public void setInputPaths(Path [] paths) { - this.inputPaths = paths; - if (this.inputPaths != null) { - Arrays.sort(inputPaths); - } - } - - public Path [] getInputPaths() { - return inputPaths; - } - - @Override - public int hashCode() { - return Objects.hashCode(this.tableDesc, this.qual, this.targets); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PartitionedTableScanNode) { - PartitionedTableScanNode other = (PartitionedTableScanNode) obj; - - boolean eq = super.equals(other); - eq = eq && TUtil.checkEquals(this.tableDesc, other.tableDesc); - eq = eq && TUtil.checkEquals(this.qual, other.qual); - eq = eq && TUtil.checkEquals(this.targets, other.targets); - eq = eq && TUtil.checkEquals(this.inputPaths, other.inputPaths); - - return eq; - } - - return false; - } - - @Override - public Object clone() throws CloneNotSupportedException { - PartitionedTableScanNode unionScan = (PartitionedTableScanNode) super.clone(); - - unionScan.tableDesc = (TableDesc) this.tableDesc.clone(); - - if (hasQual()) { - unionScan.qual = (EvalNode) this.qual.clone(); - } - - if (hasTargets()) { - unionScan.targets = new ArrayList<>(); - for (Target t : targets) { - unionScan.targets.add((Target) t.clone()); - } - } - - unionScan.inputPaths = inputPaths; - - return unionScan; - } - - @Override - public void preOrder(LogicalNodeVisitor visitor) { - visitor.visit(this); - } - - public void postOrder(LogicalNodeVisitor visitor) { - visitor.visit(this); - } - - @Override - public PlanString getPlanString() { - PlanString planStr = new PlanString(this).appendTitle(" on " + getTableName()); - if (hasAlias()) { - planStr.appendTitle(" as ").appendTitle(alias); - } - - if (hasQual()) { - planStr.addExplan("filter: ").appendExplain(this.qual.toString()); - } - - if (hasTargets()) { - planStr.addExplan("target list: ").appendExplain(StringUtils.join(targets, ", ")); - } - - planStr.addDetail("out schema: ").appendDetail(getOutSchema().toString()); - planStr.addDetail("in schema: ").appendDetail(getInSchema().toString()); - - if (inputPaths != null) { - planStr.addExplan("num of filtered paths: ").appendExplain(""+ inputPaths.length); - int i = 0; - for (Path path : inputPaths) { - planStr.addDetail((i++) + ": ").appendDetail(path.toString()); - } - } - - return planStr; - } } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/partition/PartitionPruningHandle.java b/tajo-plan/src/main/java/org/apache/tajo/plan/partition/PartitionPruningHandle.java new file mode 100644 index 0000000000..9271786efa --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/partition/PartitionPruningHandle.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.partition; + +import org.apache.hadoop.fs.Path; + +/** + * This includes result informs of partition pruning. + * + */ +public class PartitionPruningHandle { + private Path[] partitionPaths; + private String[] partitionKeys; + private long totalVolume; + + public PartitionPruningHandle(Path[] partitionPaths, String[] partitionKeys, long totalVolume) { + this.partitionPaths = partitionPaths; + this.partitionKeys = partitionKeys; + this.totalVolume = totalVolume; + } + + public Path[] getPartitionPaths() { + return partitionPaths; + } + + public String[] getPartitionKeys() { + return partitionKeys; + } + + public long getTotalVolume() { + return totalVolume; + } +} \ No newline at end of file diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 54b4844875..6b9f5f49e7 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -18,26 +18,27 @@ package org.apache.tajo.plan.rewrite.rules; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.*; import org.apache.tajo.OverridableConf; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; -import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionsByAlgebraProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.exception.*; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.partition.PartitionPruningHandle; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.util.EvalNodeToExprConverter; @@ -56,7 +57,6 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { private CatalogService catalog; - private long totalVolume; private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class); @@ -92,6 +92,10 @@ public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoExc return plan; } + public void setCatalog(CatalogService catalog) { + this.catalog = catalog; + } + private static class PartitionPathFilter implements PathFilter { private Schema schema; @@ -124,11 +128,11 @@ public String toString() { } } - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + private PartitionPruningHandle getPartitionPruningHandle(TajoConf conf, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { - return findFilteredPaths(queryContext, tableName, partitionColumns, conjunctiveForms, tablePath, null); + return getPartitionPruningHandle(conf, tableName, partitionColumns, conjunctiveForms, tablePath, null); } /** @@ -141,13 +145,13 @@ public String toString() { * @return * @throws IOException */ - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + private PartitionPruningHandle getPartitionPruningHandle(TajoConf conf, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath, ScanNode scanNode) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { - Path [] filteredPaths = null; - FileSystem fs = tablePath.getFileSystem(queryContext.getConf()); + PartitionPruningHandle partitionPruningHandle = null; + FileSystem fs = tablePath.getFileSystem(conf); String [] splits = IdentifierUtil.splitFQTableName(tableName); List partitions = null; @@ -155,17 +159,19 @@ public String toString() { if (conjunctiveForms == null) { partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + partitionPruningHandle = getPartitionPruningHandleByFileSystem(partitionColumns, conjunctiveForms, fs, + tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + partitionPruningHandle = getPartitionPruningHandleByCatalog(partitions); } } else { if (catalog.existPartitions(splits[0], splits[1])) { PartitionsByAlgebraProto request = getPartitionsAlgebraProto(splits[0], splits[1], conjunctiveForms); partitions = catalog.getPartitionsByAlgebra(request); - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + partitionPruningHandle = getPartitionPruningHandleByCatalog(partitions); } else { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + partitionPruningHandle = getPartitionPruningHandleByFileSystem(partitionColumns, conjunctiveForms, fs, + tablePath); } } } catch (UnsupportedException ue) { @@ -174,31 +180,36 @@ public String toString() { LOG.warn(ue.getMessage()); partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + partitionPruningHandle = getPartitionPruningHandleByFileSystem(partitionColumns, conjunctiveForms, fs, + tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + partitionPruningHandle = getPartitionPruningHandleByCatalog(partitions); } scanNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(conjunctiveForms)); } + LOG.info("Filtered directory or files: " + partitionPruningHandle.getPartitionPaths().length); + LOG.info("Filtered partition keys: " + partitionPruningHandle.getPartitionKeys().length); - LOG.info("Filtered directory or files: " + filteredPaths.length); - return filteredPaths; + return partitionPruningHandle; } /** - * Build list of partition path by PartitionDescProto which is generated from CatalogStore. + * Build list of partition path and partition keys by PartitionDescProto which is generated from CatalogStore. * * @param partitions * @return */ - private Path[] findFilteredPathsByPartitionDesc(List partitions) { - Path [] filteredPaths = new Path[partitions.size()]; + private PartitionPruningHandle getPartitionPruningHandleByCatalog(List partitions) { + long totalVolume = 0L; + Path[] filteredPaths = new Path[partitions.size()]; + String[] partitionKeys = new String[partitions.size()]; for (int i = 0; i < partitions.size(); i++) { - PartitionDescProto partition = partitions.get(i); + CatalogProtos.PartitionDescProto partition = partitions.get(i); filteredPaths[i] = new Path(partition.getPath()); + partitionKeys[i] = partition.getPartitionName(); totalVolume += partition.getNumBytes(); } - return filteredPaths; + return new PartitionPruningHandle(filteredPaths, partitionKeys, totalVolume); } /** @@ -212,10 +223,14 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti * @return * @throws IOException */ - private Path [] findFilteredPathsFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, - FileSystem fs, Path tablePath) throws IOException{ + private PartitionPruningHandle getPartitionPruningHandleByFileSystem(Schema partitionColumns, + EvalNode [] conjunctiveForms, FileSystem fs, Path tablePath) throws IOException{ Path [] filteredPaths = null; PathFilter [] filters; + int startIdx; + long totalVolume = 0L; + ContentSummary summary = null; + String[] partitionKeys = null; if (conjunctiveForms == null) { filters = buildAllAcceptingPathFilters(partitionColumns); @@ -230,7 +245,18 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti // Get all file status matched to a ith level path filter. filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i])); } - return filteredPaths; + + // Get partition keys and volume from the list of partition directories + partitionKeys = new String[filteredPaths.length]; + for (int i = 0; i < partitionKeys.length; i++) { + Path path = filteredPaths[i]; + startIdx = path.toString().indexOf(getColumnPartitionPathPrefix(partitionColumns)); + partitionKeys[i] = path.toString().substring(startIdx); + summary = fs.getContentSummary(path); + totalVolume += summary.getLength(); + } + + return new PartitionPruningHandle(filteredPaths, partitionKeys, totalVolume); } /** @@ -330,14 +356,16 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( for (int i = 0; i < fileStatuses.length; i++) { FileStatus fileStatus = fileStatuses[i]; paths[i] = fileStatus.getPath(); - totalVolume += fileStatus.getLen(); } return paths; } - public Path [] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException, - UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + public PartitionPruningHandle getPartitionPruningHandle(TajoConf conf, ScanNode scanNode) + throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { + long startTime = System.currentTimeMillis(); + PartitionPruningHandle pruningHandle = null; + TableDesc table = scanNode.getTableDesc(); PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod(); @@ -375,11 +403,18 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( } if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, + pruningHandle = getPartitionPruningHandle(conf, table.getName(), paritionValuesSchema, indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getUri()), scanNode); } else { // otherwise, we will get all partition paths. - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, null, new Path(table.getUri())); + pruningHandle = getPartitionPruningHandle(conf, table.getName(), paritionValuesSchema, null, + new Path(table.getUri())); } + + long finishTime = System.currentTimeMillis(); + long elapsedMills = finishTime - startTime; + + LOG.info(String.format("Partition pruning: %d ms elapsed.", elapsedMills)); + return pruningHandle; } private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) { @@ -496,6 +531,49 @@ public static String getColumnPartitionPathPrefix(Schema partitionColumn) { return sb.toString(); } + /** + * This transforms a partition name into a tupe with a given partition column schema. When a file path + * Assume that an user gives partition name 'country=KOREA/city=SEOUL'. + * + * The first datum of tuple : KOREA + * The second datum of tuple : SEOUL + * + * @param partitionColumnSchema The partition column schema + * @param partitionKeys The keys of partition + * @return The tuple transformed from a column values part. + */ + public static Tuple buildTupleFromPartitionKeys(Schema partitionColumnSchema, String partitionKeys) { + Preconditions.checkNotNull(partitionColumnSchema); + Preconditions.checkNotNull(partitionKeys); + + String [] columnValues = partitionKeys.split("/"); + Preconditions.checkArgument(partitionColumnSchema.size() >= columnValues.length, + "Invalid Partition Keys :" + partitionKeys); + + Tuple tuple = new VTuple(partitionColumnSchema.size()); + + for (int i = 0; i < tuple.size(); i++) { + tuple.put(i, NullDatum.get()); + } + + for (int i = 0; i < columnValues.length; i++) { + String [] parts = columnValues[i].split("="); + if (parts.length == 2) { + int columnId = partitionColumnSchema.getColumnIdByName(parts[0]); + Column keyColumn = partitionColumnSchema.getColumn(columnId); + + if (parts[1].equals(StorageConstants.DEFAULT_PARTITION_NAME)){ + tuple.put(columnId, DatumFactory.createNullDatum()); + } else { + tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), + StringUtils.unescapePathName(parts[1]))); + } + } + } + + return tuple; + } + private final class Rewriter extends BasicLogicalPlanVisitor { @Override public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalPlan.QueryBlock block, @@ -506,24 +584,19 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP return null; } - try { - Path [] filteredPaths = findFilteredPartitionPaths(queryContext, scanNode); - plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions"); - PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); - rewrittenScanNode.init(scanNode, filteredPaths); - rewrittenScanNode.getTableDesc().getStats().setNumBytes(totalVolume); + PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); + rewrittenScanNode.init(scanNode); - // if it is topmost node, set it as the rootnode of this block. - if (stack.empty() || block.getRoot().equals(scanNode)) { - block.setRoot(rewrittenScanNode); - } else { - PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode); - } - block.registerNode(rewrittenScanNode); - } catch (IOException e) { - throw new TajoInternalError("Partitioned Table Rewrite Failed: \n" + e.getMessage()); + // if it is topmost node, set it as the rootnode of this block. + if (stack.empty() || block.getRoot().equals(scanNode)) { + block.setRoot(rewrittenScanNode); + } else { + PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode); } + + block.registerNode(rewrittenScanNode); return null; } } + } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index 02b2ad9e28..23eab8411b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -20,7 +20,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.hadoop.fs.Path; import org.apache.tajo.OverridableConf; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.annotation.Nullable; @@ -457,13 +456,6 @@ private static PartitionedTableScanNode convertPartitionScan(OverridableConf con PlanProto.LogicalNode protoNode) { PartitionedTableScanNode partitionedScan = new PartitionedTableScanNode(protoNode.getNodeId()); fillScanNode(context, evalContext, protoNode, partitionedScan); - - PlanProto.PartitionScanSpec partitionScanProto = protoNode.getPartitionScan(); - Path [] paths = new Path[partitionScanProto.getPathsCount()]; - for (int i = 0; i < partitionScanProto.getPathsCount(); i++) { - paths[i] = new Path(partitionScanProto.getPaths(i)); - } - partitionedScan.setInputPaths(paths); return partitionedScan; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index a536b020ca..db8cfba1f5 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -19,7 +19,6 @@ package org.apache.tajo.plan.serder; import com.google.common.collect.Maps; -import org.apache.hadoop.fs.Path; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -473,21 +472,9 @@ public LogicalNode visitPartitionedTableScan(SerializeContext context, LogicalPl throws TajoException { PlanProto.ScanNode.Builder scanBuilder = buildScanNode(node); - - PlanProto.PartitionScanSpec.Builder partitionScan = PlanProto.PartitionScanSpec.newBuilder(); - List pathStrs = new ArrayList<>(); - if (node.getInputPaths() != null) { - for (Path p : node.getInputPaths()) { - pathStrs.add(p.toString()); - } - partitionScan.addAllPaths(pathStrs); - } - PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); nodeBuilder.setScan(scanBuilder); - nodeBuilder.setPartitionScan(partitionScan); context.treeBuilder.addNodes(nodeBuilder); - return node; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index bfeb4494be..c01d656998 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -990,13 +990,6 @@ public static boolean hasAsterisk(List namedExprs) { public static long getTableVolume(ScanNode scanNode) { if (scanNode.getTableDesc().hasStats()) { long scanBytes = scanNode.getTableDesc().getStats().getNumBytes(); - if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode; - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - scanBytes = 0L; - } - } - return scanBytes; } else { return TajoConstants.UNKNOWN_LENGTH; diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index da9501c090..7a166e1c56 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -73,36 +73,35 @@ message LogicalNode { optional SchemaProto out_schema = 5; optional ScanNode scan = 6; - optional PartitionScanSpec partitionScan = 7; - optional IndexScanSpec indexScan = 8; - optional JoinNode join = 9; - optional FilterNode filter = 10; - optional GroupbyNode groupby = 11; - optional DistinctGroupbyNode distinctGroupby = 12; - optional SortNode sort = 13; - optional LimitNode limit = 14; - optional WindowAggNode windowAgg = 15; - optional ProjectionNode projection = 16; - optional EvalExprNode exprEval = 17; - optional UnionNode union = 18; - optional TableSubQueryNode tableSubQuery = 19; - optional PersistentStoreNode persistentStore = 20; - optional StoreTableNodeSpec storeTable = 21; - optional InsertNodeSpec insert = 22; - optional CreateTableNodeSpec createTable = 23; - optional RootNode root = 24; - optional SetSessionNode setSession = 25; - - optional CreateDatabaseNode createDatabase = 26; - optional DropDatabaseNode dropDatabase = 27; - optional DropTableNode dropTable = 28; - - optional AlterTablespaceNode alterTablespace = 29; - optional AlterTableNode alterTable = 30; - optional TruncateTableNode truncateTableNode = 31; - - optional CreateIndexNode createIndex = 32; - optional DropIndexNode dropIndex = 33; + optional IndexScanSpec indexScan = 7; + optional JoinNode join = 8; + optional FilterNode filter = 9; + optional GroupbyNode groupby = 10; + optional DistinctGroupbyNode distinctGroupby = 11; + optional SortNode sort = 12; + optional LimitNode limit = 13; + optional WindowAggNode windowAgg = 14; + optional ProjectionNode projection = 15; + optional EvalExprNode exprEval = 16; + optional UnionNode union = 17; + optional TableSubQueryNode tableSubQuery = 18; + optional PersistentStoreNode persistentStore = 19; + optional StoreTableNodeSpec storeTable = 20; + optional InsertNodeSpec insert = 21; + optional CreateTableNodeSpec createTable = 22; + optional RootNode root = 23; + optional SetSessionNode setSession = 24; + + optional CreateDatabaseNode createDatabase = 25; + optional DropDatabaseNode dropDatabase = 26; + optional DropTableNode dropTable = 27; + + optional AlterTablespaceNode alterTablespace = 28; + optional AlterTableNode alterTable = 29; + optional TruncateTableNode truncateTableNode = 30; + + optional CreateIndexNode createIndex = 31; + optional DropIndexNode dropIndex = 32; } message ScanNode { @@ -115,10 +114,6 @@ message ScanNode { required bool nameResolveBase = 7; } -message PartitionScanSpec { - repeated string paths = 1; -} - message IndexScanSpec { required SchemaProto keySchema = 1; required string indexPath = 2; @@ -231,10 +226,6 @@ enum JoinType { RIGHT_SEMI_JOIN = 9; } -message PartitionTableScanSpec { - repeated string paths = 1; -} - message PersistentStoreNode { optional int32 childSeq = 1; // CreateTableNode may not have any children. This should be improved at TAJO-1589. required string storageType = 2; diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java index 72df3f5c78..fcb4da58fe 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java @@ -20,6 +20,7 @@ public class BuiltinFragmentKinds { public static final String FILE = "FILE"; + public static final String PARTIION_FILE = "PARTITION_FILE"; public static final String HBASE = "HBASE"; public static final String JDBC = "JDBC"; public static final String HTTP = "EXAMPLE-HTTP"; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 2785de4ea4..352fc28291 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -441,6 +441,82 @@ public int compare(Map.Entry v1, Map.Entry v2) return new FileFragment(fragmentId, file, start, length, hosts); } + + /** + * Is the given filename splitable? Usually, true, but if the file is + * stream compressed, it will not be. + *

+ * FileInputFormat implementations can override this and return + * false to ensure that individual input files are never split-up + * so that Mappers process entire files. + * + * + * @param meta the metadata of target table + * @param schema the schema of target table + * @param path the file name to check + * @param partitionKeys keys of target partition + * @param status get the file length + * @return is this file isSplittable? + * @throws IOException + */ + protected boolean isSplittablePartitionedTable(TableMeta meta, Schema schema, Path path, String partitionKeys, + FileStatus status) throws IOException { + Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen(), partitionKeys); + Scanner scanner = getScanner(meta, schema, fragment, null); + boolean split = scanner.isSplittable(); + scanner.close(); + return split; + } + + /** + * Build a fragment for partition table + * + * @param fragmentId fragment id + * @param file file path + * @param start offset + * @param length length + * @param hosts the list of hosts (names) hosting blocks + * @param partitionKeys partition keys + * @return FileFragment + */ + protected FileFragment makeSplitOfPartitionedTable(String fragmentId, Path file, long start, long length, + String[] hosts, String partitionKeys) { + return new FileFragment(fragmentId, file, start, length, hosts, partitionKeys); + } + + /** + * Build a fragment for partition table + * + * @param fragmentId fragment id + * @param file file path + * @param blockLocation location of block + * @param partitionKeys partition keys + * @return FileFragment + * @throws IOException + */ + protected FileFragment makeSplitOfPartitionedTable(String fragmentId, Path file, BlockLocation blockLocation + , String partitionKeys) throws IOException { + return new FileFragment(fragmentId, file, blockLocation, partitionKeys); + } + + /** + * Build a fragment for non splittable partition table + * + * @param fragmentId fragment id + * @param file file path + * @param start offset + * @param length length + * @param blkLocations locations of blocks + * @param partitionKeys partition keys + * @return FileFragment + * @throws IOException + */ + protected Fragment makeNonSplitOfPartitionedTable(String fragmentId, Path file, long start, long length, + BlockLocation[] blkLocations, String partitionKeys) throws IOException { + String[] hosts = getHosts(blkLocations); + return new FileFragment(fragmentId, file, start, length, hosts, partitionKeys); + } + /** * Get the minimum split size * @@ -465,19 +541,25 @@ private Integer[] getDiskIds(VolumeId[] volumeIds) { return diskIds; } + public List getSplits(String tableName, TableMeta meta, Schema schema, boolean requireSort, + Path... inputs) throws IOException { + return getSplits(tableName, meta, schema, requireSort, Optional.empty(), inputs); + } + /** * Generate the list of files and make them into FileSplits. * * @throws IOException */ - public List getSplits(String tableName, TableMeta meta, Schema schema, boolean requireSort, Path... inputs) - throws IOException { + public List getSplits(String tableName, TableMeta meta, Schema schema, boolean requireSort, + Optional partitionKeys, Path... inputs) throws IOException { // generate splits' List splits = Lists.newArrayList(); List volumeSplits = Lists.newArrayList(); List blockLocations = Lists.newArrayList(); + int i = 0; for (Path p : inputs) { ArrayList files = Lists.newArrayList(); if (fs.isFile(p)) { @@ -493,12 +575,18 @@ public List getSplits(String tableName, TableMeta meta, Schema schema, if (length > 0) { // Get locations of blocks of file BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - boolean splittable = isSplittable(meta, schema, path, file); + boolean splittable = partitionKeys.isPresent() ? + isSplittablePartitionedTable(meta, schema, path, partitionKeys.get()[i], file) + : isSplittable(meta, schema, path, file); if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { if (splittable) { for (BlockLocation blockLocation : blkLocations) { - volumeSplits.add(makeSplit(tableName, path, blockLocation)); + if (partitionKeys.isPresent()) { + volumeSplits.add(makeSplitOfPartitionedTable(tableName, path, blockLocation, partitionKeys.get()[i])); + } else { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } } blockLocations.addAll(Arrays.asList(blkLocations)); @@ -507,10 +595,20 @@ public List getSplits(String tableName, TableMeta meta, Schema schema, if (blockSize >= length) { blockLocations.addAll(Arrays.asList(blkLocations)); for (BlockLocation blockLocation : blkLocations) { - volumeSplits.add(makeSplit(tableName, path, blockLocation)); + if (partitionKeys.isPresent()) { + volumeSplits.add(makeSplitOfPartitionedTable(tableName, path, blockLocation, + partitionKeys.get()[i])); + } else { + volumeSplits.add(makeSplit(tableName, path, blockLocation)); + } } } else { - splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + if (partitionKeys.isPresent()) { + splits.add(makeNonSplitOfPartitionedTable(tableName, path, 0, length, blkLocations, + partitionKeys.get()[i])); + } else { + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } } } @@ -526,17 +624,32 @@ public List getSplits(String tableName, TableMeta meta, Schema schema, // for s3 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, + if (partitionKeys.isPresent()) { + splits.add(makeSplitOfPartitionedTable(tableName, path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts(), partitionKeys.get()[i])); + } else { + splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); + } bytesRemaining -= splitSize; } if (bytesRemaining > 0) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, + if (partitionKeys.isPresent()) { + splits.add(makeSplitOfPartitionedTable(tableName, path, length - bytesRemaining, bytesRemaining, + blkLocations[blkIndex].getHosts(), partitionKeys.get()[i])); + } else { + splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts())); + } } } else { // Non splittable - splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + if (partitionKeys.isPresent()) { + splits.add(makeNonSplitOfPartitionedTable(tableName, path, 0, length, blkLocations, + partitionKeys.get()[i])); + } else { + splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); + } } } } @@ -544,6 +657,7 @@ public List getSplits(String tableName, TableMeta meta, Schema schema, if(LOG.isDebugEnabled()){ LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); } + i++; } // Combine original fileFragments with new VolumeId information @@ -553,6 +667,40 @@ public List getSplits(String tableName, TableMeta meta, Schema schema, return splits; } + /** + * Get the list of hosts (hostname) hosting specified blocks + * + * + * @param blkLocations locations of blocks + * @return the list of hosts + * @throws IOException + */ + private String[] getHosts(BlockLocation[] blkLocations) throws IOException { + Map hostsBlockMap = new HashMap<>(); + for (BlockLocation blockLocation : blkLocations) { + for (String host : blockLocation.getHosts()) { + if (hostsBlockMap.containsKey(host)) { + hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); + } else { + hostsBlockMap.put(host, 1); + } + } + } + + List> entries = new ArrayList<>(hostsBlockMap.entrySet()); + Collections.sort(entries, (Map.Entry v1, Map.Entry v2) + -> v1.getValue().compareTo(v2.getValue())); + + String[] hosts = new String[blkLocations[0].getHosts().length]; + + for (int i = 0; i < hosts.length; i++) { + Map.Entry entry = entries.get((entries.size() - 1) - i); + hosts[i] = entry.getKey(); + } + + return hosts; + } + private void setVolumeMeta(List splits, final List blockLocations) throws IOException { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java index 0ead6009a4..b5c40205c9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java @@ -32,24 +32,41 @@ */ public class FileFragment extends AbstractFileFragment { private Integer[] diskIds; // disk volume ids + private String partitionKeys; public FileFragment(String tableName, Path uri, BlockLocation blockLocation) throws IOException { - this(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null); + this(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null, null); } - public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, Integer[] diskIds) { + public FileFragment(String tableName, Path uri, BlockLocation blockLocation, String partitionKeys) + throws IOException { + this(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null, + partitionKeys); + } + + public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, Integer[] diskIds, + String partitionKeys) { super(BuiltinFragmentKinds.FILE, uri.toUri(), tableName, start, start + length, length, hosts); this.diskIds = diskIds; + this.partitionKeys = partitionKeys; } // Non splittable public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) { - this(tableName, uri, start, length, hosts, null); + this(tableName, uri, start, length, hosts, null, null); + } + + public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, String partitionKeys) { + this(tableName, uri, start, length, hosts, null, partitionKeys); } public FileFragment(String fragmentId, Path path, long start, long length) { - this(fragmentId, path, start, length, null, null); + this(fragmentId, path, start, length, null, null, null); + } + + public FileFragment(String fragmentId, Path path, long start, long length, String partitionKeys) { + this(fragmentId, path, start, length, null, null, partitionKeys); } /** @@ -68,6 +85,14 @@ public void setDiskIds(Integer[] diskIds){ this.diskIds = diskIds; } + public String getPartitionKeys() { + return this.partitionKeys; + } + + public void setPartitionKeys(String partitionKeys) { + this.partitionKeys = partitionKeys; + } + public Path getPath() { return new Path(uri); } @@ -91,20 +116,21 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hashCode(inputSourceId, uri, startKey, endKey, length, diskIds, hostNames); + return Objects.hashCode(inputSourceId, uri, startKey, endKey, length, diskIds, hostNames, partitionKeys); } @Override public Object clone() throws CloneNotSupportedException { FileFragment frag = (FileFragment) super.clone(); frag.diskIds = diskIds; + frag.partitionKeys = partitionKeys; return frag; } @Override public String toString() { - return "\"fragment\": {\"id\": \""+ inputSourceId +"\", \"path\": " - +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": " - + getLength() + "}" ; + return "\"fragment\": {\"id\": \""+ inputSourceId +"\", \"path\": " + getPath() + + "\", \"start\": " + this.getStartKey() + ",\"length\": " + getLength() + + ",\"partitionKeys\": " + getPartitionKeys() + "}" ; } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java index 926e5febf5..ca106e96d0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java @@ -50,6 +50,11 @@ public FileFragmentProto serialize(FileFragment fragment) { if(fragment.hostNames != null) { builder.addAllHosts(fragment.hostNames); } + + if(fragment.getPartitionKeys() != null) { + builder.setPartitionKeys(fragment.getPartitionKeys()); + } + return builder.build(); } @@ -61,6 +66,8 @@ public FileFragment deserialize(FileFragmentProto proto) { proto.getStartOffset(), proto.getLength(), proto.getHostsList().toArray(new String[proto.getHostsCount()]), - proto.getDiskIdsList().toArray(new Integer[proto.getDiskIdsCount()])); + proto.getDiskIdsList().toArray(new Integer[proto.getDiskIdsCount()]), + proto.getPartitionKeys() + ); } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto index 0579f05aa9..c05a1c1069 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto +++ b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto @@ -31,4 +31,6 @@ message FileFragmentProto { required int64 length = 4; repeated string hosts = 5; repeated int32 disk_ids = 6; + // Partition Keys: country=KOREA/city=SEOUL + optional string partitionKeys = 7; }