From 23a5e0111515ca51544480de53eeb331ff96e0c7 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 16 Mar 2016 14:53:43 +0900 Subject: [PATCH 1/3] TAJO-2096: Apply offheap to the join operator. --- .../java/org/apache/tajo/SessionVars.java | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 1 + .../java/org/apache/tajo/datum/BlobDatum.java | 21 +- .../org/apache/tajo/datum/BooleanDatum.java | 3 + .../java/org/apache/tajo/datum/CharDatum.java | 10 +- .../java/org/apache/tajo/datum/DateDatum.java | 19 +- .../org/apache/tajo/datum/Float4Datum.java | 7 +- .../org/apache/tajo/datum/Float8Datum.java | 7 +- .../org/apache/tajo/datum/Inet4Datum.java | 3 + .../java/org/apache/tajo/datum/Int2Datum.java | 7 +- .../java/org/apache/tajo/datum/Int4Datum.java | 7 +- .../java/org/apache/tajo/datum/Int8Datum.java | 7 +- .../org/apache/tajo/datum/IntervalDatum.java | 3 + .../org/apache/tajo/datum/ProtobufDatum.java | 3 + .../java/org/apache/tajo/datum/TextDatum.java | 9 +- .../java/org/apache/tajo/datum/TimeDatum.java | 3 + .../org/apache/tajo/datum/TimestampDatum.java | 5 +- .../org/apache/tajo/storage/BufferPool.java | 15 ++ .../tuple/memory/OffHeapRowBlockUtils.java | 31 +++ .../apache/tajo/tuple/memory/TupleList.java | 29 +++ .../apache/tajo/tuple/memory/UnSafeTuple.java | 47 +++- .../tajo/tuple/memory/UnSafeTupleList.java | 3 +- .../java/org/apache/tajo/util/MurmurHash.java | 215 ------------------ .../org/apache/tajo/util/MurmurHash3_32.java | 199 ++++++++++++++++ .../planner/physical/TestTupleSorter.java | 3 +- .../planner/physical/TestUnSafeTuple.java | 61 +++++ .../planner/physical/CommonHashJoinExec.java | 78 +++++-- .../planner/physical/CommonJoinExec.java | 3 + .../DistinctGroupbyHashAggregationExec.java | 11 +- .../physical/HashFullOuterJoinExec.java | 17 +- .../engine/planner/physical/HashJoinExec.java | 5 +- .../physical/HashLeftAntiJoinExec.java | 1 + .../physical/HashLeftOuterJoinExec.java | 1 + .../physical/HashLeftSemiJoinExec.java | 1 + .../{TupleList.java => HeapTupleList.java} | 19 +- .../engine/planner/physical/KeyTuple.java | 2 +- .../physical/MergeFullOuterJoinExec.java | 25 +- .../planner/physical/MergeJoinExec.java | 29 ++- .../planner/physical/ReferenceTupleList.java | 53 +++++ .../physical/RightOuterMergeJoinExec.java | 25 +- .../planner/physical/WindowAggExec.java | 13 +- .../apache/tajo/engine/utils/CacheHolder.java | 11 +- .../main/resources/webapps/worker/index.jsp | 6 +- 43 files changed, 689 insertions(+), 331 deletions(-) create mode 100644 tajo-common/src/main/java/org/apache/tajo/tuple/memory/TupleList.java delete mode 100644 tajo-common/src/main/java/org/apache/tajo/util/MurmurHash.java create mode 100644 tajo-common/src/main/java/org/apache/tajo/util/MurmurHash3_32.java rename tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/{TupleList.java => HeapTupleList.java} (73%) create mode 100644 tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ReferenceTupleList.java diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index ba85549092..2f6180f03e 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -142,6 +142,8 @@ public enum SessionVars implements ConfigKey { SORT_LIST_SIZE(ConfVars.$SORT_LIST_SIZE, "The initial size of list for in-memory sort", DEFAULT), JOIN_HASH_TABLE_SIZE(ConfVars.$JOIN_HASH_TABLE_SIZE, "The initial size of hash table for in-memory hash join", DEFAULT), + EXECUTOR_DIRECT_MEMORY_ENABLE(ConfVars.$EXECUTOR_MEMORY_DIRECT, + "If true, the executor data will be kept in direct memory", DEFAULT), // for index INDEX_ENABLED(ConfVars.$INDEX_ENABLED, "index scan enabled", DEFAULT), diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index c36f43bcae..a0410fa0c6 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -367,6 +367,7 @@ public static enum ConfVars implements ConfigKey { $AGG_HASH_TABLE_SIZE("tajo.executor.aggregate.hash-table.size", 10000), $SORT_LIST_SIZE("tajo.executor.sort.list.size", 100000), $JOIN_HASH_TABLE_SIZE("tajo.executor.join.hash-table.size", 100000), + $EXECUTOR_MEMORY_DIRECT("tajo.executor.memory.direct", true), // for index $INDEX_ENABLED("tajo.query.index.enabled", false), diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java index 3de5135505..25513b9841 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java @@ -124,17 +124,20 @@ public int hashCode() { return bb.hashCode(); } - @Override - public boolean equals(Object obj) { - if (obj instanceof BlobDatum) { - BlobDatum other = (BlobDatum) obj; - initFromBytes(); - other.initFromBytes(); + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj instanceof BlobDatum) { + BlobDatum other = (BlobDatum) obj; + initFromBytes(); + other.initFromBytes(); return Arrays.equals(this.val, other.val); } - - return false; - } + + return false; + } @Override public Datum equalsTo(Datum datum) { diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java index 596540f660..91ba349047 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java @@ -147,6 +147,9 @@ public int hashCode() { @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof BooleanDatum) { BooleanDatum other = (BooleanDatum) obj; return val == other.val; diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java index 750930f0da..35f969aed4 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java @@ -21,6 +21,7 @@ import com.google.common.primitives.UnsignedBytes; import com.google.gson.annotations.Expose; import org.apache.tajo.exception.InvalidOperationException; +import org.apache.tajo.util.MurmurHash3_32; import java.util.Arrays; @@ -49,12 +50,12 @@ public CharDatum(byte [] bytes) { } public CharDatum(String val) { - this(val.getBytes()); + this(val.getBytes(TextDatum.DEFAULT_CHARSET)); } private String getString() { if (chars == null) { - chars = new String(bytes); + chars = new String(bytes, TextDatum.DEFAULT_CHARSET); } return chars; } @@ -115,11 +116,14 @@ public int size() { @Override public int hashCode() { - return getString().hashCode(); + return MurmurHash3_32.hash(bytes); } @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof CharDatum) { CharDatum other = (CharDatum) obj; return this.size == other.size && diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java index ac84e259dd..803f937e34 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java @@ -25,6 +25,7 @@ import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.MurmurHash3_32; import org.apache.tajo.util.datetime.DateTimeConstants.DateStyle; import org.apache.tajo.util.datetime.DateTimeFormat; import org.apache.tajo.util.datetime.DateTimeUtil; @@ -34,7 +35,7 @@ public class DateDatum extends Datum { public static final int SIZE = 4; // Dates are stored in UTC. - private int jdate; + private final int jdate; public DateDatum(int value) { super(TajoDataTypes.Type.DATE); @@ -231,10 +232,12 @@ public int compareTo(Datum datum) { @Override public boolean equals(Object obj) { - TimeMeta tm = asTimeMeta(); + if (this == obj) + return true; + if (obj instanceof DateDatum) { - TimeMeta another = ((DateDatum) obj).asTimeMeta(); - return tm.years == another.years && tm.monthOfYear == another.monthOfYear && tm.dayOfMonth == another.dayOfMonth; + DateDatum other = (DateDatum) obj; + return jdate == other.jdate; } else { return false; } @@ -242,12 +245,6 @@ public boolean equals(Object obj) { @Override public int hashCode() { - TimeMeta tm = asTimeMeta(); - int total = 157; - total = 23 * total + tm.years; - total = 23 * total + tm.monthOfYear; - total = 23 * total + tm.dayOfMonth; - - return total; + return MurmurHash3_32.hash(jdate); } } diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java index 5d56984c0b..99e8b54598 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java @@ -23,7 +23,7 @@ import org.apache.tajo.exception.InvalidValueForCastException; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.exception.TajoRuntimeException; -import org.apache.tajo.util.MurmurHash; +import org.apache.tajo.util.MurmurHash3_32; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -107,11 +107,14 @@ public int size() { @Override public int hashCode() { - return MurmurHash.hash(val); + return MurmurHash3_32.hash(val); } @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof Float4Datum) { Float4Datum other = (Float4Datum) obj; return val == other.val; diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java index fdd54956fd..eae8ec2681 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java @@ -22,7 +22,7 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.util.Bytes; -import org.apache.tajo.util.MurmurHash; +import org.apache.tajo.util.MurmurHash3_32; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -96,10 +96,13 @@ public int size() { @Override public int hashCode() { - return MurmurHash.hash(val); + return MurmurHash3_32.hash(val); } public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof Float8Datum) { Float8Datum other = (Float8Datum) obj; return val == other.val; diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java index ab1799bac4..de1009e3c3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java @@ -99,6 +99,9 @@ public int hashCode() { @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof Inet4Datum) { Inet4Datum other = (Inet4Datum) obj; return this.address == other.address; diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java index 5629f4124a..8f4ab63afb 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java @@ -21,7 +21,7 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidOperationException; -import org.apache.tajo.util.MurmurHash; +import org.apache.tajo.util.MurmurHash3_32; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -97,11 +97,14 @@ public int size() { @Override public int hashCode() { - return MurmurHash.hash(val); + return MurmurHash3_32.hash(val); } @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof Int2Datum) { Int2Datum other = (Int2Datum) obj; return val == other.val; diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java index a45cb1e218..b64436e38e 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java @@ -21,7 +21,7 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.exception.InvalidOperationException; -import org.apache.tajo.util.MurmurHash; +import org.apache.tajo.util.MurmurHash3_32; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -102,10 +102,13 @@ public int size() { @Override public int hashCode() { - return MurmurHash.hash(val); + return MurmurHash3_32.hash(val); } public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof Int4Datum) { Int4Datum other = (Int4Datum) obj; return val == other.val; diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java index 86f19c761a..8434398d26 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java @@ -23,7 +23,7 @@ import org.apache.tajo.exception.InvalidValueForCastException; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.exception.TajoRuntimeException; -import org.apache.tajo.util.MurmurHash; +import org.apache.tajo.util.MurmurHash3_32; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -109,11 +109,14 @@ public int size() { @Override public int hashCode() { - return MurmurHash.hash(val); + return MurmurHash3_32.hash(val); } @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof Int8Datum) { Int8Datum other = (Int8Datum) obj; return val == other.val; diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/IntervalDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/IntervalDatum.java index e797b87da7..012878296e 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/IntervalDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/IntervalDatum.java @@ -412,6 +412,9 @@ public Datum equalsTo(Datum datum) { @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof IntervalDatum) { return asInt8() == ((IntervalDatum)obj).asInt8(); } else { diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java index 822a1e9467..702af8fdbc 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java @@ -58,6 +58,9 @@ public int hashCode() { @Override public boolean equals(Object object) { + if (this == object) + return true; + if (object instanceof ProtobufDatum) { ProtobufDatum another = (ProtobufDatum) object; return value.equals(another.value); diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java index df810fe414..7fd639b501 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java @@ -25,7 +25,7 @@ import org.apache.tajo.exception.InvalidValueForCastException; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.exception.TajoRuntimeException; -import org.apache.tajo.util.MurmurHash; +import org.apache.tajo.util.MurmurHash3_32; import org.apache.tajo.util.StringUtils; import java.nio.charset.Charset; @@ -128,9 +128,12 @@ public int compareTo(Datum datum) { @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof TextDatum) { TextDatum o = (TextDatum) obj; - return COMPARATOR.compare(this.bytes, o.bytes) == 0; + return size() == o.size() && COMPARATOR.compare(this.bytes, o.bytes) == 0; } return false; @@ -152,7 +155,7 @@ public Datum equalsTo(Datum datum) { @Override public int hashCode() { - return MurmurHash.hash(bytes); + return MurmurHash3_32.hash(bytes); } @Override diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java index e70d7d5a47..c11f2e76de 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java @@ -179,6 +179,9 @@ public int compareTo(Datum datum) { @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof TimeDatum) { TimeDatum another = (TimeDatum) obj; return time == another.time; diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java index 5b4c152a51..4a96c0ab34 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java @@ -32,7 +32,7 @@ public class TimestampDatum extends Datum { public static final int SIZE = 8; - private long timestamp; + private final long timestamp; /** * @@ -204,6 +204,9 @@ public int compareTo(Datum datum) { @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof TimestampDatum) { TimestampDatum another = (TimestampDatum) obj; return timestamp == another.timestamp; diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java index 7c4e288bbd..97d81e1ec5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.FileUtil; import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; @@ -188,4 +189,18 @@ public static BufferPoolMXBean getMappedBufferPool() { private static List getBufferPools() { return ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); } + + public static String printDirectMemoryUsage() { + BufferPoolMXBean direct = BufferPool.getDirectBufferPool(); + return String.format("%s (USED)/%s (TOTAL)", + FileUtil.humanReadableByteCount(direct.getMemoryUsed(), false), + FileUtil.humanReadableByteCount(direct.getTotalCapacity(), false)); + } + + public static String printHeapMemoryUsage() { + return String.format("%s (FREE)/%s (TOTAL)/%s (MAX)", + FileUtil.humanReadableByteCount(Runtime.getRuntime().freeMemory(), false), + FileUtil.humanReadableByteCount(Runtime.getRuntime().totalMemory(), false), + FileUtil.humanReadableByteCount(Runtime.getRuntime().maxMemory(), false)); + } } diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java index 3f27763d90..64241ce2c5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java @@ -28,6 +28,7 @@ import org.apache.tajo.exception.ValueOutOfRangeException; import org.apache.tajo.storage.Tuple; import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.util.UnsafeUtil; import java.util.Arrays; import java.util.Collections; @@ -231,4 +232,34 @@ protected void writeField(int colIdx, Tuple tuple, RowWriter writer) { public static void convert(Tuple tuple, RowWriter writer) { tupleConverter.convert(tuple, writer); } + + public static boolean equals(long ptr1, int length1, long ptr2, int length2) { + if (length1 != length2) { + return false; + } + + final int longCount = length1 >>> 3; + final int byteCount = length1 & 7; + + int aIndex = 0; + int bIndex = 0; + + for (int i = longCount; i > 0; i --) { + if (UnsafeUtil.unsafe.getLong(ptr1) != UnsafeUtil.unsafe.getLong(ptr2)) { + return false; + } + ptr1 += 8; + ptr2 += 8; + } + + for (int i = byteCount; i > 0; i --) { + if (UnsafeUtil.unsafe.getByte(ptr1) != UnsafeUtil.unsafe.getByte(ptr2)) { + return false; + } + ptr1 ++; + ptr2 ++; + } + + return true; + } } diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/TupleList.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/TupleList.java new file mode 100644 index 0000000000..99a6f3ba44 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/TupleList.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.Deallocatable; + +import java.util.List; + +public interface TupleList extends List, Deallocatable { + + boolean addTuple(Tuple tuple); +} diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java index dcff801088..7c33deaf75 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java @@ -28,6 +28,7 @@ import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.MurmurHash3_32; import org.apache.tajo.util.SizeOf; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.UnsafeUtil; @@ -340,17 +341,59 @@ public Datum[] getValues() { @Override public int hashCode() { - return Arrays.hashCode(getValues()); + int[] hashCodes = new int[size()]; + + for (int i = 0; i < size(); i++) { + if (contains(i)) { + switch (types[i].getType()) { + case INT1: + case INT2: + case INT4: + case FLOAT4: + hashCodes[i] = MurmurHash3_32.hashUnsafeInt(getFieldAddr(i)); + break; + case INT8: + case FLOAT8: + hashCodes[i] = MurmurHash3_32.hashUnsafeLong(getFieldAddr(i)); + break; + case TEXT: + long pos = getFieldAddr(i); + int len = PlatformDependent.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + hashCodes[i] = MurmurHash3_32.hashUnsafeVariant(pos, len); + break; + default: + hashCodes[i] = asDatum(i).hashCode(); + break; + } + } else { + hashCodes[i] = NullDatum.get().hashCode(); + } + } + return Arrays.hashCode(hashCodes); } @Override public boolean equals(Object obj) { - if (obj instanceof Tuple) { + if (this == obj) + return true; + + if (obj instanceof UnSafeTuple) { + + UnSafeTuple other = (UnSafeTuple) obj; + int headerSize1 = SizeOf.SIZE_OF_INT + (size() * SizeOf.SIZE_OF_INT); + int headerSize2 = SizeOf.SIZE_OF_INT + (other.size() * SizeOf.SIZE_OF_INT); + + return size() == other.size() && OffHeapRowBlockUtils.equals(address() + headerSize1, getLength() - headerSize1, + other.address() + headerSize2, other.getLength() - headerSize2); + + } else if (obj instanceof Tuple) { Tuple other = (Tuple) obj; return Arrays.equals(getValues(), other.getValues()); } return false; } + @Override public String toString() { return VTuple.toDisplayString(getValues()); diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java index 4c4a6cbbd1..70b2bdd30e 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java @@ -31,7 +31,7 @@ * The memory pages are automatically added, if memory of a page are exceeded. * This instance must be released */ -public class UnSafeTupleList extends ArrayList { +public class UnSafeTupleList extends ArrayList implements TupleList { private final DataType[] dataTypes; private List rowBlocks; @@ -59,6 +59,7 @@ public boolean add(UnSafeTuple tuple) { return addTuple(tuple); } + @Override public boolean addTuple(Tuple tuple) { int prevPos = currentRowBlock.getMemory().writerPosition(); diff --git a/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash.java b/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash.java deleted file mode 100644 index 4db4cf7185..0000000000 --- a/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.util; - -/** - * This class is borrowed from the following source code - * https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/hash/MurmurHash.java - */ -public class MurmurHash { - public static int hash(Object o) { - if (o == null) { - return 0; - } - if (o instanceof Long) { - return hashLong((Long) o); - } - if (o instanceof Integer) { - return hashLong((Integer) o); - } - if (o instanceof Double) { - return hashLong(Double.doubleToRawLongBits((Double) o)); - } - if (o instanceof Float) { - return hashLong(Float.floatToRawIntBits((Float) o)); - } - if (o instanceof String) { - return hash(((String) o).getBytes()); - } - if (o instanceof byte[]) { - return hash((byte[]) o); - } - return hash(o.toString()); - } - - public static int hash(byte[] data) { - return hash(data, data.length, -1); - } - - public static int hash(byte[] data, int seed) { - return hash(data, data.length, seed); - } - - public static int hash(byte[] data, int length, int seed) { - int m = 0x5bd1e995; - int r = 24; - - int h = seed ^ length; - - int len_4 = length >> 2; - - for (int i = 0; i < len_4; i++) { - int i_4 = i << 2; - int k = data[i_4 + 3]; - k = k << 8; - k = k | (data[i_4 + 2] & 0xff); - k = k << 8; - k = k | (data[i_4 + 1] & 0xff); - k = k << 8; - k = k | (data[i_4 + 0] & 0xff); - k *= m; - k ^= k >>> r; - k *= m; - h *= m; - h ^= k; - } - - // avoid calculating modulo - int len_m = len_4 << 2; - int left = length - len_m; - - if (left != 0) { - if (left >= 3) { - h ^= (int) data[length - 3] << 16; - } - if (left >= 2) { - h ^= (int) data[length - 2] << 8; - } - if (left >= 1) { - h ^= (int) data[length - 1]; - } - - h *= m; - } - - h ^= h >>> 13; - h *= m; - h ^= h >>> 15; - - return h; - } - - public static int hashLong(long data) { - int m = 0x5bd1e995; - int r = 24; - - int h = 0; - - int k = (int) data * m; - k ^= k >>> r; - h ^= k * m; - - k = (int) (data >> 32) * m; - k ^= k >>> r; - h *= m; - h ^= k * m; - - h ^= h >>> 13; - h *= m; - h ^= h >>> 15; - - return h; - } - - public static long hash64(Object o) { - if (o == null) { - return 0l; - } else if (o instanceof String) { - final byte[] bytes = ((String) o).getBytes(); - return hash64(bytes, bytes.length); - } else if (o instanceof byte[]) { - final byte[] bytes = (byte[]) o; - return hash64(bytes, bytes.length); - } - return hash64(o.toString()); - } - - // 64 bit implementation copied from here: https://github.com/tnm/murmurhash-java - - /** - * Generates 64 bit hash from byte array with default seed value. - * - * @param data byte array to hash - * @param length length of the array to hash - * @return 64 bit hash of the given string - */ - public static long hash64(final byte[] data, int length) { - return hash64(data, length, 0xe17a1465); - } - - - /** - * Generates 64 bit hash from byte array of the given length and seed. - * - * @param data byte array to hash - * @param length length of the array to hash - * @param seed initial seed value - * @return 64 bit hash of the given array - */ - public static long hash64(final byte[] data, int length, int seed) { - final long m = 0xc6a4a7935bd1e995L; - final int r = 47; - - long h = (seed & 0xffffffffl) ^ (length * m); - - int length8 = length / 8; - - for (int i = 0; i < length8; i++) { - final int i8 = i * 8; - long k = ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) - + (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8 + 3] & 0xff) << 24) - + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) << 40) - + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56); - - k *= m; - k ^= k >>> r; - k *= m; - - h ^= k; - h *= m; - } - - switch (length % 8) { - case 7: - h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; - case 6: - h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; - case 5: - h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; - case 4: - h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; - case 3: - h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; - case 2: - h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; - case 1: - h ^= (long) (data[length & ~7] & 0xff); - h *= m; - break; - default: - break; - } - - h ^= h >>> r; - h *= m; - h ^= h >>> r; - - return h; - } -} \ No newline at end of file diff --git a/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash3_32.java b/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash3_32.java new file mode 100644 index 0000000000..8275482102 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash3_32.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +/** + * This class is borrowed from following source codes and it is modified to adapt to tajo + * https://github.com/google/guava/blob/v19.0/guava/src/com/google/common/hash/Murmur3_32HashFunction.java + * https://github.com/yonik/java_util/blob/master/src/util/hash/MurmurHash3.java + */ +public class MurmurHash3_32 { + // Constants for 32 bit variant + private static final int C1 = 0xcc9e2d51; + private static final int C2 = 0x1b873593; + private static final int DEFAULT_SEED = 0; + + public static int hash(int input) { + return hashInt(input); + } + + public static int hash(long input) { + return hashLong(input); + } + + public static int hash(double input) { + return hashLong(Double.doubleToRawLongBits(input)); + } + + public static int hash(float input) { + return hashInt(Float.floatToRawIntBits(input)); + } + + public static int hash(String input) { + return hash(input.getBytes()); + } + + public static int hash(byte[] input) { + return hash(input, 0, input.length, DEFAULT_SEED); + } + + public static int hash(byte[] data, int offset, int len, int seed) { + int h1 = seed; + int roundedEnd = offset + (len & 0xfffffffc); // round down to 4 byte block + + for (int i = offset; i < roundedEnd; i += 4) { + // little endian load order + int k1 = (data[i] & 0xff) | ((data[i + 1] & 0xff) << 8) | ((data[i + 2] & 0xff) << 16) | (data[i + 3] << 24); + + k1 = mixK1(k1); + h1 = mixH1(h1, k1); + } + + // tail + int k1 = 0; + + switch (len & 0x03) { + case 3: + k1 = (data[roundedEnd + 2] & 0xff) << 16; + // fallthrough + case 2: + k1 |= (data[roundedEnd + 1] & 0xff) << 8; + // fallthrough + case 1: + k1 |= (data[roundedEnd] & 0xff); + k1 *= C1; + k1 = (k1 << 15) | (k1 >>> 17); // ROTL32(k1,15); + k1 *= C2; + h1 ^= k1; + } + + // finalization + h1 ^= len; + + // fmix(h1); + h1 ^= h1 >>> 16; + h1 *= 0x85ebca6b; + h1 ^= h1 >>> 13; + h1 *= 0xc2b2ae35; + h1 ^= h1 >>> 16; + + return h1; + } + + public static int hashInt(int input) { + int k1 = mixK1(input); + int h1 = mixH1(DEFAULT_SEED, k1); + + return fmix(h1, 4); + } + + public static int hashLong(long input) { + int low = (int) input; + int high = (int) (input >>> 32); + + int k1 = mixK1(low); + int h1 = mixH1(DEFAULT_SEED, k1); + + k1 = mixK1(high); + h1 = mixH1(h1, k1); + + return fmix(h1, 8); + } + + + public static int hashUnsafeInt(long address) { + return hashInt(UnsafeUtil.unsafe.getInt(address)); + } + + public static int hashUnsafeLong(long address) { + return hashLong(UnsafeUtil.unsafe.getLong(address)); + } + + public static int hashUnsafeVariant(long address, int len) { + return hashUnsafeVariant(address, len, DEFAULT_SEED); + } + + public static int hashUnsafeVariant(long address, int len, int seed) { + int h1 = seed; + int roundedEnd = len & 0xfffffffc; // round down to 4 byte block + + for (int i = 0; i < roundedEnd; i += 4) { + // little endian load order + int k1 = UnsafeUtil.unsafe.getInt(address + i); + k1 = mixK1(k1); + h1 = mixH1(h1, k1); + } + + // tail + int k1 = 0; + long offset = address + roundedEnd; + + switch (len & 0x03) { + case 3: + k1 = (UnsafeUtil.unsafe.getByte(offset + 2) & 0xff) << 16; + // fallthrough + case 2: + k1 |= (UnsafeUtil.unsafe.getByte(offset + 1) & 0xff) << 8; + // fallthrough + case 1: + k1 |= (UnsafeUtil.unsafe.getByte(offset) & 0xff); + k1 *= C1; + k1 = (k1 << 15) | (k1 >>> 17); // ROTL32(k1,15); + k1 *= C2; + h1 ^= k1; + } + + // finalization + h1 ^= len; + + // fmix(h1); + h1 ^= h1 >>> 16; + h1 *= 0x85ebca6b; + h1 ^= h1 >>> 13; + h1 *= 0xc2b2ae35; + h1 ^= h1 >>> 16; + + return h1; + } + + private static int mixK1(int k1) { + k1 *= C1; + k1 = Integer.rotateLeft(k1, 15); + k1 *= C2; + return k1; + } + + private static int mixH1(int h1, int k1) { + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + h1 = h1 * 5 + 0xe6546b64; + return h1; + } + + // Finalization mix - force all bits of a hash block to avalanche + private static int fmix(int h1, int length) { + h1 ^= length; + h1 ^= h1 >>> 16; + h1 *= 0x85ebca6b; + h1 ^= h1 >>> 13; + h1 *= 0xc2b2ae35; + h1 ^= h1 >>> 16; + return h1; + } +} \ No newline at end of file diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java index 2f7330b09d..dc73872690 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java @@ -31,6 +31,7 @@ import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.TupleList; import org.junit.Test; import java.util.*; @@ -74,7 +75,7 @@ public final void testSortBench() { long[] time1 = new long[ITERATION]; long[] time2 = new long[ITERATION]; for(int iteration = 0; iteration < ITERATION; iteration++) { - TupleList target = new TupleList(tuples.length); + TupleList target = new HeapTupleList(tuples.length); target.addAll(Arrays.asList(Arrays.copyOf(tuples, tuples.length))); Set keys = new TreeSet<>(); for (int i = 0; i < MAX_SORT_KEY; i++) { diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java index 48170f667d..224d4782fd 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java @@ -26,8 +26,10 @@ import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.UnSafeTuple; import org.apache.tajo.tuple.memory.UnSafeTupleList; import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.MurmurHash3_32; import org.junit.BeforeClass; import org.junit.Test; @@ -101,4 +103,63 @@ public final void testMemoryPageAndValidation() { unSafeTupleList.release(); } + + @Test + public final void testUnsafeHash() { + + Column col0 = new Column("col0", Type.INT4); + Column col1 = new Column("col1", Type.INT8); + Column col2 = new Column("col2", Type.FLOAT4); + Column col3 = new Column("col3", Type.FLOAT8); + Column col4 = new Column("col4", Type.TEXT); + + Schema schema = new Schema(new Column[]{col0, col1, col2, col3, col4}); + + Datum[] datums = new Datum[]{ + DatumFactory.createInt4(rnd.nextInt()), + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createFloat4(rnd.nextFloat()), + DatumFactory.createFloat8(rnd.nextDouble()), + DatumFactory.createText("test test")}; + Tuple tuple = new VTuple(datums); + + UnSafeTupleList unSafeTupleList = new UnSafeTupleList(SchemaUtil.toDataTypes(schema), 1, StorageUnit.KB); + unSafeTupleList.addTuple(tuple); + UnSafeTuple tuple1 = (UnSafeTuple) unSafeTupleList.get(0); + + assertEquals(tuple.hashCode(), tuple1.hashCode()); + + long address = tuple1.getFieldAddr(0); + assertEquals(tuple.asDatum(0).hashCode(), tuple1.asDatum(0).hashCode()); + assertEquals(MurmurHash3_32.hash(tuple.getInt4(0)), + MurmurHash3_32.hashUnsafeInt(address)); + assertEquals(MurmurHash3_32.hash(tuple.getInt4(0)), + MurmurHash3_32.hashUnsafeVariant(address, tuple1.asDatum(0).size())); + + address = tuple1.getFieldAddr(1); + assertEquals(tuple.asDatum(1).hashCode(), tuple1.asDatum(1).hashCode()); + assertEquals(MurmurHash3_32.hash(tuple.getInt8(1)), + MurmurHash3_32.hashUnsafeLong(address)); + assertEquals(MurmurHash3_32.hash(tuple.getInt8(1)), + MurmurHash3_32.hashUnsafeVariant(address, tuple1.asDatum(1).size())); + + address = tuple1.getFieldAddr(2); + assertEquals(tuple.asDatum(2).hashCode(), tuple1.asDatum(2).hashCode()); + assertEquals(MurmurHash3_32.hash(tuple.getFloat4(2)), + MurmurHash3_32.hashUnsafeInt(address)); + assertEquals(MurmurHash3_32.hash(tuple.getFloat4(2)), + MurmurHash3_32.hashUnsafeVariant(address, tuple1.asDatum(2).size())); + + address = tuple1.getFieldAddr(3); + assertEquals(tuple.asDatum(3).hashCode(), tuple1.asDatum(3).hashCode()); + assertEquals(MurmurHash3_32.hash(tuple.getFloat8(3)), + MurmurHash3_32.hashUnsafeLong(address)); + assertEquals(MurmurHash3_32.hash(tuple.getFloat8(3)), + MurmurHash3_32.hashUnsafeVariant(address, tuple1.asDatum(3).size())); + + address = tuple1.getFieldAddr(4) + 4;//header length; + assertEquals(MurmurHash3_32.hash(tuple1.getBytes(4)), + MurmurHash3_32.hashUnsafeVariant(address, tuple1.asDatum(4).size())); + unSafeTupleList.release(); + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java index 92a68bd345..e8feda41ff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java @@ -18,9 +18,13 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.engine.utils.CacheHolder; import org.apache.tajo.engine.utils.TableCacheKey; @@ -28,6 +32,9 @@ import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.TupleList; +import org.apache.tajo.tuple.memory.UnSafeTupleList; +import org.apache.tajo.util.FileUtil; import org.apache.tajo.worker.ExecutionBlockSharedResource; import org.apache.tajo.worker.TaskAttemptContext; @@ -41,6 +48,7 @@ * @param Tuple collection type to load small relation onto in-memory */ public abstract class CommonHashJoinExec extends CommonJoinExec { + private static final Log LOG = LogFactory.getLog(CommonHashJoinExec.class); // temporal tuples and states for nested loop join protected boolean first = true; @@ -62,6 +70,8 @@ public abstract class CommonHashJoinExec extends CommonJoinExec { protected boolean finished; protected TableStats tableStatsOfCachedRightChild = null; + private TupleList cache; + private boolean flushCache; public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { super(context, plan, outer, inner); @@ -115,6 +125,7 @@ protected void loadRightToHashTable() throws IOException { loadRightFromCache(key); } else { this.tupleSlots = convert(buildRightToHashTable(), false); + this.flushCache = true; } first = false; @@ -123,13 +134,13 @@ protected void loadRightToHashTable() throws IOException { protected void loadRightFromCache(TableCacheKey key) throws IOException { ExecutionBlockSharedResource sharedResource = context.getSharedResource(); - CacheHolder> holder; + CacheHolder>> holder; synchronized (sharedResource.getLock()) { if (sharedResource.hasBroadcastCache(key)) { holder = sharedResource.getBroadcastCache(key); } else { - TupleMap built = buildRightToHashTable(); - holder = new CacheHolder.BroadcastCacheHolder(built, rightChild.getInputStats(), null); + TupleMap> built = buildRightToHashTable(); + holder = new CacheHolder.BroadcastCacheHolder(built, rightChild.getInputStats(), cache); sharedResource.addBroadcastCache(key, holder); } } @@ -137,7 +148,7 @@ protected void loadRightFromCache(TableCacheKey key) throws IOException { this.tupleSlots = convert(holder.getData(), true); } - protected TupleMap buildRightToHashTable() throws IOException { + protected TupleMap> buildRightToHashTable() throws IOException { if (isCrossJoin) { return buildRightToHashTableForCrossJoin(); } else { @@ -145,37 +156,70 @@ protected TupleMap buildRightToHashTable() throws IOException { } } - protected TupleMap buildRightToHashTableForCrossJoin() throws IOException { + protected TupleMap> buildRightToHashTableForCrossJoin() throws IOException { Tuple tuple; - TupleMap map = new TupleMap<>(1); - TupleList tuples = new TupleList(); + TupleMap> map = new TupleMap<>(1); + TajoDataTypes.DataType[] types = SchemaUtil.toDataTypes(rightSchema); + int size = context.getQueryContext().getInt(SessionVars.JOIN_HASH_TABLE_SIZE); + UnSafeTupleList unSafeTuples = null; + + if (directMemory) { + unSafeTuples = new UnSafeTupleList(types, size); + cache = unSafeTuples; + } else { + cache = new HeapTupleList(size); + } while (!context.isStopped() && (tuple = rightChild.next()) != null) { - tuples.add(tuple); + cache.addTuple(tuple); + } + map.put(null, cache); + + if (directMemory) { + LOG.info("Right table: " + + FileUtil.humanReadableByteCount(unSafeTuples.usedMem(), false) + " loaded"); } - map.put(null, tuples); return map; } - protected TupleMap buildRightToHashTableForNonCrossJoin() throws IOException { + protected TupleMap> buildRightToHashTableForNonCrossJoin() throws IOException { Tuple tuple; - TupleMap map = new TupleMap<>(context.getQueryContext().getInt(SessionVars.JOIN_HASH_TABLE_SIZE)); + + int size = context.getQueryContext().getInt(SessionVars.JOIN_HASH_TABLE_SIZE); + TupleMap> map = new TupleMap<>(size); KeyProjector keyProjector = new KeyProjector(rightSchema, rightKeyList); + TajoDataTypes.DataType[] types = SchemaUtil.toDataTypes(rightSchema); + UnSafeTupleList unSafeTuples = null; + + if (directMemory) { + unSafeTuples = new UnSafeTupleList(types, size); + cache = unSafeTuples; + } else { + cache = new HeapTupleList(size); + } + while (!context.isStopped() && (tuple = rightChild.next()) != null) { + // if source is scan or groupby, it needs not to be cloned + cache.addTuple(tuple); + KeyTuple keyTuple = keyProjector.project(tuple); TupleList newValue = map.get(keyTuple); if (newValue == null) { - map.put(keyTuple, newValue = new TupleList()); + map.put(keyTuple, newValue = new ReferenceTupleList(1)); } - // if source is scan or groupby, it needs not to be cloned - newValue.add(tuple); + + newValue.add(cache.get(cache.size() - 1)); + } + if (directMemory) { + LOG.info("Right table: " + + FileUtil.humanReadableByteCount(unSafeTuples.usedMem(), false) + " loaded"); } return map; } // todo: convert loaded data to cache condition - protected abstract TupleMap convert(TupleMap hashed, boolean fromCache) + protected abstract TupleMap convert(TupleMap> hashed, boolean fromCache) throws IOException; @Override @@ -189,6 +233,10 @@ public void rescan() throws IOException { public void close() throws IOException { super.close(); iterator = null; + if(flushCache && cache != null) { + cache.release(); + } + if (tupleSlots != null) { tupleSlots.clear(); tupleSlots = null; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java index 66531575ed..7768a2c063 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java @@ -21,6 +21,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; @@ -59,6 +60,7 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec { protected final Schema rightSchema; protected final FrameTuple frameTuple; + protected final boolean directMemory; // projection protected Projector projector; @@ -82,6 +84,7 @@ public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec ou // for join this.frameTuple = new FrameTuple(); + this.directMemory = context.getQueryContext().getBool(SessionVars.EXECUTOR_DIRECT_MEMORY_ENABLE); } /** diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java index 6607416e09..389cab2c99 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java @@ -32,6 +32,7 @@ import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.TupleList; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -110,7 +111,7 @@ public void init() throws IOException { } } - TupleList currentAggregatedTuples = null; + TupleList currentAggregatedTuples = null; int currentAggregatedTupleIndex = 0; int currentAggregatedTupleSize = 0; @@ -148,13 +149,13 @@ public Tuple next() throws IOException { // Groupby_Key2 | Distinct1_Column_V3 | | | //-------------------------------------------------------------------------------------- - List tupleSlots = new ArrayList<>(); + List> tupleSlots = new ArrayList<>(); // aggregation with single grouping key for (HashAggregator hashAggregator : hashAggregators) { if (!hashAggregator.iterator.hasNext()) { nullCount++; - tupleSlots.add(new TupleList()); + tupleSlots.add(new HeapTupleList()); continue; } Entry> entry = hashAggregator.iterator.next(); @@ -208,7 +209,7 @@ public Tuple next() throws IOException { // currentAggregatedTuples has tuples which has same group key. if (currentAggregatedTuples == null) { - currentAggregatedTuples = new TupleList(); + currentAggregatedTuples = new HeapTupleList(); } else { currentAggregatedTuples.clear(); } @@ -421,7 +422,7 @@ public void initFetch() { } public TupleList aggregate(Map groupTuples) { - TupleList aggregatedTuples = new TupleList(); + TupleList aggregatedTuples = new HeapTupleList(); for (Entry entry : groupTuples.entrySet()) { Tuple groupbyKey = entry.getKey(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java index 7020b9d098..91c9ac0468 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java @@ -21,6 +21,7 @@ import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.TupleList; import org.apache.tajo.util.Pair; import org.apache.tajo.worker.TaskAttemptContext; @@ -29,7 +30,7 @@ import java.util.List; import java.util.Map; -public class HashFullOuterJoinExec extends CommonHashJoinExec> { +public class HashFullOuterJoinExec extends CommonHashJoinExec>> { private boolean finalLoop; // final loop for right unmatched private final List nullTupleList; @@ -44,7 +45,7 @@ public Iterator getUnmatchedRight() { return new Iterator() { - private Iterator> iterator1 = tupleSlots.values().iterator(); + private Iterator>> iterator1 = tupleSlots.values().iterator(); private Iterator iterator2; @Override @@ -53,7 +54,7 @@ public boolean hasNext() { return true; } for (iterator2 = null; !hasMore() && iterator1.hasNext();) { - Pair next = iterator1.next(); + Pair> next = iterator1.next(); if (!next.getFirst()) { iterator2 = next.getSecond().iterator(); } @@ -107,7 +108,7 @@ public Tuple next() throws IOException { continue; } // getting corresponding right - Pair hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple)); + Pair> hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple)); if (hashed == null) { iterator = nullTupleList.iterator(); continue; @@ -125,10 +126,10 @@ public Tuple next() throws IOException { } @Override - protected TupleMap> convert(TupleMap hashed, + protected TupleMap>> convert(TupleMap> hashed, boolean fromCache) throws IOException { - TupleMap> tuples = new TupleMap<>(hashed.size()); - for (Map.Entry entry : hashed.entrySet()) { + TupleMap>> tuples = new TupleMap<>(hashed.size()); + for (Map.Entry> entry : hashed.entrySet()) { // flag: initially false (whether this join key had at least one match on the counter part) tuples.putWihtoutKeyCopy(entry.getKey(), new Pair<>(false, entry.getValue())); } @@ -138,7 +139,7 @@ protected TupleMap> convert(TupleMap hashed, @Override public void rescan() throws IOException { super.rescan(); - for (Pair value : tupleSlots.values()) { + for (Pair> value : tupleSlots.values()) { value.setFirst(false); } finalLoop = false; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index e47e515832..517ecbee05 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -20,12 +20,13 @@ import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.TupleList; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.Iterator; -public class HashJoinExec extends CommonHashJoinExec { +public class HashJoinExec extends CommonHashJoinExec> { public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec, PhysicalExec rightExec) { @@ -33,7 +34,7 @@ public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left } @Override - protected TupleMap convert(TupleMap hashed, boolean fromCache) + protected TupleMap> convert(TupleMap> hashed, boolean fromCache) throws IOException { return fromCache ? new TupleMap<>(hashed) : hashed; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java index 746bdb9e2a..47d098d60a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java @@ -20,6 +20,7 @@ import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.TupleList; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index b652c3c36c..ec793a8349 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.TupleList; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java index 42b78e83d3..295825c753 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java @@ -20,6 +20,7 @@ import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.TupleList; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HeapTupleList.java similarity index 73% rename from tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java rename to tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HeapTupleList.java index 71ccae1b98..8e7fbc46a9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HeapTupleList.java @@ -20,20 +20,21 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.TupleList; import java.util.ArrayList; /** - * In TupleList, input tuples are automatically cloned whenever the add() method is called. + * In HeapTupleList, input tuples are automatically cloned whenever the add() method is called. * This data structure is usually used in physical operators like hash join or hash aggregation. */ -public class TupleList extends ArrayList { +public class HeapTupleList extends ArrayList implements TupleList { - public TupleList() { + public HeapTupleList() { super(); } - public TupleList(int initialCapacity) { + public HeapTupleList(int initialCapacity) { super(initialCapacity); } @@ -41,4 +42,14 @@ public TupleList(int initialCapacity) { public boolean add(Tuple tuple) { return super.add(new VTuple(tuple)); } + + @Override + public boolean addTuple(Tuple tuple) { + return add(tuple); + } + + @Override + public void release() { + super.clear(); + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java index 7ee36e31e4..f0afcc9002 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java @@ -24,7 +24,7 @@ /** * KeyTuple is to keep its hash value in memory to avoid frequent expensive hash calculation. - * Datum.hashCode() uses MurmurHash, so its cost is not so cheap. + * Datum.hashCode() uses MurmurHash3_32, so its cost is not so cheap. * */ public class KeyTuple extends VTuple implements Cloneable { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java index 824fb0e200..eee056d1b9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java @@ -20,12 +20,15 @@ import com.google.common.base.Preconditions; import org.apache.tajo.SessionVars; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.TupleList; +import org.apache.tajo.tuple.memory.UnSafeTupleList; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -40,8 +43,8 @@ public class MergeFullOuterJoinExec extends CommonJoinExec { private Tuple prevLeftTuple = null; private Tuple prevRightTuple = null; - private TupleList leftTupleSlots; - private TupleList rightTupleSlots; + private TupleList leftTupleSlots; + private TupleList rightTupleSlots; private JoinTupleComparator joincomparator = null; private TupleComparator[] tupleComparator = null; @@ -65,8 +68,14 @@ public MergeFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, Physica "but there is no join condition"); final int INITIAL_TUPLE_SLOT = context.getQueryContext().getInt(SessionVars.JOIN_HASH_TABLE_SIZE); - this.leftTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); - this.rightTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); + if (directMemory) { + this.leftTupleSlots = new UnSafeTupleList(SchemaUtil.toDataTypes(leftSchema), INITIAL_TUPLE_SLOT); + this.rightTupleSlots = new UnSafeTupleList(SchemaUtil.toDataTypes(rightSchema), INITIAL_TUPLE_SLOT); + } else { + this.leftTupleSlots = new HeapTupleList(INITIAL_TUPLE_SLOT); + this.rightTupleSlots = new HeapTupleList(INITIAL_TUPLE_SLOT); + } + SortSpec[][] sortSpecs = new SortSpec[2][]; sortSpecs[0] = leftSortKey; sortSpecs[1] = rightSortKey; @@ -222,7 +231,7 @@ public Tuple next() throws IOException { prevLeftTuple.put(leftTuple.getValues()); do { - leftTupleSlots.add(leftTuple); + leftTupleSlots.addTuple(leftTuple); leftTuple = leftChild.next(); if(leftTuple == null) { endLeft = true; @@ -234,7 +243,7 @@ public Tuple next() throws IOException { prevRightTuple.put(rightTuple.getValues()); do { - rightTupleSlots.add(rightTuple); + rightTupleSlots.addTuple(rightTuple); rightTuple = rightChild.next(); if(rightTuple == null) { endRight = true; @@ -304,8 +313,8 @@ public void rescan() throws IOException { @Override public void close() throws IOException { super.close(); - leftTupleSlots.clear(); - rightTupleSlots.clear(); + leftTupleSlots.release(); + rightTupleSlots.release(); leftTupleSlots = null; rightTupleSlots = null; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java index 80c10f6dbd..d2ed13f1e2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java @@ -20,11 +20,14 @@ import com.google.common.base.Preconditions; import org.apache.tajo.SessionVars; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.TupleList; +import org.apache.tajo.tuple.memory.UnSafeTupleList; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -39,10 +42,10 @@ public class MergeJoinExec extends CommonJoinExec { private final Tuple prevOuterTuple; private final Tuple prevInnerTuple; - private TupleList outerTupleSlots; - private TupleList innerTupleSlots; - private Iterator outerIterator; - private Iterator innerIterator; + private TupleList outerTupleSlots; + private TupleList innerTupleSlots; + private Iterator outerIterator; + private Iterator innerIterator; private JoinTupleComparator joincomparator = null; private TupleComparator [] tupleComparator = null; @@ -57,8 +60,14 @@ public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec out final int INITIAL_TUPLE_SLOT = context.getQueryContext().getInt(SessionVars.JOIN_HASH_TABLE_SIZE); - this.outerTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); - this.innerTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); + if (directMemory) { + this.outerTupleSlots = new UnSafeTupleList(SchemaUtil.toDataTypes(outer.getSchema()), INITIAL_TUPLE_SLOT); + this.innerTupleSlots = new UnSafeTupleList(SchemaUtil.toDataTypes(inner.getSchema()), INITIAL_TUPLE_SLOT); + } else { + this.outerTupleSlots = new HeapTupleList(INITIAL_TUPLE_SLOT); + this.innerTupleSlots = new HeapTupleList(INITIAL_TUPLE_SLOT); + } + SortSpec[][] sortSpecs = new SortSpec[2][]; sortSpecs[0] = outerSortKey; sortSpecs[1] = innerSortKey; @@ -110,7 +119,7 @@ public Tuple next() throws IOException { prevOuterTuple.put(outerTuple.getValues()); do { - outerTupleSlots.add(outerTuple); + outerTupleSlots.addTuple(outerTuple); outerTuple = leftChild.next(); if (outerTuple == null) { end = true; @@ -122,7 +131,7 @@ public Tuple next() throws IOException { prevInnerTuple.put(innerTuple.getValues()); do { - innerTupleSlots.add(innerTuple); + innerTupleSlots.addTuple(innerTuple); innerTuple = rightChild.next(); if (innerTuple == null) { end = true; @@ -159,8 +168,8 @@ public void rescan() throws IOException { public void close() throws IOException { super.close(); - outerTupleSlots.clear(); - innerTupleSlots.clear(); + outerTupleSlots.release(); + innerTupleSlots.release(); outerTupleSlots = null; innerTupleSlots = null; outerIterator = null; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ReferenceTupleList.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ReferenceTupleList.java new file mode 100644 index 0000000000..68f9a6dea9 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ReferenceTupleList.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.TupleList; + +import java.util.ArrayList; + +/** + * In ReferenceTupleList, input tuples are not cloned whenever the add() method is called. + */ +public class ReferenceTupleList extends ArrayList implements TupleList { + + public ReferenceTupleList() { + super(); + } + + public ReferenceTupleList(int initialCapacity) { + super(initialCapacity); + } + + @Override + public boolean add(Tuple tuple) { + return super.add(tuple); + } + + @Override + public boolean addTuple(Tuple tuple) { + return add(tuple); + } + + @Override + public void release() { + super.clear(); + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java index 706ec3e728..1f7f568feb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java @@ -20,12 +20,15 @@ import com.google.common.base.Preconditions; import org.apache.tajo.SessionVars; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.TupleList; +import org.apache.tajo.tuple.memory.UnSafeTupleList; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -39,8 +42,8 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { private Tuple nextLeft = null; private Tuple nullPaddedTuple; - private TupleList leftTupleSlots; - private TupleList innerTupleSlots; + private TupleList leftTupleSlots; + private TupleList innerTupleSlots; private JoinTupleComparator joinComparator = null; private TupleComparator [] tupleComparator = null; @@ -60,8 +63,14 @@ public RightOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, Physic "but there is no join condition"); final int INITIAL_TUPLE_SLOT = context.getQueryContext().getInt(SessionVars.JOIN_HASH_TABLE_SIZE); - this.leftTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); - this.innerTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); + if (directMemory) { + this.leftTupleSlots = new UnSafeTupleList(SchemaUtil.toDataTypes(outer.getSchema()), INITIAL_TUPLE_SLOT); + this.innerTupleSlots = new UnSafeTupleList(SchemaUtil.toDataTypes(inner.getSchema()), INITIAL_TUPLE_SLOT); + } else { + this.leftTupleSlots = new HeapTupleList(INITIAL_TUPLE_SLOT); + this.innerTupleSlots = new HeapTupleList(INITIAL_TUPLE_SLOT); + } + SortSpec[][] sortSpecs = new SortSpec[2][]; sortSpecs[0] = outerSortKey; sortSpecs[1] = innerSortKey; @@ -221,7 +230,7 @@ public Tuple next() throws IOException { prevLeftTuple.put(leftTuple.getValues()); } do { - leftTupleSlots.add(leftTuple); + leftTupleSlots.addTuple(leftTuple); leftTuple = leftChild.next(); if( leftTuple == null) { endOuter = true; @@ -235,7 +244,7 @@ public Tuple next() throws IOException { prevRightTuple.put(rightTuple.getValues()); } do { - innerTupleSlots.add(rightTuple); + innerTupleSlots.addTuple(rightTuple); rightTuple = rightChild.next(); if(rightTuple == null) { endInner = true; @@ -330,8 +339,8 @@ public void rescan() throws IOException { @Override public void close() throws IOException { super.close(); - leftTupleSlots.clear(); - innerTupleSlots.clear(); + leftTupleSlots.release(); + innerTupleSlots.release(); leftTupleSlots = null; innerTupleSlots = null; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java index 44845e7de7..718840e833 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java @@ -32,6 +32,7 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.TupleList; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -80,9 +81,9 @@ enum WindowState { // Transient state boolean firstTime = true; - TupleList evaluatedTuples = null; - TupleList accumulatedInTuples = null; - TupleList nextAccumulatedInTuples = null; + TupleList evaluatedTuples = null; + TupleList accumulatedInTuples = null; + TupleList nextAccumulatedInTuples = null; WindowState state = WindowState.NEW_WINDOW; Iterator tupleInFrameIterator = null; @@ -244,7 +245,7 @@ public Tuple next() throws IOException { private void initWindow() { if (firstTime) { - accumulatedInTuples = new TupleList(); + accumulatedInTuples = new HeapTupleList(); contexts = new FunctionContext[functionNum]; for(int evalIdx = 0; evalIdx < functionNum; evalIdx++) { @@ -273,14 +274,14 @@ private void accumulatingWindow(Tuple currentKey, Tuple inTuple) { } private void preAccumulatingNextWindow(Tuple inTuple) { - nextAccumulatedInTuples = new TupleList(); + nextAccumulatedInTuples = new HeapTupleList(); nextAccumulatedInTuples.add(inTuple); } private void evaluationWindowFrame() { TupleComparator comp; - evaluatedTuples = new TupleList(); + evaluatedTuples = new HeapTupleList(); for (Tuple inTuple : accumulatedInTuples) { for (int c = 0; c < nonFunctionColumnNum; c++) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java index fcef055a08..b795f92e89 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java @@ -21,8 +21,9 @@ import org.apache.tajo.QueryId; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.physical.ScanExec; -import org.apache.tajo.engine.planner.physical.TupleList; import org.apache.tajo.engine.planner.physical.TupleMap; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.TupleList; import org.apache.tajo.util.Deallocatable; import org.apache.tajo.worker.TaskAttemptContext; @@ -50,19 +51,19 @@ public interface CacheHolder { * This is a cache-holder for a join table * It will release when execution block is finished */ - class BroadcastCacheHolder implements CacheHolder> { - private TupleMap data; + class BroadcastCacheHolder implements CacheHolder>> { + private TupleMap> data; private Deallocatable rowBlock; private TableStats tableStats; - public BroadcastCacheHolder(TupleMap data, TableStats tableStats, Deallocatable rowBlock){ + public BroadcastCacheHolder(TupleMap> data, TableStats tableStats, Deallocatable rowBlock){ this.data = data; this.tableStats = tableStats; this.rowBlock = rowBlock; } @Override - public TupleMap getData() { + public TupleMap> getData() { return data; } diff --git a/tajo-core/src/main/resources/webapps/worker/index.jsp b/tajo-core/src/main/resources/webapps/worker/index.jsp index fa000128af..e3b0f23ec9 100644 --- a/tajo-core/src/main/resources/webapps/worker/index.jsp +++ b/tajo-core/src/main/resources/webapps/worker/index.jsp @@ -26,6 +26,7 @@ <%@ page import="org.apache.tajo.worker.TajoWorker" %> <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.List" %> +<%@ page import="org.apache.tajo.storage.BufferPool" %> <% TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); @@ -47,9 +48,8 @@

Tajo Worker: <%=tajoWorker.getWorkerContext().getWorkerName()%>


- - - + + From a7bed3028c4574a84bfcbd9fa6ea3d3dab74b752 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 16 Mar 2016 18:55:04 +0900 Subject: [PATCH 2/3] add doc and fix broken tests --- .../org/apache/tajo/util/MurmurHash3_32.java | 8 ++---- .../planner/physical/TestUnSafeTuple.java | 2 +- .../testWindowWithAggregation6.sql | 2 +- .../TestTajoCli/testHelpSessionVars.result | 1 + .../sphinx/configuration/tajo-site-xml.rst | 26 +++++++++++++++++++ 5 files changed, 31 insertions(+), 8 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash3_32.java b/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash3_32.java index 8275482102..75f16de9e5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash3_32.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash3_32.java @@ -77,9 +77,7 @@ public static int hash(byte[] data, int offset, int len, int seed) { // fallthrough case 1: k1 |= (data[roundedEnd] & 0xff); - k1 *= C1; - k1 = (k1 << 15) | (k1 >>> 17); // ROTL32(k1,15); - k1 *= C2; + k1 = mixK1(k1); h1 ^= k1; } @@ -153,9 +151,7 @@ public static int hashUnsafeVariant(long address, int len, int seed) { // fallthrough case 1: k1 |= (UnsafeUtil.unsafe.getByte(offset) & 0xff); - k1 *= C1; - k1 = (k1 << 15) | (k1 >>> 17); // ROTL32(k1,15); - k1 *= C2; + k1 = mixK1(k1); h1 ^= k1; } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java index 224d4782fd..cb23d87066 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java @@ -125,7 +125,7 @@ public final void testUnsafeHash() { UnSafeTupleList unSafeTupleList = new UnSafeTupleList(SchemaUtil.toDataTypes(schema), 1, StorageUnit.KB); unSafeTupleList.addTuple(tuple); - UnSafeTuple tuple1 = (UnSafeTuple) unSafeTupleList.get(0); + UnSafeTuple tuple1 = unSafeTupleList.get(0); assertEquals(tuple.hashCode(), tuple1.hashCode()); diff --git a/tajo-core-tests/src/test/resources/queries/TestWindowQuery/testWindowWithAggregation6.sql b/tajo-core-tests/src/test/resources/queries/TestWindowQuery/testWindowWithAggregation6.sql index 1a9d46bf42..1783aedcf0 100644 --- a/tajo-core-tests/src/test/resources/queries/TestWindowQuery/testWindowWithAggregation6.sql +++ b/tajo-core-tests/src/test/resources/queries/TestWindowQuery/testWindowWithAggregation6.sql @@ -1,7 +1,7 @@ select l_orderkey, count(*) as cnt, - row_number() over (order by count(*) desc) row_num + row_number() over (order by count(*) desc, l_orderkey) row_num from lineitem group by diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index 46e6b763da..484d1cac35 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -41,6 +41,7 @@ Available Session Variables: \set AGG_HASH_TABLE_SIZE [int value] - The initial size of list for in-memory aggregation \set SORT_LIST_SIZE [int value] - The initial size of list for in-memory sort \set JOIN_HASH_TABLE_SIZE [int value] - The initial size of hash table for in-memory hash join +\set EXECUTOR_DIRECT_MEMORY_ENABLE [true or false] - If true, the executor data will be kept in direct memory \set INDEX_ENABLED [true or false] - index scan enabled \set INDEX_SELECTIVITY_THRESHOLD [real value] - the selectivity threshold for index scan \set PARTITION_NO_RESULT_OVERWRITE_ENABLED [true or false] - If true, a partitioned table is overwritten even if a sub query leads to no result. Otherwise, the table data will be kept if there is no result diff --git a/tajo-docs/src/main/sphinx/configuration/tajo-site-xml.rst b/tajo-docs/src/main/sphinx/configuration/tajo-site-xml.rst index 2280897089..f1a83854e7 100644 --- a/tajo-docs/src/main/sphinx/configuration/tajo-site-xml.rst +++ b/tajo-docs/src/main/sphinx/configuration/tajo-site-xml.rst @@ -6,10 +6,36 @@ You can add more configurations in the ``tajo-site.xml`` file. Note that you sho If you are looking for the configurations for the master and the worker, please refer to :doc:`tajo_master_configuration` and :doc:`worker_configuration`. Also, catalog configurations are found here :doc:`catalog_configuration`. +========================= +Common Query Settings +========================= + +.. _tajo.executor.memory.direct: + +"""""""""""""""""""""""""""""""""""""" +`tajo.executor.memory.direct` +"""""""""""""""""""""""""""""""""""""" + +A flag to enable or disable the use of direct memory. + + * Property value type: Boolean + * Default value: true + * Example + +.. code-block:: xml + + + tajo.executor.memory.direct + true + + + ========================= Join Query Settings ========================= +.. _tajo.dist-query.join.auto-broadcast: + """""""""""""""""""""""""""""""""""""" `tajo.dist-query.join.auto-broadcast` """""""""""""""""""""""""""""""""""""" From dd90ac10147db08c521a02954c314610102abb04 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 17 Mar 2016 10:44:21 +0900 Subject: [PATCH 3/3] fix broken test --- .../java/org/apache/tajo/engine/query/TestWindowQuery.java | 6 +++--- .../queries/TestWindowQuery/testWindowWithAggregation4.sql | 2 ++ .../TestWindowQuery/testWindowWithAggregation4.result | 5 ++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java index 9993992aa9..f09d53749d 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java @@ -204,10 +204,10 @@ public final void testWindowWithAggregation3() throws Exception { } @Test + @Option(sort = true) + @SimpleTest() public final void testWindowWithAggregation4() throws Exception { - ResultSet res = executeQuery(); - assertResultSet(res); - cleanupQuery(res); + runSimpleTests(); } @Test diff --git a/tajo-core-tests/src/test/resources/queries/TestWindowQuery/testWindowWithAggregation4.sql b/tajo-core-tests/src/test/resources/queries/TestWindowQuery/testWindowWithAggregation4.sql index af695ba973..5a954ddacb 100644 --- a/tajo-core-tests/src/test/resources/queries/TestWindowQuery/testWindowWithAggregation4.sql +++ b/tajo-core-tests/src/test/resources/queries/TestWindowQuery/testWindowWithAggregation4.sql @@ -4,5 +4,7 @@ select row_number() over (order by count(*) desc) row_num from lineitem +where + l_orderkey != 1 group by l_orderkey \ No newline at end of file diff --git a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result index fffa2dd79e..a5de93570f 100644 --- a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result +++ b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result @@ -1,5 +1,4 @@ l_orderkey,cnt,row_num ------------------------------- -1,2,1 -3,2,2 -2,1,3 \ No newline at end of file +2,1,2 +3,2,1
MaxHeap: <%=Runtime.getRuntime().maxMemory()/1024/1024%> MB
TotalHeap: <%=Runtime.getRuntime().totalMemory()/1024/1024%> MB
FreeHeap: <%=Runtime.getRuntime().freeMemory()/1024/1024%> MB
Heap memory: <%= BufferPool.printHeapMemoryUsage() %>
Direct memory: <%= BufferPool.printDirectMemoryUsage() %>
Available Resource: <%= tajoWorker.getWorkerContext().getNodeResourceManager().getAvailableResource() %>
Running Tasks: <%= tajoWorker.getWorkerContext().getTaskManager().getRunningTasks() %>
Configuration:detail...