diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 22dd5a4850..5cd99f686d 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -104,6 +104,11 @@ public static TableDesc newTableDesc(String tableName, Schema schema, TableMeta return new TableDesc(tableName, schema, meta, path.toUri()); } + public static TableDesc newTableDesc(String tableName, Schema schema, TableMeta meta, Path path + , PartitionMethodDesc partitionMethodDesc) { + return new TableDesc(tableName, schema, meta, path.toUri(), partitionMethodDesc); + } + public static TableDesc newTableDesc(TableDescProto proto) { return new TableDesc(proto); } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java index 8122bd59db..948567001c 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java @@ -51,32 +51,46 @@ public class TableDesc implements ProtoObject, GsonObject, Clone public TableDesc() { } + public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, @Nullable URI uri, boolean external) { + this(tableName, schema, meta, uri, null, external); + } + public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, - @Nullable URI uri, boolean external) { + @Nullable URI uri, @Nullable PartitionMethodDesc partitionMethodDesc, boolean external) { this.tableName = tableName; this.schema = schema; this.meta = meta; this.uri = uri; + this.partitionMethodDesc = partitionMethodDesc; this.external = external; } public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, @Nullable URI path) { - this(tableName, schema, meta, path, true); + this(tableName, schema, meta, path, null, true); } - + + public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, @Nullable URI path, + @Nullable PartitionMethodDesc partitionMethodDesc) { + this(tableName, schema, meta, path, partitionMethodDesc, true); + } + public TableDesc(String tableName, @Nullable Schema schema, String dataFormat, KeyValueSet options, @Nullable URI path) { this(tableName, schema, new TableMeta(dataFormat, options), path); } - - public TableDesc(TableDescProto proto) { + + public TableDesc(String tableName, @Nullable Schema schema, String dataFormat, KeyValueSet options, + @Nullable URI path, @Nullable PartitionMethodDesc partitionMethodDesc) { + this(tableName, schema, new TableMeta(dataFormat, options), path, partitionMethodDesc); + } + + public TableDesc(TableDescProto proto) { this(proto.getTableName(), proto.hasSchema() ? SchemaFactory.newV1(proto.getSchema()) : null, - new TableMeta(proto.getMeta()), proto.hasPath() ? URI.create(proto.getPath()) : null, proto.getIsExternal()); + new TableMeta(proto.getMeta()), proto.hasPath() ? URI.create(proto.getPath()) : null, + proto.hasPartition() ? new PartitionMethodDesc(proto.getPartition()) : null, + proto.getIsExternal()); if(proto.hasStats()) { this.stats = new TableStats(proto.getStats()); - } - if (proto.hasPartition()) { - this.partitionMethodDesc = new PartitionMethodDesc(proto.getPartition()); } } diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml b/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml index d8484613fb..86e9338979 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml @@ -33,6 +33,7 @@ UTF-8 UTF-8 + 3.4 @@ -163,7 +164,7 @@ org.apache.hive hive-metastore - ${hive.version} + ${hive.version} provided @@ -369,6 +370,12 @@ parquet-hadoop-bundle ${parquet.version} + + org.antlr + antlr-runtime + ${hive.antlr.version} + test + diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java index 1260371c77..20d6d5ced5 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java @@ -253,7 +253,7 @@ public void testTableWithNullValue() throws Exception { } - // TODO: This should be added at TAJO-1891 + @Test public void testAddTableByPartition() throws Exception { TableMeta meta = new TableMeta("TEXT", new KeyValueSet()); diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index 6583d4eb01..0f6d8a7dfb 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -452,11 +452,6 @@ public ReturnState alterTable(RpcController controller, AlterTableDescProto prot return errInsufficientPrivilege("alter a table in database '" + split[0] + "'"); } - // TODO: This should be removed at TAJO-1891 - if (proto.getAlterTableType() == CatalogProtos.AlterTableType.ADD_PARTITION) { - return errFeatureNotImplemented("ADD PARTTIION"); - } - wlock.lock(); try { diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index f0cb2086a9..135f9594c0 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -784,7 +784,7 @@ public final void testAddAndDeleteTablePartitionByRange() throws Exception { assertFalse(catalog.existsTable(tableName)); } - // TODO: This should be added at TAJO-1891 + @Test public final void testAddAndDeleteTablePartitionByColumn() throws Exception { Schema schema = SchemaBuilder.builder() .add("id", Type.INT4) diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java index cab2e49044..27cbab23da 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java @@ -165,7 +165,7 @@ public void testTable() throws Exception { } } - // TODO: This should be added at TAJO-1891 + @Test public void testTablePartition() throws Exception { ////////////////////////////////////////////////////////////////////////////// // Test add partition diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogExceptions.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogExceptions.java index 3b90e218a3..96cb8dbc28 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogExceptions.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogExceptions.java @@ -159,7 +159,7 @@ public void testUpdateTableStatsOfUndefinedTable() throws Exception { build()); } - // TODO: This should be added at TAJO-1891 + @Test public void testAddPartitionWithWrongUri() throws Exception { // TODO: currently, wrong uri does not occur any exception. String partitionName = "DaTe=/=AaA"; @@ -173,7 +173,7 @@ public void testAddPartitionWithWrongUri() throws Exception { catalog.alterTable(alterTableDesc); } - // TODO: This should be added at TAJO-1891 + @Test(expected = DuplicatePartitionException.class) public void testAddDuplicatePartition() throws Exception { String partitionName = "DaTe=bBb/dAtE=AaA"; PartitionDesc partitionDesc = CatalogTestingUtil.buildPartitionDesc(partitionName); @@ -196,7 +196,7 @@ public void testAddDuplicatePartition() throws Exception { catalog.alterTable(alterTableDesc); } - // TODO: This should be added at TAJO-1891 + @Test(expected = UndefinedTableException.class) public void testAddPartitionToUndefinedTable() throws Exception { String partitionName = "DaTe=bBb/dAtE=AaA"; PartitionDesc partitionDesc = CatalogTestingUtil.buildPartitionDesc(partitionName); @@ -209,19 +209,6 @@ public void testAddPartitionToUndefinedTable() throws Exception { catalog.alterTable(alterTableDesc); } - // TODO: This should be removed at TAJO-1891 - @Test(expected = NotImplementedException.class) - public void testAddPartitionNotimplementedException() throws Exception { - String partitionName = "DaTe=/=AaA"; - PartitionDesc partitionDesc = CatalogTestingUtil.buildPartitionDesc(partitionName); - - AlterTableDesc alterTableDesc = new AlterTableDesc(); - alterTableDesc.setTableName(IdentifierUtil.buildFQName("TestDatabase1", "TestPartition1")); - alterTableDesc.setPartitionDesc(partitionDesc); - alterTableDesc.setAlterTableType(AlterTableType.ADD_PARTITION); - - catalog.alterTable(alterTableDesc); - } @Test(expected = UndefinedPartitionException.class) public void testDropUndefinedPartition() throws Exception { diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java index 7eb50bbaec..9f9feb578e 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java @@ -193,14 +193,10 @@ private static void dumpDatabase(TajoClient client, String databaseName, PrintWr writer.write("--\n"); writer.write(String.format("-- Table Partitions: %s%n", tableName)); writer.write("--\n"); - // TODO: This should be improved at TAJO-1891 -// List partitionProtos = client.getPartitionsOfTable(fqName); -// for (PartitionDescProto eachPartitionProto : partitionProtos) { -// writer.write(DDLBuilder.buildDDLForAddPartition(table, eachPartitionProto)); -// } - writer.write(String.format("ALTER TABLE %s REPAIR PARTITION;", - IdentifierUtil.denormalizeIdentifier(databaseName) + "." + IdentifierUtil.denormalizeIdentifier(tableName))); - + List partitionProtos = client.getPartitionsOfTable(fqName); + for (CatalogProtos.PartitionDescProto eachPartitionProto : partitionProtos) { + writer.write(DDLBuilder.buildDDLForAddPartition(table, eachPartitionProto)); + } writer.write("\n\n"); } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java index 57a6aa3ffe..8f3cb6f03d 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java @@ -119,16 +119,17 @@ public void testPartitionsDump() throws Exception { + " partition by column(\"col3\" int4, \"col4\" int4)" ); - // TODO: This should be improved at TAJO-1891 -// executeString("ALTER TABLE \"" + getCurrentDatabase() + "\".\"TableName3\"" + -// " ADD PARTITION (\"col3\" = 1 , \"col4\" = 2)"); -// executeString("ALTER TABLE \"" + getCurrentDatabase() + "\".\"TableName4\"" + -// " ADD PARTITION (\"col3\" = 'tajo' , \"col4\" = '2015-09-01')"); + executeString("ALTER TABLE \"" + getCurrentDatabase() + "\".\"TableName3\"" + + " ADD PARTITION (\"col3\" = 1 , \"col4\" = 2)"); + executeString("create table \"" + getCurrentDatabase() + "\".\"TableName4\"" + " (\"col1\" int4, \"col2\" int4) " + " partition by column(\"col3\" TEXT, \"col4\" date)" ); + executeString("ALTER TABLE \"" + getCurrentDatabase() + "\".\"TableName4\"" + + " ADD PARTITION (\"col3\" = 'tajo' , \"col4\" = '2015-09-01')"); + try { UserRoleInfo userInfo = UserRoleInfo.getCurrentUser(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java index 47c5a35639..f3194fd47c 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java @@ -547,19 +547,7 @@ public void testResultRowNumWhenSelectingOnPartitionedTable() throws Exception { } } - // TODO: This should be removed at TAJO-1891 @Test - public void testAddPartitionNotimplementedException() throws Exception { - String tableName = IdentifierUtil.normalizeIdentifier("testAddPartitionNotimplementedException"); - tajoCli.executeScript("create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8)"); - tajoCli.executeScript("alter table " + tableName + " add partition (key2 = 0.1)"); - - String consoleResult; - consoleResult = new String(out.toByteArray()); - assertOutputResult(consoleResult); - } - - // TODO: This should be added at TAJO-1891 public void testAlterTableAddDropPartition() throws Exception { String tableName = IdentifierUtil.normalizeIdentifier("testAlterTableAddPartition"); @@ -586,8 +574,10 @@ public void testAlterTableAddDropPartition() throws Exception { tajoCli.executeScript("alter table " + tableName + " drop partition (col3 = 0.1, col4 = 10)"); - String consoleResult = new String(out.toByteArray()); - assertOutputResult(consoleResult); + String stdoutResult = new String(out.toByteArray()); + assertOutputResult(stdoutResult); + String stdErrResult = new String(err.toByteArray()); + assertErrorResult(stdErrResult, false); } public static class TajoCliOutputTestFormatter extends DefaultTajoCliOutputFormatter { diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 142b2c3836..b323a329a9 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -1233,7 +1233,7 @@ public final void testAlterTableRepairPartiton() throws TajoException { "ALTER TABLE partitioned_table DROP PARTITION (col1 = '2015' , col2 = '01', col3 = '11' )", //1 }; - // TODO: This should be added at TAJO-1891 + @Test public final void testAddPartitionAndDropPartition() throws TajoException { String tableName = IdentifierUtil.normalizeIdentifier("partitioned_table"); String qualifiedTableName = IdentifierUtil.buildFQName(DEFAULT_DATABASE_NAME, tableName); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java new file mode 100644 index 0000000000..291e65ab6c --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java @@ -0,0 +1,656 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.*; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.partition.PartitionDesc; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionType; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.parser.sql.SQLAnalyzer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.partition.PartitionPruningHandle; +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.junit.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.*; + +public class TestPartitionedTableRewriter { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPartitionedTableRewriter"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + private FileSystem fs; + + final static String PARTITION_TABLE_NAME = "tb_partition"; + final static String MULTIPLE_PARTITION_TABLE_NAME = "tb_multiple_partition"; + final static String ARBITRARY_PARTITION_TABLE_NAME = "tb_arbitrary_partition"; + final static private Path[] ARBITRARY_PATH = new Path[3]; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + util.startCatalogCluster(); + catalog = util.getCatalogService(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + fs = FileSystem.get(conf); + + Schema schema = SchemaBuilder.builder() + .add("n_nationkey", TajoDataTypes.Type.INT8) + .add("n_name", TajoDataTypes.Type.TEXT) + .add("n_regionkey", TajoDataTypes.Type.INT8) + .build(); + + TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, util.getConfiguration()); + + createTableWithOnePartitionKeyColumn(fs, schema, meta); + createTableWithMultiplePartitionKeyColumns(fs, schema, meta); + createTableIncludeArbitraryDirectories(fs, schema, meta); + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + } + + private void createTableWithOnePartitionKeyColumn(FileSystem fs, Schema schema, + TableMeta meta) throws Exception { + Schema partSchema = SchemaBuilder.builder() + .add("key", TajoDataTypes.Type.TEXT) + .build(); + + PartitionMethodDesc partitionMethodDesc = + new PartitionMethodDesc(DEFAULT_DATABASE_NAME, PARTITION_TABLE_NAME, + CatalogProtos.PartitionType.COLUMN, "key", partSchema); + + Path tablePath = new Path(testDir, PARTITION_TABLE_NAME); + fs.mkdirs(tablePath); + + TableDesc desc = CatalogUtil.newTableDesc(DEFAULT_DATABASE_NAME + "." + PARTITION_TABLE_NAME, schema, meta, + tablePath, partitionMethodDesc); + catalog.createTable(desc); + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME + "." + PARTITION_TABLE_NAME); + assertNotNull(tableDesc); + + Path path = new Path(tableDesc.getUri().toString() + "/key=part123"); + fs.mkdirs(path); + FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key=part456"); + fs.mkdirs(path); + FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key=part789"); + fs.mkdirs(path); + FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data")); + } + + private void createTableWithMultiplePartitionKeyColumns(FileSystem fs, + Schema schema, TableMeta meta) throws Exception { + Schema partSchema = SchemaBuilder.builder() + .add("key1", TajoDataTypes.Type.TEXT) + .add("key2", TajoDataTypes.Type.TEXT) + .add("key3", TajoDataTypes.Type.INT8) + .build(); + + PartitionMethodDesc partitionMethodDesc = + new PartitionMethodDesc("default", MULTIPLE_PARTITION_TABLE_NAME, + CatalogProtos.PartitionType.COLUMN, "key1,key2,key3", partSchema); + + Path tablePath = new Path(testDir, MULTIPLE_PARTITION_TABLE_NAME); + fs.mkdirs(tablePath); + + TableDesc desc = CatalogUtil.newTableDesc(DEFAULT_DATABASE_NAME + "." + MULTIPLE_PARTITION_TABLE_NAME, schema, + meta, tablePath, partitionMethodDesc); + catalog.createTable(desc); + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME + "." + MULTIPLE_PARTITION_TABLE_NAME); + assertNotNull(tableDesc); + + Path path = new Path(tableDesc.getUri().toString() + "/key1=part123"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=1"); + fs.mkdirs(path); + FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=2"); + fs.mkdirs(path); + FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key1=part789"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789/key3=3"); + fs.mkdirs(path); + FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data")); + } + + private void createTableIncludeArbitraryDirectories(FileSystem fs, + Schema schema, TableMeta meta) throws Exception { + Schema partSchema = SchemaBuilder.builder() + .add("year", TajoDataTypes.Type.TEXT) + .add("month", TajoDataTypes.Type.TEXT) + .add("day", TajoDataTypes.Type.TEXT) + .build(); + + PartitionMethodDesc partitionMethodDesc = + new PartitionMethodDesc(DEFAULT_DATABASE_NAME, ARBITRARY_PARTITION_TABLE_NAME, + PartitionType.COLUMN, "year,month,day", partSchema); + + Path tablePath = new Path(testDir, ARBITRARY_PARTITION_TABLE_NAME); + fs.mkdirs(tablePath); + + TableDesc desc = CatalogUtil.newTableDesc(DEFAULT_DATABASE_NAME + "." + ARBITRARY_PARTITION_TABLE_NAME, schema, + meta, tablePath, partitionMethodDesc); + catalog.createTable(desc); + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME + "." + PARTITION_TABLE_NAME); + assertNotNull(tableDesc); + + Path path = new Path(tableDesc.getUri().toString() + "/2016"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/2016/3"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/2016/3/1"); + fs.mkdirs(path); + FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data")); + + catalog.alterTable(getAlterTableDesc("year=2016/month=3/day=1", path)); + + ARBITRARY_PATH[0] = path; + + path = new Path(tableDesc.getUri().toString() + "/2016/3/2"); + fs.mkdirs(path); + FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data")); + + catalog.alterTable(getAlterTableDesc("year=2016/month=3/day=2", path)); + + ARBITRARY_PATH[1] = path; + + path = new Path(tableDesc.getUri().toString() + "/2015"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/2015/3"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/2015/3/1"); + fs.mkdirs(path); + FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data")); + + catalog.alterTable(getAlterTableDesc("year=2015/month=3/day=1", path)); + + ARBITRARY_PATH[2] = path; + + List partitions = catalog.getPartitionsOfTable(DEFAULT_DATABASE_NAME, + ARBITRARY_PARTITION_TABLE_NAME); + assertEquals(3L, partitions.size()); + } + + private AlterTableDesc getAlterTableDesc(String partitionName, Path path) throws IOException { + AlterTableDesc alterTableDesc = new AlterTableDesc(); + alterTableDesc.setTableName(DEFAULT_DATABASE_NAME + "." + ARBITRARY_PARTITION_TABLE_NAME); + alterTableDesc.setAlterTableType(AlterTableType.ADD_PARTITION); + + PartitionDesc partitionDesc = new PartitionDesc(); + partitionDesc.setPartitionName(partitionName); + + String[] partitionNames = partitionName.split("/"); + + List partitionKeyList = new ArrayList<>(); + for (String partition : partitionNames) { + String[] splits = partition.split("="); + String columnName = "", partitionValue = ""; + if (splits.length == 2) { + columnName = splits[0]; + partitionValue = splits[1]; + } else if (splits.length == 1) { + if (partition.charAt(0) == '=') { + partitionValue = splits[0]; + } else { + columnName = ""; + } + } + + CatalogProtos.PartitionKeyProto.Builder builder = CatalogProtos.PartitionKeyProto.newBuilder(); + builder.setColumnName(columnName); + builder.setPartitionValue(partitionValue); + partitionKeyList.add(builder.build()); + } + + partitionDesc.setPartitionKeys(partitionKeyList); + partitionDesc.setPath(path.toString()); + partitionDesc.setNumBytes(fs.getFileStatus(path).getLen()); + + alterTableDesc.setPartitionDesc(partitionDesc); + + return alterTableDesc; + } + + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + @Test + public void testFilterIncludePartitionKeyColumn() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part456' ORDER BY key"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Path[] filteredPaths = partitionPruningHandle.getPartitionPaths(); + assertEquals(1, filteredPaths.length); + assertTrue(filteredPaths[0].toString().endsWith("key=part456")); + + String[] partitionKeys = partitionPruningHandle.getPartitionKeys(); + assertEquals(1, partitionKeys.length); + assertEquals("key=part456", partitionKeys[0]); + + assertEquals(10L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testWithoutAnyFilters() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " ORDER BY key"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SCAN, sortNode.getChild().getType()); + ScanNode scanNode = sortNode.getChild(); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(3, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key=part123")); + assertTrue(partitionPathList.get(1).toString().endsWith("key=part456")); + assertTrue(partitionPathList.get(2).toString().endsWith("key=part789")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(3, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), "key=part123"); + assertEquals(partitionKeysList.get(1), "key=part456"); + assertEquals(partitionKeysList.get(2), "key=part789"); + + assertEquals(33L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeNonExistingPartitionValue() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part123456789'"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + assertEquals(0, partitionPruningHandle.getPartitionPaths().length); + assertEquals(0, partitionPruningHandle.getPartitionKeys().length); + + assertEquals(0L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeNonPartitionKeyColumn() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE n_nationkey = 1"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(3, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key=part123")); + assertTrue(partitionPathList.get(1).toString().endsWith("key=part456")); + assertTrue(partitionPathList.get(2).toString().endsWith("key=part789")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(3, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), "key=part123"); + assertEquals(partitionKeysList.get(1), "key=part456"); + assertEquals(partitionKeysList.get(2), "key=part789"); + + assertEquals(33L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeEveryPartitionKeyColumn() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part789' and key2 = 'supp789' and key3=3"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Path[] filteredPaths = partitionPruningHandle.getPartitionPaths(); + assertEquals(1, filteredPaths.length); + assertTrue(filteredPaths[0].toString().endsWith("key1=part789/key2=supp789/key3=3")); + + String[] partitionKeys = partitionPruningHandle.getPartitionKeys(); + assertEquals(1, partitionKeys.length); + assertEquals("key1=part789/key2=supp789/key3=3", partitionKeys[0]); + + assertEquals(10L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeSomeOfPartitionKeyColumns() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part123' and key2 = 'supp123' order by n_nationkey"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(2, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key1=part123/key2=supp123/key3=1")); + assertTrue(partitionPathList.get(1).toString().endsWith("key1=part123/key2=supp123/key3=2")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(2, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), ("key1=part123/key2=supp123/key3=1")); + assertEquals(partitionKeysList.get(1), ("key1=part123/key2=supp123/key3=2")); + + assertEquals(23L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeNonPartitionKeyColumns() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part123' and n_nationkey >= 2 order by n_nationkey"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(2, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key1=part123/key2=supp123/key3=1")); + assertTrue(partitionPathList.get(1).toString().endsWith("key1=part123/key2=supp123/key3=2")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(2, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), ("key1=part123/key2=supp123/key3=1")); + assertEquals(partitionKeysList.get(1), ("key1=part123/key2=supp123/key3=2")); + + assertEquals(23L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterWithArbitraryDirectories1() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + ARBITRARY_PARTITION_TABLE_NAME + + " WHERE year = '2016' ORDER BY year, month, day"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(2, partitionPathList.size()); + assertEquals(partitionPathList.get(0), ARBITRARY_PATH[0]); + assertEquals(partitionPathList.get(1), ARBITRARY_PATH[1]); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(2, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), "year=2016/month=3/day=1"); + assertEquals(partitionKeysList.get(1), "year=2016/month=3/day=2"); + + assertEquals(272L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterWithArbitraryDirectories2() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + ARBITRARY_PARTITION_TABLE_NAME + + " WHERE year = '2016' and month = '3' and day = '2' "); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Path[] filteredPaths = partitionPruningHandle.getPartitionPaths(); + assertEquals(1, filteredPaths.length); + assertEquals(ARBITRARY_PATH[1], filteredPaths[0]); + + String[] partitionKeys = partitionPruningHandle.getPartitionKeys(); + assertEquals(1, partitionKeys.length); + assertEquals("year=2016/month=3/day=2", partitionKeys[0]); + + assertEquals(136L, partitionPruningHandle.getTotalVolume()); + } + +} \ No newline at end of file diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java index 30490f4a70..b0b3f10017 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java @@ -88,7 +88,7 @@ public final void testAlterTableSetProperty() throws Exception { cleanupQuery(after_res); } - // TODO: This should be added at TAJO-1891 + @Test public final void testAlterTableAddPartition() throws Exception { executeDDL("create_partitioned_table.sql", null); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 103a0f6525..d3f6ecc256 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -1927,4 +1927,124 @@ public final void testPartitionWithInOperator() throws Exception { res.close(); } + @Test + public void testArbitraryPartitionPath() throws Exception { + FileSystem fs = FileSystem.get(conf); + Path path = null; + ContentSummary contentSummary = null; + + PartitionDescProto partitionDescProto = null; + ResultSet res = null; + String result = null, expectedResult = null; + + String tableName = IdentifierUtil.normalizeIdentifier("testAbnormalDirectories"); + if (nodeType == NodeType.INSERT) { + executeString( + "create table " + tableName + " (col1 int4, col2 float8) partition by column(key int4) ").close(); + executeString( + "insert overwrite into " + tableName + " select l_orderkey, l_quantity, l_partkey " + + " from lineitem where l_partkey > 2").close(); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 float8) partition by column(key int4) " + + " as select l_orderkey, l_quantity, l_partkey from lineitem where l_partkey > 2").close(); + } + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + + // Check the number of rows + res = executeString("SELECT count(*) as cnt FROM " + tableName); + result = resultSetToString(res); + expectedResult = "cnt\n" + + "-------------------------------\n" + + "1\n"; + res.close(); + assertEquals(expectedResult, result); + + // Make arbitrary partition path + path = new Path(tableDesc.getUri().toString() + "/part12345"); + + executeString("INSERT INTO LOCATION '" + path + "'" + + " select l_orderkey, l_quantity, l_partkey from lineitem where l_partkey = 1"); + + executeString("alter table " + tableName + " add partition (key = 1)" + + " location '" + path.toString() + "'").close(); + + partitionDescProto = catalog.getPartition(getCurrentDatabase(), tableName, "key=1"); + contentSummary = fs.getContentSummary(path); + assertEquals(contentSummary.getLength(), partitionDescProto.getNumBytes()); + assertEquals(path.toString(), partitionDescProto.getPath()); + + path = new Path(tableDesc.getUri().toString() + "/part6789"); + + executeString("INSERT INTO LOCATION '" + path + "'" + + " select l_orderkey, l_quantity, l_partkey from lineitem where l_partkey = 2"); + + executeString("alter table " + tableName + " add partition (key = 2)" + + " location '" + path.toString() + "'").close(); + + partitionDescProto = catalog.getPartition(getCurrentDatabase(), tableName, "key=2"); + contentSummary = fs.getContentSummary(path); + assertEquals(contentSummary.getLength(), partitionDescProto.getNumBytes()); + assertEquals(path.toString(), partitionDescProto.getPath()); + + // Check the number of rows + res = executeString("SELECT count(*) as cnt FROM " + tableName); + result = resultSetToString(res); + expectedResult = "cnt\n" + + "-------------------------------\n" + + "5\n"; + res.close(); + assertEquals(expectedResult, result); + + // Equal operator + res = executeString("SELECT * FROM " + tableName + " where key = 1 order by col2"); + result = resultSetToString(res); + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,17.0,1\n" + + "1,36.0,1\n" ; + res.close(); + assertEquals(expectedResult, result); + + // Less than equals + res = executeString("SELECT * FROM " + tableName + " where key <= 2 order by key, col2"); + result = resultSetToString(res); + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,17.0,1\n" + + "1,36.0,1\n" + + "2,38.0,2\n" + + "3,45.0,2\n" ; + res.close(); + assertEquals(expectedResult, result); + + // In + res = executeString("SELECT * FROM " + tableName + " where key in (1, 3) order by key, col2"); + result = resultSetToString(res); + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,17.0,1\n" + + "1,36.0,1\n" + + "3,49.0,3\n" ; + res.close(); + assertEquals(expectedResult, result); + + // Join + res = executeString( + " select col1,col2,key, p_partkey, p_name from \"" + getCurrentDatabase() + "\"." + tableName + + ", part where key = " + "p_partkey and col1 >= 2 order by key, col2"); + + result = resultSetToString(res); + expectedResult = "col1,col2,key,p_partkey,p_name\n" + + "-------------------------------\n" + + "2,38.0,2,2,blush thistle blue yellow saddle\n" + + "3,45.0,2,2,blush thistle blue yellow saddle\n" + + "3,49.0,3,3,spring green yellow purple cornsilk\n" ; + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + } + } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java index dff63c427b..39744f6a26 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java @@ -183,4 +183,28 @@ public void testBuildTupleFromPartitionPath() { assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); } + + @Test + public void testBuildTupleFromPartitionName() { + Schema schema = SchemaBuilder.builder() + .add("key1", Type.INT8) + .add("key2", Type.TEXT) + .build(); + + Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionKeys(schema, "key1=123"); + assertNotNull(tuple); + assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); + assertEquals(DatumFactory.createNullDatum(), tuple.asDatum(1)); + + tuple = PartitionedTableRewriter.buildTupleFromPartitionKeys(schema, "key1=123/key2=abc"); + assertNotNull(tuple); + assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); + assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); + + tuple = PartitionedTableRewriter.buildTupleFromPartitionKeys(schema, "key2=abc"); + assertNotNull(tuple); + assertEquals(DatumFactory.createNullDatum(), tuple.asDatum(0)); + assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); + + } } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java b/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java index e2968a8664..3a577e824f 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java @@ -75,12 +75,7 @@ public Collection getResourceFiles(String subdir) throws URISyntaxExceptio new Predicate() { @Override public boolean apply(@Nullable FileStatus input) { - // TODO: This should be removed at TAJO-1891 - if (input.getPath().getName().indexOf("add_partition") > -1) { - return false; - } else { - return input.isFile(); - } + return input.isFile(); } } ); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestPartitionFileFragment.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestPartitionFileFragment.java new file mode 100644 index 0000000000..999817741f --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestPartitionFileFragment.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.storage.fragment.PartitionFileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.SortedSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestPartitionFileFragment { + private Path path; + + @Before + public final void setUp() throws Exception { + path = CommonTestingUtil.getTestDir(); + } + + @Test + public final void testGetAndSetFields() { + PartitionFileFragment fragment1 = new PartitionFileFragment("table1_1", new Path(path, "table0/col1=1"), + 0, 500, "col1=1"); + + assertEquals("table1_1", fragment1.getTableName()); + assertEquals(new Path(path, "table0/col1=1"), fragment1.getPath()); + assertEquals("col1=1", fragment1.getPartitionKeys()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getLength()); + } + + @Test + public final void testGetProtoAndRestore() { + PartitionFileFragment fragment = new PartitionFileFragment("table1_1", new Path(path, "table0/col1=1"), 0, + 500, "col1=1"); + + PartitionFileFragment fragment1 = FragmentConvertor.convert(PartitionFileFragment.class, fragment.getProto()); + assertEquals("table1_1", fragment1.getTableName()); + assertEquals(new Path(path, "table0/col1=1"), fragment1.getPath()); + assertEquals("col1=1", fragment1.getPartitionKeys()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getLength()); + } + + @Test + public final void testCompareTo() { + final int num = 10; + PartitionFileFragment[] tablets = new PartitionFileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new PartitionFileFragment("tablet0", new Path(path, "tablet0/col1=" + i), i * 500, (i+1) * 500 + , "col1=" + i); + } + + Arrays.sort(tablets); + + for (int i = 0; i < num; i++) { + assertEquals("col1=" + (num - i - 1), tablets[i].getPartitionKeys()); + } + } + + @Test + public final void testCompareTo2() { + final int num = 1860; + PartitionFileFragment[] tablets = new PartitionFileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new PartitionFileFragment("tablet1", new Path(path, "tablet/col1=" +i), (long)i * 6553500, + (long) (i+1) * 6553500, "col1=" + i); + } + + SortedSet sortedSet = Sets.newTreeSet(); + for (PartitionFileFragment frag : tablets) { + sortedSet.add(frag); + } + assertEquals(num, sortedSet.size()); + } +} diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan index 2f2ca890fb..7456b5238a 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -82,8 +82,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan index f1fa414673..46e0b4b7ee 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -100,8 +100,8 @@ Block Id: eb_0000000000000_0000_000004 [LEAF] [q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.c.c_custkey (INT4), num=32) PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan index 2f2ca890fb..7456b5238a 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -82,8 +82,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan index f1fa414673..46e0b4b7ee 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -100,8 +100,8 @@ Block Id: eb_0000000000000_0000_000004 [LEAF] [q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.c.c_custkey (INT4), num=32) PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.err b/tajo-core-tests/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.err new file mode 100644 index 0000000000..fbd0c7ec54 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.err @@ -0,0 +1,2 @@ +ERROR: 'key2' column is not a partition key +ERROR: partition 'key=0.1' does not exist \ No newline at end of file diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result index fdb620d724..b729091011 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result @@ -1,8 +1,6 @@ OK -ERROR: 'key2' column is not a partition key OK OK -ERROR: partition 'key=0.1' does not exist OK OK OK diff --git a/tajo-core-tests/src/test/resources/results/TestTajoDump/testPartitionsDump.result b/tajo-core-tests/src/test/resources/results/TestTajoDump/testPartitionsDump.result index 23b42aa546..69d85f0792 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoDump/testPartitionsDump.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoDump/testPartitionsDump.result @@ -17,7 +17,8 @@ CREATE TABLE "TestTajoDump"."TableName3" (col1 INT4, col2 INT4) USING TEXT WITH -- -- Table Partitions: TableName3 -- -ALTER TABLE "TestTajoDump"."TableName3" REPAIR PARTITION; +ALTER TABLE "TestTajoDump"."TableName3" ADD IF NOT EXISTS PARTITION (col3=1,col4=2) LOCATION '${partition.path1}/col3=1/col4=2'; + @@ -29,4 +30,4 @@ CREATE TABLE "TestTajoDump"."TableName4" (col1 INT4, col2 INT4) USING TEXT WITH -- -- Table Partitions: TableName4 -- -ALTER TABLE "TestTajoDump"."TableName4" REPAIR PARTITION; \ No newline at end of file +ALTER TABLE "TestTajoDump"."TableName4" ADD IF NOT EXISTS PARTITION (col3='tajo',col4='2015-09-01') LOCATION '${partition.path2}/col3=tajo/col4=2015-09-01'; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 87a6e74ba9..eb1b0cd184 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -24,7 +24,6 @@ import com.google.common.collect.ObjectArrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SortSpec; @@ -44,13 +43,10 @@ import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; -import org.apache.tajo.plan.serder.PlanProto.SortEnforce; import org.apache.tajo.plan.serder.PlanProto.SortedInputEnforce; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.FileTablespace; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.TablespaceManager; -import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.unit.StorageUnit; @@ -61,7 +57,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Stack; @@ -896,12 +891,12 @@ private boolean checkIfSortEquivalance(TaskAttemptContext ctx, ScanNode scanNode public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack node) throws IOException { + FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); // check if an input is sorted in the same order to the subsequence sort operator. if (checkIfSortEquivalance(ctx, scanNode, node)) { - if (ctx.getTable(scanNode.getCanonicalName()) == null) { + if (fragments == null) { return new SeqScanExec(ctx, scanNode, null); } - FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); return new ExternalSortExec(ctx, (SortNode) node.peek(), scanNode, fragments); } else { Enforcer enforcer = ctx.getEnforcer(); @@ -915,31 +910,15 @@ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, St } } - if (scanNode instanceof PartitionedTableScanNode - && ((PartitionedTableScanNode)scanNode).getInputPaths() != null && - ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) { - - if (broadcastFlag) { - PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode; - List fileFragments = new ArrayList<>(); - - FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri()); - for (Path path : partitionedTableScanNode.getInputPaths()) { - fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path))); - } - - FragmentProto[] fragments = - FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])); - - ctx.addFragments(scanNode.getCanonicalName(), fragments); - return new PartitionMergeScanExec(ctx, scanNode, fragments); - } + if (scanNode.getTableDesc().hasPartition() && broadcastFlag && fragments != null) { + ctx.addFragments(scanNode.getCanonicalName(), fragments); + return new PartitionMergeScanExec(ctx, scanNode, fragments); } - if (ctx.getTable(scanNode.getCanonicalName()) == null) { + if (fragments == null) { return new SeqScanExec(ctx, scanNode, null); } - FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); + return new SeqScanExec(ctx, scanNode, fragments); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index d390740187..3fc03645c0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -317,6 +317,7 @@ private long estimateOutputVolumeInternal(LogicalNode node) throws TajoInternalE if (node instanceof RelationNode) { switch (node.getType()) { case INDEX_SCAN: + case PARTITIONS_SCAN: case SCAN: ScanNode scanNode = (ScanNode) node; if (scanNode.getTableDesc().getStats() == null) { @@ -326,20 +327,6 @@ private long estimateOutputVolumeInternal(LogicalNode node) throws TajoInternalE } else { return scanNode.getTableDesc().getStats().getNumBytes(); } - case PARTITIONS_SCAN: - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node; - if (pScanNode.getTableDesc().getStats() == null) { - // TODO - this case means that data is not located in HDFS. So, we need additional - // broadcast method. - return Long.MAX_VALUE; - } else { - // if there is no selected partition - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - return 0; - } else { - return pScanNode.getTableDesc().getStats().getNumBytes(); - } - } case TABLE_SUBQUERY: return estimateOutputVolumeInternal(((TableSubQueryNode) node).getSubQuery()); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java index b13cb0f1a8..6513ac743b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java @@ -97,13 +97,6 @@ public static void replaceChild(LogicalNode newChild, ScanNode originalChild, Lo public static long getTableVolume(ScanNode scanNode) { if (scanNode.getTableDesc().hasStats()) { long scanBytes = scanNode.getTableDesc().getStats().getNumBytes(); - if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode; - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - scanBytes = 0L; - } - } - return scanBytes; } else { return -1; @@ -117,6 +110,7 @@ public static long computeDescendentVolume(LogicalNode node) { if (node instanceof RelationNode) { switch (node.getType()) { + case PARTITIONS_SCAN: case SCAN: ScanNode scanNode = (ScanNode) node; if (scanNode.getTableDesc().getStats() == null) { @@ -126,20 +120,6 @@ public static long computeDescendentVolume(LogicalNode node) { } else { return scanNode.getTableDesc().getStats().getNumBytes(); } - case PARTITIONS_SCAN: - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node; - if (pScanNode.getTableDesc().getStats() == null) { - // TODO - this case means that data is not located in HDFS. So, we need additional - // broadcast method. - return Long.MAX_VALUE; - } else { - // if there is no selected partition - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - return 0; - } else { - return pScanNode.getTableDesc().getStats().getNumBytes(); - } - } case TABLE_SUBQUERY: return computeDescendentVolume(((TableSubQueryNode) node).getSubQuery()); default: diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index dc48f3fb0f..f682595d20 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -39,6 +39,7 @@ import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.storage.fragment.PartitionFileFragment; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -97,11 +98,12 @@ private void rewriteColumnPartitionedTableSchema() throws IOException { Tuple partitionRow = null; if (fragments != null && fragments.length > 0) { - List fileFragments = FragmentConvertor.convert(FileFragment.class, fragments); + List partitionFileFragments = FragmentConvertor.convert(PartitionFileFragment + .class, fragments); - // Get a partition key value from a given path - partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath( - columnPartitionSchema, fileFragments.get(0).getPath(), false); + // Get tuple from first partition fragment using parition keys + partitionRow = PartitionedTableRewriter.buildTupleFromPartitionKeys(columnPartitionSchema, + partitionFileFragments.get(0).getPartitionKeys()); } // Targets or search conditions may contain column references. diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index d423c041f1..de8f98f4e9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -568,7 +568,8 @@ public GetQueryResultDataResponse getQueryResultData(RpcController controller, G queryId, scanNode, Integer.MAX_VALUE, - codecType); + codecType, + context.getCatalog()); queryResultScanner.init(); session.addNonForwardQueryResultScanner(queryResultScanner); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java index c16e95bae1..aaebe11800 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java @@ -120,9 +120,6 @@ public LogicalNode visitPartitionedTableScan(PlanShapeFixerContext context, Logi throws TajoException { super.visitPartitionedTableScan(context, plan, block, node, stack); context.childNumbers.push(1); - Path[] inputPaths = node.getInputPaths(); - Arrays.sort(inputPaths); - node.setInputPaths(inputPaths); if (node.hasTargets()) { node.setTargets(sortTargets(node.getTargets())); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index a1728ec2ca..d231e48648 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -24,10 +24,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.TaskId; +import org.apache.tajo.*; +import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableDesc; @@ -80,10 +78,12 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc final private Optional codecType; private MemoryRowBlock rowBlock; private Future nextFetch; + private CatalogService catalog; public NonForwardQueryResultFileScanner(AsyncTaskService asyncTaskService, TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, - int maxRow, Optional codecType) throws IOException { + int maxRow, Optional codecType, + CatalogService catalog) throws IOException { this.asyncTaskService = asyncTaskService; this.tajoConf = tajoConf; this.sessionId = sessionId; @@ -93,6 +93,7 @@ public NonForwardQueryResultFileScanner(AsyncTaskService asyncTaskService, this.maxRow = maxRow; this.rowEncoder = RowStoreUtil.createEncoder(scanNode.getOutSchema()); this.codecType = codecType; + this.catalog = catalog; } public void init() throws IOException, TajoException { @@ -105,7 +106,8 @@ private void initSeqScanExec() throws IOException, TajoException { List fragments = Lists.newArrayList(); if (tableDesc.hasPartition()) { FileTablespace fileTablespace = TUtil.checkTypeAndGet(tablespace, FileTablespace.class); - fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc)); + fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc + , catalog, tajoConf)); } else { fragments.addAll(tablespace.getSplits(tableDesc.getName(), tableDesc, scanNode.getQual())); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 58f4ec013f..0186d16785 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -311,7 +311,8 @@ public void execSimpleQuery(QueryContext queryContext, Session session, String q queryInfo.getQueryId(), scanNode, maxRow, - Optional.empty()); + Optional.empty(), + context.getCatalog()); queryResultScanner.init(); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index ba051a3115..1a0a03db52 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -32,6 +32,7 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.PhysicalPlannerImpl; import org.apache.tajo.engine.planner.RangePartitionAlgorithm; @@ -42,11 +43,11 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanRewriteUtil; import org.apache.tajo.engine.utils.TupleUtil; -import org.apache.tajo.exception.TajoException; -import org.apache.tajo.exception.TajoInternalError; -import org.apache.tajo.exception.UndefinedTableException; +import org.apache.tajo.exception.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.logical.SortNode.SortPurpose; +import org.apache.tajo.plan.partition.PartitionPruningHandle; +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; @@ -383,29 +384,21 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster if (broadcastFragments != null) { //In this phase a ScanNode has a single fragment. //If there are more than one data files, that files should be added to fragments or partition path - for (ScanNode eachScan: broadcastScans) { - - Path[] partitionScanPaths = null; TableDesc tableDesc = masterContext.getTableDesc(eachScan); Tablespace space = TablespaceManager.get(tableDesc.getUri()); + Collection scanFragments = null; if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { - - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan; - partitionScanPaths = partitionScan.getInputPaths(); - // set null to inputPaths in getFragmentsFromPartitionedTable() - getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc); - partitionScan.setInputPaths(partitionScanPaths); - + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); + scanFragments = getFragmentsFromPartitionedTable(space, eachScan, tableDesc, catalog, conf); } else { + scanFragments = space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan.getQual()); + } - Collection scanFragments = - space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan.getQual()); - if (scanFragments != null) { - rightFragments.addAll(scanFragments); - } - + if (scanFragments != null) { + rightFragments.addAll(scanFragments); } } } @@ -472,17 +465,22 @@ public static Map>> merge * It creates a number of fragments for all partitions. */ public static List getFragmentsFromPartitionedTable(Tablespace tsHandler, - ScanNode scan, - TableDesc table) throws IOException { + ScanNode scan, TableDesc table, CatalogService catalog, TajoConf conf) throws IOException, TajoException { Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace"); if (!(scan instanceof PartitionedTableScanNode)) { throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type."); } List fragments = Lists.newArrayList(); PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan; - fragments.addAll(((FileTablespace) tsHandler).getSplits( - scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths())); - partitionsScan.setInputPaths(null); + partitionsScan.init(scan); + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + PartitionPruningHandle pruningHandle = rewriter.getPartitionPruningHandle(conf, partitionsScan); + + FileTablespace tablespace = (FileTablespace) tsHandler; + fragments.addAll(tablespace.getPartitionSplits(scan.getCanonicalName(), table.getMeta(), table.getSchema() + , pruningHandle.getPartitionKeys(), pruningHandle.getPartitionPaths())); + return fragments; } @@ -504,9 +502,9 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch // Broadcast table // all fragments or paths assigned every Large table's scan task. // -> PARTITIONS_SCAN - // . add all partition paths to node's inputPaths variable + // . add all PartitionFileFragments to broadcastFragments // -> SCAN - // . add all fragments to broadcastFragments + // . add all FileFragments to broadcastFragments Collection baseFragments = null; List broadcastFragments = new ArrayList<>(); for (int i = 0; i < scans.length; i++) { @@ -514,16 +512,12 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch TableDesc desc = stage.getContext().getTableDesc(scan); Collection scanFragments; - Path[] partitionScanPaths = null; - Tablespace space = TablespaceManager.get(desc.getUri()); - if (scan.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scan; - partitionScanPaths = partitionScan.getInputPaths(); - // set null to inputPaths in getFragmentsFromPartitionedTable() - scanFragments = getFragmentsFromPartitionedTable(space, scan, desc); + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); + scanFragments = getFragmentsFromPartitionedTable(space, scan, desc, catalog, conf); } else { scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan.getQual()); } @@ -532,13 +526,7 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch if (i == baseScanId) { baseFragments = scanFragments; } else { - if (scan.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan; - // PhisicalPlanner make PartitionMergeScanExec when table is boradcast table and inputpaths is not empty - partitionScan.setInputPaths(partitionScanPaths); - } else { - broadcastFragments.addAll(scanFragments); - } + broadcastFragments.addAll(scanFragments); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index fed75cd7fe..5cbb203b02 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -28,10 +28,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; import org.apache.tajo.*; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; @@ -52,7 +49,6 @@ import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.plan.logical.*; -import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; @@ -1195,8 +1191,12 @@ private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOExceptio // // Also, we can ensure FileTableSpace if the type of ScanNode is PARTITIONS_SCAN. if (scan.getType() == NodeType.PARTITIONS_SCAN) { + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); + // After calling this method, partition paths are removed from the physical plan. - fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table); + fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table, catalog, + conf); } else { fragments = tablespace.getSplits(scan.getCanonicalName(), table, scan.getQual()); } diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java index 7d5c78ac7b..abbfd33cde 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java @@ -144,7 +144,8 @@ private static NonForwardQueryResultScanner getNonForwardQueryResultScanner( queryId, scanNode, Integer.MAX_VALUE, - Optional.empty()); + Optional.empty(), + masterContext.getCatalog()); resultScanner.init(); session.addNonForwardQueryResultScanner(resultScanner); } diff --git a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst index 47e74922b9..dc3aeb2141 100644 --- a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst +++ b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst @@ -65,6 +65,19 @@ SET PROPERTY This statement will allow users to change a table property. +======================== +ADD PARTITION +======================== + +*Synopsis* + +.. code-block:: sql + + CREATE INDEX [ name ] ON table_name [ USING method ] + ( { column_name | ( expression ) } [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] ) + [ WHERE predicate ] + + ======================== DROP PARTITION ======================== diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java index c202642db9..c2ed3d34d7 100644 --- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java +++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java @@ -593,7 +593,7 @@ public void testSortWithDateTime() throws Exception { } } - // TODO: This should be added at TAJO-1891 + @Test public void testAlterTableAddPartition() throws Exception { Statement stmt = null; ResultSet resultSet = null; diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java index 617688228f..a03b169241 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java @@ -18,125 +18,21 @@ package org.apache.tajo.plan.logical; -import com.google.common.base.Objects; -import com.google.gson.annotations.Expose; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.plan.PlanString; -import org.apache.tajo.plan.Target; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.util.TUtil; - -import java.util.ArrayList; - public class PartitionedTableScanNode extends ScanNode { - @Expose Path [] inputPaths; - public PartitionedTableScanNode(int pid) { super(pid, NodeType.PARTITIONS_SCAN); } - public void init(ScanNode scanNode, Path[] inputPaths) { + public void init(ScanNode scanNode) { tableDesc = scanNode.tableDesc; setInSchema(scanNode.getInSchema()); setOutSchema(scanNode.getOutSchema()); this.qual = scanNode.qual; this.targets = scanNode.targets; - this.inputPaths = inputPaths; if (scanNode.hasAlias()) { alias = scanNode.alias; } } - public void setInputPaths(Path [] paths) { - this.inputPaths = paths; - } - - public Path [] getInputPaths() { - return inputPaths; - } - - @Override - public int hashCode() { - return Objects.hashCode(this.tableDesc, this.qual, this.targets); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PartitionedTableScanNode) { - PartitionedTableScanNode other = (PartitionedTableScanNode) obj; - - boolean eq = super.equals(other); - eq = eq && TUtil.checkEquals(this.tableDesc, other.tableDesc); - eq = eq && TUtil.checkEquals(this.qual, other.qual); - eq = eq && TUtil.checkEquals(this.targets, other.targets); - eq = eq && TUtil.checkEquals(this.inputPaths, other.inputPaths); - - return eq; - } - - return false; - } - - @Override - public Object clone() throws CloneNotSupportedException { - PartitionedTableScanNode unionScan = (PartitionedTableScanNode) super.clone(); - - unionScan.tableDesc = (TableDesc) this.tableDesc.clone(); - - if (hasQual()) { - unionScan.qual = (EvalNode) this.qual.clone(); - } - - if (hasTargets()) { - unionScan.targets = new ArrayList<>(); - for (Target t : targets) { - unionScan.targets.add((Target) t.clone()); - } - } - - unionScan.inputPaths = inputPaths; - - return unionScan; - } - - @Override - public void preOrder(LogicalNodeVisitor visitor) { - visitor.visit(this); - } - - public void postOrder(LogicalNodeVisitor visitor) { - visitor.visit(this); - } - - @Override - public PlanString getPlanString() { - PlanString planStr = new PlanString(this).appendTitle(" on " + getTableName()); - if (hasAlias()) { - planStr.appendTitle(" as ").appendTitle(alias); - } - - if (hasQual()) { - planStr.addExplan("filter: ").appendExplain(this.qual.toString()); - } - - if (hasTargets()) { - planStr.addExplan("target list: ").appendExplain(StringUtils.join(targets, ", ")); - } - - planStr.addDetail("out schema: ").appendDetail(getOutSchema().toString()); - planStr.addDetail("in schema: ").appendDetail(getInSchema().toString()); - - if (inputPaths != null) { - planStr.addExplan("num of filtered paths: ").appendExplain(""+ inputPaths.length); - int i = 0; - for (Path path : inputPaths) { - planStr.addDetail((i++) + ": ").appendDetail(path.toString()); - } - } - - return planStr; - } } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/partition/PartitionPruningHandle.java b/tajo-plan/src/main/java/org/apache/tajo/plan/partition/PartitionPruningHandle.java new file mode 100644 index 0000000000..9271786efa --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/partition/PartitionPruningHandle.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.partition; + +import org.apache.hadoop.fs.Path; + +/** + * This includes result informs of partition pruning. + * + */ +public class PartitionPruningHandle { + private Path[] partitionPaths; + private String[] partitionKeys; + private long totalVolume; + + public PartitionPruningHandle(Path[] partitionPaths, String[] partitionKeys, long totalVolume) { + this.partitionPaths = partitionPaths; + this.partitionKeys = partitionKeys; + this.totalVolume = totalVolume; + } + + public Path[] getPartitionPaths() { + return partitionPaths; + } + + public String[] getPartitionKeys() { + return partitionKeys; + } + + public long getTotalVolume() { + return totalVolume; + } +} \ No newline at end of file diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 54b4844875..246d77d247 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -18,26 +18,27 @@ package org.apache.tajo.plan.rewrite.rules; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.*; import org.apache.tajo.OverridableConf; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; -import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionsByAlgebraProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.exception.*; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.partition.PartitionPruningHandle; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.util.EvalNodeToExprConverter; @@ -56,7 +57,6 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { private CatalogService catalog; - private long totalVolume; private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class); @@ -92,6 +92,10 @@ public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoExc return plan; } + public void setCatalog(CatalogService catalog) { + this.catalog = catalog; + } + private static class PartitionPathFilter implements PathFilter { private Schema schema; @@ -124,11 +128,11 @@ public String toString() { } } - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + private PartitionPruningHandle getPartitionPruningHandle(TajoConf conf, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { - return findFilteredPaths(queryContext, tableName, partitionColumns, conjunctiveForms, tablePath, null); + return getPartitionPruningHandle(conf, tableName, partitionColumns, conjunctiveForms, tablePath, null); } /** @@ -141,13 +145,13 @@ public String toString() { * @return * @throws IOException */ - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + private PartitionPruningHandle getPartitionPruningHandle(TajoConf conf, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath, ScanNode scanNode) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { - Path [] filteredPaths = null; - FileSystem fs = tablePath.getFileSystem(queryContext.getConf()); + PartitionPruningHandle partitionPruningHandle = null; + FileSystem fs = tablePath.getFileSystem(conf); String [] splits = IdentifierUtil.splitFQTableName(tableName); List partitions = null; @@ -155,17 +159,19 @@ public String toString() { if (conjunctiveForms == null) { partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + partitionPruningHandle = getPartitionPruningHandleByFileSystem(partitionColumns, conjunctiveForms, fs, + tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + partitionPruningHandle = getPartitionPruningHandleByCatalog(partitions); } } else { if (catalog.existPartitions(splits[0], splits[1])) { PartitionsByAlgebraProto request = getPartitionsAlgebraProto(splits[0], splits[1], conjunctiveForms); partitions = catalog.getPartitionsByAlgebra(request); - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + partitionPruningHandle = getPartitionPruningHandleByCatalog(partitions); } else { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + partitionPruningHandle = getPartitionPruningHandleByFileSystem(partitionColumns, conjunctiveForms, fs, + tablePath); } } } catch (UnsupportedException ue) { @@ -174,31 +180,36 @@ public String toString() { LOG.warn(ue.getMessage()); partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + partitionPruningHandle = getPartitionPruningHandleByFileSystem(partitionColumns, conjunctiveForms, fs, + tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + partitionPruningHandle = getPartitionPruningHandleByCatalog(partitions); } scanNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(conjunctiveForms)); } + LOG.info("Filtered directory or files: " + partitionPruningHandle.getPartitionPaths().length); + LOG.info("Filtered partition keys: " + partitionPruningHandle.getPartitionKeys().length); - LOG.info("Filtered directory or files: " + filteredPaths.length); - return filteredPaths; + return partitionPruningHandle; } /** - * Build list of partition path by PartitionDescProto which is generated from CatalogStore. + * Build list of partition path and partition keys by PartitionDescProto which is generated from CatalogStore. * * @param partitions * @return */ - private Path[] findFilteredPathsByPartitionDesc(List partitions) { - Path [] filteredPaths = new Path[partitions.size()]; + private PartitionPruningHandle getPartitionPruningHandleByCatalog(List partitions) { + long totalVolume = 0L; + Path[] filteredPaths = new Path[partitions.size()]; + String[] partitionKeys = new String[partitions.size()]; for (int i = 0; i < partitions.size(); i++) { - PartitionDescProto partition = partitions.get(i); + CatalogProtos.PartitionDescProto partition = partitions.get(i); filteredPaths[i] = new Path(partition.getPath()); + partitionKeys[i] = partition.getPartitionName(); totalVolume += partition.getNumBytes(); } - return filteredPaths; + return new PartitionPruningHandle(filteredPaths, partitionKeys, totalVolume); } /** @@ -212,10 +223,14 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti * @return * @throws IOException */ - private Path [] findFilteredPathsFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, - FileSystem fs, Path tablePath) throws IOException{ + private PartitionPruningHandle getPartitionPruningHandleByFileSystem(Schema partitionColumns, + EvalNode [] conjunctiveForms, FileSystem fs, Path tablePath) throws IOException{ Path [] filteredPaths = null; PathFilter [] filters; + int startIdx; + long totalVolume = 0L; + ContentSummary summary = null; + String[] partitionKeys = null; if (conjunctiveForms == null) { filters = buildAllAcceptingPathFilters(partitionColumns); @@ -230,7 +245,18 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti // Get all file status matched to a ith level path filter. filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i])); } - return filteredPaths; + + // Get partition keys and volume from the list of partition directories + partitionKeys = new String[filteredPaths.length]; + for (int i = 0; i < partitionKeys.length; i++) { + Path path = filteredPaths[i]; + startIdx = path.toString().indexOf(getColumnPartitionPathPrefix(partitionColumns)); + partitionKeys[i] = path.toString().substring(startIdx); + summary = fs.getContentSummary(path); + totalVolume += summary.getLength(); + } + + return new PartitionPruningHandle(filteredPaths, partitionKeys, totalVolume); } /** @@ -330,14 +356,16 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( for (int i = 0; i < fileStatuses.length; i++) { FileStatus fileStatus = fileStatuses[i]; paths[i] = fileStatus.getPath(); - totalVolume += fileStatus.getLen(); } return paths; } - public Path [] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException, - UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + public PartitionPruningHandle getPartitionPruningHandle(TajoConf conf, ScanNode scanNode) + throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { + long startTime = System.currentTimeMillis(); + PartitionPruningHandle pruningHandle = null; + TableDesc table = scanNode.getTableDesc(); PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod(); @@ -375,11 +403,18 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( } if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, + pruningHandle = getPartitionPruningHandle(conf, table.getName(), paritionValuesSchema, indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getUri()), scanNode); } else { // otherwise, we will get all partition paths. - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, null, new Path(table.getUri())); + pruningHandle = getPartitionPruningHandle(conf, table.getName(), paritionValuesSchema, null, + new Path(table.getUri())); } + + long finishTime = System.currentTimeMillis(); + long elapsedMills = finishTime - startTime; + + LOG.info(String.format("Partition pruning: %d ms elapsed.", elapsedMills)); + return pruningHandle; } private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) { @@ -496,6 +531,49 @@ public static String getColumnPartitionPathPrefix(Schema partitionColumn) { return sb.toString(); } + /** + * This transforms a partition name into a tupe with a given partition column schema. When a file path + * Assume that an user gives partition name 'country=KOREA/city=SEOUL'. + * + * The first datum of tuple : KOREA + * The second datum of tuple : SEOUL + * + * @param partitionColumnSchema The partition column schema + * @param partitionKeys The keys of partition + * @return The tuple transformed from a column values part. + */ + public static Tuple buildTupleFromPartitionKeys(Schema partitionColumnSchema, String partitionKeys) { + Preconditions.checkNotNull(partitionColumnSchema); + Preconditions.checkNotNull(partitionKeys); + + String [] columnValues = partitionKeys.split("/"); + Preconditions.checkArgument(partitionColumnSchema.size() >= columnValues.length, + "Invalid Partition Keys :" + partitionKeys); + + Tuple tuple = new VTuple(partitionColumnSchema.size()); + + for (int i = 0; i < tuple.size(); i++) { + tuple.put(i, NullDatum.get()); + } + + for (int i = 0; i < columnValues.length; i++) { + String [] parts = columnValues[i].split("="); + if (parts.length == 2) { + int columnId = partitionColumnSchema.getColumnIdByName(parts[0]); + Column keyColumn = partitionColumnSchema.getColumn(columnId); + + if (parts[1].equals(StorageConstants.DEFAULT_PARTITION_NAME)){ + tuple.put(columnId, DatumFactory.createNullDatum()); + } else { + tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), + StringUtils.unescapePathName(parts[1]))); + } + } + } + + return tuple; + } + private final class Rewriter extends BasicLogicalPlanVisitor { @Override public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalPlan.QueryBlock block, @@ -506,23 +584,17 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP return null; } - try { - Path [] filteredPaths = findFilteredPartitionPaths(queryContext, scanNode); - plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions"); - PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); - rewrittenScanNode.init(scanNode, filteredPaths); - rewrittenScanNode.getTableDesc().getStats().setNumBytes(totalVolume); + PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); + rewrittenScanNode.init(scanNode); - // if it is topmost node, set it as the rootnode of this block. - if (stack.empty() || block.getRoot().equals(scanNode)) { - block.setRoot(rewrittenScanNode); - } else { - PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode); - } - block.registerNode(rewrittenScanNode); - } catch (IOException e) { - throw new TajoInternalError("Partitioned Table Rewrite Failed: \n" + e.getMessage()); + // if it is topmost node, set it as the rootnode of this block. + if (stack.empty() || block.getRoot().equals(scanNode)) { + block.setRoot(rewrittenScanNode); + } else { + PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode); } + + block.registerNode(rewrittenScanNode); return null; } } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index 4b47e4ac87..ff18a16fbb 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -20,7 +20,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.hadoop.fs.Path; import org.apache.tajo.OverridableConf; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.annotation.Nullable; @@ -116,7 +115,7 @@ public int compare(PlanProto.LogicalNode o1, PlanProto.LogicalNode o2) { current = convertUnion(nodeMap, protoNode); break; case PARTITIONS_SCAN: - current = convertPartitionScan(context, evalContext, protoNode); + current = convertPartitionedTableScan(context, evalContext, protoNode); break; case SCAN: current = convertScan(context, evalContext, protoNode); @@ -410,6 +409,13 @@ private static ScanNode convertScan(OverridableConf context, EvalContext evalCon return scan; } + private static PartitionedTableScanNode convertPartitionedTableScan(OverridableConf context, EvalContext evalContext, + PlanProto.LogicalNode protoNode) { + PartitionedTableScanNode partitionedTableScan = new PartitionedTableScanNode(protoNode.getNodeId()); + fillScanNode(context, evalContext, protoNode, partitionedTableScan); + return partitionedTableScan; + } + private static void fillScanNode(OverridableConf context, EvalContext evalContext, PlanProto.LogicalNode protoNode, ScanNode scan) { PlanProto.ScanNode scanProto = protoNode.getScan(); @@ -452,20 +458,6 @@ private static IndexScanNode convertIndexScan(OverridableConf context, EvalConte return indexScan; } - private static PartitionedTableScanNode convertPartitionScan(OverridableConf context, EvalContext evalContext, - PlanProto.LogicalNode protoNode) { - PartitionedTableScanNode partitionedScan = new PartitionedTableScanNode(protoNode.getNodeId()); - fillScanNode(context, evalContext, protoNode, partitionedScan); - - PlanProto.PartitionScanSpec partitionScanProto = protoNode.getPartitionScan(); - Path [] paths = new Path[partitionScanProto.getPathsCount()]; - for (int i = 0; i < partitionScanProto.getPathsCount(); i++) { - paths[i] = new Path(partitionScanProto.getPaths(i)); - } - partitionedScan.setInputPaths(paths); - return partitionedScan; - } - private static TableSubQueryNode convertTableSubQuery(OverridableConf context, EvalContext evalContext, Map nodeMap, PlanProto.LogicalNode protoNode) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index e7247688f8..677fc8d7ff 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -19,7 +19,6 @@ package org.apache.tajo.plan.serder; import com.google.common.collect.Maps; -import org.apache.hadoop.fs.Path; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -41,8 +40,6 @@ import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.util.ProtoUtil; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Stack; @@ -476,21 +473,9 @@ public LogicalNode visitPartitionedTableScan(SerializeContext context, LogicalPl throws TajoException { PlanProto.ScanNode.Builder scanBuilder = buildScanNode(node); - - PlanProto.PartitionScanSpec.Builder partitionScan = PlanProto.PartitionScanSpec.newBuilder(); - List pathStrs = new ArrayList<>(); - if (node.getInputPaths() != null) { - for (Path p : node.getInputPaths()) { - pathStrs.add(p.toString()); - } - partitionScan.addAllPaths(pathStrs); - } - PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); nodeBuilder.setScan(scanBuilder); - nodeBuilder.setPartitionScan(partitionScan); context.treeBuilder.addNodes(nodeBuilder); - return node; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java index 9ded584815..d576379cc6 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java @@ -134,13 +134,6 @@ private static boolean isSimpleRelationNode(LogicalNode node) { private static long getTableVolume(ScanNode scanNode) { if (scanNode.getTableDesc().hasStats()) { long scanBytes = scanNode.getTableDesc().getStats().getNumBytes(); - if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode; - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - scanBytes = 0L; - } - } - return scanBytes; } else { return -1; diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java index 2b197ff0ec..bac3a1cecc 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java @@ -354,14 +354,4 @@ public Expr visitInsert(Context context, Stack stack, Insert expr) throws return expr; } - - // TODO: This should be removed at TAJO-1891 - @Override - public Expr visitAlterTable(Context context, Stack stack, AlterTable expr) throws TajoException { - if (expr.getAlterTableOpType() == AlterTableOpType.ADD_PARTITION) { - context.state.addVerification(new NotImplementedException("ADD PARTITION")); - } - - return expr; - } } diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index c50429f613..0465e89d9d 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -73,36 +73,35 @@ message LogicalNode { optional SchemaProto out_schema = 5; optional ScanNode scan = 6; - optional PartitionScanSpec partitionScan = 7; - optional IndexScanSpec indexScan = 8; - optional JoinNode join = 9; - optional FilterNode filter = 10; - optional GroupbyNode groupby = 11; - optional DistinctGroupbyNode distinctGroupby = 12; - optional SortNode sort = 13; - optional LimitNode limit = 14; - optional WindowAggNode windowAgg = 15; - optional ProjectionNode projection = 16; - optional EvalExprNode exprEval = 17; - optional UnionNode union = 18; - optional TableSubQueryNode tableSubQuery = 19; - optional PersistentStoreNode persistentStore = 20; - optional StoreTableNodeSpec storeTable = 21; - optional InsertNodeSpec insert = 22; - optional CreateTableNodeSpec createTable = 23; - optional RootNode root = 24; - optional SetSessionNode setSession = 25; - - optional CreateDatabaseNode createDatabase = 26; - optional DropDatabaseNode dropDatabase = 27; - optional DropTableNode dropTable = 28; - - optional AlterTablespaceNode alterTablespace = 29; - optional AlterTableNode alterTable = 30; - optional TruncateTableNode truncateTableNode = 31; - - optional CreateIndexNode createIndex = 32; - optional DropIndexNode dropIndex = 33; + optional IndexScanSpec indexScan = 7; + optional JoinNode join = 8; + optional FilterNode filter = 9; + optional GroupbyNode groupby = 10; + optional DistinctGroupbyNode distinctGroupby = 11; + optional SortNode sort = 12; + optional LimitNode limit = 13; + optional WindowAggNode windowAgg = 14; + optional ProjectionNode projection = 15; + optional EvalExprNode exprEval = 16; + optional UnionNode union = 17; + optional TableSubQueryNode tableSubQuery = 18; + optional PersistentStoreNode persistentStore = 19; + optional StoreTableNodeSpec storeTable = 20; + optional InsertNodeSpec insert = 21; + optional CreateTableNodeSpec createTable = 22; + optional RootNode root = 23; + optional SetSessionNode setSession = 24; + + optional CreateDatabaseNode createDatabase = 25; + optional DropDatabaseNode dropDatabase = 26; + optional DropTableNode dropTable = 27; + + optional AlterTablespaceNode alterTablespace = 28; + optional AlterTableNode alterTable = 29; + optional TruncateTableNode truncateTableNode = 30; + + optional CreateIndexNode createIndex = 31; + optional DropIndexNode dropIndex = 32; } message ScanNode { @@ -115,10 +114,6 @@ message ScanNode { required bool nameResolveBase = 7; } -message PartitionScanSpec { - repeated string paths = 1; -} - message IndexScanSpec { required SchemaProto keySchema = 1; required string indexPath = 2; @@ -231,10 +226,6 @@ enum JoinType { RIGHT_SEMI_JOIN = 9; } -message PartitionTableScanSpec { - repeated string paths = 1; -} - message PersistentStoreNode { optional int32 childSeq = 1; // CreateTableNode may not have any children. This should be improved at TAJO-1589. required string storageType = 2; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 2aa2b91025..95f9069e5d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -43,6 +43,7 @@ import org.apache.tajo.schema.IdentifierUtil; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.PartitionFileFragment; import org.apache.tajo.util.Bytes; import javax.annotation.Nullable; @@ -405,33 +406,7 @@ protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blo // for Non Splittable. eg, compressed gzip TextFile protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, BlockLocation[] blkLocations) throws IOException { - - Map hostsBlockMap = new HashMap<>(); - for (BlockLocation blockLocation : blkLocations) { - for (String host : blockLocation.getHosts()) { - if (hostsBlockMap.containsKey(host)) { - hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); - } else { - hostsBlockMap.put(host, 1); - } - } - } - - List> entries = new ArrayList<>(hostsBlockMap.entrySet()); - Collections.sort(entries, new Comparator>() { - - @Override - public int compare(Map.Entry v1, Map.Entry v2) { - return v1.getValue().compareTo(v2.getValue()); - } - }); - - String[] hosts = new String[blkLocations[0].getHosts().length]; - - for (int i = 0; i < hosts.length; i++) { - Map.Entry entry = entries.get((entries.size() - 1) - i); - hosts[i] = entry.getKey(); - } + String[] hosts = getHosts(blkLocations); return new FileFragment(fragmentId, file, start, length, hosts); } @@ -547,6 +522,219 @@ public List getSplits(String tableName, TableMeta meta, Schema schema, return splits; } + /** + * Get the list of hosts (hostname) hosting specified blocks + * + * + * @param blkLocations locations of blocks + * @return the list of hosts + * @throws IOException + */ + private String[] getHosts(BlockLocation[] blkLocations) throws IOException { + Map hostsBlockMap = new HashMap<>(); + for (BlockLocation blockLocation : blkLocations) { + for (String host : blockLocation.getHosts()) { + if (hostsBlockMap.containsKey(host)) { + hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); + } else { + hostsBlockMap.put(host, 1); + } + } + } + + List> entries = new ArrayList<>(hostsBlockMap.entrySet()); + Collections.sort(entries, (Map.Entry v1, Map.Entry v2) + -> v1.getValue().compareTo(v2.getValue())); + + String[] hosts = new String[blkLocations[0].getHosts().length]; + + for (int i = 0; i < hosts.length; i++) { + Map.Entry entry = entries.get((entries.size() - 1) - i); + hosts[i] = entry.getKey(); + } + + return hosts; + } + + //////////////////////////////////////////////////////////////////////////////// + // The below code is for splitting partitioned table. + //////////////////////////////////////////////////////////////////////////////// + + + /** + * Is the given filename splitable? Usually, true, but if the file is + * stream compressed, it will not be. + *

+ * FileInputFormat implementations can override this and return + * false to ensure that individual input files are never split-up + * so that Mappers process entire files. + * + * + * @param meta the metadata of target table + * @param schema the schema of target table + * @param path the file name to check + * @param partitionKeys keys of target partition + * @param status get the file length + * @return is this file isSplittable? + * @throws IOException + */ + protected boolean isSplittablePartitionFragment(TableMeta meta, Schema schema, Path path, String partitionKeys, + FileStatus status) throws IOException { + Fragment fragment = new PartitionFileFragment(path.getName(), path, 0, status.getLen(), partitionKeys); + Scanner scanner = getScanner(meta, schema, fragment, null); + boolean split = scanner.isSplittable(); + scanner.close(); + return split; + } + + /** + * Build a fragment for partition table + * + * @param fragmentId fragment id + * @param file file path + * @param start offset + * @param length length + * @param hosts the list of hosts (names) hosting blocks + * @param partitionKeys partition keys + * @return PartitionFileFragment + */ + protected PartitionFileFragment getSplittablePartitionFragment(String fragmentId, Path file, long start, long length, + String[] hosts, String partitionKeys) { + return new PartitionFileFragment(fragmentId, file, start, length, hosts, partitionKeys); + } + + /** + * Build a fragment for partition table + * + * @param fragmentId fragment id + * @param file file path + * @param blockLocation location of block + * @param partitionKeys partition keys + * @return PartitionFileFragment + * @throws IOException + */ + protected PartitionFileFragment getSplittablePartitionFragment(String fragmentId, Path file, BlockLocation blockLocation + , String partitionKeys) throws IOException { + return new PartitionFileFragment(fragmentId, file, blockLocation, partitionKeys); + } + + /** + * Build a fragment for non splittable partition table + * + * @param fragmentId fragment id + * @param file file path + * @param start offset + * @param length length + * @param blkLocations locations of blocks + * @param partitionKeys partition keys + * @return PartitionFileFragment + * @throws IOException + */ + protected Fragment getNonSplittablePartitionFragment(String fragmentId, Path file, long start, long length, + BlockLocation[] blkLocations, String partitionKeys) throws IOException { + String[] hosts = getHosts(blkLocations); + return new PartitionFileFragment(fragmentId, file, start, length, hosts, partitionKeys); + } + + /** + * Build the list of fragments for partition table + * + * @param tableName table name + * @param meta all meta information for scanning a fragmented table + * @param schema table schema + * @return the list of PartitionFileFragment + * @throws IOException + */ + public List getPartitionSplits(String tableName, TableMeta meta, Schema schema, String[] partitionKeys, + Path... inputs) throws IOException { + long startTime = System.currentTimeMillis(); + + // generate splits' + List splits = Lists.newArrayList(); + List volumeSplits = Lists.newArrayList(); + List blockLocations = Lists.newArrayList(); + + int i = 0; + for (Path p : inputs) { + ArrayList files = Lists.newArrayList(); + if (fs.isFile(p)) { + files.addAll(Lists.newArrayList(fs.getFileStatus(p))); + } else { + files.addAll(listStatus(p)); + } + + for (FileStatus file : files) { + Path path = file.getPath(); + long length = file.getLen(); + if (length > 0) { + // Get locations of blocks of file + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + boolean splittable = isSplittablePartitionFragment(meta, schema, path, partitionKeys[i], file); + if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { + + if (splittable) { + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(getSplittablePartitionFragment(tableName, path, blockLocation, partitionKeys[i])); + } + blockLocations.addAll(Arrays.asList(blkLocations)); + + } else { // Non splittable + long blockSize = blkLocations[0].getLength(); + if (blockSize >= length) { + blockLocations.addAll(Arrays.asList(blkLocations)); + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(getSplittablePartitionFragment(tableName, path, blockLocation, partitionKeys[i])); + } + } else { + splits.add(getNonSplittablePartitionFragment(tableName, path, 0, length, blkLocations, + partitionKeys[i])); + } + } + + } else { + if (splittable) { + + long minSize = Math.max(getMinSplitSize(), 1); + + long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one + long splitSize = Math.max(minSize, blockSize); + long bytesRemaining = length; + + // for s3 + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(getSplittablePartitionFragment(tableName, path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts(), partitionKeys[i])); + bytesRemaining -= splitSize; + } + if (bytesRemaining > 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(getSplittablePartitionFragment(tableName, path, length - bytesRemaining, bytesRemaining, + blkLocations[blkIndex].getHosts(), partitionKeys[i])); + } + } else { // Non splittable + splits.add(getNonSplittablePartitionFragment(tableName, path, 0, length, blkLocations, partitionKeys[i])); + } + } + } + } + if (LOG.isDebugEnabled()){ + LOG.debug("# of average splits per partition: " + splits.size() / (i+1)); + } + i++; + } + + // Combine original fileFragments with new VolumeId information + setVolumeMeta(volumeSplits, blockLocations); + splits.addAll(volumeSplits); + LOG.info("Total # of splits: " + splits.size()); + + long finishTime = System.currentTimeMillis(); + long elapsedMills = finishTime - startTime; + LOG.info(String.format("Split for partition table :%d ms elapsed.", elapsedMills)); + return splits; + } + private void setVolumeMeta(List splits, final List blockLocations) throws IOException { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java index 8f18c7a0f0..c584aee72e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java @@ -106,6 +106,10 @@ public String[] getHosts() { return hosts; } + public void setHosts(String[] hosts) { + this.hosts = hosts; + } + /** * Get the list of Disk Ids * Unknown disk is -1. Others 0 ~ N @@ -127,6 +131,10 @@ public String getTableName() { return this.tableName; } + public void setTableName(String tableName) { + this.tableName = tableName; + } + public Path getPath() { return this.uri; } @@ -149,6 +157,10 @@ public long getLength() { return this.length; } + public void setLength(long length) { + this.length = length; + } + @Override public boolean isEmpty() { return this.length <= 0; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/PartitionFileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/PartitionFileFragment.java new file mode 100644 index 0000000000..3a20fb5264 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/PartitionFileFragment.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import com.google.common.base.Objects; +import com.google.gson.annotations.Expose; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.BuiltinStorages; +import org.apache.tajo.storage.StorageFragmentProtos.PartitionFileFragmentProto; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + +public class PartitionFileFragment extends FileFragment implements Cloneable { + + @Expose private String partitionKeys; // required + + public PartitionFileFragment(ByteString raw) throws InvalidProtocolBufferException { + super(raw); + PartitionFileFragmentProto.Builder builder = PartitionFileFragmentProto.newBuilder(); + builder.mergeFrom(raw); + this.partitionKeys = builder.build().getPartitionKeys(); + } + + public PartitionFileFragment(String tableName, Path uri, BlockLocation blockLocation, + String partitionKeys) throws IOException { + super(tableName, uri, blockLocation); + this.partitionKeys = partitionKeys; + } + + public PartitionFileFragment(String tableName, Path uri, long start, long length, String[] hosts, + String partitionKeys) { + super(tableName, uri, start, length, hosts); + this.partitionKeys = partitionKeys; + } + + public PartitionFileFragment(String fragmentId, Path path, long start, long length, String partitionKeys) { + super(fragmentId, path, start, length); + this.partitionKeys = partitionKeys; + } + + public String getPartitionKeys() { + return partitionKeys; + } + + public void setPartitionKeys(String partitionKeys) { + this.partitionKeys = partitionKeys; + } + + @Override + public int hashCode() { + return Objects.hashCode(getTableName(), getPath(), getStartKey(), getLength(), getPartitionKeys()); + } + + @Override + public Object clone() throws CloneNotSupportedException { + PartitionFileFragment frag = (PartitionFileFragment) super.clone(); + frag.setTableName(getTableName()); + frag.setPath(getPath()); + frag.setDiskIds(getDiskIds()); + frag.setHosts(getHosts()); + frag.setPartitionKeys(getPartitionKeys()); + + return frag; + } + + @Override + public String toString() { + return "\"fragment\": {\"id\": \""+ getTableName() +"\", \"path\": " + +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": " + + getLength() + "\", \"partitionKeys\":" + getPartitionKeys() + "}" ; + } + + @Override + public FragmentProto getProto() { + PartitionFileFragmentProto.Builder builder = PartitionFileFragmentProto.newBuilder(); + builder.setId(getTableName()); + builder.setStartOffset(this.startOffset); + builder.setLength(this.length); + builder.setPath(getPath().toString()); + if(getDiskIds() != null) { + List idList = new ArrayList<>(); + for(int eachId: getDiskIds()) { + idList.add(eachId); + } + builder.addAllDiskIds(idList); + } + + if (getHosts() != null) { + builder.addAllHosts(Arrays.asList(getHosts())); + } + + if (partitionKeys != null) { + builder.setPartitionKeys(this.partitionKeys); + } + + FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); + fragmentBuilder.setId(getTableName()); + fragmentBuilder.setDataFormat(BuiltinStorages.TEXT); + fragmentBuilder.setContents(builder.buildPartial().toByteString()); + return fragmentBuilder.build(); + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto index 0579f05aa9..d9869f071d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto +++ b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto @@ -32,3 +32,14 @@ message FileFragmentProto { repeated string hosts = 5; repeated int32 disk_ids = 6; } + +message PartitionFileFragmentProto { + required string id = 1; + required string path = 2; + required int64 start_offset = 3; + required int64 length = 4; + repeated string hosts = 5; + repeated int32 disk_ids = 6; + // Partition Keys: country=KOREA/city=SEOUL + required string partitionKeys = 7; +}