diff --git a/CHANGES b/CHANGES index eec145c14e..8ab23b45c2 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + NEW FEATURES TAJO-1686: Allow Tajo to use Hive UDF. (jihoon) diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 2b20907c44..b7da7cb84d 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -332,6 +332,11 @@ public static TableDesc newTableDesc(String tableName, Schema schema, TableMeta return new TableDesc(tableName, schema, meta, path.toUri()); } + public static TableDesc newTableDesc(String tableName, Schema schema, TableMeta meta, Path path + , PartitionMethodDesc partitionMethodDesc) { + return new TableDesc(tableName, schema, meta, path.toUri(), partitionMethodDesc); + } + public static TableDesc newTableDesc(TableDescProto proto) { return new TableDesc(proto); } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java index 8122bd59db..948567001c 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java @@ -51,32 +51,46 @@ public class TableDesc implements ProtoObject, GsonObject, Clone public TableDesc() { } + public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, @Nullable URI uri, boolean external) { + this(tableName, schema, meta, uri, null, external); + } + public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, - @Nullable URI uri, boolean external) { + @Nullable URI uri, @Nullable PartitionMethodDesc partitionMethodDesc, boolean external) { this.tableName = tableName; this.schema = schema; this.meta = meta; this.uri = uri; + this.partitionMethodDesc = partitionMethodDesc; this.external = external; } public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, @Nullable URI path) { - this(tableName, schema, meta, path, true); + this(tableName, schema, meta, path, null, true); } - + + public TableDesc(String tableName, @Nullable Schema schema, TableMeta meta, @Nullable URI path, + @Nullable PartitionMethodDesc partitionMethodDesc) { + this(tableName, schema, meta, path, partitionMethodDesc, true); + } + public TableDesc(String tableName, @Nullable Schema schema, String dataFormat, KeyValueSet options, @Nullable URI path) { this(tableName, schema, new TableMeta(dataFormat, options), path); } - - public TableDesc(TableDescProto proto) { + + public TableDesc(String tableName, @Nullable Schema schema, String dataFormat, KeyValueSet options, + @Nullable URI path, @Nullable PartitionMethodDesc partitionMethodDesc) { + this(tableName, schema, new TableMeta(dataFormat, options), path, partitionMethodDesc); + } + + public TableDesc(TableDescProto proto) { this(proto.getTableName(), proto.hasSchema() ? SchemaFactory.newV1(proto.getSchema()) : null, - new TableMeta(proto.getMeta()), proto.hasPath() ? URI.create(proto.getPath()) : null, proto.getIsExternal()); + new TableMeta(proto.getMeta()), proto.hasPath() ? URI.create(proto.getPath()) : null, + proto.hasPartition() ? new PartitionMethodDesc(proto.getPartition()) : null, + proto.getIsExternal()); if(proto.hasStats()) { this.stats = new TableStats(proto.getStats()); - } - if (proto.hasPartition()) { - this.partitionMethodDesc = new PartitionMethodDesc(proto.getPartition()); } } diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 6e3eaeae8a..0618b8dc57 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -232,6 +232,15 @@ public static enum ConfVars implements ConfigKey { // for RCFile HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true, Validators.bool()), + // S3 Configuration -------------------------------------------------- + S3_MAX_ERROR_RETRIES("tajo.s3.max-error-retries", 10), + S3_SSL_ENABLED("tajo.s3.ssl.enabled", true), + S3_CONNECT_TIMEOUT("tajo.s3.connect-timeout", "5m"), + S3_SOCKET_TIMEOUT("tajo.s3.socket-timeout", "5m"), + S3_MAX_CONNECTIONS("tajo.s3.max-connections", 500), + S3_USE_INSTANCE_CREDENTIALS("tajo.s3.use-instance-credentials", true), + S3_PIN_CLIENT_TO_CURRENT_REGION("tajo.s3.pin-client-to-current-region", false), + // RPC -------------------------------------------------------------------- // Internal RPC Client INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num", diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java index 95700d0a14..fd693d7f51 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java @@ -142,4 +142,42 @@ public static void cleanupAndthrowIfFailed(java.io.Closeable... closeables) thro throw ioe; } } + + public static String getCommonPrefix(Path... paths){ + String prefix = ""; + String[][] folders = new String[paths.length][]; + + // split on file separator + for(int i = 0; i < paths.length; i++){ + folders[i] = paths[i].toString().split("/"); + } + + for(int j = 0; j < folders[0].length; j++){ + // grab the next folder name in the first path + String thisFolder = folders[0][j]; + // assume all have matched in case there are no more paths + boolean allMatched = true; + + // look at the other paths + for(int i = 1; i < folders.length && allMatched; i++){ + // if there is no folder here + if(folders[i].length < j){ + allMatched = false; + // stop looking because we've gone as far as we can + break; + } + // check if it matched + allMatched &= folders[i][j].equals(thisFolder); + } + // if they all matched this folder name + if(allMatched) { + // add it to the answer + prefix += thisFolder + "/"; + } else { + break; + } + } + + return prefix; + } } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java new file mode 100644 index 0000000000..69fc0fec7f --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.*; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.parser.sql.SQLAnalyzer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.partition.PartitionPruningHandle; +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.junit.*; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.*; + +public class TestPartitionedTableRewriter { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPartitionedTableRewriter"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + private FileSystem fs; + + final static String PARTITION_TABLE_NAME = "tb_partition"; + final static String MULTIPLE_PARTITION_TABLE_NAME = "tb_multiple_partition"; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + util.startCatalogCluster(); + catalog = util.getCatalogService(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + fs = FileSystem.get(conf); + + Schema schema = SchemaBuilder.builder() + .add("n_nationkey", TajoDataTypes.Type.INT8) + .add("n_name", TajoDataTypes.Type.TEXT) + .add("n_regionkey", TajoDataTypes.Type.INT8) + .build(); + + TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, util.getConfiguration()); + + createTableWithOnePartitionKeyColumn(fs, schema, meta); + createTableWithMultiplePartitionKeyColumns(fs, schema, meta); + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + } + + private void createTableWithOnePartitionKeyColumn(FileSystem fs, Schema schema, + TableMeta meta) throws Exception { + Schema partSchema = SchemaBuilder.builder() + .add("key", TajoDataTypes.Type.TEXT) + .build(); + + PartitionMethodDesc partitionMethodDesc = + new PartitionMethodDesc(DEFAULT_DATABASE_NAME, PARTITION_TABLE_NAME, + CatalogProtos.PartitionType.COLUMN, "key", partSchema); + + Path tablePath = new Path(testDir, PARTITION_TABLE_NAME); + fs.mkdirs(tablePath); + + TableDesc desc = CatalogUtil.newTableDesc(DEFAULT_DATABASE_NAME + "." + PARTITION_TABLE_NAME, schema, meta, + tablePath, partitionMethodDesc); + catalog.createTable(desc); + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME + "." + PARTITION_TABLE_NAME); + assertNotNull(tableDesc); + + Path path = new Path(tableDesc.getUri().toString() + "/key=part123"); + fs.mkdirs(path); + FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key=part456"); + fs.mkdirs(path); + FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key=part789"); + fs.mkdirs(path); + FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data")); + } + + private void createTableWithMultiplePartitionKeyColumns(FileSystem fs, + Schema schema, TableMeta meta) throws Exception { + Schema partSchema = SchemaBuilder.builder() + .add("key1", TajoDataTypes.Type.TEXT) + .add("key2", TajoDataTypes.Type.TEXT) + .add("key3", TajoDataTypes.Type.INT8) + .build(); + + PartitionMethodDesc partitionMethodDesc = + new PartitionMethodDesc("default", MULTIPLE_PARTITION_TABLE_NAME, + CatalogProtos.PartitionType.COLUMN, "key1,key2,key3", partSchema); + + Path tablePath = new Path(testDir, MULTIPLE_PARTITION_TABLE_NAME); + fs.mkdirs(tablePath); + + TableDesc desc = CatalogUtil.newTableDesc(DEFAULT_DATABASE_NAME + "." + MULTIPLE_PARTITION_TABLE_NAME, schema, + meta, tablePath, partitionMethodDesc); + catalog.createTable(desc); + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME + "." + MULTIPLE_PARTITION_TABLE_NAME); + assertNotNull(tableDesc); + + Path path = new Path(tableDesc.getUri().toString() + "/key1=part123"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=1"); + fs.mkdirs(path); + FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=2"); + fs.mkdirs(path); + FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key1=part789"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789/key3=3"); + fs.mkdirs(path); + FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data")); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + @Test + public void testFilterIncludePartitionKeyColumn() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part456' ORDER BY key"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Path[] filteredPaths = partitionPruningHandle.getPartitionPaths(); + assertEquals(1, filteredPaths.length); + assertTrue(filteredPaths[0].toString().endsWith("key=part456")); + + String[] partitionKeys = partitionPruningHandle.getPartitionKeys(); + assertEquals(1, partitionKeys.length); + assertEquals("key=part456", partitionKeys[0]); + + assertEquals(10L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testWithoutAnyFilters() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " ORDER BY key"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SCAN, sortNode.getChild().getType()); + ScanNode scanNode = sortNode.getChild(); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(3, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key=part123")); + assertTrue(partitionPathList.get(1).toString().endsWith("key=part456")); + assertTrue(partitionPathList.get(2).toString().endsWith("key=part789")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(3, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), "key=part123"); + assertEquals(partitionKeysList.get(1), "key=part456"); + assertEquals(partitionKeysList.get(2), "key=part789"); + + assertEquals(33L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeNonExistingPartitionValue() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part123456789'"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + assertEquals(0, partitionPruningHandle.getPartitionPaths().length); + assertEquals(0, partitionPruningHandle.getPartitionKeys().length); + + assertEquals(0L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeNonPartitionKeyColumn() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE n_nationkey = 1"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(3, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key=part123")); + assertTrue(partitionPathList.get(1).toString().endsWith("key=part456")); + assertTrue(partitionPathList.get(2).toString().endsWith("key=part789")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(3, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), "key=part123"); + assertEquals(partitionKeysList.get(1), "key=part456"); + assertEquals(partitionKeysList.get(2), "key=part789"); + + assertEquals(33L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeEveryPartitionKeyColumn() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part789' and key2 = 'supp789' and key3=3"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Path[] filteredPaths = partitionPruningHandle.getPartitionPaths(); + assertEquals(1, filteredPaths.length); + assertTrue(filteredPaths[0].toString().endsWith("key1=part789/key2=supp789/key3=3")); + + String[] partitionKeys = partitionPruningHandle.getPartitionKeys(); + assertEquals(1, partitionKeys.length); + assertEquals("key1=part789/key2=supp789/key3=3", partitionKeys[0]); + + assertEquals(10L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeSomeOfPartitionKeyColumns() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part123' and key2 = 'supp123' order by n_nationkey"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(2, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key1=part123/key2=supp123/key3=1")); + assertTrue(partitionPathList.get(1).toString().endsWith("key1=part123/key2=supp123/key3=2")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(2, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), ("key1=part123/key2=supp123/key3=1")); + assertEquals(partitionKeysList.get(1), ("key1=part123/key2=supp123/key3=2")); + + assertEquals(23L, partitionPruningHandle.getTotalVolume()); + } + + @Test + public void testFilterIncludeNonPartitionKeyColumns() throws Exception { + Expr expr = analyzer.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part123' and n_nationkey >= 2 order by n_nationkey"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + + PartitionPruningHandle partitionPruningHandle = rewriter.getPartitionPruningHandle(conf, scanNode); + assertNotNull(partitionPruningHandle); + + Stream partitionPathStream = Stream.of(partitionPruningHandle.getPartitionPaths()) + .sorted((path1, path2) -> path1.compareTo(path2)); + List partitionPathList = partitionPathStream.collect(Collectors.toList()); + assertEquals(2, partitionPathList.size()); + assertTrue(partitionPathList.get(0).toString().endsWith("key1=part123/key2=supp123/key3=1")); + assertTrue(partitionPathList.get(1).toString().endsWith("key1=part123/key2=supp123/key3=2")); + + Stream partitionKeysStream = Stream.of(partitionPruningHandle.getPartitionKeys()) + .sorted((keys1, keys2) -> keys1.compareTo(keys2)); + List partitionKeysList = partitionKeysStream.collect(Collectors.toList()); + assertEquals(2, partitionKeysList.size()); + assertEquals(partitionKeysList.get(0), ("key1=part123/key2=supp123/key3=1")); + assertEquals(partitionKeysList.get(1), ("key1=part123/key2=supp123/key3=2")); + + assertEquals(23L, partitionPruningHandle.getTotalVolume()); + } + +} \ No newline at end of file diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java index dff63c427b..39744f6a26 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java @@ -183,4 +183,28 @@ public void testBuildTupleFromPartitionPath() { assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); } + + @Test + public void testBuildTupleFromPartitionName() { + Schema schema = SchemaBuilder.builder() + .add("key1", Type.INT8) + .add("key2", Type.TEXT) + .build(); + + Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionKeys(schema, "key1=123"); + assertNotNull(tuple); + assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); + assertEquals(DatumFactory.createNullDatum(), tuple.asDatum(1)); + + tuple = PartitionedTableRewriter.buildTupleFromPartitionKeys(schema, "key1=123/key2=abc"); + assertNotNull(tuple); + assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0)); + assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); + + tuple = PartitionedTableRewriter.buildTupleFromPartitionKeys(schema, "key2=abc"); + assertNotNull(tuple); + assertEquals(DatumFactory.createNullDatum(), tuple.asDatum(0)); + assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1)); + + } } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestPartitionFileFragment.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestPartitionFileFragment.java new file mode 100644 index 0000000000..999817741f --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestPartitionFileFragment.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.storage.fragment.PartitionFileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.SortedSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestPartitionFileFragment { + private Path path; + + @Before + public final void setUp() throws Exception { + path = CommonTestingUtil.getTestDir(); + } + + @Test + public final void testGetAndSetFields() { + PartitionFileFragment fragment1 = new PartitionFileFragment("table1_1", new Path(path, "table0/col1=1"), + 0, 500, "col1=1"); + + assertEquals("table1_1", fragment1.getTableName()); + assertEquals(new Path(path, "table0/col1=1"), fragment1.getPath()); + assertEquals("col1=1", fragment1.getPartitionKeys()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getLength()); + } + + @Test + public final void testGetProtoAndRestore() { + PartitionFileFragment fragment = new PartitionFileFragment("table1_1", new Path(path, "table0/col1=1"), 0, + 500, "col1=1"); + + PartitionFileFragment fragment1 = FragmentConvertor.convert(PartitionFileFragment.class, fragment.getProto()); + assertEquals("table1_1", fragment1.getTableName()); + assertEquals(new Path(path, "table0/col1=1"), fragment1.getPath()); + assertEquals("col1=1", fragment1.getPartitionKeys()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getLength()); + } + + @Test + public final void testCompareTo() { + final int num = 10; + PartitionFileFragment[] tablets = new PartitionFileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new PartitionFileFragment("tablet0", new Path(path, "tablet0/col1=" + i), i * 500, (i+1) * 500 + , "col1=" + i); + } + + Arrays.sort(tablets); + + for (int i = 0; i < num; i++) { + assertEquals("col1=" + (num - i - 1), tablets[i].getPartitionKeys()); + } + } + + @Test + public final void testCompareTo2() { + final int num = 1860; + PartitionFileFragment[] tablets = new PartitionFileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new PartitionFileFragment("tablet1", new Path(path, "tablet/col1=" +i), (long)i * 6553500, + (long) (i+1) * 6553500, "col1=" + i); + } + + SortedSet sortedSet = Sets.newTreeSet(); + for (PartitionFileFragment frag : tablets) { + sortedSet.add(frag); + } + assertEquals(num, sortedSet.size()); + } +} diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan index 2f2ca890fb..7456b5238a 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -82,8 +82,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan index f1fa414673..46e0b4b7ee 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Hash_NoBroadcast.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -100,8 +100,8 @@ Block Id: eb_0000000000000_0000_000004 [LEAF] [q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.c.c_custkey (INT4), num=32) PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan index 2f2ca890fb..7456b5238a 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -82,8 +82,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan index f1fa414673..46e0b4b7ee 100644 --- a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan +++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testInnerAndOuterWithEmpty.1.Sort_NoBroadcast.plan @@ -6,8 +6,8 @@ JOIN(8)(LEFT_OUTER) => out schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} => in schema: {(3) default.a.l_orderkey (INT4), default.b.o_orderkey (INT4), default.c.c_custkey (INT4)} PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} JOIN(7)(INNER) @@ -100,8 +100,8 @@ Block Id: eb_0000000000000_0000_000004 [LEAF] [q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.c.c_custkey (INT4), num=32) PARTITIONS_SCAN(9) on default.customer_broad_parts as c + => filter: default.c.c_custkey (INT4) < 0 => target list: default.c.c_custkey (INT4) - => num of filtered paths: 0 => out schema: {(1) default.c.c_custkey (INT4)} => in schema: {(7) default.c.c_acctbal (FLOAT8), default.c.c_address (TEXT), default.c.c_comment (TEXT), default.c.c_mktsegment (TEXT), default.c.c_name (TEXT), default.c.c_nationkey (INT4), default.c.c_phone (TEXT)} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 87a6e74ba9..eb1b0cd184 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -24,7 +24,6 @@ import com.google.common.collect.ObjectArrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SortSpec; @@ -44,13 +43,10 @@ import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; -import org.apache.tajo.plan.serder.PlanProto.SortEnforce; import org.apache.tajo.plan.serder.PlanProto.SortedInputEnforce; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.FileTablespace; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.TablespaceManager; -import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.unit.StorageUnit; @@ -61,7 +57,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Stack; @@ -896,12 +891,12 @@ private boolean checkIfSortEquivalance(TaskAttemptContext ctx, ScanNode scanNode public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack node) throws IOException { + FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); // check if an input is sorted in the same order to the subsequence sort operator. if (checkIfSortEquivalance(ctx, scanNode, node)) { - if (ctx.getTable(scanNode.getCanonicalName()) == null) { + if (fragments == null) { return new SeqScanExec(ctx, scanNode, null); } - FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); return new ExternalSortExec(ctx, (SortNode) node.peek(), scanNode, fragments); } else { Enforcer enforcer = ctx.getEnforcer(); @@ -915,31 +910,15 @@ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, St } } - if (scanNode instanceof PartitionedTableScanNode - && ((PartitionedTableScanNode)scanNode).getInputPaths() != null && - ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) { - - if (broadcastFlag) { - PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode; - List fileFragments = new ArrayList<>(); - - FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri()); - for (Path path : partitionedTableScanNode.getInputPaths()) { - fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path))); - } - - FragmentProto[] fragments = - FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])); - - ctx.addFragments(scanNode.getCanonicalName(), fragments); - return new PartitionMergeScanExec(ctx, scanNode, fragments); - } + if (scanNode.getTableDesc().hasPartition() && broadcastFlag && fragments != null) { + ctx.addFragments(scanNode.getCanonicalName(), fragments); + return new PartitionMergeScanExec(ctx, scanNode, fragments); } - if (ctx.getTable(scanNode.getCanonicalName()) == null) { + if (fragments == null) { return new SeqScanExec(ctx, scanNode, null); } - FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); + return new SeqScanExec(ctx, scanNode, fragments); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index d390740187..3fc03645c0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -317,6 +317,7 @@ private long estimateOutputVolumeInternal(LogicalNode node) throws TajoInternalE if (node instanceof RelationNode) { switch (node.getType()) { case INDEX_SCAN: + case PARTITIONS_SCAN: case SCAN: ScanNode scanNode = (ScanNode) node; if (scanNode.getTableDesc().getStats() == null) { @@ -326,20 +327,6 @@ private long estimateOutputVolumeInternal(LogicalNode node) throws TajoInternalE } else { return scanNode.getTableDesc().getStats().getNumBytes(); } - case PARTITIONS_SCAN: - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node; - if (pScanNode.getTableDesc().getStats() == null) { - // TODO - this case means that data is not located in HDFS. So, we need additional - // broadcast method. - return Long.MAX_VALUE; - } else { - // if there is no selected partition - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - return 0; - } else { - return pScanNode.getTableDesc().getStats().getNumBytes(); - } - } case TABLE_SUBQUERY: return estimateOutputVolumeInternal(((TableSubQueryNode) node).getSubQuery()); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java index b13cb0f1a8..6513ac743b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java @@ -97,13 +97,6 @@ public static void replaceChild(LogicalNode newChild, ScanNode originalChild, Lo public static long getTableVolume(ScanNode scanNode) { if (scanNode.getTableDesc().hasStats()) { long scanBytes = scanNode.getTableDesc().getStats().getNumBytes(); - if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode; - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - scanBytes = 0L; - } - } - return scanBytes; } else { return -1; @@ -117,6 +110,7 @@ public static long computeDescendentVolume(LogicalNode node) { if (node instanceof RelationNode) { switch (node.getType()) { + case PARTITIONS_SCAN: case SCAN: ScanNode scanNode = (ScanNode) node; if (scanNode.getTableDesc().getStats() == null) { @@ -126,20 +120,6 @@ public static long computeDescendentVolume(LogicalNode node) { } else { return scanNode.getTableDesc().getStats().getNumBytes(); } - case PARTITIONS_SCAN: - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node; - if (pScanNode.getTableDesc().getStats() == null) { - // TODO - this case means that data is not located in HDFS. So, we need additional - // broadcast method. - return Long.MAX_VALUE; - } else { - // if there is no selected partition - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - return 0; - } else { - return pScanNode.getTableDesc().getStats().getNumBytes(); - } - } case TABLE_SUBQUERY: return computeDescendentVolume(((TableSubQueryNode) node).getSubQuery()); default: diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index dc48f3fb0f..f682595d20 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -39,6 +39,7 @@ import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.storage.fragment.PartitionFileFragment; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -97,11 +98,12 @@ private void rewriteColumnPartitionedTableSchema() throws IOException { Tuple partitionRow = null; if (fragments != null && fragments.length > 0) { - List fileFragments = FragmentConvertor.convert(FileFragment.class, fragments); + List partitionFileFragments = FragmentConvertor.convert(PartitionFileFragment + .class, fragments); - // Get a partition key value from a given path - partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath( - columnPartitionSchema, fileFragments.get(0).getPath(), false); + // Get tuple from first partition fragment using parition keys + partitionRow = PartitionedTableRewriter.buildTupleFromPartitionKeys(columnPartitionSchema, + partitionFileFragments.get(0).getPartitionKeys()); } // Targets or search conditions may contain column references. diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 22024e4c75..6cffc7c76c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -567,7 +567,8 @@ public GetQueryResultDataResponse getQueryResultData(RpcController controller, G queryId, scanNode, Integer.MAX_VALUE, - codecType); + codecType, + context.getCatalog()); queryResultScanner.init(); session.addNonForwardQueryResultScanner(queryResultScanner); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index 6a2214191d..12081d8711 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -511,8 +511,8 @@ public void alterTable(TajoMaster.MasterContext context, final QueryContext quer long numBytes = 0L; if (fs.exists(partitionPath)) { - ContentSummary summary = fs.getContentSummary(partitionPath); - numBytes = summary.getLength(); + Tablespace tablespace = TablespaceManager.get(desc.getUri()); + numBytes = tablespace.calculateSize(partitionPath); } catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(), @@ -623,6 +623,7 @@ public void repairPartition(TajoMaster.MasterContext context, final QueryContext } // Find missing partitions from CatalogStore + Tablespace tablespace = TablespaceManager.get(tableDesc.getUri()); List targetPartitions = new ArrayList<>(); for(Path filteredPath : filteredPaths) { @@ -631,7 +632,7 @@ public void repairPartition(TajoMaster.MasterContext context, final QueryContext // if there is partition column in the path if (startIdx > -1) { - PartitionDescProto targetPartition = getPartitionDesc(tablePath, filteredPath, fs); + PartitionDescProto targetPartition = getPartitionDesc(tablespace, tablePath, filteredPath); if (!existingPartitionNames.contains(targetPartition.getPartitionName())) { if (LOG.isDebugEnabled()) { LOG.debug("Partitions not in CatalogStore:" + targetPartition.getPartitionName()); @@ -657,7 +658,8 @@ public void repairPartition(TajoMaster.MasterContext context, final QueryContext LOG.info("Total added partitions to CatalogStore: " + targetPartitions.size()); } - private PartitionDescProto getPartitionDesc(Path tablePath, Path partitionPath, FileSystem fs) throws IOException { + private PartitionDescProto getPartitionDesc(Tablespace tablespace, Path tablePath, Path partitionPath) + throws IOException { String partitionName = StringUtils.unescapePathName(partitionPath.toString()); int startIndex = partitionName.indexOf(tablePath.toString()) + tablePath.toString().length(); @@ -679,9 +681,7 @@ private PartitionDescProto getPartitionDesc(Path tablePath, Path partitionPath, } builder.setPath(partitionPath.toString()); - - ContentSummary contentSummary = fs.getContentSummary(partitionPath); - builder.setNumBytes(contentSummary.getLength()); + builder.setNumBytes(tablespace.calculateSize(partitionPath)); return builder.build(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java index c16e95bae1..aaebe11800 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java @@ -120,9 +120,6 @@ public LogicalNode visitPartitionedTableScan(PlanShapeFixerContext context, Logi throws TajoException { super.visitPartitionedTableScan(context, plan, block, node, stack); context.childNumbers.push(1); - Path[] inputPaths = node.getInputPaths(); - Arrays.sort(inputPaths); - node.setInputPaths(inputPaths); if (node.hasTargets()) { node.setTargets(sortTargets(node.getTargets())); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index a1728ec2ca..d231e48648 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -24,10 +24,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.TaskId; +import org.apache.tajo.*; +import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableDesc; @@ -80,10 +78,12 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc final private Optional codecType; private MemoryRowBlock rowBlock; private Future nextFetch; + private CatalogService catalog; public NonForwardQueryResultFileScanner(AsyncTaskService asyncTaskService, TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, - int maxRow, Optional codecType) throws IOException { + int maxRow, Optional codecType, + CatalogService catalog) throws IOException { this.asyncTaskService = asyncTaskService; this.tajoConf = tajoConf; this.sessionId = sessionId; @@ -93,6 +93,7 @@ public NonForwardQueryResultFileScanner(AsyncTaskService asyncTaskService, this.maxRow = maxRow; this.rowEncoder = RowStoreUtil.createEncoder(scanNode.getOutSchema()); this.codecType = codecType; + this.catalog = catalog; } public void init() throws IOException, TajoException { @@ -105,7 +106,8 @@ private void initSeqScanExec() throws IOException, TajoException { List fragments = Lists.newArrayList(); if (tableDesc.hasPartition()) { FileTablespace fileTablespace = TUtil.checkTypeAndGet(tablespace, FileTablespace.class); - fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc)); + fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc + , catalog, tajoConf)); } else { fragments.addAll(tablespace.getSplits(tableDesc.getName(), tableDesc, scanNode.getQual())); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index f0a6b7685a..190b421ed7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -310,7 +310,8 @@ public void execSimpleQuery(QueryContext queryContext, Session session, String q queryInfo.getQueryId(), scanNode, maxRow, - Optional.empty()); + Optional.empty(), + context.getCatalog()); queryResultScanner.init(); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index d57040e3ff..0b675676c8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -23,8 +23,6 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; @@ -47,6 +45,7 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.error.Errors.SerializedException; import org.apache.tajo.exception.ErrorUtil; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.master.event.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; @@ -539,8 +538,8 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) { List partitions = query.getPartitions(); if (partitions != null) { - // Set contents length and file count to PartitionDescProto by listing final output directories. - List finalPartitions = getPartitionsWithContentsSummary(query.systemConf, + // Find each partition volume by listing all partitions. + List finalPartitions = getPartitionsWithContentsSummary(queryContext, finalOutputDir, partitions); String databaseName, simpleTableName; @@ -571,16 +570,14 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { return QueryState.QUERY_SUCCEEDED; } - private List getPartitionsWithContentsSummary(TajoConf conf, Path outputDir, - List partitions) throws IOException { + private List getPartitionsWithContentsSummary(QueryContext queryContext, + Path outputDir, List partitions) throws IOException { List finalPartitions = new ArrayList<>(); - FileSystem fileSystem = outputDir.getFileSystem(conf); for (PartitionDescProto partition : partitions) { PartitionDescProto.Builder builder = partition.toBuilder(); Path partitionPath = new Path(outputDir, partition.getPath()); - ContentSummary contentSummary = fileSystem.getContentSummary(partitionPath); - builder.setNumBytes(contentSummary.getLength()); + builder.setNumBytes(calculateSize(queryContext, partitionPath)); finalPartitions.add(builder.build()); } return finalPartitions; @@ -687,7 +684,7 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo TableStats aggregated = query.aggregateTableStatsOfTerminalBlock(); resultTableDesc.setStats(aggregated); } else { - stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir)); + stats.setNumBytes(calculateSize(queryContext, finalOutputDir)); resultTableDesc.setStats(stats); } @@ -730,7 +727,7 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo TableStats aggregated = query.aggregateTableStatsOfTerminalBlock(); tableDescTobeCreated.setStats(aggregated); } else { - stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir)); + stats.setNumBytes(calculateSize(queryContext, finalOutputDir)); tableDescTobeCreated.setStats(stats); } @@ -773,7 +770,7 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo TableStats aggregated = query.aggregateTableStatsOfTerminalBlock(); finalTable.setStats(aggregated); } else { - long volume = getTableVolume(query.systemConf, finalOutputDir); + long volume = calculateSize(queryContext, finalOutputDir); stats.setNumBytes(volume); finalTable.setStats(stats); } @@ -791,10 +788,9 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo } } - public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(systemConf); - ContentSummary directorySummary = fs.getContentSummary(tablePath); - return directorySummary.getLength(); + public static long calculateSize(QueryContext queryContext, Path path) throws IOException { + Tablespace space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")); + return space.calculateSize(path); } public static class StageCompletedTransition implements SingleArcTransition { diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index ba051a3115..8c70b9c0e3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -32,6 +32,7 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.PhysicalPlannerImpl; import org.apache.tajo.engine.planner.RangePartitionAlgorithm; @@ -42,11 +43,11 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanRewriteUtil; import org.apache.tajo.engine.utils.TupleUtil; -import org.apache.tajo.exception.TajoException; -import org.apache.tajo.exception.TajoInternalError; -import org.apache.tajo.exception.UndefinedTableException; +import org.apache.tajo.exception.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.logical.SortNode.SortPurpose; +import org.apache.tajo.plan.partition.PartitionPruningHandle; +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; @@ -383,29 +384,21 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster if (broadcastFragments != null) { //In this phase a ScanNode has a single fragment. //If there are more than one data files, that files should be added to fragments or partition path - for (ScanNode eachScan: broadcastScans) { - - Path[] partitionScanPaths = null; TableDesc tableDesc = masterContext.getTableDesc(eachScan); Tablespace space = TablespaceManager.get(tableDesc.getUri()); + Collection scanFragments = null; if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { - - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan; - partitionScanPaths = partitionScan.getInputPaths(); - // set null to inputPaths in getFragmentsFromPartitionedTable() - getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc); - partitionScan.setInputPaths(partitionScanPaths); - + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); + scanFragments = getFragmentsFromPartitionedTable(space, eachScan, tableDesc, catalog, conf); } else { + scanFragments = space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan.getQual()); + } - Collection scanFragments = - space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan.getQual()); - if (scanFragments != null) { - rightFragments.addAll(scanFragments); - } - + if (scanFragments != null) { + rightFragments.addAll(scanFragments); } } } @@ -472,17 +465,20 @@ public static Map>> merge * It creates a number of fragments for all partitions. */ public static List getFragmentsFromPartitionedTable(Tablespace tsHandler, - ScanNode scan, - TableDesc table) throws IOException { + ScanNode scan, TableDesc table, CatalogService catalog, TajoConf conf) throws IOException, TajoException { Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace"); if (!(scan instanceof PartitionedTableScanNode)) { throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type."); } List fragments = Lists.newArrayList(); PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan; - fragments.addAll(((FileTablespace) tsHandler).getSplits( - scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths())); - partitionsScan.setInputPaths(null); + partitionsScan.init(scan); + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + PartitionPruningHandle pruningHandle = rewriter.getPartitionPruningHandle(conf, partitionsScan); + FileTablespace tablespace = (FileTablespace) tsHandler; + fragments.addAll(tablespace.getPartitionSplits(scan.getCanonicalName(), table.getMeta(), table.getSchema() + , pruningHandle)); return fragments; } @@ -504,9 +500,9 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch // Broadcast table // all fragments or paths assigned every Large table's scan task. // -> PARTITIONS_SCAN - // . add all partition paths to node's inputPaths variable + // . add all PartitionFileFragments to broadcastFragments // -> SCAN - // . add all fragments to broadcastFragments + // . add all FileFragments to broadcastFragments Collection baseFragments = null; List broadcastFragments = new ArrayList<>(); for (int i = 0; i < scans.length; i++) { @@ -514,16 +510,12 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch TableDesc desc = stage.getContext().getTableDesc(scan); Collection scanFragments; - Path[] partitionScanPaths = null; - Tablespace space = TablespaceManager.get(desc.getUri()); - if (scan.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scan; - partitionScanPaths = partitionScan.getInputPaths(); - // set null to inputPaths in getFragmentsFromPartitionedTable() - scanFragments = getFragmentsFromPartitionedTable(space, scan, desc); + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); + scanFragments = getFragmentsFromPartitionedTable(space, scan, desc, catalog, conf); } else { scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan.getQual()); } @@ -532,13 +524,7 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch if (i == baseScanId) { baseFragments = scanFragments; } else { - if (scan.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan; - // PhisicalPlanner make PartitionMergeScanExec when table is boradcast table and inputpaths is not empty - partitionScan.setInputPaths(partitionScanPaths); - } else { - broadcastFragments.addAll(scanFragments); - } + broadcastFragments.addAll(scanFragments); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index fed75cd7fe..5cbb203b02 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -28,10 +28,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; import org.apache.tajo.*; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; @@ -52,7 +49,6 @@ import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.plan.logical.*; -import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; @@ -1195,8 +1191,12 @@ private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOExceptio // // Also, we can ensure FileTableSpace if the type of ScanNode is PARTITIONS_SCAN. if (scan.getType() == NodeType.PARTITIONS_SCAN) { + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TajoConf conf = stage.getContext().getQueryContext().getConf(); + // After calling this method, partition paths are removed from the physical plan. - fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table); + fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table, catalog, + conf); } else { fragments = tablespace.getSplits(scan.getCanonicalName(), table, scan.getQual()); } diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java index 7d5c78ac7b..abbfd33cde 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java @@ -144,7 +144,8 @@ private static NonForwardQueryResultScanner getNonForwardQueryResultScanner( queryId, scanNode, Integer.MAX_VALUE, - Optional.empty()); + Optional.empty(), + masterContext.getCatalog()); resultScanner.init(); session.addNonForwardQueryResultScanner(resultScanner); } diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index 7280e1f2fc..5afe67a924 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -146,6 +146,7 @@ run cp -r $ROOT/tajo-sql-parser/target/tajo-sql-parser-${project.version}/* . run cp -r $ROOT/tajo-storage/tajo-storage-jdbc/target/tajo-storage-jdbc-${project.version}.jar . run cp -r $ROOT/tajo-storage/tajo-storage-pgsql/target/tajo-storage-pgsql-${project.version}.jar . + run cp -r $ROOT/tajo-storage/tajo-storage-s3/target/tajo-storage-s3-${project.version}.jar . run cp -r $ROOT/tajo-pullserver/target/tajo-pullserver-${project.version}.jar . run cp -r $ROOT/tajo-metrics/target/tajo-metrics-${project.version}.jar . run cp -r $ROOT/tajo-core/target/tajo-core-${project.version}.jar . @@ -156,7 +157,11 @@ run mkdir -p lib run cp -r $ROOT/tajo-storage/tajo-storage-hdfs/target/lib/hive-*.jar lib/ - + + run mkdir aws_s3 + run cp -r $ROOT/tajo-storage/tajo-storage-s3/target/lib/*.jar aws_s3/ + + run mkdir -p share/jdbc-dist run cp -r $ROOT/tajo-jdbc/target/tajo-jdbc-${project.version}-jar-with-dependencies.jar ./share/jdbc-dist/tajo-jdbc-${project.version}.jar diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo index 8dffe9ac29..30b2f91b75 100755 --- a/tajo-dist/src/main/bin/tajo +++ b/tajo-dist/src/main/bin/tajo @@ -336,6 +336,17 @@ fi # Hive Home Configuration End ############################################################################## +############################################################################## +# Find and Set AWS S3 CLASSPATH +############################################################################## + +AWS_S3_LIB=$TAJO_HOME/aws_s3 + +if [ -d ${AWS_S3_LIB} ]; then + for f in ${AWS_S3_LIB}/*.jar; do + CLASSPATH=${CLASSPATH}:$f; + done +fi ############################################################################## # Find and Set Tajo CLASSPATH diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java index 617688228f..a03b169241 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java @@ -18,125 +18,21 @@ package org.apache.tajo.plan.logical; -import com.google.common.base.Objects; -import com.google.gson.annotations.Expose; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.plan.PlanString; -import org.apache.tajo.plan.Target; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.util.TUtil; - -import java.util.ArrayList; - public class PartitionedTableScanNode extends ScanNode { - @Expose Path [] inputPaths; - public PartitionedTableScanNode(int pid) { super(pid, NodeType.PARTITIONS_SCAN); } - public void init(ScanNode scanNode, Path[] inputPaths) { + public void init(ScanNode scanNode) { tableDesc = scanNode.tableDesc; setInSchema(scanNode.getInSchema()); setOutSchema(scanNode.getOutSchema()); this.qual = scanNode.qual; this.targets = scanNode.targets; - this.inputPaths = inputPaths; if (scanNode.hasAlias()) { alias = scanNode.alias; } } - public void setInputPaths(Path [] paths) { - this.inputPaths = paths; - } - - public Path [] getInputPaths() { - return inputPaths; - } - - @Override - public int hashCode() { - return Objects.hashCode(this.tableDesc, this.qual, this.targets); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PartitionedTableScanNode) { - PartitionedTableScanNode other = (PartitionedTableScanNode) obj; - - boolean eq = super.equals(other); - eq = eq && TUtil.checkEquals(this.tableDesc, other.tableDesc); - eq = eq && TUtil.checkEquals(this.qual, other.qual); - eq = eq && TUtil.checkEquals(this.targets, other.targets); - eq = eq && TUtil.checkEquals(this.inputPaths, other.inputPaths); - - return eq; - } - - return false; - } - - @Override - public Object clone() throws CloneNotSupportedException { - PartitionedTableScanNode unionScan = (PartitionedTableScanNode) super.clone(); - - unionScan.tableDesc = (TableDesc) this.tableDesc.clone(); - - if (hasQual()) { - unionScan.qual = (EvalNode) this.qual.clone(); - } - - if (hasTargets()) { - unionScan.targets = new ArrayList<>(); - for (Target t : targets) { - unionScan.targets.add((Target) t.clone()); - } - } - - unionScan.inputPaths = inputPaths; - - return unionScan; - } - - @Override - public void preOrder(LogicalNodeVisitor visitor) { - visitor.visit(this); - } - - public void postOrder(LogicalNodeVisitor visitor) { - visitor.visit(this); - } - - @Override - public PlanString getPlanString() { - PlanString planStr = new PlanString(this).appendTitle(" on " + getTableName()); - if (hasAlias()) { - planStr.appendTitle(" as ").appendTitle(alias); - } - - if (hasQual()) { - planStr.addExplan("filter: ").appendExplain(this.qual.toString()); - } - - if (hasTargets()) { - planStr.addExplan("target list: ").appendExplain(StringUtils.join(targets, ", ")); - } - - planStr.addDetail("out schema: ").appendDetail(getOutSchema().toString()); - planStr.addDetail("in schema: ").appendDetail(getInSchema().toString()); - - if (inputPaths != null) { - planStr.addExplan("num of filtered paths: ").appendExplain(""+ inputPaths.length); - int i = 0; - for (Path path : inputPaths) { - planStr.addDetail((i++) + ": ").appendDetail(path.toString()); - } - } - - return planStr; - } } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/partition/PartitionPruningHandle.java b/tajo-plan/src/main/java/org/apache/tajo/plan/partition/PartitionPruningHandle.java new file mode 100644 index 0000000000..bed50a54cb --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/partition/PartitionPruningHandle.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.partition; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.plan.expr.EvalNode; + +import java.util.Map; + +/** + * This includes result informs of partition pruning. + * + */ +public class PartitionPruningHandle { + private Path[] partitionPaths; + private String[] partitionKeys; + private long totalVolume; + private EvalNode[] conjunctiveForms; + + public PartitionPruningHandle(Path[] partitionPaths, String[] partitionKeys, long totalVolume) { + this.partitionPaths = partitionPaths; + this.partitionKeys = partitionKeys; + this.totalVolume = totalVolume; + } + + public Path[] getPartitionPaths() { + return partitionPaths; + } + + public String[] getPartitionKeys() { + return partitionKeys; + } + + public long getTotalVolume() { + return totalVolume; + } + + public boolean hasConjunctiveForms() { + return this.conjunctiveForms != null; + } + + public void setConjunctiveForms(EvalNode[] conjunctiveForms) { + this.conjunctiveForms = conjunctiveForms; + } +} \ No newline at end of file diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 32e41d3934..fb67c5bfa2 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -18,26 +18,27 @@ package org.apache.tajo.plan.rewrite.rules; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.*; import org.apache.tajo.OverridableConf; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; -import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionsByAlgebraProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.exception.*; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.partition.PartitionPruningHandle; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.util.EvalNodeToExprConverter; @@ -55,7 +56,6 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { private CatalogService catalog; - private long totalVolume; private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class); @@ -91,6 +91,10 @@ public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoExc return plan; } + public void setCatalog(CatalogService catalog) { + this.catalog = catalog; + } + private static class PartitionPathFilter implements PathFilter { private Schema schema; @@ -123,11 +127,11 @@ public String toString() { } } - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + private PartitionPruningHandle getPartitionPruningHandle(TajoConf conf, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { - return findFilteredPaths(queryContext, tableName, partitionColumns, conjunctiveForms, tablePath, null); + return getPartitionPruningHandle(conf, tableName, partitionColumns, conjunctiveForms, tablePath, null); } /** @@ -140,13 +144,13 @@ public String toString() { * @return * @throws IOException */ - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + private PartitionPruningHandle getPartitionPruningHandle(TajoConf conf, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath, ScanNode scanNode) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { - Path [] filteredPaths = null; - FileSystem fs = tablePath.getFileSystem(queryContext.getConf()); + PartitionPruningHandle partitionPruningHandle = null; + FileSystem fs = tablePath.getFileSystem(conf); String [] splits = CatalogUtil.splitFQTableName(tableName); List partitions = null; @@ -154,17 +158,19 @@ public String toString() { if (conjunctiveForms == null) { partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + partitionPruningHandle = getPartitionPruningHandleByFileSystem(partitionColumns, conjunctiveForms, fs, + tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + partitionPruningHandle = getPartitionPruningHandleByCatalog(partitions); } } else { if (catalog.existPartitions(splits[0], splits[1])) { PartitionsByAlgebraProto request = getPartitionsAlgebraProto(splits[0], splits[1], conjunctiveForms); partitions = catalog.getPartitionsByAlgebra(request); - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + partitionPruningHandle = getPartitionPruningHandleByCatalog(partitions); } else { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + partitionPruningHandle = getPartitionPruningHandleByFileSystem(partitionColumns, conjunctiveForms, fs, + tablePath); } } } catch (UnsupportedException ue) { @@ -173,31 +179,43 @@ public String toString() { LOG.warn(ue.getMessage()); partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + partitionPruningHandle = getPartitionPruningHandleByFileSystem(partitionColumns, conjunctiveForms, fs, + tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + partitionPruningHandle = getPartitionPruningHandleByCatalog(partitions); } scanNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(conjunctiveForms)); } - LOG.info("Filtered directory or files: " + filteredPaths.length); - return filteredPaths; + if (conjunctiveForms != null) { + partitionPruningHandle.setConjunctiveForms(conjunctiveForms); + } else { + partitionPruningHandle.setConjunctiveForms(conjunctiveForms); + } + + LOG.info("Filtered directory or files: " + partitionPruningHandle.getPartitionPaths().length); + LOG.info("Filtered partition keys: " + partitionPruningHandle.getPartitionKeys().length); + + return partitionPruningHandle; } /** - * Build list of partition path by PartitionDescProto which is generated from CatalogStore. + * Build list of partition path and partition keys by PartitionDescProto which is generated from CatalogStore. * * @param partitions * @return */ - private Path[] findFilteredPathsByPartitionDesc(List partitions) { - Path [] filteredPaths = new Path[partitions.size()]; + private PartitionPruningHandle getPartitionPruningHandleByCatalog(List partitions) { + long totalVolume = 0L; + Path[] filteredPaths = new Path[partitions.size()]; + String[] partitionKeys = new String[partitions.size()]; for (int i = 0; i < partitions.size(); i++) { - PartitionDescProto partition = partitions.get(i); + CatalogProtos.PartitionDescProto partition = partitions.get(i); filteredPaths[i] = new Path(partition.getPath()); + partitionKeys[i] = partition.getPartitionName(); totalVolume += partition.getNumBytes(); } - return filteredPaths; + return new PartitionPruningHandle(filteredPaths, partitionKeys, totalVolume); } /** @@ -211,10 +229,14 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti * @return * @throws IOException */ - private Path [] findFilteredPathsFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, - FileSystem fs, Path tablePath) throws IOException{ + private PartitionPruningHandle getPartitionPruningHandleByFileSystem(Schema partitionColumns, + EvalNode [] conjunctiveForms, FileSystem fs, Path tablePath) throws IOException{ Path [] filteredPaths = null; PathFilter [] filters; + int startIdx; + long totalVolume = 0L; + ContentSummary summary = null; + String[] partitionKeys = null; if (conjunctiveForms == null) { filters = buildAllAcceptingPathFilters(partitionColumns); @@ -229,7 +251,18 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti // Get all file status matched to a ith level path filter. filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i])); } - return filteredPaths; + + // Get partition keys and volume from the list of partition directories + partitionKeys = new String[filteredPaths.length]; + for (int i = 0; i < partitionKeys.length; i++) { + Path path = filteredPaths[i]; + startIdx = path.toString().indexOf(getColumnPartitionPathPrefix(partitionColumns)); + partitionKeys[i] = path.toString().substring(startIdx); + summary = fs.getContentSummary(path); + totalVolume += summary.getLength(); + } + + return new PartitionPruningHandle(filteredPaths, partitionKeys, totalVolume); } /** @@ -329,14 +362,16 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( for (int i = 0; i < fileStatuses.length; i++) { FileStatus fileStatus = fileStatuses[i]; paths[i] = fileStatus.getPath(); - totalVolume += fileStatus.getLen(); } return paths; } - public Path [] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException, - UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + public PartitionPruningHandle getPartitionPruningHandle(TajoConf conf, ScanNode scanNode) + throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { + long startTime = System.currentTimeMillis(); + PartitionPruningHandle pruningHandle = null; + TableDesc table = scanNode.getTableDesc(); PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod(); @@ -374,11 +409,18 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( } if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, + pruningHandle = getPartitionPruningHandle(conf, table.getName(), paritionValuesSchema, indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getUri()), scanNode); } else { // otherwise, we will get all partition paths. - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, null, new Path(table.getUri())); + pruningHandle = getPartitionPruningHandle(conf, table.getName(), paritionValuesSchema, null, + new Path(table.getUri())); } + + long finishTime = System.currentTimeMillis(); + long elapsedMills = finishTime - startTime; + + LOG.info(String.format("Partition pruning: %d ms elapsed.", elapsedMills)); + return pruningHandle; } private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) { @@ -495,6 +537,49 @@ public static String getColumnPartitionPathPrefix(Schema partitionColumn) { return sb.toString(); } + /** + * This transforms a partition name into a tupe with a given partition column schema. When a file path + * Assume that an user gives partition name 'country=KOREA/city=SEOUL'. + * + * The first datum of tuple : KOREA + * The second datum of tuple : SEOUL + * + * @param partitionColumnSchema The partition column schema + * @param partitionKeys The keys of partition + * @return The tuple transformed from a column values part. + */ + public static Tuple buildTupleFromPartitionKeys(Schema partitionColumnSchema, String partitionKeys) { + Preconditions.checkNotNull(partitionColumnSchema); + Preconditions.checkNotNull(partitionKeys); + + String [] columnValues = partitionKeys.split("/"); + Preconditions.checkArgument(partitionColumnSchema.size() >= columnValues.length, + "Invalid Partition Keys :" + partitionKeys); + + Tuple tuple = new VTuple(partitionColumnSchema.size()); + + for (int i = 0; i < tuple.size(); i++) { + tuple.put(i, NullDatum.get()); + } + + for (int i = 0; i < columnValues.length; i++) { + String [] parts = columnValues[i].split("="); + if (parts.length == 2) { + int columnId = partitionColumnSchema.getColumnIdByName(parts[0]); + Column keyColumn = partitionColumnSchema.getColumn(columnId); + + if (parts[1].equals(StorageConstants.DEFAULT_PARTITION_NAME)){ + tuple.put(columnId, DatumFactory.createNullDatum()); + } else { + tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), + StringUtils.unescapePathName(parts[1]))); + } + } + } + + return tuple; + } + private final class Rewriter extends BasicLogicalPlanVisitor { @Override public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalPlan.QueryBlock block, @@ -505,24 +590,19 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP return null; } - try { - Path [] filteredPaths = findFilteredPartitionPaths(queryContext, scanNode); - plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions"); - PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); - rewrittenScanNode.init(scanNode, filteredPaths); - rewrittenScanNode.getTableDesc().getStats().setNumBytes(totalVolume); + PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); + rewrittenScanNode.init(scanNode); - // if it is topmost node, set it as the rootnode of this block. - if (stack.empty() || block.getRoot().equals(scanNode)) { - block.setRoot(rewrittenScanNode); - } else { - PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode); - } - block.registerNode(rewrittenScanNode); - } catch (IOException e) { - throw new TajoInternalError("Partitioned Table Rewrite Failed: \n" + e.getMessage()); + // if it is topmost node, set it as the rootnode of this block. + if (stack.empty() || block.getRoot().equals(scanNode)) { + block.setRoot(rewrittenScanNode); + } else { + PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode); } + + block.registerNode(rewrittenScanNode); return null; } } + } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index 4b47e4ac87..ff18a16fbb 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -20,7 +20,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.hadoop.fs.Path; import org.apache.tajo.OverridableConf; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.annotation.Nullable; @@ -116,7 +115,7 @@ public int compare(PlanProto.LogicalNode o1, PlanProto.LogicalNode o2) { current = convertUnion(nodeMap, protoNode); break; case PARTITIONS_SCAN: - current = convertPartitionScan(context, evalContext, protoNode); + current = convertPartitionedTableScan(context, evalContext, protoNode); break; case SCAN: current = convertScan(context, evalContext, protoNode); @@ -410,6 +409,13 @@ private static ScanNode convertScan(OverridableConf context, EvalContext evalCon return scan; } + private static PartitionedTableScanNode convertPartitionedTableScan(OverridableConf context, EvalContext evalContext, + PlanProto.LogicalNode protoNode) { + PartitionedTableScanNode partitionedTableScan = new PartitionedTableScanNode(protoNode.getNodeId()); + fillScanNode(context, evalContext, protoNode, partitionedTableScan); + return partitionedTableScan; + } + private static void fillScanNode(OverridableConf context, EvalContext evalContext, PlanProto.LogicalNode protoNode, ScanNode scan) { PlanProto.ScanNode scanProto = protoNode.getScan(); @@ -452,20 +458,6 @@ private static IndexScanNode convertIndexScan(OverridableConf context, EvalConte return indexScan; } - private static PartitionedTableScanNode convertPartitionScan(OverridableConf context, EvalContext evalContext, - PlanProto.LogicalNode protoNode) { - PartitionedTableScanNode partitionedScan = new PartitionedTableScanNode(protoNode.getNodeId()); - fillScanNode(context, evalContext, protoNode, partitionedScan); - - PlanProto.PartitionScanSpec partitionScanProto = protoNode.getPartitionScan(); - Path [] paths = new Path[partitionScanProto.getPathsCount()]; - for (int i = 0; i < partitionScanProto.getPathsCount(); i++) { - paths[i] = new Path(partitionScanProto.getPaths(i)); - } - partitionedScan.setInputPaths(paths); - return partitionedScan; - } - private static TableSubQueryNode convertTableSubQuery(OverridableConf context, EvalContext evalContext, Map nodeMap, PlanProto.LogicalNode protoNode) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index e7247688f8..677fc8d7ff 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -19,7 +19,6 @@ package org.apache.tajo.plan.serder; import com.google.common.collect.Maps; -import org.apache.hadoop.fs.Path; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -41,8 +40,6 @@ import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.util.ProtoUtil; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Stack; @@ -476,21 +473,9 @@ public LogicalNode visitPartitionedTableScan(SerializeContext context, LogicalPl throws TajoException { PlanProto.ScanNode.Builder scanBuilder = buildScanNode(node); - - PlanProto.PartitionScanSpec.Builder partitionScan = PlanProto.PartitionScanSpec.newBuilder(); - List pathStrs = new ArrayList<>(); - if (node.getInputPaths() != null) { - for (Path p : node.getInputPaths()) { - pathStrs.add(p.toString()); - } - partitionScan.addAllPaths(pathStrs); - } - PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); nodeBuilder.setScan(scanBuilder); - nodeBuilder.setPartitionScan(partitionScan); context.treeBuilder.addNodes(nodeBuilder); - return node; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java index 9ded584815..d576379cc6 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java @@ -134,13 +134,6 @@ private static boolean isSimpleRelationNode(LogicalNode node) { private static long getTableVolume(ScanNode scanNode) { if (scanNode.getTableDesc().hasStats()) { long scanBytes = scanNode.getTableDesc().getStats().getNumBytes(); - if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode; - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - scanBytes = 0L; - } - } - return scanBytes; } else { return -1; diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index c50429f613..0465e89d9d 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -73,36 +73,35 @@ message LogicalNode { optional SchemaProto out_schema = 5; optional ScanNode scan = 6; - optional PartitionScanSpec partitionScan = 7; - optional IndexScanSpec indexScan = 8; - optional JoinNode join = 9; - optional FilterNode filter = 10; - optional GroupbyNode groupby = 11; - optional DistinctGroupbyNode distinctGroupby = 12; - optional SortNode sort = 13; - optional LimitNode limit = 14; - optional WindowAggNode windowAgg = 15; - optional ProjectionNode projection = 16; - optional EvalExprNode exprEval = 17; - optional UnionNode union = 18; - optional TableSubQueryNode tableSubQuery = 19; - optional PersistentStoreNode persistentStore = 20; - optional StoreTableNodeSpec storeTable = 21; - optional InsertNodeSpec insert = 22; - optional CreateTableNodeSpec createTable = 23; - optional RootNode root = 24; - optional SetSessionNode setSession = 25; - - optional CreateDatabaseNode createDatabase = 26; - optional DropDatabaseNode dropDatabase = 27; - optional DropTableNode dropTable = 28; - - optional AlterTablespaceNode alterTablespace = 29; - optional AlterTableNode alterTable = 30; - optional TruncateTableNode truncateTableNode = 31; - - optional CreateIndexNode createIndex = 32; - optional DropIndexNode dropIndex = 33; + optional IndexScanSpec indexScan = 7; + optional JoinNode join = 8; + optional FilterNode filter = 9; + optional GroupbyNode groupby = 10; + optional DistinctGroupbyNode distinctGroupby = 11; + optional SortNode sort = 12; + optional LimitNode limit = 13; + optional WindowAggNode windowAgg = 14; + optional ProjectionNode projection = 15; + optional EvalExprNode exprEval = 16; + optional UnionNode union = 17; + optional TableSubQueryNode tableSubQuery = 18; + optional PersistentStoreNode persistentStore = 19; + optional StoreTableNodeSpec storeTable = 20; + optional InsertNodeSpec insert = 21; + optional CreateTableNodeSpec createTable = 22; + optional RootNode root = 23; + optional SetSessionNode setSession = 24; + + optional CreateDatabaseNode createDatabase = 25; + optional DropDatabaseNode dropDatabase = 26; + optional DropTableNode dropTable = 27; + + optional AlterTablespaceNode alterTablespace = 28; + optional AlterTableNode alterTable = 29; + optional TruncateTableNode truncateTableNode = 30; + + optional CreateIndexNode createIndex = 31; + optional DropIndexNode dropIndex = 32; } message ScanNode { @@ -115,10 +114,6 @@ message ScanNode { required bool nameResolveBase = 7; } -message PartitionScanSpec { - repeated string paths = 1; -} - message IndexScanSpec { required SchemaProto keySchema = 1; required string indexPath = 2; @@ -231,10 +226,6 @@ enum JoinType { RIGHT_SEMI_JOIN = 9; } -message PartitionTableScanSpec { - repeated string paths = 1; -} - message PersistentStoreNode { optional int32 childSeq = 1; // CreateTableNode may not have any children. This should be improved at TAJO-1589. required string storageType = 2; diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 00e6d75a12..a917d37ed3 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -101,6 +101,8 @@ public String toString() { public abstract long getTableVolume(TableDesc table, Optional filter) throws UnsupportedException; + public abstract long calculateSize(Path path) throws IOException; + /** * if {@link StorageProperty#isArbitraryPathAllowed} is true, * the storage allows arbitrary path accesses. In this case, the storage must provide the root URI. diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 132ceff0ae..e8f77b4dac 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -1118,4 +1118,10 @@ public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws Ta } } } + + @Override + public long calculateSize(Path path) throws IOException { + FileSystem fs = path.getFileSystem(conf); + return fs.getContentSummary(path).getLength(); + } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index ad4574909d..0597eb9312 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -40,8 +40,10 @@ import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.partition.PartitionPruningHandle; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.PartitionFileFragment; import org.apache.tajo.util.Bytes; import javax.annotation.Nullable; @@ -127,13 +129,13 @@ protected void storageInit() throws IOException { @Override public long getTableVolume(TableDesc table, Optional filter) throws UnsupportedException { Path path = new Path(table.getUri()); - ContentSummary summary; + long totalVolume = 0L; try { - summary = fs.getContentSummary(path); + totalVolume = calculateSize(path); } catch (IOException e) { throw new TajoInternalError(e); } - return summary.getLength(); + return totalVolume; } @Override @@ -245,6 +247,7 @@ public static FileFragment[] splitNG(Configuration conf, String tableName, Table return tablets; } + @Override public long calculateSize(Path tablePath) throws IOException { FileSystem fs = tablePath.getFileSystem(conf); long totalSize = 0; @@ -404,33 +407,7 @@ protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blo // for Non Splittable. eg, compressed gzip TextFile protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, BlockLocation[] blkLocations) throws IOException { - - Map hostsBlockMap = new HashMap<>(); - for (BlockLocation blockLocation : blkLocations) { - for (String host : blockLocation.getHosts()) { - if (hostsBlockMap.containsKey(host)) { - hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); - } else { - hostsBlockMap.put(host, 1); - } - } - } - - List> entries = new ArrayList<>(hostsBlockMap.entrySet()); - Collections.sort(entries, new Comparator>() { - - @Override - public int compare(Map.Entry v1, Map.Entry v2) { - return v1.getValue().compareTo(v2.getValue()); - } - }); - - String[] hosts = new String[blkLocations[0].getHosts().length]; - - for (int i = 0; i < hosts.length; i++) { - Map.Entry entry = entries.get((entries.size() - 1) - i); - hosts[i] = entry.getKey(); - } + String[] hosts = getHosts(blkLocations); return new FileFragment(fragmentId, file, start, length, hosts); } @@ -546,6 +523,221 @@ public List getSplits(String tableName, TableMeta meta, Schema schema, return splits; } + /** + * Get the list of hosts (hostname) hosting specified blocks + * + * + * @param blkLocations locations of blocks + * @return the list of hosts + * @throws IOException + */ + private String[] getHosts(BlockLocation[] blkLocations) throws IOException { + Map hostsBlockMap = new HashMap<>(); + for (BlockLocation blockLocation : blkLocations) { + for (String host : blockLocation.getHosts()) { + if (hostsBlockMap.containsKey(host)) { + hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); + } else { + hostsBlockMap.put(host, 1); + } + } + } + + List> entries = new ArrayList<>(hostsBlockMap.entrySet()); + Collections.sort(entries, (Map.Entry v1, Map.Entry v2) + -> v1.getValue().compareTo(v2.getValue())); + + String[] hosts = new String[blkLocations[0].getHosts().length]; + + for (int i = 0; i < hosts.length; i++) { + Map.Entry entry = entries.get((entries.size() - 1) - i); + hosts[i] = entry.getKey(); + } + + return hosts; + } + + //////////////////////////////////////////////////////////////////////////////// + // The below code is for splitting partitioned table. + //////////////////////////////////////////////////////////////////////////////// + + + /** + * Is the given filename splitable? Usually, true, but if the file is + * stream compressed, it will not be. + *

+ * FileInputFormat implementations can override this and return + * false to ensure that individual input files are never split-up + * so that Mappers process entire files. + * + * + * @param meta the metadata of target table + * @param schema the schema of target table + * @param path the file name to check + * @param partitionKeys keys of target partition + * @param status get the file length + * @return is this file isSplittable? + * @throws IOException + */ + protected boolean isSplittablePartitionFragment(TableMeta meta, Schema schema, Path path, String partitionKeys, + FileStatus status) throws IOException { + Fragment fragment = new PartitionFileFragment(path.getName(), path, 0, status.getLen(), partitionKeys); + Scanner scanner = getScanner(meta, schema, fragment, null); + boolean split = scanner.isSplittable(); + scanner.close(); + return split; + } + + /** + * Build a fragment for partition table + * + * @param fragmentId fragment id + * @param file file path + * @param start offset + * @param length length + * @param hosts the list of hosts (names) hosting blocks + * @param partitionKeys partition keys + * @return PartitionFileFragment + */ + protected PartitionFileFragment getSplittablePartitionFragment(String fragmentId, Path file, long start, long length, + String[] hosts, String partitionKeys) { + return new PartitionFileFragment(fragmentId, file, start, length, hosts, partitionKeys); + } + + /** + * Build a fragment for partition table + * + * @param fragmentId fragment id + * @param file file path + * @param blockLocation location of block + * @param partitionKeys partition keys + * @return PartitionFileFragment + * @throws IOException + */ + protected PartitionFileFragment getSplittablePartitionFragment(String fragmentId, Path file, BlockLocation blockLocation + , String partitionKeys) throws IOException { + return new PartitionFileFragment(fragmentId, file, blockLocation, partitionKeys); + } + + /** + * Build a fragment for non splittable partition table + * + * @param fragmentId fragment id + * @param file file path + * @param start offset + * @param length length + * @param blkLocations locations of blocks + * @param partitionKeys partition keys + * @return PartitionFileFragment + * @throws IOException + */ + protected Fragment getNonSplittablePartitionFragment(String fragmentId, Path file, long start, long length, + BlockLocation[] blkLocations, String partitionKeys) throws IOException { + String[] hosts = getHosts(blkLocations); + return new PartitionFileFragment(fragmentId, file, start, length, hosts, partitionKeys); + } + + /** + * Build the list of fragments for partition table + * + * @param tableName table name + * @param meta all meta information for scanning a fragmented table + * @param schema table schema + * @return the list of PartitionFileFragment + * @throws IOException + */ + public List getPartitionSplits(String tableName, TableMeta meta, Schema schema + , PartitionPruningHandle pruningHandle) throws IOException { + long startTime = System.currentTimeMillis(); + + // generate splits' + List splits = Lists.newArrayList(); + List volumeSplits = Lists.newArrayList(); + List blockLocations = Lists.newArrayList(); + + int i = 0; + Path[] inputs = pruningHandle.getPartitionPaths(); + String[] partitionKeys = pruningHandle.getPartitionKeys(); + for (Path p : inputs) { + ArrayList files = Lists.newArrayList(); + if (fs.isFile(p)) { + files.addAll(Lists.newArrayList(fs.getFileStatus(p))); + } else { + files.addAll(listStatus(p)); + } + + for (FileStatus file : files) { + Path path = file.getPath(); + long length = file.getLen(); + if (length > 0) { + // Get locations of blocks of file + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + boolean splittable = isSplittablePartitionFragment(meta, schema, path, partitionKeys[i], file); + if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { + + if (splittable) { + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(getSplittablePartitionFragment(tableName, path, blockLocation, partitionKeys[i])); + } + blockLocations.addAll(Arrays.asList(blkLocations)); + + } else { // Non splittable + long blockSize = blkLocations[0].getLength(); + if (blockSize >= length) { + blockLocations.addAll(Arrays.asList(blkLocations)); + for (BlockLocation blockLocation : blkLocations) { + volumeSplits.add(getSplittablePartitionFragment(tableName, path, blockLocation, partitionKeys[i])); + } + } else { + splits.add(getNonSplittablePartitionFragment(tableName, path, 0, length, blkLocations, + partitionKeys[i])); + } + } + + } else { + if (splittable) { + + long minSize = Math.max(getMinSplitSize(), 1); + + long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one + long splitSize = Math.max(minSize, blockSize); + long bytesRemaining = length; + + // for s3 + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(getSplittablePartitionFragment(tableName, path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts(), partitionKeys[i])); + bytesRemaining -= splitSize; + } + if (bytesRemaining > 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(getSplittablePartitionFragment(tableName, path, length - bytesRemaining, bytesRemaining, + blkLocations[blkIndex].getHosts(), partitionKeys[i])); + } + } else { // Non splittable + splits.add(getNonSplittablePartitionFragment(tableName, path, 0, length, blkLocations, partitionKeys[i])); + } + } + } + } + if (LOG.isDebugEnabled()){ + LOG.debug("# of average splits per partition: " + splits.size() / (i+1)); + } + i++; + } + + // Combine original fileFragments with new VolumeId information + setVolumeMeta(volumeSplits, blockLocations); + splits.addAll(volumeSplits); + LOG.info("Total # of splits: " + splits.size()); + + long finishTime = System.currentTimeMillis(); + long elapsedMills = finishTime - startTime; + LOG.info(String.format("Split for partition table :%d ms elapsed.", elapsedMills)); + return splits; + } + private void setVolumeMeta(List splits, final List blockLocations) throws IOException { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java index 8f18c7a0f0..c584aee72e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java @@ -106,6 +106,10 @@ public String[] getHosts() { return hosts; } + public void setHosts(String[] hosts) { + this.hosts = hosts; + } + /** * Get the list of Disk Ids * Unknown disk is -1. Others 0 ~ N @@ -127,6 +131,10 @@ public String getTableName() { return this.tableName; } + public void setTableName(String tableName) { + this.tableName = tableName; + } + public Path getPath() { return this.uri; } @@ -149,6 +157,10 @@ public long getLength() { return this.length; } + public void setLength(long length) { + this.length = length; + } + @Override public boolean isEmpty() { return this.length <= 0; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/PartitionFileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/PartitionFileFragment.java new file mode 100644 index 0000000000..3a20fb5264 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/PartitionFileFragment.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import com.google.common.base.Objects; +import com.google.gson.annotations.Expose; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.BuiltinStorages; +import org.apache.tajo.storage.StorageFragmentProtos.PartitionFileFragmentProto; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + +public class PartitionFileFragment extends FileFragment implements Cloneable { + + @Expose private String partitionKeys; // required + + public PartitionFileFragment(ByteString raw) throws InvalidProtocolBufferException { + super(raw); + PartitionFileFragmentProto.Builder builder = PartitionFileFragmentProto.newBuilder(); + builder.mergeFrom(raw); + this.partitionKeys = builder.build().getPartitionKeys(); + } + + public PartitionFileFragment(String tableName, Path uri, BlockLocation blockLocation, + String partitionKeys) throws IOException { + super(tableName, uri, blockLocation); + this.partitionKeys = partitionKeys; + } + + public PartitionFileFragment(String tableName, Path uri, long start, long length, String[] hosts, + String partitionKeys) { + super(tableName, uri, start, length, hosts); + this.partitionKeys = partitionKeys; + } + + public PartitionFileFragment(String fragmentId, Path path, long start, long length, String partitionKeys) { + super(fragmentId, path, start, length); + this.partitionKeys = partitionKeys; + } + + public String getPartitionKeys() { + return partitionKeys; + } + + public void setPartitionKeys(String partitionKeys) { + this.partitionKeys = partitionKeys; + } + + @Override + public int hashCode() { + return Objects.hashCode(getTableName(), getPath(), getStartKey(), getLength(), getPartitionKeys()); + } + + @Override + public Object clone() throws CloneNotSupportedException { + PartitionFileFragment frag = (PartitionFileFragment) super.clone(); + frag.setTableName(getTableName()); + frag.setPath(getPath()); + frag.setDiskIds(getDiskIds()); + frag.setHosts(getHosts()); + frag.setPartitionKeys(getPartitionKeys()); + + return frag; + } + + @Override + public String toString() { + return "\"fragment\": {\"id\": \""+ getTableName() +"\", \"path\": " + +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": " + + getLength() + "\", \"partitionKeys\":" + getPartitionKeys() + "}" ; + } + + @Override + public FragmentProto getProto() { + PartitionFileFragmentProto.Builder builder = PartitionFileFragmentProto.newBuilder(); + builder.setId(getTableName()); + builder.setStartOffset(this.startOffset); + builder.setLength(this.length); + builder.setPath(getPath().toString()); + if(getDiskIds() != null) { + List idList = new ArrayList<>(); + for(int eachId: getDiskIds()) { + idList.add(eachId); + } + builder.addAllDiskIds(idList); + } + + if (getHosts() != null) { + builder.addAllHosts(Arrays.asList(getHosts())); + } + + if (partitionKeys != null) { + builder.setPartitionKeys(this.partitionKeys); + } + + FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); + fragmentBuilder.setId(getTableName()); + fragmentBuilder.setDataFormat(BuiltinStorages.TEXT); + fragmentBuilder.setContents(builder.buildPartial().toByteString()); + return fragmentBuilder.build(); + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto index 0579f05aa9..d9869f071d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto +++ b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto @@ -32,3 +32,14 @@ message FileFragmentProto { repeated string hosts = 5; repeated int32 disk_ids = 6; } + +message PartitionFileFragmentProto { + required string id = 1; + required string path = 2; + required int64 start_offset = 3; + required int64 length = 4; + repeated string hosts = 5; + repeated int32 disk_ids = 6; + // Partition Keys: country=KOREA/city=SEOUL + required string partitionKeys = 7; +} diff --git a/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java index 8386f74995..2bfc641fb3 100644 --- a/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java +++ b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java @@ -19,6 +19,8 @@ package org.apache.tajo.storage.pgsql; import net.minidev.json.JSONObject; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.*; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.TajoRuntimeException; @@ -95,4 +97,11 @@ public long getTableVolume(TableDesc table, Optional filter) { throw new TajoInternalError(e); } } + + @Override + public long calculateSize(Path path) throws IOException { + FileSystem fs = path.getFileSystem(conf); + return fs.getContentSummary(path).getLength(); + } + } diff --git a/tajo-storage/tajo-storage-s3/pom.xml b/tajo-storage/tajo-storage-s3/pom.xml index a9a541aed1..9a78814301 100644 --- a/tajo-storage/tajo-storage-s3/pom.xml +++ b/tajo-storage/tajo-storage-s3/pom.xml @@ -34,6 +34,11 @@ UTF-8 UTF-8 + 0.122 + 1.7.4 + 2.8.2 + 11.0.2 + 2.2.3 @@ -97,6 +102,27 @@ true + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + runtime + ${project.build.directory}/lib + false + false + true + + + + @@ -105,16 +131,182 @@ org.apache.tajo tajo-common provided + + + org.apache.hadoop + hadoop-common + + + com.google.protobuf + protobuf-java + + + commons-logging + commons-logging + + + commons-logging + commons-logging-api + + + commons-lang + commons-lang + + + com.google.guava + guava + + + com.google.code.gson + gson + + + io.netty + netty-buffer + + + org.iq80.snappy + snappy + + org.apache.tajo tajo-storage-common provided + + + org.apache.tajo + tajo-common + + + org.apache.tajo + tajo-catalog-common + + + org.apache.tajo + tajo-plan + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + com.google.protobuf + protobuf-java + + + io.netty + netty-buffer + + org.apache.tajo tajo-storage-hdfs provided + + + io.netty + netty-transport + + + io.netty + netty-codec + + + io.netty + netty-codec-http + + + org.apache.tajo + tajo-common + + + org.apache.tajo + tajo-catalog-common + + + org.apache.tajo + tajo-plan + + + org.apache.tajo + tajo-storage-common + + + org.apache.avro + trevni-core + + + org.apache.avro + trevni-avro + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + com.google.protobuf + protobuf-java + + + com.twitter + parquet-column + + + com.twitter + parquet-hadoop + + + com.twitter + parquet-format + + + io.netty + netty-buffer + + + com.facebook.presto + presto-orc + + + + + + org.apache.tajo + tajo-catalog-common + provided + + + org.apache.tajo + tajo-common + + + org.apache.hadoop + hadoop-common + + + com.google.protobuf + protobuf-java + + @@ -122,56 +314,320 @@ hadoop-common provided + + com.google.code.gson + gson + + + com.jcraft + jsch + + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + + + org.apache.htrace + htrace-core + zookeeper org.apache.zookeeper - slf4j-api - org.slf4j + org.apache.commons + commons-compress - jersey-json - com.sun.jersey + org.apache.ant + ant + + + org.apache.commons + commons-compress + org.apache.hadoop hadoop-hdfs provided - commons-el - commons-el + org.apache.hadoop + hadoop-annotations + + + org.apache.hadoop + hadoop-auth - tomcat - jasper-runtime + org.apache.hadoop + hadoop-common - tomcat - jasper-compiler + com.google.guava + guava org.mortbay.jetty - jsp-2.1-jetty + jetty - com.sun.jersey.jersey-test-framework - jersey-test-framework-grizzly2 + org.mortbay.jetty + jetty-util + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-server + + + commons-cli + commons-cli + + + commons-codec + commons-codec + + + commons-io + commons-io + + + commons-lang + commons-lang + + + commons-logging + commons-logging + + + commons-daemon + commons-daemon + + + log4j + log4j + + + com.google.protobuf + protobuf-java + + + javax.servlet + servlet-api + + + org.slf4j + slf4j-log4j12 + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + xmlenc + xmlenc - netty-all io.netty + netty-all + + + xerces + xercesImpl + + + org.apache.htrace + htrace-core + + + org.fusesource.leveldbjni + leveldbjni-all + + + org.apache.tajo + tajo-plan + provided + + + + com.fasterxml.jackson.core + jackson-databind + test + ${jackson2.version} + + + com.fasterxml.jackson.core + jackson-annotations + test + ${jackson2.version} + + junit junit test + + + io.airlift + units + ${airlft.version} + + + javax.validation + validation-api + + + com.google.guava + guava + + + com.fasterxml.jackson.core + jackson-annotations + + + + + + org.weakref + jmxutils + 1.18 + + + + com.amazonaws + aws-java-sdk + ${aws-java-sdk.version} + + + commons-logging + commons-logging + + + org.apache.httpcomponents + httpclient + + + joda-time + joda-time + + + javax.mail + mail + + + org.freemarker + freemarker + + + org.springframework + spring-beans + + + org.springframework + spring-core + + + org.springframework + spring-context + + + org.springframework + spring-test + + + org.aspectj + aspectjrt + + + junit + junit + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + + + + org.apache.httpcomponents + httpclient + 4.5 + + + org.apache.httpcomponents + httpcore + + + commons-logging + commons-logging + + + commons-codec + commons-codec + + + + + + org.apache.httpcomponents + httpcore + 4.4.1 + + + junit + junit + + + + + + org.testng + testng + 6.9.6 + test + + + + joda-time + joda-time + ${joda-time.version} + compile + + + + javax.validation + validation-api + 1.1.0.Final + + diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java index 4bcdb60a68..15f1edefbc 100644 --- a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java @@ -18,14 +18,434 @@ package org.apache.tajo.storage.s3; +import java.io.IOException; import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.iterable.S3Objects; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.catalog.*; +import org.apache.tajo.plan.partition.PartitionPruningHandle; import org.apache.tajo.storage.FileTablespace; import net.minidev.json.JSONObject; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.FileUtil; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.nullToEmpty; + +import static io.airlift.units.DataSize.Unit.MEGABYTE; public class S3TableSpace extends FileTablespace { + private final static Log LOG = LogFactory.getLog(S3TableSpace.class); + + private AmazonS3 s3; + private boolean useInstanceCredentials; + public static final String ENDPOINT = "fs.s3a.endpoint"; + private static final DataSize BLOCK_SIZE = new DataSize(32, MEGABYTE); + protected static final double SPLIT_SLOP = 1.1; // 10% slop + public S3TableSpace(String spaceName, URI uri, JSONObject config) { super(spaceName, uri, config); } + + @Override + public void init(TajoConf tajoConf) throws IOException { + super.init(tajoConf); + + this.blocksMetadataEnabled = false; + + int maxErrorRetries = conf.getIntVar(TajoConf.ConfVars.S3_MAX_ERROR_RETRIES); + boolean sslEnabled = conf.getBoolVar(TajoConf.ConfVars.S3_SSL_ENABLED); + + Duration connectTimeout = Duration.valueOf(conf.getVar(TajoConf.ConfVars.S3_CONNECT_TIMEOUT)); + Duration socketTimeout = Duration.valueOf(conf.getVar(TajoConf.ConfVars.S3_SOCKET_TIMEOUT)); + int maxConnections = conf.getIntVar(TajoConf.ConfVars.S3_MAX_CONNECTIONS); + + this.useInstanceCredentials = conf.getBoolVar(TajoConf.ConfVars.S3_USE_INSTANCE_CREDENTIALS); + + ClientConfiguration configuration = new ClientConfiguration() + .withMaxErrorRetry(maxErrorRetries) + .withProtocol(sslEnabled ? Protocol.HTTPS : Protocol.HTTP) + .withConnectionTimeout(Ints.checkedCast(connectTimeout.toMillis())) + .withSocketTimeout(Ints.checkedCast(socketTimeout.toMillis())) + .withMaxConnections(maxConnections); + + this.s3 = createAmazonS3Client(uri, conf, configuration); + + if (s3 != null) { + String endPoint = conf.getTrimmed(ENDPOINT,""); + try { + // Check where use a custom endpoint + if (!endPoint.isEmpty()) { + s3.setEndpoint(endPoint); + } + } catch (IllegalArgumentException e) { + String msg = "Incorrect endpoint: " + e.getMessage(); + LOG.error(msg); + throw new IllegalArgumentException(msg, e); + } + + LOG.info("Amazon3Client is initialized."); + } + } + + private AmazonS3Client createAmazonS3Client(URI uri, Configuration hadoopConfig, ClientConfiguration clientConfig) { + AWSCredentialsProvider credentials = getAwsCredentialsProvider(uri, hadoopConfig); + AmazonS3Client client = new AmazonS3Client(credentials, clientConfig); + return client; + } + + private AWSCredentialsProvider getAwsCredentialsProvider(URI uri, Configuration conf) { + // first try credentials from URI or static properties + try { + return new StaticCredentialsProvider(getAwsCredentials(uri, conf)); + } catch (IllegalArgumentException ignored) { + } + + if (useInstanceCredentials) { + return new InstanceProfileCredentialsProvider(); + } + + throw new RuntimeException("S3 credentials not configured"); + } + + private static AWSCredentials getAwsCredentials(URI uri, Configuration conf) { + TajoS3Credentials credentials = new TajoS3Credentials(); + credentials.initialize(uri, conf); + return new BasicAWSCredentials(credentials.getAccessKey(), credentials.getSecretAccessKey()); + } + + @Override + public long calculateSize(Path path) throws IOException { + String key = keyFromPath(path); + if (!key.isEmpty()) { + key += "/"; + } + + Iterable objectSummaries = S3Objects.withPrefix(s3, uri.getHost(), key); + Stream objectStream = StreamSupport.stream(objectSummaries.spliterator(), false); + long totalBucketSize = objectStream.mapToLong(object -> object.getSize()).sum(); + objectStream.close(); + return totalBucketSize; + } + + private String keyFromPath(Path path) + { + checkArgument(path.isAbsolute(), "Path is not absolute: %s", path); + String key = nullToEmpty(path.toUri().getPath()); + if (key.startsWith("/")) { + key = key.substring(1); + } + if (key.endsWith("/")) { + key = key.substring(0, key.length() - 1); + } + return key; + } + + @Override + public List getPartitionSplits(String tableName, TableMeta meta, Schema schema + , PartitionPruningHandle pruningHandle) throws IOException { + long startTime = System.currentTimeMillis(); + List splits = Lists.newArrayList(); + + // Generate the list of FileStatuses and partition keys + Path[] paths = pruningHandle.getPartitionPaths(); + if (paths.length == 0) { + return splits; + } + + // Prepare partition map which includes index for each partition path + Map partitionPathMap = Maps.newHashMap(); + for (int i = 0; i < pruningHandle.getPartitionKeys().length; i++) { + partitionPathMap.put(pruningHandle.getPartitionPaths()[i], i); + } + + // Get common prefix of partition paths + String commonPrefix = FileUtil.getCommonPrefix(paths); + + // Generate splits + if (pruningHandle.hasConjunctiveForms()) { + splits.addAll(getFragmentsByMarker(meta, schema, tableName, new Path(commonPrefix), pruningHandle, + partitionPathMap)); + } else { + splits.addAll(getFragmentsByListingAllObjects(meta, schema, tableName, new Path(commonPrefix), pruningHandle, + partitionPathMap)); + } + + LOG.info("Total # of splits: " + splits.size()); + + long finishTime = System.currentTimeMillis(); + long elapsedMills = finishTime - startTime; + LOG.info(String.format("Split for partition table :%d ms elapsed.", elapsedMills)); + + return splits; + } + + /** + * Generate fragments using marker parameter in prefix listing API + * + * @param path path to be listed + * @param pruningHandle informs of partition pruning results + * @throws IOException + */ + private List getFragmentsByMarker(TableMeta meta, Schema schema, String tableName, Path path, + PartitionPruningHandle pruningHandle, Map partitionPathMap) throws IOException { + List splits = Lists.newArrayList(); + long startTime = System.currentTimeMillis(); + ObjectListing objectListing; + Path previousPartition = null, nextPartition = null; + int callCount = 0, i = 0; + boolean isFirst = true, isFinished = false, isAccepted = false; + + int partitionCount = pruningHandle.getPartitionPaths().length; + + // Listing S3 Objects using AWS API + String prefix = keyFromPath(path); + if (!prefix.isEmpty()) { + prefix += "/"; + } + + ListObjectsRequest request = new ListObjectsRequest() + .withBucketName(uri.getHost()) + .withPrefix(prefix); + + do { + isAccepted = false; + + // Get first chunk of 1000 objects + objectListing = s3.listObjects(request); + int objectsCount = objectListing.getObjectSummaries().size(); + + // Get first bucket and last bucket from current objects + S3ObjectSummary firstBucket = objectListing.getObjectSummaries().get(0); + Path firstPath = getPathFromBucket(firstBucket); + Path firstPartition = isFile(firstBucket) ? firstPath.getParent() : firstPath; + + S3ObjectSummary lastBucket = objectListing.getObjectSummaries().get(objectsCount - 1); + Path lastPath = getPathFromBucket(lastBucket); + Path lastPartition = isFile(lastBucket) ? lastPath.getParent() : lastPath; + + // Check if current objects include target partition. + if (isFirst) { + nextPartition = pruningHandle.getPartitionPaths()[0]; + if (nextPartition.compareTo(firstPartition) <= 0 || nextPartition.compareTo(lastPartition) <= 0) { + isAccepted = true; + } + } else { + if (previousPartition.compareTo(firstPartition) <= 0 || nextPartition.compareTo(firstPartition) <= 0 + || previousPartition.compareTo(lastPartition) <= 0 || nextPartition.compareTo(lastPartition) <= 0) { + isAccepted = true; + } + } + + // Generate fragments. + if (isAccepted) { + for (S3ObjectSummary summary : objectListing.getObjectSummaries()) { + Optional bucketPath = getValidPathFromBucket(summary); + + if (bucketPath.isPresent()) { + Path partitionPath = bucketPath.get().getParent(); + + // If Tajo can matched partition from partition map, add it to final list. + if (partitionPathMap.containsKey(partitionPath)) { + FileStatus file = getFileStatusFromBucket(summary, bucketPath.get()); + String partitionKey = getPartitionKey(pruningHandle, partitionPathMap, partitionPath); + computePartitionSplits(file, meta, schema, tableName, partitionKey, splits); + previousPartition = partitionPath; + + int index = partitionPathMap.get(previousPartition); + if ((index + 1) < partitionCount) { + nextPartition = pruningHandle.getPartitionPaths()[index + 1]; + } else { + nextPartition = null; + } + + if (isFirst) { + isFirst = false; + } + + if (LOG.isDebugEnabled()){ + LOG.debug("# of average splits per partition: " + splits.size() / (i+1)); + } + i++; + } else { + // If current objects include next target partition, get next object. + if (nextPartition != null && nextPartition.compareTo(lastPartition) <= 0) { + continue; + } else { + if (previousPartition != null && nextPartition == null) { + isFinished = true; + } + break; + } + } + } + } + } + + request.setMarker(objectListing.getNextMarker()); + callCount++; + } while (objectListing.isTruncated() && !isFinished); + + long finishTime = System.currentTimeMillis(); + long elapsedMills = finishTime - startTime; + LOG.info(String.format("List S3Objects: %d ms elapsed. API call count: %d", elapsedMills, callCount)); + + return splits; + } + + /** + * Generate fragments using listing all objects without marker + * + * @param path path to be listed + * @param pruningHandle informs of partition pruning results + * @throws IOException + */ + private List getFragmentsByListingAllObjects(TableMeta meta, Schema schema, String tableName, Path path, + PartitionPruningHandle pruningHandle, Map partitionPathMap) throws IOException { + List splits = Lists.newArrayList(); + long startTime = System.currentTimeMillis(); + + String prefix = keyFromPath(path); + if (!prefix.isEmpty()) { + prefix += "/"; + } + + int i = 0; + Iterable objectSummaries = S3Objects.withPrefix(s3, uri.getHost(), prefix); + for (S3ObjectSummary summary : objectSummaries) { + if (summary.getSize() > 0 && !summary.getKey().endsWith("/")) { + Path bucketPath = getPathFromBucket(summary); + String fileName = bucketPath.getName(); + + if (!fileName.startsWith("_") && !fileName.startsWith(".")) { + Path partitionPath = bucketPath.getParent(); + if (partitionPathMap.containsKey(partitionPath)) { + FileStatus file = getFileStatusFromBucket(summary, bucketPath); + String partitionKey = getPartitionKey(pruningHandle, partitionPathMap, partitionPath); + computePartitionSplits(file, meta, schema, tableName, partitionKey, splits); + + if (LOG.isDebugEnabled()){ + LOG.debug("# of average splits per partition: " + splits.size() / (i+1)); + } + i++; + } + } + } + } + + long finishTime = System.currentTimeMillis(); + long elapsedMills = finishTime - startTime; + LOG.info(String.format("List S3Objects: %d ms elapsed", elapsedMills)); + + return splits; + } + + private String getPartitionKey(PartitionPruningHandle pruningHandle, Map partitionPathMap, Path + partitionPath) { + int index = partitionPathMap.get(partitionPath); + return pruningHandle.getPartitionKeys()[index]; + } + + private void computePartitionSplits(FileStatus file, TableMeta meta, Schema schema, String tableName, + String partitionKey, List splits) throws IOException { + Path path = file.getPath(); + long length = file.getLen(); + + // Get locations of blocks of file + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + boolean splittable = isSplittablePartitionFragment(meta, schema, path, partitionKey, file); + if (splittable) { + + long minSize = Math.max(getMinSplitSize(), 1); + long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one + long splitSize = Math.max(minSize, blockSize); + long bytesRemaining = length; + + // for s3 + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(getSplittablePartitionFragment(tableName, path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts(), partitionKey)); + bytesRemaining -= splitSize; + } + if (bytesRemaining > 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(getSplittablePartitionFragment(tableName, path, length - bytesRemaining, bytesRemaining, + blkLocations[blkIndex].getHosts(), partitionKey)); + } + } else { // Non splittable + splits.add(getNonSplittablePartitionFragment(tableName, path, 0, length, blkLocations, partitionKey)); + } + } + + private Optional getValidPathFromBucket(S3ObjectSummary summary) { + Optional path = null; + + if (isFile(summary)) { + Path bucketPath = getPathFromBucket(summary); + if (!bucketPath.getName().startsWith("_") && !bucketPath.getName().startsWith(".")) { + path = Optional.of(bucketPath); + } else { + path = Optional.empty(); + } + } else { + path = Optional.empty(); + } + + return path; + } + + private boolean isFile(S3ObjectSummary summary) { + if (summary.getSize() > 0 && !summary.getKey().endsWith("/")) { + return true; + } else { + return false; + } + } + + private Path getPathFromBucket(S3ObjectSummary summary) { + String bucketName = summary.getBucketName(); + String pathString = uri.getScheme() + "://" + bucketName + "/" + summary.getKey(); + Path path = new Path(pathString); + return path; + } + + private FileStatus getFileStatusFromBucket(S3ObjectSummary summary, Path path) { + return new FileStatus(summary.getSize(), false, 1, BLOCK_SIZE.toBytes(), + summary.getLastModified().getTime(), path); + } + + @VisibleForTesting + public AmazonS3 getAmazonS3Client() { + return s3; + } } diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Credentials.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Credentials.java new file mode 100644 index 0000000000..138e0de41c --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Credentials.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import java.net.URI; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +/** + *

+ * Extracts AWS credentials from the filesystem URI or configuration. (borrowed from hadoop-aws package) + *

+ */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TajoS3Credentials { + + private String accessKey; + private String secretAccessKey; + + /** + * @throws IllegalArgumentException if credentials for S3 cannot be + * determined. + */ + public void initialize(URI uri, Configuration conf) { + if (uri.getHost() == null) { + throw new IllegalArgumentException("Invalid hostname in URI " + uri); + } + + String userInfo = uri.getUserInfo(); + if (userInfo != null) { + int index = userInfo.indexOf(':'); + if (index != -1) { + accessKey = userInfo.substring(0, index); + secretAccessKey = userInfo.substring(index + 1); + } else { + accessKey = userInfo; + } + } + + String scheme = uri.getScheme(); + String accessKeyProperty = String.format("fs.%s.awsAccessKeyId", scheme); + String secretAccessKeyProperty = + String.format("fs.%s.awsSecretAccessKey", scheme); + if (accessKey == null) { + accessKey = conf.getTrimmed(accessKeyProperty); + } + if (secretAccessKey == null) { + secretAccessKey = conf.getTrimmed(secretAccessKeyProperty); + } + if (accessKey == null && secretAccessKey == null) { + throw new IllegalArgumentException("AWS " + + "Access Key ID and Secret Access " + + "Key must be specified as the " + + "username or password " + + "(respectively) of a " + scheme + + " URL, or by setting the " + + accessKeyProperty + " or " + + secretAccessKeyProperty + + " properties (respectively)."); + } else if (accessKey == null) { + throw new IllegalArgumentException("AWS " + + "Access Key ID must be specified " + + "as the username of a " + scheme + + " URL, or by setting the " + + accessKeyProperty + " property."); + } else if (secretAccessKey == null) { + throw new IllegalArgumentException("AWS " + + "Secret Access Key must be " + + "specified as the password of a " + + scheme + " URL, or by setting the " + + secretAccessKeyProperty + + " property."); + } + + } + + public String getAccessKey() { + return accessKey; + } + + public String getSecretAccessKey() { + return secretAccessKey; + } +} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java index 2d0677885c..293d39e40e 100644 --- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java @@ -18,6 +18,11 @@ package org.apache.tajo.storage.s3; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.google.common.base.Throwables; import net.minidev.json.JSONObject; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.TablespaceManager; @@ -26,9 +31,12 @@ import org.junit.Test; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; +import static com.google.common.base.Preconditions.checkArgument; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class TestS3TableSpace { @@ -41,6 +49,8 @@ public static void setUp() throws Exception { TajoConf tajoConf = new TajoConf(); tajoConf.set("fs.s3.impl", MockS3FileSystem.class.getName()); + tajoConf.set("fs.s3a.access.key", "test_access_key_id"); + tajoConf.set("fs.s3a.secret.key", "test_secret_access_key"); tablespace.init(tajoConf); TablespaceManager.addTableSpaceForTest(tablespace); @@ -59,4 +69,33 @@ public void testTablespaceHandler() throws Exception { assertTrue((TablespaceManager.get(S3_URI)) instanceof S3TableSpace); assertEquals(S3_URI, TablespaceManager.get(S3_URI).getUri().toASCIIString()); } + + @Test + public void testInstanceCredentialsEnabled() throws Exception { + assertTrue((TablespaceManager.getByName(SPACENAME)) instanceof S3TableSpace); + S3TableSpace tableSpace = TablespaceManager.getByName(SPACENAME); + + assertNotNull(tableSpace.getAmazonS3Client()); + assertTrue((tableSpace.getAmazonS3Client()) instanceof AmazonS3Client); + + assertTrue(getAwsCredentialsProvider(tableSpace.getAmazonS3Client()) + instanceof InstanceProfileCredentialsProvider); + } + + private AWSCredentialsProvider getAwsCredentialsProvider(AmazonS3 s3) { + return getFieldValue(s3, "awsCredentialsProvider", AWSCredentialsProvider.class); + } + + @SuppressWarnings("unchecked") + private T getFieldValue(Object instance, String name, Class type) { + try { + Field field = instance.getClass().getDeclaredField(name); + checkArgument(field.getType() == type, "expected %s but found %s", type, field.getType()); + field.setAccessible(true); + return (T) field.get(instance); + } + catch (ReflectiveOperationException e) { + throw Throwables.propagate(e); + } + } }