Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e98cf2c
TAJO-1925: Improve hive compatibility with TIMESTAMP partition column.
blrunner Nov 5, 2015
36e9ec4
Remove debug logs
blrunner Nov 5, 2015
88d1335
Add more descriptions for added codes
blrunner Nov 5, 2015
9856dda
Add more description for PartitionFilterAlgebraVisitor::visitTimestam…
blrunner Nov 5, 2015
2743bfb
Remove unncessary codes
blrunner Nov 5, 2015
359d988
Implement ColPartitionStoreExec::encodeTimestamp
blrunner Nov 6, 2015
2e09493
Escape the path name of timestamp partition
blrunner Nov 6, 2015
84c0164
Apply users timezone to partition pruning.
blrunner Nov 6, 2015
0dad060
Apply UTC timezone for casting operation.
blrunner Nov 6, 2015
491d685
Escape partition and unescase partition values
blrunner Nov 6, 2015
2d4e0e4
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Nov 6, 2015
ad644c1
Fix SecondsFraction paddning bug
blrunner Nov 8, 2015
fb5d7de
Fix hive compatibility and remove cast operator for int8
blrunner Nov 9, 2015
b33e2a0
Add a description for hive compatibility
blrunner Nov 9, 2015
f77289b
Fix a typo
blrunner Nov 9, 2015
4d27894
Remove unused codes
blrunner Nov 9, 2015
0094dd0
Fix repair partition bug
blrunner Nov 9, 2015
46f3055
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Nov 10, 2015
799add9
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Nov 17, 2015
b4f20b2
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Nov 18, 2015
ff1629f
Add a configuration for timestamp transform
blrunner Nov 18, 2015
f7ce4e1
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Nov 24, 2015
1099f47
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Dec 3, 2015
ffabd0a
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Dec 7, 2015
3d72dc8
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Dec 14, 2015
683873b
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Dec 30, 2015
0e19ed3
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Jan 13, 2016
16e649f
Merge branch 'TAJO-1925' of https://github.com/blrunner/tajo into TAJ…
blrunner Feb 16, 2016
bb78731
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Feb 17, 2016
f251e91
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Feb 17, 2016
8e865d9
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Feb 19, 2016
5b840d5
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Feb 22, 2016
8cb896d
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 7, 2016
ee43042
Trigger for travis CI build
blrunner Mar 7, 2016
b85cbc3
Trigger for travis CI build
blrunner Mar 8, 2016
eb2b886
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 18, 2016
dd5bc07
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 18, 2016
bf97e29
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.IsNullPredicate;
Expand Down Expand Up @@ -965,6 +966,12 @@ private String getFilter(String databaseName, String tableName, List<ColumnProto
PartitionFilterAlgebraVisitor visitor = new PartitionFilterAlgebraVisitor();
visitor.setIsHiveCatalog(true);

if (conf.get(SessionVars.TIMEZONE.getConfVars().keyname()) != null) {
visitor.setTimezoneId(conf.get(SessionVars.TIMEZONE.getConfVars().keyname()));
} else {
visitor.setTimezoneId(TimeZone.getDefault().getID());
}

Expr[] filters = AlgebraicUtil.getRearrangedCNFExpressions(databaseName + "." + tableName, partitionColumns, exprs);

StringBuffer sb = new StringBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.SessionVars;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
import org.apache.tajo.annotation.Nullable;
Expand Down Expand Up @@ -2181,9 +2182,6 @@ public List<PartitionDescProto> getPartitionsByAlgebra(PartitionsByAlgebraProto
case DATE:
pstmt.setDate(currentIndex, (Date) parameter.getSecond());
break;
case TIMESTAMP:
pstmt.setTimestamp(currentIndex, (Timestamp) parameter.getSecond());
break;
case TIME:
pstmt.setTime(currentIndex, (Time) parameter.getSecond());
break;
Expand Down Expand Up @@ -2262,6 +2260,12 @@ private Pair<String, List<PartitionFilterSet>> getSelectStatementAndPartitionFil
PartitionFilterAlgebraVisitor visitor = new PartitionFilterAlgebraVisitor();
visitor.setIsHiveCatalog(false);

if (conf.get(SessionVars.TIMEZONE.getConfVars().keyname()) != null) {
visitor.setTimezoneId(conf.get(SessionVars.TIMEZONE.getConfVars().keyname()));
} else {
visitor.setTimezoneId(TimeZone.getDefault().getID());
}

Expr[] filters = AlgebraicUtil.getRearrangedCNFExpressions(tableName, partitionColumns, exprs);

StringBuffer sb = new StringBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ public static enum ConfVars implements ConfigKey {

// Partition
PARTITION_DYNAMIC_BULK_INSERT_BATCH_SIZE("tajo.partition.dynamic.bulk-insert.batch-size", 1000),
PARTITION_TIMESTAMP_TRAMSFORM_DATEFORMAT("tajo.partition.timestamp.transform.dateformat", true),


/////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ public byte[] asTextBytes() {
public Datum equalsTo(Datum datum) {
if (datum.type() == TajoDataTypes.Type.TIME) {
return timestamp == datum.asInt8() ? BooleanDatum.TRUE : BooleanDatum.FALSE;
} else if(datum.type() == TajoDataTypes.Type.TIMESTAMP) {
TimestampDatum another = (TimestampDatum) datum;
return timestamp == another.timestamp ? BooleanDatum.TRUE : BooleanDatum.FALSE;
} else if (datum.isNull()) {
return datum;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ public final void testAlterTableRepairPartition() throws Exception {
assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3")));

List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitionsOfTable(getCurrentDatabase(),
simpleTableName);

executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close();
verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.util.*;

import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
Expand Down Expand Up @@ -1562,18 +1563,56 @@ public final void testTimestampPartitionColumn() throws Exception {

executeString(
"insert overwrite into " + tableName
+ " select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem");
+ " select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem " +
" where l_orderkey != 2");
} else {
executeString(
"create table " + tableName + "(col1 int4, col2 int4) partition by column(key timestamp) "
+ " as select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem");
+ " as select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem " +
" where l_orderkey != 2");
}

executeString(
"insert overwrite into " + tableName
+ " select l_orderkey, l_partkey, TIMESTAMP '1997-01-28 02:50:08.037' from lineitem " +
" where l_orderkey = 2");

assertTrue(client.existTable(tableName));

List<PartitionDescProto> partitions = catalog.getPartitionsOfTable(DEFAULT_DATABASE_NAME, tableName);
assertEquals(5, partitions.size());

// Equals
res = executeString("SELECT * FROM " + tableName + " WHERE key = TIMESTAMP '1993-11-09 00:00:00.0'");

expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"3,3,1993-11-09 00:00:00\n";

assertEquals(expectedResult, resultSetToString(res));
res.close();

res = executeString("SELECT * FROM " + tableName + " WHERE key = to_timestamp(760147200)");

expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"3,2,1994-02-02 00:00:00\n" ;

assertEquals(expectedResult, resultSetToString(res));
res.close();

res = executeString("SELECT * FROM " + tableName + " WHERE key = TIMESTAMP '1997-01-28 02:50:08.037'");

expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"2,2,1997-01-28 02:50:08.037\n";

assertEquals(expectedResult, resultSetToString(res));
res.close();

// LessThanOrEquals
res = executeString("SELECT * FROM " + tableName
+ " WHERE key <= to_timestamp('1995-09-01', 'YYYY-MM-DD') order by col1, col2, key");
+ " WHERE key <= TIMESTAMP '1995-09-01 00:00:00' order by col1, col2, key");

expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
Expand All @@ -1585,8 +1624,8 @@ public final void testTimestampPartitionColumn() throws Exception {

// LessThan and GreaterThan
res = executeString("SELECT * FROM " + tableName
+ " WHERE key > to_timestamp('1993-01-01', 'YYYY-MM-DD') and " +
"key < to_timestamp('1996-01-01', 'YYYY-MM-DD') order by col1, col2, key desc");
+ " WHERE key > TIMESTAMP '1993-01-01 00:00:00' and " +
"key < TIMESTAMP '1996-01-01 00:00:00' order by col1, col2, key desc");

expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
Expand All @@ -1598,8 +1637,8 @@ public final void testTimestampPartitionColumn() throws Exception {

// Between
res = executeString("SELECT * FROM " + tableName
+ " WHERE key between to_timestamp('1993-01-01', 'YYYY-MM-DD') " +
"and to_timestamp('1997-01-01', 'YYYY-MM-DD') order by col1, col2, key desc");
+ " WHERE key between TIMESTAMP '1993-01-01 00:00:00' " +
"and TIMESTAMP '1997-01-01 00:00:00' order by col1, col2, key desc");

expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tajo.engine.util;

import org.apache.hadoop.fs.Path;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.common.TajoDataTypes.Type;
Expand All @@ -33,6 +34,8 @@
import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.junit.Test;

import java.util.TimeZone;

import static org.junit.Assert.*;

public class TestTupleUtil {
Expand Down Expand Up @@ -140,47 +143,48 @@ public final void testGetPartitions() {

@Test
public void testBuildTupleFromPartitionPath() {
String timezoneId = TimeZone.getDefault().getID();

Schema schema = new Schema();
schema.addColumn("key1", Type.INT8);
schema.addColumn("key2", Type.TEXT);

Path path = new Path("hdfs://tajo/warehouse/partition_test/");
Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId);
assertNull(tuple);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId);
assertNull(tuple);

path = new Path("hdfs://tajo/warehouse/partition_test/key1=123");
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId);
assertNotNull(tuple);
assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0));
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId);
assertNotNull(tuple);
assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0));

path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/part-0000"); // wrong cases;
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId);
assertNull(tuple);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId);
assertNull(tuple);

path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc");
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId);
assertNotNull(tuple);
assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0));
assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1));
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId);
assertNotNull(tuple);
assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0));
assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1));


path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc/part-0001");
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true, timezoneId);
assertNull(tuple);

tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false, timezoneId);
assertNotNull(tuple);
assertEquals(DatumFactory.createInt8(123), tuple.asDatum(0));
assertEquals(DatumFactory.createText("abc"), tuple.asDatum(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@
import org.apache.tajo.storage.*;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.datetime.DateTimeFormat;
import org.apache.tajo.util.datetime.DateTimeUtil;
import org.apache.tajo.util.datetime.TimeMeta;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
import java.util.TimeZone;

public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
private static Log LOG = LogFactory.getLog(ColPartitionStoreExec.class);
Expand Down Expand Up @@ -184,14 +188,48 @@ private void addPartition(String partition) throws IOException {
// In CTAS, the uri would be null. So, it get the uri from staging directory.
int endIndex = storeTablePath.toString().indexOf(FileTablespace.TMP_STAGING_DIR_PREFIX);
String outputPath = storeTablePath.toString().substring(0, endIndex);
builder.setPath(outputPath + partition);
builder.setPath(outputPath + partition);
} else {
builder.setPath(this.plan.getUri().toString() + "/" + partition);
}

context.addPartition(builder.build());
}

/**
* Convert TimestampDatum to formatted string for Hive compatibility with users timezone.
*
* @param tm TimeMeta
* @return
*/
protected String encodeTimestamp(TimeMeta tm) {
StringBuilder sb = new StringBuilder();

TimeZone tz = null;
if (context.getQueryContext().containsKey(SessionVars.TIMEZONE)) {
tz = TimeZone.getTimeZone(context.getQueryContext().get(SessionVars.TIMEZONE));
} else {
tz = TimeZone.getDefault();
}
DateTimeUtil.toUserTimezone(tm, tz);

sb.append(StringUtils.escapePathName(DateTimeFormat.to_char(tm, "yyyy-MM-dd HH24:MI:SS")));

sb.append(".");
if (tm.fsecs == 0) {
sb.append("0");
} else {
int secondsFraction = tm.fsecs / 1000;

if (secondsFraction < 10) {
sb.append(secondsFraction);
} else {
sb.append(org.apache.commons.lang.StringUtils.leftPad("" + secondsFraction, 3, '0'));
}
}
return sb.toString();
}

public void openAppender(int suffixId) throws IOException {
Path actualFilePath = lastFileName;
if (suffixId > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@

package org.apache.tajo.engine.planner.physical;

import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.TimestampDatum;
import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple;
import org.apache.tajo.plan.logical.StoreTableNode;
import org.apache.tajo.storage.Appender;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.datetime.DateTimeFormat;
import org.apache.tajo.util.datetime.DateTimeUtil;
import org.apache.tajo.util.datetime.TimeMeta;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.sql.Timestamp;
import java.util.*;

/**
* This class is a physical operator to store at column partitioned table.
Expand Down Expand Up @@ -62,7 +66,12 @@ private Appender getAppender(ComparableTuple partitionKey, Tuple tuple) throws I
}
sb.append(keyNames[i]).append('=');
Datum datum = tuple.asDatum(keyIds[i]);
sb.append(StringUtils.escapePathName(datum.asChars()));

if (datum.type() == TajoDataTypes.Type.TIMESTAMP) {
sb.append(encodeTimestamp(tuple.getTimeDate(keyIds[i])));
} else {
sb.append(StringUtils.escapePathName(datum.asChars()));
}
}
appender = getNextPartitionAppender(sb.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tajo.engine.planner.physical;

import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
Expand All @@ -44,6 +45,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;


public class SeqScanExec extends ScanExec {
Expand Down Expand Up @@ -100,9 +102,16 @@ private void rewriteColumnPartitionedTableSchema() throws IOException {
if (fragments != null && fragments.length > 0) {
List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);

String timezoneId = null;
if (context.getQueryContext().containsKey(SessionVars.TIMEZONE)) {
timezoneId = context.getQueryContext().get(SessionVars.TIMEZONE);
} else {
timezoneId = TimeZone.getDefault().getID();
}

// Get a partition key value from a given path
partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath(
columnPartitionSchema, fileFragments.get(0).getPath(), false);
columnPartitionSchema, fileFragments.get(0).getPath(), false, timezoneId);
}

// Targets or search conditions may contain column references.
Expand Down
Loading