diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index bd1b88fa6b..b4b3b25d44 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -163,6 +163,8 @@ public static Datum createFromInt4(DataType type, int val) { return new Int4Datum(val); case DATE: return new DateDatum(val); + case INET4: + return new Inet4Datum(val); default: throw new UnsupportedOperationException("Cannot create " + type.getType().name() + " datum from INT4"); } diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java index 89e72ed092..4ce828c9f1 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatum; @@ -50,6 +51,11 @@ public boolean contains(int fieldId) { return false; } + @Override + public TajoDataTypes.Type type(int fieldId) { + return TajoDataTypes.Type.NULL_TYPE; + } + @Override public boolean isNull(int fieldid) { return true; diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/ReadableTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/ReadableTuple.java new file mode 100644 index 0000000000..18446082e8 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/storage/ReadableTuple.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; + +public interface ReadableTuple { + + TajoDataTypes.Type type(int fieldId); + + boolean isNull(int fieldId); + + boolean getBool(int fieldId); + + byte getByte(int fieldId); + + char getChar(int fieldId); + + byte[] getBytes(int fieldId); + + short getInt2(int fieldId); + + int getInt4(int fieldId); + + long getInt8(int fieldId); + + float getFloat4(int fieldId); + + double getFloat8(int fieldId); + + String getText(int fieldId); + + Datum getProtobufDatum(int fieldId); + + Datum getInterval(int fieldId); + + char[] getUnicodeChars(int fieldId); + + Datum get(int fieldId); +} diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java index aec784fc52..da8210fb4f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java @@ -20,14 +20,12 @@ import org.apache.tajo.datum.Datum; -public interface Tuple extends Cloneable { +public interface Tuple extends Cloneable, ReadableTuple { int size(); boolean contains(int fieldid); - boolean isNull(int fieldid); - @SuppressWarnings("unused") boolean isNotNull(int fieldid); @@ -41,37 +39,9 @@ public interface Tuple extends Cloneable { void put(Datum[] values); - Datum get(int fieldId); - void setOffset(long offset); - - long getOffset(); - - boolean getBool(int fieldId); - byte getByte(int fieldId); - - char getChar(int fieldId); - - byte [] getBytes(int fieldId); - - short getInt2(int fieldId); - - int getInt4(int fieldId); - - long getInt8(int fieldId); - - float getFloat4(int fieldId); - - double getFloat8(int fieldId); - - String getText(int fieldId); - - Datum getProtobufDatum(int fieldId); - - Datum getInterval(int fieldId); - - char [] getUnicodeChars(int fieldId); + long getOffset(); Tuple clone() throws CloneNotSupportedException; diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java index 5e839b7e58..77252cc4ee 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage; import com.google.gson.annotations.Expose; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.Inet4Datum; import org.apache.tajo.datum.IntervalDatum; @@ -54,6 +55,11 @@ public boolean contains(int fieldId) { return values[fieldId] != null; } + @Override + public TajoDataTypes.Type type(int fieldId) { + return values[fieldId] == null ? TajoDataTypes.Type.NULL_TYPE : values[fieldId].type(); + } + @Override public boolean isNull(int fieldid) { return values[fieldid] == null || values[fieldid].isNull(); diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java b/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java index 708eae919a..c968fa0692 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java @@ -121,6 +121,9 @@ public class ClassSize { /** Overhead for KeyValueSkipListSet */ public static final int KEYVALUE_SKIPLIST_SET; + /** Overhead for BitSet */ + public static final int BITSET; + /* Are we running on jdk7? */ private static final boolean JDK7; static { @@ -212,6 +215,8 @@ public class ClassSize { TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2); KEYVALUE_SKIPLIST_SET = align(OBJECT + REFERENCE); + + BITSET = align(ARRAY + Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN); } /** diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java deleted file mode 100644 index 9940608a2a..0000000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java +++ /dev/null @@ -1,414 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.planner.physical; - -import com.google.common.primitives.Booleans; -import com.google.common.primitives.Doubles; -import com.google.common.primitives.Floats; -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; -import com.google.common.primitives.Shorts; -import com.google.common.primitives.UnsignedInts; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; - -import java.util.Arrays; -import java.util.BitSet; - -/** - * Extract raw level values (primitive or String/byte[]) from each of key columns for compare - */ -public class ComparableVector { - - protected final Tuple[] tuples; // source tuples - protected final TupleVector[] vectors; // values of key columns - protected final int[] keyIndex; - - public ComparableVector(int length, SortSpec[] sortKeys, int[] keyIndex) { - tuples = new Tuple[length]; - vectors = new TupleVector[sortKeys.length]; - for (int i = 0; i < vectors.length; i++) { - TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType(); - boolean nullFirst = sortKeys[i].isNullFirst(); - boolean ascending = sortKeys[i].isAscending(); - boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending; - vectors[i] = new TupleVector(vectorType(type), tuples.length, nullInvert, ascending); - } - this.keyIndex = keyIndex; - } - - public int compare(final int i1, final int i2) { - for (TupleVector vector : vectors) { - int compare = vector.compare(i1, i2); - if (compare != 0) { - return compare; - } - } - return 0; - } - - public void set(int index, Tuple tuple) { - for (int i = 0; i < vectors.length; i++) { - vectors[i].set(index, tuple, keyIndex[i]); - } - } - - protected static class TupleVector { - - private final int type; - private final BitSet nulls; - private final boolean nullInvert; - private final boolean ascending; - - private boolean[] booleans; - private byte[] bits; - private short[] shorts; - private int[] ints; - private long[] longs; - private float[] floats; - private double[] doubles; - private byte[][] bytes; - - private int index; - - private TupleVector(int type, int length, boolean nullInvert, boolean ascending) { - this.type = type; - this.nulls = new BitSet(length); - this.nullInvert = nullInvert; - this.ascending = ascending; - switch (type) { - case 0: booleans = new boolean[length]; break; - case 1: bits = new byte[length]; break; - case 2: shorts = new short[length]; break; - case 3: ints = new int[length]; break; - case 4: longs = new long[length]; break; - case 5: floats = new float[length]; break; - case 6: doubles = new double[length]; break; - case 7: bytes = new byte[length][]; break; - case 8: ints = new int[length]; break; - case -1: break; - default: - throw new IllegalArgumentException(); - } - } - - protected final void append(Tuple tuple, int field) { - set(index++, tuple, field); - } - - protected final void set(int index, Tuple tuple, int field) { - if (tuple.isNull(field)) { - nulls.set(index); - return; - } - nulls.clear(index); - switch (type) { - case 0: booleans[index] = tuple.getBool(field); break; - case 1: bits[index] = tuple.getByte(field); break; - case 2: shorts[index] = tuple.getInt2(field); break; - case 3: ints[index] = tuple.getInt4(field); break; - case 4: longs[index] = tuple.getInt8(field); break; - case 5: floats[index] = tuple.getFloat4(field); break; - case 6: doubles[index] = tuple.getFloat8(field); break; - case 7: bytes[index] = tuple.getBytes(field); break; - case 8: ints[index] = tuple.getInt4(field); break; - default: - throw new IllegalArgumentException(); - } - } - - protected final int compare(int index1, int index2) { - final boolean n1 = nulls.get(index1); - final boolean n2 = nulls.get(index2); - if (n1 && n2) { - return 0; - } - if (n1 ^ n2) { - int compVal = n1 ? 1 : -1; - return nullInvert ? -compVal : compVal; - } - int compare; - switch (type) { - case 0: compare = Booleans.compare(booleans[index1], booleans[index2]); break; - case 1: compare = bits[index1] - bits[index2]; break; - case 2: compare = Shorts.compare(shorts[index1], shorts[index2]); break; - case 3: compare = Ints.compare(ints[index1], ints[index2]); break; - case 4: compare = Longs.compare(longs[index1], longs[index2]); break; - case 5: compare = Floats.compare(floats[index1], floats[index2]); break; - case 6: compare = Doubles.compare(doubles[index1], doubles[index2]); break; - case 7: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break; - case 8: compare = UnsignedInts.compare(ints[index1], ints[index2]); break; - default: - throw new IllegalArgumentException(); - } - return ascending ? compare : -compare; - } - } - - public static class ComparableTuple { - - private final TupleType[] keyTypes; - private final int[] keyIndex; - private final Object[] keys; - - public ComparableTuple(Schema schema, int[] keyIndex) { - this(tupleTypes(schema, keyIndex), keyIndex); - } - - public ComparableTuple(Schema schema, int start, int end) { - this(schema, toKeyIndex(start, end)); - } - - private ComparableTuple(TupleType[] keyTypes, int[] keyIndex) { - this.keyTypes = keyTypes; - this.keyIndex = keyIndex; - this.keys = new Object[keyIndex.length]; - } - - public int size() { - return keyIndex.length; - } - - public void set(Tuple tuple) { - for (int i = 0; i < keyTypes.length; i++) { - final int field = keyIndex[i]; - if (tuple.isNull(field)) { - keys[i] = null; - continue; - } - switch (keyTypes[i]) { - case BOOLEAN: keys[i] = tuple.getBool(field); break; - case BIT: keys[i] = tuple.getByte(field); break; - case INT1: - case INT2: keys[i] = tuple.getInt2(field); break; - case INT4: - case DATE: - case INET4: keys[i] = tuple.getInt4(field); break; - case INT8: - case TIME: - case TIMESTAMP: keys[i] = tuple.getInt8(field); break; - case FLOAT4: keys[i] = tuple.getFloat4(field); break; - case FLOAT8: keys[i] = tuple.getFloat8(field); break; - case TEXT: - case CHAR: - case BLOB: keys[i] = tuple.getBytes(field); break; - case DATUM: keys[i] = tuple.get(field); break; - default: - throw new IllegalArgumentException(); - } - } - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ComparableTuple) { - ComparableTuple other = (ComparableTuple)obj; - for (int i = 0; i < keys.length; i++) { - final boolean n1 = keys[i] == null; - final boolean n2 = other.keys[i] == null; - if (n1 && n2) { - continue; - } - if (n1 ^ n2) { - return false; - } - switch (keyTypes[i]) { - case TEXT: - case CHAR: - case BLOB: if (!Arrays.equals((byte[])keys[i], (byte[])other.keys[i])) return false; continue; - default: if (!keys[i].equals(other.keys[i])) return false; continue; - } - } - return true; - } - return false; - } - - public boolean equals(Tuple tuple) { - for (int i = 0; i < keys.length; i++) { - final int field = keyIndex[i]; - final boolean n1 = keys[i] == null; - final boolean n2 = tuple.isNull(field); - if (n1 && n2) { - continue; - } - if (n1 ^ n2) { - return false; - } - switch (keyTypes[i]) { - case BOOLEAN: if ((Boolean)keys[i] != tuple.getBool(field)) return false; continue; - case BIT: if ((Byte)keys[i] != tuple.getByte(field)) return false; continue; - case INT1: - case INT2: if ((Short)keys[i] != tuple.getInt2(field)) return false; continue; - case INT4: - case DATE: - case INET4: if ((Integer)keys[i] != tuple.getInt4(field)) return false; continue; - case INT8: - case TIME: - case TIMESTAMP: if ((Long)keys[i] != tuple.getInt8(field)) return false; continue; - case FLOAT4: if ((Float)keys[i] != tuple.getFloat4(field)) return false; continue; - case FLOAT8: if ((Double)keys[i] != tuple.getFloat8(field)) return false; continue; - case TEXT: - case CHAR: - case BLOB: if (!Arrays.equals((byte[])keys[i], tuple.getBytes(field))) return false; continue; - case DATUM: if (!keys[i].equals(tuple.get(field))) return false; continue; - } - } - return true; - } - - @Override - public int hashCode() { - int result = 1; - for (Object key : keys) { - int hash = key == null ? 0 : - key instanceof byte[] ? Arrays.hashCode((byte[])key) : key.hashCode(); - result = 31 * result + hash; - } - return result; - } - - public ComparableTuple copy() { - ComparableTuple copy = emptyCopy(); - System.arraycopy(keys, 0, copy.keys, 0, keys.length); - return copy; - } - - public ComparableTuple emptyCopy() { - return new ComparableTuple(keyTypes, keyIndex); - } - - public VTuple toVTuple() { - VTuple vtuple = new VTuple(keyIndex.length); - for (int i = 0; i < keyIndex.length; i++) { - vtuple.put(i, toDatum(i)); - } - return vtuple; - } - - public Datum toDatum(int i) { - if (keys[i] == null) { - return NullDatum.get(); - } - switch (keyTypes[i]) { - case NULL_TYPE: return NullDatum.get(); - case BOOLEAN: return DatumFactory.createBool((Boolean) keys[i]); - case BIT: return DatumFactory.createBit((Byte)keys[i]); - case INT1: - case INT2: return DatumFactory.createInt2((Short) keys[i]); - case INT4: return DatumFactory.createInt4((Integer) keys[i]); - case DATE: return DatumFactory.createDate((Integer) keys[i]); - case INET4: return DatumFactory.createInet4((Integer) keys[i]); - case INT8: return DatumFactory.createInt8((Long) keys[i]); - case TIME: return DatumFactory.createTime((Long) keys[i]); - case TIMESTAMP: return DatumFactory.createTimestamp((Long) keys[i]); - case FLOAT4: return DatumFactory.createFloat4((Float) keys[i]); - case FLOAT8: return DatumFactory.createFloat8((Double) keys[i]); - case TEXT: return DatumFactory.createText((byte[]) keys[i]); - case CHAR: return DatumFactory.createChar((byte[]) keys[i]); - case BLOB: return DatumFactory.createBlob((byte[]) keys[i]); - case DATUM: return (Datum)keys[i]; - default: - throw new IllegalArgumentException(); - } - } - } - - public static boolean isVectorizable(SortSpec[] sortKeys) { - if (sortKeys.length == 0) { - return false; - } - for (SortSpec spec : sortKeys) { - try { - vectorType(spec.getSortKey().getDataType().getType()); - } catch (Exception e) { - return false; - } - } - return true; - } - - private static int vectorType(TajoDataTypes.Type type) { - switch (type) { - case BOOLEAN: return 0; - case BIT: return 1; - case INT1: case INT2: return 2; - case INT4: case DATE: return 3; - case INT8: case TIME: case TIMESTAMP: case INTERVAL: return 4; - case FLOAT4: return 5; - case FLOAT8: return 6; - case TEXT: case CHAR: case BLOB: return 7; - case INET4: return 8; - case NULL_TYPE: return -1; - } - // todo - throw new UnsupportedException(type.name()); - } - - private static TupleType[] tupleTypes(Schema schema, int[] keyIndex) { - TupleType[] types = new TupleType[keyIndex.length]; - for (int i = 0; i < keyIndex.length; i++) { - types[i] = tupleType(schema.getColumn(keyIndex[i]).getDataType().getType()); - } - return types; - } - - private static TupleType tupleType(TajoDataTypes.Type type) { - switch (type) { - case BOOLEAN: return TupleType.BOOLEAN; - case BIT: return TupleType.BIT; - case INT1: return TupleType.INT1; - case INT2: return TupleType.INT2; - case INT4: return TupleType.INT4; - case DATE: return TupleType.DATE; - case INT8: return TupleType.INT8; - case TIME: return TupleType.TIME; - case TIMESTAMP: return TupleType.TIMESTAMP; - case FLOAT4: return TupleType.FLOAT4; - case FLOAT8: return TupleType.FLOAT8; - case TEXT: return TupleType.TEXT; - case CHAR: return TupleType.CHAR; - case BLOB: return TupleType.BLOB; - case INET4: return TupleType.INET4; - case NULL_TYPE: return TupleType.NULL_TYPE; - default: return TupleType.DATUM; - } - } - - private static int[] toKeyIndex(int start, int end) { - int[] keyIndex = new int[end - start]; - for (int i = 0; i < keyIndex.length; i++) { - keyIndex[i] = start + i; - } - return keyIndex; - } - - private static enum TupleType { - NULL_TYPE, BOOLEAN, BIT, INT1, INT2, INT4, DATE, INET4, INT8, TIME, TIMESTAMP, - FLOAT4, FLOAT8, TEXT, CHAR, BLOB, DATUM - } -} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 3da296cdd8..e5abc55433 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -28,12 +28,16 @@ import org.apache.hadoop.io.IOUtils; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.planner.PhysicalPlanningException; import org.apache.tajo.plan.logical.SortNode; import org.apache.tajo.storage.*; @@ -41,6 +45,9 @@ import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.ClassSize; +import org.apache.tajo.util.ComparableVector; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; @@ -81,7 +88,7 @@ public class ExternalSortExec extends SortExec { /** If there are available multiple cores, it tries parallel merge. */ private ExecutorService executorService; /** used for in-memory sort of each chunk. */ - private List inMemoryTable; + private MemoryCache inMemoryTable; /** temporal dir */ private final Path sortTmpDir; /** It enables round-robin disks allocation */ @@ -120,7 +127,6 @@ private ExternalSortExec(final TaskAttemptContext context, final SortNode plan) this.sortBufferBytesNum = context.getQueryContext().getLong(SessionVars.EXTSORT_BUFFER_SIZE) * StorageUnit.MB; this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM); this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum); - this.inMemoryTable = new ArrayList(100000); this.sortTmpDir = getExecutorTmpDir(); localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); @@ -161,24 +167,24 @@ public SortNode getPlan() { /** * Sort a tuple block and store them into a chunk file */ - private Path sortAndStoreChunk(int chunkId, List tupleBlock) + private Path sortAndStoreChunk(int chunkId, MemoryCache cache) throws IOException { TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW); - int rowNum = tupleBlock.size(); + int rowNum = cache.size(); long sortStart = System.currentTimeMillis(); - Iterable sorted = getSorter(tupleBlock).sort(); + cache.sortTuples(); long sortEnd = System.currentTimeMillis(); long chunkWriteStart = System.currentTimeMillis(); Path outputPath = getChunkPathForWrite(0, chunkId); final RawFileAppender appender = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath); appender.init(); - for (Tuple t : sorted) { + for (ReadableTuple t : cache) { appender.addTuple(t); } appender.close(); - tupleBlock.clear(); + cache.reset(); long chunkWriteEnd = System.currentTimeMillis(); @@ -197,27 +203,30 @@ private Path sortAndStoreChunk(int chunkId, List tupleBlock) */ private List sortAndStoreAllChunks() throws IOException { Tuple tuple; - long memoryConsumption = 0; List chunkPaths = TUtil.newList(); + // TODO - calculate average tuple size from scan exec + int limit = estimateLimit(inSchema, sortBufferBytesNum); + inMemoryTable = new MemoryCache(limit, sortSpecs, comparator.getSortKeyIds(), sortBufferBytesNum); + int chunkId = 0; long runStartTime = System.currentTimeMillis(); while (!context.isStopped() && (tuple = child.next()) != null) { // partition sort start - Tuple vtuple = new VTuple(tuple); - inMemoryTable.add(vtuple); - memoryConsumption += MemoryUtil.calculateMemorySize(vtuple); - if (memoryConsumption > sortBufferBytesNum) { + if (!inMemoryTable.addTuple(tuple)) { long runEndTime = System.currentTimeMillis(); info(LOG, chunkId + " run loading time: " + (runEndTime - runStartTime) + " msec"); runStartTime = runEndTime; - info(LOG, "Memory consumption exceeds " + sortBufferBytesNum + " bytes"); + if (inMemoryTable.memoryConsumption > sortBufferBytesNum) { + info(LOG, "Memory consumption exceeds " + sortBufferBytesNum + " bytes"); + } memoryResident = false; chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable)); - memoryConsumption = 0; + inMemoryTable.reset(); + inMemoryTable.addTuple(tuple); chunkId++; // When the volume of sorting data once exceed the size of sort buffer, @@ -235,13 +244,15 @@ private List sortAndStoreAllChunks() throws IOException { } } - if (!memoryResident && !inMemoryTable.isEmpty()) { // if there are at least one or more input tuples + if (!memoryResident && inMemoryTable.size() > 0) { // if there are at least one or more input tuples // check if data exceeds a sort buffer. If so, it store the remain data into a chunk. long start = System.currentTimeMillis(); int rowNum = inMemoryTable.size(); chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable)); long end = System.currentTimeMillis(); info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)"); + inMemoryTable.reset(); + inMemoryTable = null; } // get total loaded (or stored) bytes and total row numbers @@ -279,8 +290,7 @@ public Tuple next() throws IOException { info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec"); if (memoryResident) { // if all sorted data reside in a main-memory table. - TupleSorter sorter = getSorter(inMemoryTable); - result = new MemTableScanner(sorter.sort(), inMemoryTable.size(), sortAndStoredBytes); + result = new MemTableScanner(); } else { // if input data exceeds main-memory at least once try { @@ -498,11 +508,9 @@ private boolean checkIfCanBeUnbalancedMerged(int remainInputNum, int outputNum) */ private Scanner createFinalMerger(List inputs) throws IOException { if (inputs.size() == 1) { - this.result = getFileScanner(inputs.get(0)); - } else { - this.result = createKWayMerger(inputs, 0, inputs.size()); + return getFileScanner(inputs.get(0)); } - return result; + return createKWayMerger(inputs, 0, inputs.size()); } private Scanner getFileScanner(FileFragment frag) throws IOException { @@ -533,45 +541,41 @@ private Scanner createKWayMergerInternal(final Scanner [] sources, final int sta } } - private static class MemTableScanner extends AbstractScanner { - final Iterable iterable; - final long sortAndStoredBytes; - final int totalRecords; + private class MemTableScanner extends AbstractScanner { - Iterator iterator; + private final int[] mapping; // for input stats - float scannerProgress; - int numRecords; - TableStats scannerTableStats; + private float scannerProgress; - public MemTableScanner(Iterable iterable, int length, long inBytes) { - this.iterable = iterable; - this.totalRecords = length; - this.sortAndStoredBytes = inBytes; + private TableStats scannerTableStats; + + private transient int index; + + public MemTableScanner() { + this.mapping = inMemoryTable.sortTuples(); + LOG.info("Using memory scanner, used only " + + FileUtil.humanReadableByteCount(inMemoryTable.memoryConsumption, false)); } @Override public void init() throws IOException { - iterator = iterable.iterator(); + index = 0; scannerProgress = 0.0f; - numRecords = 0; // it will be returned as the final stats scannerTableStats = new TableStats(); scannerTableStats.setNumBytes(sortAndStoredBytes); scannerTableStats.setReadBytes(sortAndStoredBytes); - scannerTableStats.setNumRows(totalRecords); + scannerTableStats.setNumRows(inMemoryTable.size()); } @Override public Tuple next() throws IOException { - if (iterator.hasNext()) { - numRecords++; - return iterator.next(); - } else { - return null; + if (index < mapping.length) { + return inMemoryTable.toVTuple(mapping[index++]); } + return null; } @Override @@ -581,15 +585,13 @@ public void reset() throws IOException { @Override public void close() throws IOException { - iterator = null; scannerProgress = 1.0f; } @Override public float getProgress() { - if (iterator != null && numRecords > 0) { - return (float)numRecords / (float)totalRecords; - + if (mapping.length > 0) { + return (float)index / (float)mapping.length; } else { // if an input is empty return scannerProgress; } @@ -794,15 +796,13 @@ public void close() throws IOException { } } - if(inMemoryTable != null){ - inMemoryTable.clear(); + if (inMemoryTable != null) { + inMemoryTable.reset(); inMemoryTable = null; } - if(executorService != null){ - executorService.shutdown(); - executorService = null; - } + executorService.shutdown(); + executorService = null; plan = null; super.close(); @@ -810,7 +810,10 @@ public void close() throws IOException { @Override public void rescan() throws IOException { - if (result != null) { + // parents should propagate if they needs rescanning + if (result instanceof MemoryCache) { + sorted = false; + } else if (result != null) { result.reset(); } super.rescan(); @@ -834,4 +837,122 @@ public TableStats getInputStats() { return inputStats; } } + + private int estimateLimit(Schema schema, long allocation) { + int allocated = 0; + List columns = schema.getColumns(); + allocation -= ClassSize.ARRAY * 3 * columns.size() * Bytes.SIZEOF_INT; + for (Column column : columns) { + allocation -= ClassSize.ARRAY + ClassSize.REFERENCE * 12 + ClassSize.BITSET; + switch(column.getDataType().getType()) { + case BOOLEAN: + case BIT: allocated += 1; continue; + case INT1: + case INT2: allocated += 2; continue; + case INET4: + case DATE: + case INT4: allocated += 4; continue; + case INT8: + case TIME: + case TIMESTAMP: allocated += 8; continue; + case FLOAT4: allocated += 4; continue; + case FLOAT8: allocated += 8; continue; + default: allocated += ClassSize.OBJECT; + } + } + return (int)(allocation / allocated); + } + + private class MemoryCache extends VectorizedSorter implements Iterable { + + private final long memoryLimit; + private final long initialSize; + private final TajoDataTypes.Type[] types; + private long memoryConsumption; + + public MemoryCache(int limit, SortSpec[] sortKeys, int[] keyIndex, long memoryLimit) { + super(limit, inSchema, sortKeys, keyIndex); + this.memoryLimit = memoryLimit; + this.types = new TajoDataTypes.Type[inSchema.size()]; + for (int i = 0; i < inSchema.size(); i++) { + types[i] = inSchema.getColumn(i).getDataType().getType(); + } + int allocated = ClassSize.ARRAY * 3 * inSchema.size() * Bytes.SIZEOF_INT; + for (TajoDataTypes.Type type : types) { + allocated += ClassSize.ARRAY + ClassSize.REFERENCE * 12 + ClassSize.BITSET; + switch(type) { + case BOOLEAN: + case BIT: allocated += limit; continue; + case INT1: + case INT2: allocated += 2 * limit; continue; + case INET4: + case DATE: + case INT4: allocated += 4 * limit; continue; + case INT8: + case TIME: + case TIMESTAMP: allocated += 8 * limit; continue; + case FLOAT4: allocated += 4 * limit; continue; + case FLOAT8: allocated += 8 * limit; continue; + } + } + initialSize = memoryConsumption = allocated; + } + + @Override + public boolean addTuple(Tuple tuple) { + memoryConsumption += MemoryUtil.calculateVectorMemorySize(tuple); + if (memoryConsumption <= memoryLimit) { + return super.addTuple(tuple); + } + return false; + } + + @Override + public void reset() { + super.reset(); + memoryConsumption = initialSize; + } + + @Override + public Iterator iterator() { + return newIterator(); + } + + public Iterator newIterator() { + final int[] mapping = mapping(); + return new Iterator() { + private int index = -1; + private final TupleIterator iterator = new TupleIterator(); + public boolean hasNext() { return index + 1 < mapping.length; } + public ReadableTuple next() { + index++; + return iterator; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + + class TupleIterator implements ReadableTuple { + public TajoDataTypes.Type type(int fieldId) { return types[fieldId]; } + public boolean isNull(int fieldId) { return MemoryCache.this.isNull(mapping[index], fieldId); } + public boolean getBool(int fieldId) { return MemoryCache.this.getBool(mapping[index], fieldId); } + public byte getByte(int fieldId) { return MemoryCache.this.getByte(mapping[index], fieldId); } + public char getChar(int fieldId) { return MemoryCache.this.getChar(mapping[index], fieldId); } + public byte[] getBytes(int fieldId) { return MemoryCache.this.getBytes(mapping[index], fieldId); } + public short getInt2(int fieldId) { return MemoryCache.this.getInt2(mapping[index], fieldId); } + public int getInt4(int fieldId) { return MemoryCache.this.getInt4(mapping[index], fieldId); } + public long getInt8(int fieldId) { return MemoryCache.this.getInt8(mapping[index], fieldId); } + public float getFloat4(int fieldId) { return MemoryCache.this.getFloat4(mapping[index], fieldId); } + public double getFloat8(int fieldId) { return MemoryCache.this.getFloat8(mapping[index], fieldId); } + public String getText(int fieldId) { return MemoryCache.this.getText(mapping[index], fieldId); } + public Datum getProtobufDatum(int fieldId) { return MemoryCache.this.getProtobufDatum(mapping[index], fieldId); } + public Datum getInterval(int fieldId) { return MemoryCache.this.getInterval(mapping[index], fieldId); } + public char[] getUnicodeChars(int fieldId) { return MemoryCache.this.getUnicodeChars(mapping[index], fieldId); } + public Datum get(int fieldId) { return MemoryCache.this.get(mapping[index], fieldId); } + } + }; + } + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java index 28be9de670..ec72251fcb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java @@ -22,6 +22,7 @@ import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.ComparableVector; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -42,7 +43,7 @@ public SortExec(TaskAttemptContext context, Schema inSchema, protected TupleSorter getSorter(List tupleSlots) { if (!tupleSlots.isEmpty() && ComparableVector.isVectorizable(sortSpecs)) { - return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds()); + return new VectorizedTupleSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds()); } return new TupleSorter.DefaultSorter(tupleSlots, comparator); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java index 18d853f1f5..8f51b73f20 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java @@ -20,31 +20,41 @@ import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.ComparableVector; -import java.util.Iterator; -import java.util.List; +import java.util.Arrays; /** * Extract raw level values (primitive or String/byte[]) from each of key columns before sorting * Uses indirection for efficient swapping */ -public class VectorizedSorter extends ComparableVector implements IndexedSortable, TupleSorter { +public class VectorizedSorter extends ComparableVector implements IndexedSortable { private final int[] mappings; // index indirection - public VectorizedSorter(List source, SortSpec[] sortKeys, int[] keyIndex) { - super(source.size(), sortKeys, keyIndex); - source.toArray(tuples); // wish it's array list - mappings = new int[tuples.length]; - for (int i = 0; i < tuples.length; i++) { - for (int j = 0; j < keyIndex.length; j++) { - vectors[j].append(tuples[i], keyIndex[j]); - } - mappings[i] = i; + private int counter; + + public VectorizedSorter(int limit, SortSpec[] sortKeys, int[] keyIndex) { + super(limit, sortKeys, keyIndex); + mappings = new int[limit]; + } + + public VectorizedSorter(int limit, Schema schema, SortSpec[] sortKeys, int[] keyIndex) { + super(limit, schema, sortKeys, keyIndex); + mappings = new int[limit]; + } + + public boolean addTuple(Tuple tuple) { + if (counter >= mappings.length) { + return false; } + append(tuple); + mappings[counter] = counter; + counter++; + return true; } @Override @@ -59,19 +69,24 @@ public void swap(int i1, int i2) { mappings[i2] = v1; } + public int[] sortTuples() { + if (counter > 0) { + new QuickSort().sort(this, 0, counter); + } + return mapping(); + } + + public int size() { + return counter; + } + @Override - public Iterable sort() { - new QuickSort().sort(this, 0, mappings.length); - return new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { - int index; - public boolean hasNext() { return index < mappings.length; } - public Tuple next() { return tuples[mappings[index++]]; } - public void remove() { throw new UnsupportedException(); } - }; - } - }; + public void reset() { + super.reset(); + counter = 0; + } + + public int[] mapping() { + return Arrays.copyOf(mappings, counter); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedTupleSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedTupleSorter.java new file mode 100644 index 0000000000..3f4a5958f8 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedTupleSorter.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; + +import java.util.Iterator; +import java.util.List; + +public class VectorizedTupleSorter extends VectorizedSorter implements TupleSorter { + + private final Tuple[] tuples; // source tuples + + public VectorizedTupleSorter(List source, SortSpec[] sortKeys, int[] keyIndex) { + super(source.size(), sortKeys, keyIndex); + tuples = source.toArray(new Tuple[source.size()]); // wish it's array list + for (Tuple tuple : tuples) { + addTuple(tuple); + } + } + + @Override + public Iterable sort() { + final int[] mappings = sortTuples(); + return new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int index; + public boolean hasNext() { return index < mappings.length; } + public Tuple next() { return tuples[mappings[index++]]; } + public void remove() { throw new UnsupportedException(); } + }; + } + }; + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index 8e9e343c0c..e728f7b8f0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -699,7 +699,11 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo LOG.info(stage.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + " sub ranges (total units: " + determinedTaskNum + ")"); - ranges = partitioner.partition(determinedTaskNum); + if (determinedTaskNum > 1) { + ranges = partitioner.partition(determinedTaskNum); + } else { + ranges = new TupleRange[] {mergedRange}; + } if (ranges == null) { throw new NullPointerException("ranges is null on " + stage.getId() + " stage."); } @@ -710,10 +714,8 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); if (LOG.isDebugEnabled()) { - if (ranges != null) { - for (TupleRange eachRange : ranges) { - LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); - } + for (TupleRange eachRange : ranges) { + LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); } } } diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index 0ef8294f25..aa13743793 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.algebra.Expr; @@ -50,7 +51,7 @@ import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestExternalSortExec { private TajoConf conf; @@ -119,12 +120,23 @@ public void tearDown() throws Exception { }; @Test - public final void testNext() throws IOException, PlanningException { + public final void testMergeScan() throws IOException, PlanningException { + runTest(36, 2); + } + + @Test + public final void testMemoryScan() throws IOException, PlanningException { + runTest(200, 2); + } + + private void runTest(long buffer, int fanout) throws IOException, PlanningException { FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir); + ctx.getQueryContext().setInt(TajoConf.ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, fanout); + ctx.getQueryContext().setLong(SessionVars.EXTSORT_BUFFER_SIZE, buffer); ctx.setEnforcer(new Enforcer()); Expr expr = analyzer.parse(QUERIES[0]); LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); @@ -136,17 +148,13 @@ public final void testNext() throws IOException, PlanningException { ProjectionExec proj = (ProjectionExec) exec; // TODO - should be planed with user's optimization hint - ExternalSortExec extSort; if (!(proj.getChild() instanceof ExternalSortExec)) { UnaryPhysicalExec sortExec = proj.getChild(); SeqScanExec scan = sortExec.getChild(); - extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan); + ExternalSortExec extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan); proj.setChild(extSort); - } else { - extSort = proj.getChild(); } - extSort.setSortBufferBytesNum(1024*1024); Tuple tuple; Tuple preVal = null; @@ -162,8 +170,8 @@ public final void testNext() throws IOException, PlanningException { while ((tuple = exec.next()) != null) { curVal = tuple; - if (preVal != null) { - assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + if (preVal != null && comparator.compare(preVal, curVal) > 0) { + fail("prev: " + preVal + ", but cur: " + curVal); // making string takes times } preVal = new VTuple(curVal); cnt++; @@ -177,8 +185,8 @@ public final void testNext() throws IOException, PlanningException { cnt = 0; while ((tuple = exec.next()) != null) { curVal = tuple; - if (preVal != null) { - assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + if (preVal != null && comparator.compare(preVal, curVal) > 0) { + fail("prev: " + preVal + ", but cur: " + curVal); } preVal = curVal; cnt++; diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java index 9cc477ad76..4f9daabc26 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java @@ -92,7 +92,7 @@ public final void testSortBench() { } long start = System.currentTimeMillis(); - VectorizedSorter sorter = new VectorizedSorter(target, sortKeys, keyIndices); + TupleSorter sorter = new VectorizedTupleSorter(target, sortKeys, keyIndices); Iterator iterator = sorter.sort().iterator(); String[] result1 = new String[SAMPLING]; diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java index 30fc9eec36..52d036fbaf 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java @@ -18,6 +18,7 @@ package org.apache.tajo.jdbc; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.NullDatum; @@ -48,6 +49,11 @@ public boolean contains(int fieldid) { return false; } + @Override + public TajoDataTypes.Type type(int fieldId) { + return get(fieldId).type(); + } + @Override public boolean isNull(int fieldid) { return values.get(fieldid) == null || values.get(fieldid).isNull(); diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java index 8b7e2e06a8..9f29e8a994 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java @@ -22,6 +22,7 @@ package org.apache.tajo.storage; import com.google.common.base.Preconditions; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.ProtobufDatum; @@ -69,6 +70,11 @@ public boolean contains(int fieldId) { } } + @Override + public TajoDataTypes.Type type(int fieldId) { + return get(fieldId).type(); + } + @Override public boolean isNull(int fieldid) { return get(fieldid).isNull(); diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java index bfbe478a02..fe46ef8bb4 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.NullDatum; @@ -67,6 +68,11 @@ public boolean contains(int fieldid) { return textBytes[fieldid] != null || values[fieldid] != null; } + @Override + public TajoDataTypes.Type type(int fieldId) { + return get(fieldId).type(); + } + @Override public boolean isNull(int fieldid) { return get(fieldid).isNull(); diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java index 16477cd8ef..e5de5ce037 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java @@ -65,6 +65,12 @@ public class MemoryUtil { /** Overhead for an TimestampDatum */ public static final long TIMESTAMP_DATUM; + /** Overhead for an IntervalDatum */ + public static final long INTERVAL_DATUM; + + /** Overhead for an ProtobufDatum */ + public static final long PROTOBUF_DATUM; + static { NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false); @@ -93,6 +99,30 @@ public class MemoryUtil { TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false); TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false); + + INTERVAL_DATUM = ClassSize.estimateBase(IntervalDatum.class, false); + + PROTOBUF_DATUM = ClassSize.estimateBase(ProtobufDatum.class, false); + } + + public static long calculateVectorMemorySize(Tuple tuple) { + long total = 0; + for (int i = 0; i < tuple.size(); i++) { + switch (tuple.type(i)) { + case TEXT: + case BLOB: + case CHAR: + total += ClassSize.ARRAY + tuple.get(i).size(); + continue; + case INTERVAL: + total += INTERVAL_DATUM; + continue; + case PROTOBUF: + total += PROTOBUF_DATUM + tuple.get(i).size(); + continue; + } + } + return total; } public static long calculateMemorySize(Tuple tuple) { @@ -157,6 +187,14 @@ public static long calculateMemorySize(Tuple tuple) { total += TIMESTAMP_DATUM; break; + case INTERVAL: + total += INTERVAL_DATUM; + break; + + case PROTOBUF: + total += PROTOBUF_DATUM + datum.size(); + break; + default: break; } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 33db7982ac..34bc8462b2 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -110,6 +110,7 @@ public Tuple toTuple(byte [] bytes) { case INT4: case DATE: + case INET4: int i_ = bb.getInt(); tuple.put(i, DatumFactory.createFromInt4(type, i_)); break; @@ -149,11 +150,6 @@ public Tuple toTuple(byte [] bytes) { tuple.put(i, DatumFactory.createBlob(_bytes)); break; - case INET4: - byte [] _ipv4 = new byte[4]; - bb.get(_ipv4); - tuple.put(i, DatumFactory.createInet4(_ipv4)); - break; case INET6: // TODO - to be implemented throw new UnsupportedException(type.getType().name()); @@ -198,40 +194,40 @@ public byte[] toBytes(Tuple tuple) { nullFlags.set(i); break; case BOOLEAN: - bb.put(tuple.get(i).asByte()); + bb.put(tuple.getBool(i) ? (byte)0 : (byte)1); break; case BIT: - bb.put(tuple.get(i).asByte()); + bb.put(tuple.getByte(i)); break; case CHAR: - bb.put(tuple.get(i).asByte()); + bb.put(tuple.getByte(i)); break; case INT2: - bb.putShort(tuple.get(i).asInt2()); + bb.putShort(tuple.getInt2(i)); break; case INT4: - bb.putInt(tuple.get(i).asInt4()); + bb.putInt(tuple.getInt4(i)); break; case INT8: - bb.putLong(tuple.get(i).asInt8()); + bb.putLong(tuple.getInt8(i)); break; case FLOAT4: - bb.putFloat(tuple.get(i).asFloat4()); + bb.putFloat(tuple.getFloat4(i)); break; case FLOAT8: - bb.putDouble(tuple.get(i).asFloat8()); + bb.putDouble(tuple.getFloat8(i)); break; case TEXT: - byte[] _string = tuple.get(i).asByteArray(); + byte[] _string = tuple.getBytes(i); bb.putInt(_string.length); bb.put(_string); break; case DATE: - bb.putInt(tuple.get(i).asInt4()); + bb.putInt(tuple.getInt4(i)); break; case TIME: case TIMESTAMP: - bb.putLong(tuple.get(i).asInt8()); + bb.putLong(tuple.getInt8(i)); break; case INTERVAL: IntervalDatum interval = (IntervalDatum) tuple.get(i); @@ -239,16 +235,15 @@ public byte[] toBytes(Tuple tuple) { bb.putLong(interval.getMilliSeconds()); break; case BLOB: - byte[] bytes = tuple.get(i).asByteArray(); + byte[] bytes = tuple.getBytes(i); bb.putInt(bytes.length); bb.put(bytes); break; case INET4: - byte[] ipBytes = tuple.get(i).asByteArray(); - bb.put(ipBytes); + bb.putInt(tuple.getInt4(i)); break; - case INET6: - bb.put(tuple.get(i).asByteArray()); + case INET6: + bb.put(tuple.getBytes(i)); break; default: throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); @@ -268,7 +263,7 @@ public byte[] toBytes(Tuple tuple) { } // Note that, NULL values are treated separately - private int estimateTupleDataSize(Tuple tuple) { + private int estimateTupleDataSize(ReadableTuple tuple) { int size = 0; Column col; @@ -289,6 +284,7 @@ private int estimateTupleDataSize(Tuple tuple) { break; case DATE: case INT4: + case INET4: case FLOAT4: size += 4; break; @@ -303,11 +299,10 @@ private int estimateTupleDataSize(Tuple tuple) { break; case TEXT: case BLOB: - size += (4 + tuple.get(i).asByteArray().length); + size += (4 + tuple.getBytes(i).length); break; - case INET4: case INET6: - size += tuple.get(i).asByteArray().length; + size += tuple.getBytes(i).length; break; default: throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java index a2c08deca8..f5416bc811 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -21,12 +21,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.util.ComparableVector; /** * This class is not thread-safe. @@ -34,8 +35,7 @@ public class TableStatistics { private static final Log LOG = LogFactory.getLog(TableStatistics.class); private Schema schema; - private Tuple minValues; - private Tuple maxValues; + private MinMaxStats minMaxValues; private long [] numNulls; private long numRows = 0; private long numBytes = 0; @@ -44,20 +44,14 @@ public class TableStatistics { public TableStatistics(Schema schema) { this.schema = schema; - minValues = new VTuple(schema.size()); - maxValues = new VTuple(schema.size()); + this.minMaxValues = new MinMaxStats(); numNulls = new long[schema.size()]; comparable = new boolean[schema.size()]; - DataType type; for (int i = 0; i < schema.size(); i++) { - type = schema.getColumn(i).getDataType(); - if (type.getType() == Type.PROTOBUF) { - comparable[i] = false; - } else { - comparable[i] = true; - } + DataType type = schema.getColumn(i).getDataType(); + comparable[i] = type.getType() != Type.PROTOBUF; } } @@ -81,22 +75,12 @@ public long getNumBytes() { return this.numBytes; } - public void analyzeField(int idx, Datum datum) { - if (datum instanceof NullDatum) { - numNulls[idx]++; - return; - } + public void analyzeField(ReadableTuple tuple) { + minMaxValues.update(tuple); + } - if (comparable[idx]) { - if (!maxValues.contains(idx) || - maxValues.get(idx).compareTo(datum) < 0) { - maxValues.put(idx, datum); - } - if (!minValues.contains(idx) || - minValues.get(idx).compareTo(datum) > 0) { - minValues.put(idx, datum); - } - } + public void analyzeNullField() { + minMaxValues.updateNull(); } public TableStats getTableStat() { @@ -106,18 +90,8 @@ public TableStats getTableStat() { for (int i = 0; i < schema.size(); i++) { columnStats = new ColumnStats(schema.getColumn(i)); columnStats.setNumNulls(numNulls[i]); - if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) { - columnStats.setMinValue(minValues.get(i)); - } else { - LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + - ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); - } - if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) { - columnStats.setMaxValue(maxValues.get(i)); - } else { - LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() + - ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); - } + columnStats.setMinValue(minMaxValues.getMinDatum(i)); + columnStats.setMaxValue(minMaxValues.getMaxDatum(i)); stat.addColumnStat(columnStats); } @@ -126,4 +100,65 @@ public TableStats getTableStat() { return stat; } + + private class MinMaxStats extends ComparableVector { + + private static final int MAX_INDEX = 0; + private static final int MIN_INDEX = 1; + private static final int TMP_INDEX = 2; + + private final boolean[] warned; + private final Type[] types; + + public MinMaxStats() { + super(3, schema); + this.types = SchemaUtil.toTypes(schema); + this.warned = new boolean[types.length]; + setNull(MAX_INDEX); + setNull(MIN_INDEX); + } + + public void update(ReadableTuple tuple) { + set(TMP_INDEX, tuple); + for (int i = 0; i < types.length; i++) { + if (tuple.isNull(i)) { + numNulls[i]++; + continue; + } + if (warned[i] || !comparable[i]) { + continue; + } + if (tuple.type(i) != types[i]) { + LOG.warn("Wrong statistics column type (" + tuple.type(i) + + ", expected=" + types[i] + ")"); + warned[i] = true; + continue; + } + if (vectors[i].isNull(MAX_INDEX) || vectors[i].compare(MAX_INDEX, TMP_INDEX) < 0) { + vectors[i].set(MAX_INDEX, tuple, i); + } + if (vectors[i].isNull(MIN_INDEX) || vectors[i].compare(MIN_INDEX, TMP_INDEX) > 0) { + vectors[i].set(MIN_INDEX, tuple, i); + } + } + } + + public void updateNull() { + for (int i = 0; i < types.length; i++) { + numNulls[i]++; + } + } + + public Datum getMaxDatum(int field) { + return getDatum(MAX_INDEX, field); + } + + public Datum getMinDatum(int field) { + return getDatum(MIN_INDEX, field); + } + + public Datum getDatum(int index, int field) { + return vectors[field].isNull(index) ? null : vectors[field].toDatum(index); + } + } } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java index 33f9f1cf22..08b285c17e 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java @@ -21,6 +21,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.*; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; @@ -74,6 +75,11 @@ public boolean contains(int fieldid) { return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; } + @Override + public TajoDataTypes.Type type(int fieldId) { + return types[fieldId].getType(); + } + @Override public boolean isNull(int fieldid) { return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java index b742e6d8e8..e453b8969d 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -22,6 +22,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.*; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; @@ -63,6 +64,11 @@ public int size() { return types.length; } + @Override + public TajoDataTypes.Type type(int fieldId) { + return types[fieldId].getType(); + } + public ByteBuffer nioBuffer() { return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice(); } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/util/ComparableTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/util/ComparableTuple.java new file mode 100644 index 0000000000..21386019ec --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/util/ComparableTuple.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import com.google.common.primitives.Booleans; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; +import com.google.common.primitives.UnsignedInts; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.storage.ReadableTuple; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.ComparableVector.TupleType; + +import java.util.Arrays; + +public class ComparableTuple implements ReadableTuple, Comparable { + + private final TupleType[] keyTypes; + private final int[] keyIndex; + private final Object[] keys; + + public ComparableTuple(Schema schema, int[] keyIndex) { + this(ComparableVector.tupleTypes(schema, keyIndex), keyIndex); + } + + public ComparableTuple(Schema schema, int start, int end) { + this(schema, ComparableVector.toIndex(start, end)); + } + + private ComparableTuple(TupleType[] keyTypes, int[] keyIndex) { + this.keyTypes = keyTypes; + this.keyIndex = keyIndex; + this.keys = new Object[keyIndex.length]; + } + + public int size() { + return keyIndex.length; + } + + public void set(Tuple tuple) { + for (int i = 0; i < keyTypes.length; i++) { + final int field = keyIndex[i]; + if (tuple.isNull(field)) { + keys[i] = null; + continue; + } + switch (keyTypes[i]) { + case BOOLEAN: keys[i] = tuple.getBool(field); break; + case BIT: keys[i] = tuple.getByte(field); break; + case INT1: + case INT2: keys[i] = tuple.getInt2(field); break; + case INT4: + case DATE: + case INET4: keys[i] = tuple.getInt4(field); break; + case INT8: + case TIME: + case TIMESTAMP: keys[i] = tuple.getInt8(field); break; + case FLOAT4: keys[i] = tuple.getFloat4(field); break; + case FLOAT8: keys[i] = tuple.getFloat8(field); break; + case TEXT: + case CHAR: + case BLOB: keys[i] = tuple.getBytes(field); break; + case DATUM: keys[i] = tuple.get(field); break; + default: + throw new IllegalArgumentException(); + } + } + } + + @Override + public boolean equals(Object obj) { + ComparableTuple other = (ComparableTuple) obj; + for (int i = 0; i < keys.length; i++) { + final boolean n1 = keys[i] == null; + final boolean n2 = other.keys[i] == null; + if (n1 && n2) { + continue; + } + if (n1 ^ n2) { + return false; + } + switch (keyTypes[i]) { + case TEXT: + case CHAR: + case BLOB: if (!Arrays.equals((byte[]) keys[i], (byte[]) other.keys[i])) return false; continue; + default: if (!keys[i].equals(other.keys[i])) return false; continue; + } + } + return true; + } + + public boolean equals(Tuple tuple) { + for (int i = 0; i < keys.length; i++) { + final int field = keyIndex[i]; + final boolean n1 = keys[i] == null; + final boolean n2 = tuple.isNull(field); + if (n1 && n2) { + continue; + } + if (n1 ^ n2) { + return false; + } + switch (keyTypes[i]) { + case BOOLEAN: if ((Boolean) keys[i] != tuple.getBool(field)) return false; continue; + case BIT: if ((Byte) keys[i] != tuple.getByte(field)) return false; continue; + case INT1: + case INT2: if ((Short) keys[i] != tuple.getInt2(field)) return false; continue; + case INT4: + case DATE: + case INET4: if ((Integer) keys[i] != tuple.getInt4(field)) return false; continue; + case INT8: + case TIME: + case TIMESTAMP: if ((Long) keys[i] != tuple.getInt8(field)) return false; continue; + case FLOAT4: if ((Float) keys[i] != tuple.getFloat4(field)) return false; continue; + case FLOAT8: if ((Double) keys[i] != tuple.getFloat8(field)) return false; continue; + case TEXT: + case CHAR: + case BLOB: if (!Arrays.equals((byte[]) keys[i], tuple.getBytes(field))) return false; continue; + case DATUM: if (!keys[i].equals(tuple.get(field))) return false; continue; + } + } + return true; + } + + public int compareTo(ComparableTuple o) { + for (int i = 0; i < keys.length; i++) { + final boolean n1 = keys[i] == null; + final boolean n2 = o.keys[i] == null; + if (n1 && n2) { + continue; + } + if (n1 ^ n2) { + return n1 ? 1 : -1; + } + int compare; + switch (keyTypes[i]) { + case BOOLEAN: compare = Booleans.compare((Boolean) keys[i], (Boolean) o.keys[i]); break; + case BIT: compare = (Byte)keys[i] - (Byte)o.keys[i]; break; + case INT1: + case INT2: compare = Shorts.compare((Short)keys[i], (Short)o.keys[i]); break; + case INT4: + case DATE: compare = Ints.compare((Integer)keys[i], (Integer)o.keys[i]); break; + case INET4: compare = UnsignedInts.compare((Integer)keys[i], (Integer)o.keys[i]); break; + case INT8: + case TIME: + case TIMESTAMP: compare = Longs.compare((Long)keys[i], (Long)o.keys[i]); break; + case FLOAT4: compare = Floats.compare((Float)keys[i], (Float)o.keys[i]); break; + case FLOAT8: compare = Doubles.compare((Double)keys[i], (Double)o.keys[i]); break; + case CHAR: + case TEXT: + case BLOB: compare = TextDatum.COMPARATOR.compare((byte[])keys[i], (byte[])o.keys[i]); break; + case DATUM: compare = ((Datum)keys[i]).compareTo((Datum)o.keys[i]); break; + default: + throw new IllegalArgumentException(); + } + if (compare != 0) { + return compare; + } + } + return 0; + } + + @Override + public int hashCode() { + int result = 1; + for (Object key : keys) { + int hash = key == null ? 0 : + key instanceof byte[] ? Arrays.hashCode((byte[]) key) : key.hashCode(); + result = 31 * result + hash; + } + return result; + } + + public ComparableTuple copy() { + ComparableTuple copy = emptyCopy(); + System.arraycopy(keys, 0, copy.keys, 0, keys.length); + return copy; + } + + public ComparableTuple emptyCopy() { + return new ComparableTuple(keyTypes, keyIndex); + } + + public VTuple toVTuple() { + VTuple vtuple = new VTuple(keyIndex.length); + for (int i = 0; i < keyIndex.length; i++) { + vtuple.put(i, toDatum(i)); + } + return vtuple; + } + + public Datum toDatum(int i) { + if (keys[i] == null) { + return NullDatum.get(); + } + switch (keyTypes[i]) { + case NULL_TYPE: return NullDatum.get(); + case BOOLEAN: return DatumFactory.createBool((Boolean) keys[i]); + case BIT: return DatumFactory.createBit((Byte) keys[i]); + case INT1: + case INT2: return DatumFactory.createInt2((Short) keys[i]); + case INT4: return DatumFactory.createInt4((Integer) keys[i]); + case DATE: return DatumFactory.createDate((Integer) keys[i]); + case INET4: return DatumFactory.createInet4((Integer) keys[i]); + case INT8: return DatumFactory.createInt8((Long) keys[i]); + case TIME: return DatumFactory.createTime((Long) keys[i]); + case TIMESTAMP: return DatumFactory.createTimestamp((Long) keys[i]); + case FLOAT4: return DatumFactory.createFloat4((Float) keys[i]); + case FLOAT8: return DatumFactory.createFloat8((Double) keys[i]); + case TEXT: return DatumFactory.createText((byte[]) keys[i]); + case CHAR: return DatumFactory.createChar((byte[]) keys[i]); + case BLOB: return DatumFactory.createBlob((byte[]) keys[i]); + case DATUM: return (Datum) keys[i]; + default: + throw new IllegalArgumentException(); + } + } + + @Override + public TajoDataTypes.Type type(int fieldId) { + TajoDataTypes.Type type = ComparableVector.dataType(keyTypes[fieldId]); + return type == null ? ((Datum) keys[fieldId]).type() : type; + } + + @Override + public boolean isNull(int fieldId) { + return keys[fieldId] == null; + } + + @Override + public boolean getBool(int fieldId) { + return (Boolean)keys[fieldId]; + } + + @Override + public byte getByte(int fieldId) { + return (Byte)keys[fieldId]; + } + + @Override + public char getChar(int fieldId) { + return getText(fieldId).charAt(0); + } + + @Override + public byte[] getBytes(int fieldId) { + return (byte[])keys[fieldId]; + } + + @Override + public short getInt2(int fieldId) { + return (Short)keys[fieldId]; + } + + @Override + public int getInt4(int fieldId) { + return (Integer)keys[fieldId]; + } + + @Override + public long getInt8(int fieldId) { + return (Long)keys[fieldId]; + } + + @Override + public float getFloat4(int fieldId) { + return (Float)keys[fieldId]; + } + + @Override + public double getFloat8(int fieldId) { + return (Double)keys[fieldId]; + } + + @Override + public String getText(int fieldId) { + return new String((byte[])keys[fieldId], TextDatum.DEFAULT_CHARSET); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + return (Datum)keys[fieldId]; + } + + @Override + public Datum getInterval(int fieldId) { + return (Datum)keys[fieldId]; + } + + @Override + public char[] getUnicodeChars(int fieldId) { + return StringUtils.convertBytesToChars((byte[]) keys[fieldId], TextDatum.DEFAULT_CHARSET); + } + + @Override + public Datum get(int fieldId) { + return toDatum(fieldId); + } +} + + diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/util/ComparableVector.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/util/ComparableVector.java new file mode 100644 index 0000000000..0d1cfe07b3 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/util/ComparableVector.java @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import com.google.common.primitives.Booleans; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; +import com.google.common.primitives.UnsignedInts; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.storage.ReadableTuple; +import org.apache.tajo.storage.VTuple; + +import java.util.Arrays; +import java.util.BitSet; + +/** + * Extract raw level values (primitive or String/byte[]) from each of key columns for compare + */ +public class ComparableVector { + + protected final TupleVector[] vectors; // values of key columns + protected final int[] setIndex; + protected final int[] compareIndex; + + public ComparableVector(int rows, Schema schema, SortSpec[] sortKeys, int[] keyIndex) { + vectors = new TupleVector[schema.size()]; + + for (int i = 0; i < vectors.length; i++) { + Column column = schema.getColumn(i); + TajoDataTypes.Type type = column.getDataType().getType(); + int index = Ints.indexOf(keyIndex, i); + if (index >= 0) { + boolean nullFirst = sortKeys[index].isNullFirst(); + boolean ascending = sortKeys[index].isAscending(); + boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending; + vectors[i] = new TupleVector(tupleType(type), rows, nullInvert, ascending); + } else { + vectors[i] = new TupleVector(tupleType(type), rows); + } + } + this.setIndex = toIndex(0, vectors.length); + this.compareIndex = keyIndex; + } + + public ComparableVector(int rows, SortSpec[] sortKeys, int[] keyIndex) { + vectors = new TupleVector[sortKeys.length]; + for (int i = 0; i < vectors.length; i++) { + TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType(); + boolean nullFirst = sortKeys[i].isNullFirst(); + boolean ascending = sortKeys[i].isAscending(); + boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending; + vectors[i] = new TupleVector(tupleType(type), rows, nullInvert, ascending); + } + this.setIndex = keyIndex; + this.compareIndex = toIndex(0, vectors.length); + } + + public ComparableVector(int rows, Schema schema) { + vectors = new TupleVector[schema.size()]; + for (int i = 0; i < vectors.length; i++) { + Column column = schema.getColumn(i); + TajoDataTypes.Type type = column.getDataType().getType(); + vectors[i] = new TupleVector(tupleType(type), rows, false, true); + } + this.setIndex = this.compareIndex = toIndex(0, vectors.length); + } + + public int compare(final int i1, final int i2) { + for (int index : compareIndex) { + int compare = vectors[index].compare(i1, i2); + if (compare != 0) { + return compare; + } + } + return 0; + } + + public void set(int index, ReadableTuple tuple) { + for (int i = 0; i < vectors.length; i++) { + vectors[i].set(index, tuple, setIndex[i]); + } + } + + public void append(ReadableTuple tuple) { + for (int i = 0; i < vectors.length; i++) { + vectors[i].append(tuple, setIndex[i]); + } + } + + public void setNull(int index) { + for (TupleVector vector : vectors) { + vector.nulls.set(index); + } + } + + public void reset() { + for (TupleVector vector : vectors) { + vector.index = 0; + vector.nulls.clear(); + if (vector.bytes != null) { + Arrays.fill(vector.bytes, null); + } + if (vector.data != null) { + Arrays.fill(vector.data, null); + } + } + } + + public boolean isNull(int index, int fieldId) { + return vectors[fieldId].isNull(index); + } + + public boolean getBool(int index, int fieldId) { + return vectors[fieldId].booleans[index]; + } + + public byte getByte(int index, int fieldId) { + return vectors[fieldId].bits[index]; + } + + public char getChar(int index, int fieldId) { + return new String(vectors[fieldId].bytes[index], TextDatum.DEFAULT_CHARSET).charAt(0); + } + + public byte[] getBytes(int index, int fieldId) { + return vectors[fieldId].bytes[index]; + } + + public short getInt2(int index, int fieldId) { + return vectors[fieldId].shorts[index]; + } + + public int getInt4(int index, int fieldId) { + return vectors[fieldId].ints[index]; + } + + public long getInt8(int index, int fieldId) { + return vectors[fieldId].longs[index]; + } + + public float getFloat4(int index, int fieldId) { + return vectors[fieldId].floats[index]; + } + + public double getFloat8(int index, int fieldId) { + return vectors[fieldId].doubles[index]; + } + + public String getText(int index, int fieldId) { + return new String(vectors[fieldId].bytes[index], TextDatum.DEFAULT_CHARSET); + } + + public Datum getProtobufDatum(int index, int fieldId) { + return vectors[fieldId].data[fieldId]; + } + + public Datum getInterval(int index, int fieldId) { + return vectors[fieldId].data[fieldId]; + } + + public char[] getUnicodeChars(int index, int fieldId) { + return StringUtils.convertBytesToChars(vectors[fieldId].bytes[index], TextDatum.DEFAULT_CHARSET); + } + + public Datum get(int index, int fieldId) { + return vectors[fieldId].toDatum(index); + } + + public static class TupleVector { + + private final TupleType type; + private final BitSet nulls; + private final boolean nullInvert; + private final boolean ascending; + + private boolean[] booleans; + private byte[] bits; + private short[] shorts; + private int[] ints; + private long[] longs; + private float[] floats; + private double[] doubles; + private byte[][] bytes; + private Datum[] data; + + private int index; // appending index + + private TupleVector(TupleType type, int length) { + this(type, length, false, true); + } + + private TupleVector(TupleType type, int length, boolean nullInvert, boolean ascending) { + this.type = type; + this.nulls = new BitSet(length); + this.nullInvert = nullInvert; + this.ascending = ascending; + switch (type) { + case NULL_TYPE: break; + case BOOLEAN: booleans = new boolean[length]; break; + case BIT: bits = new byte[length]; break; + case INT1: + case INT2: shorts = new short[length]; break; + case INT4: + case DATE: + case INET4: ints = new int[length]; break; + case INT8: + case TIME: + case TIMESTAMP: longs = new long[length]; break; + case FLOAT4: floats = new float[length]; break; + case FLOAT8: doubles = new double[length]; break; + case CHAR: + case TEXT: + case BLOB: bytes = new byte[length][]; break; + case DATUM: data = new Datum[length]; break; + default: + throw new IllegalArgumentException(); + } + } + + public final void append(ReadableTuple tuple, int field) { + set(index++, tuple, field); + } + + public final void set(int index, ReadableTuple tuple, int field) { + if (tuple.isNull(field)) { + nulls.set(index); + return; + } + nulls.clear(index); + switch (type) { + case BOOLEAN: booleans[index] = tuple.getBool(field); break; + case BIT: bits[index] = tuple.getByte(field); break; + case INT1: + case INT2: shorts[index] = tuple.getInt2(field); break; + case INT4: + case DATE: + case INET4: ints[index] = tuple.getInt4(field); break; + case INT8: + case TIME: + case TIMESTAMP: longs[index] = tuple.getInt8(field); break; + case FLOAT4: floats[index] = tuple.getFloat4(field); break; + case FLOAT8: doubles[index] = tuple.getFloat8(field); break; + case TEXT: + case CHAR: + case BLOB: bytes[index] = tuple.getBytes(field); break; + case DATUM: data[index] = tuple.get(field); break; + default: + throw new IllegalArgumentException(); + } + } + + public final int compare(int index1, int index2) { + final boolean n1 = nulls.get(index1); + final boolean n2 = nulls.get(index2); + if (n1 && n2) { + return 0; + } + if (n1 ^ n2) { + int compVal = n1 ? 1 : -1; + return nullInvert ? -compVal : compVal; + } + int compare; + switch (type) { + case BOOLEAN: compare = Booleans.compare(booleans[index1], booleans[index2]); break; + case BIT: compare = bits[index1] - bits[index2]; break; + case INT1: + case INT2: compare = Shorts.compare(shorts[index1], shorts[index2]); break; + case INT4: + case DATE: compare = Ints.compare(ints[index1], ints[index2]); break; + case INET4: compare = UnsignedInts.compare(ints[index1], ints[index2]); break; + case INT8: + case TIME: + case TIMESTAMP: compare = Longs.compare(longs[index1], longs[index2]); break; + case FLOAT4: compare = Floats.compare(floats[index1], floats[index2]); break; + case FLOAT8: compare = Doubles.compare(doubles[index1], doubles[index2]); break; + case CHAR: + case TEXT: + case BLOB: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break; + case DATUM: compare = data[index1].compareTo(data[index2]); break; + default: + throw new IllegalArgumentException(); + } + return ascending ? compare : -compare; + } + + public boolean isNull(int index) { + return nulls.get(index); + } + + public Datum toDatum(int index) { + if (nulls.get(index)) { + return NullDatum.get(); + } + switch (type) { + case NULL_TYPE: return NullDatum.get(); + case BOOLEAN: return DatumFactory.createBool(booleans[index]); + case BIT: return DatumFactory.createBit(bits[index]); + case INT1: + case INT2: return DatumFactory.createInt2(shorts[index]); + case INT4: return DatumFactory.createInt4(ints[index]); + case DATE: return DatumFactory.createDate(ints[index]); + case INET4: return DatumFactory.createInet4(ints[index]); + case INT8: return DatumFactory.createInt8(longs[index]); + case TIME: return DatumFactory.createTime(longs[index]); + case TIMESTAMP: return DatumFactory.createTimestamp(longs[index]); + case FLOAT4: return DatumFactory.createFloat4(floats[index]); + case FLOAT8: return DatumFactory.createFloat8(doubles[index]); + case TEXT: return DatumFactory.createText(bytes[index]); + case CHAR: return DatumFactory.createChar(bytes[index]); + case BLOB: return DatumFactory.createBlob(bytes[index]); + case DATUM: return data[index]; + default: + throw new IllegalArgumentException(); + } + } + } + + public VTuple toVTuple(int index) { + VTuple vtuple = new VTuple(vectors.length); + for (int i = 0; i < vectors.length; i++) { + vtuple.put(i, vectors[i].toDatum(index)); + } + return vtuple; + } + + public static boolean isVectorizable(SortSpec[] sortKeys) { + return sortKeys.length != 0; + } + + static TupleType[] tupleTypes(Schema schema, int[] keyIndex) { + TupleType[] types = new TupleType[keyIndex.length]; + for (int i = 0; i < keyIndex.length; i++) { + types[i] = tupleType(schema.getColumn(keyIndex[i]).getDataType().getType()); + } + return types; + } + + static TupleType tupleType(TajoDataTypes.Type type) { + switch (type) { + case BOOLEAN: return TupleType.BOOLEAN; + case BIT: return TupleType.BIT; + case INT1: return TupleType.INT1; + case INT2: return TupleType.INT2; + case INT4: return TupleType.INT4; + case DATE: return TupleType.DATE; + case INT8: return TupleType.INT8; + case TIME: return TupleType.TIME; + case TIMESTAMP: return TupleType.TIMESTAMP; + case FLOAT4: return TupleType.FLOAT4; + case FLOAT8: return TupleType.FLOAT8; + case TEXT: return TupleType.TEXT; + case CHAR: return TupleType.CHAR; + case BLOB: return TupleType.BLOB; + case INET4: return TupleType.INET4; + case NULL_TYPE: return TupleType.NULL_TYPE; + default: return TupleType.DATUM; + } + } + + static TajoDataTypes.Type dataType(TupleType type) { + switch (type) { + case BOOLEAN: return TajoDataTypes.Type.BOOLEAN; + case BIT: return TajoDataTypes.Type.BIT; + case INT1: return TajoDataTypes.Type.INT1; + case INT2: return TajoDataTypes.Type.INT2; + case INT4: return TajoDataTypes.Type.INT4; + case DATE: return TajoDataTypes.Type.DATE; + case INT8: return TajoDataTypes.Type.INT8; + case TIME: return TajoDataTypes.Type.TIME; + case TIMESTAMP: return TajoDataTypes.Type.TIMESTAMP; + case FLOAT4: return TajoDataTypes.Type.FLOAT4; + case FLOAT8: return TajoDataTypes.Type.FLOAT8; + case TEXT: return TajoDataTypes.Type.TEXT; + case CHAR: return TajoDataTypes.Type.CHAR; + case BLOB: return TajoDataTypes.Type.BLOB; + case INET4: return TajoDataTypes.Type.INET4; + case NULL_TYPE: return TajoDataTypes.Type.NULL_TYPE; + default: return null; + } + } + + static int[] toIndex(int start, int end) { + int[] keyIndex = new int[end - start]; + for (int i = 0; i < keyIndex.length; i++) { + keyIndex[i] = start + i; + } + return keyIndex; + } + + static enum TupleType { + NULL_TYPE, BOOLEAN, BIT, INT1, INT2, INT4, DATE, INET4, INT8, TIME, TIMESTAMP, + FLOAT4, FLOAT8, TEXT, CHAR, BLOB, DATUM + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java index c1047d980a..57cbece2af 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -163,6 +163,10 @@ public void init() throws IOException { @Override public void addTuple(Tuple tuple) throws IOException { + if (isShuffle) { + // it is to calculate min/max values, and it is only used for the intermediate file. + stats.analyzeField(tuple); + } Datum datum; int rowBytes = 0; @@ -174,10 +178,6 @@ public void addTuple(Tuple tuple) throws IOException { os.write(delimiter); rowBytes += delimiter.length; } - if (isShuffle) { - // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(i, datum); - } } os.write(LF); rowBytes += 1; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 5213ba0502..b98ebd5eeb 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -622,7 +622,14 @@ public void writeRawVarint64(long value) throws IOException { @Override public void addTuple(Tuple t) throws IOException { + addTuple((ReadableTuple)t); + } + + public void addTuple(ReadableTuple t) throws IOException { + if (enabledStats) { + stats.analyzeField(t); + } if (buffer.remaining() < headerSize) { flushBuffer(); } @@ -633,9 +640,6 @@ public void addTuple(Tuple t) throws IOException { // reset the null flags nullFlags.clear(); for (int i = 0; i < schema.size(); i++) { - if (enabledStats) { - stats.analyzeField(i, t.get(i)); - } if (t.isNull(i)) { nullFlags.set(i); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java index 1ff6c4f9ea..d258b837aa 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java @@ -368,6 +368,9 @@ private void writeHeader() throws IOException { @Override public void addTuple(Tuple t) throws IOException { + if (enabledStats) { + stats.analyzeField(t); + } checkAndWriteSync(); Column col; @@ -375,9 +378,6 @@ public void addTuple(Tuple t) throws IOException { nullFlags.clear(); for (int i = 0; i < schema.size(); i++) { - if (enabledStats) { - stats.analyzeField(i, t.get(i)); - } if (t.isNull(i)) { nullFlags.set(i); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java index da426ea6ec..8f214015c4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@ -137,12 +137,12 @@ private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) { */ @Override public void addTuple(Tuple tuple) throws IOException { + if (enabledStats) { + stats.analyzeField(tuple); + } GenericRecord record = new GenericData.Record(avroSchema); for (int i = 0; i < schema.size(); ++i) { Column column = schema.getColumn(i); - if (enabledStats) { - stats.analyzeField(i, tuple.get(i)); - } Object value; Schema.Field avroField = avroFields.get(i); Schema.Type avroType = avroField.schema().getType(); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index 415c338734..61b59fdca9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -107,9 +107,7 @@ public long getOffset() throws IOException { @Override public void addTuple(Tuple tuple) throws IOException { if (enabledStats) { - for (int i = 0; i < schema.size(); ++i) { - stats.analyzeField(i, tuple.get(i)); - } + stats.analyzeField(tuple); } writer.write(tuple); if (enabledStats) { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index 44aabd453e..777d28281a 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -889,24 +889,24 @@ public void addTuple(Tuple t) throws IOException { * @throws java.io.IOException */ public void append(Tuple tuple) throws IOException { + if (isShuffle) { + // it is to calculate min/max values, and it is only used for the intermediate file. + stats.analyzeField(tuple); + } int size = schema.size(); for (int i = 0; i < size; i++) { Datum datum = tuple.get(i); int length = columnBuffers[i].append(schema.getColumn(i), datum); columnBufferSize += length; - if (isShuffle) { - // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(i, datum); - } } if (size < columnNumber) { + if (isShuffle) { + stats.analyzeNullField(); + } for (int i = size; i < columnNumber; i++) { columnBuffers[i].append(schema.getColumn(i), NullDatum.get()); - if (isShuffle) { - stats.analyzeField(i, NullDatum.get()); - } } } @@ -1238,7 +1238,7 @@ public void init() throws IOException { try { in.close(); } catch (IOException e) { - if (LOG != null && LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Exception in closing " + in, e); } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index 404352c3e8..0fff2be1d5 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -159,6 +159,11 @@ public void init() throws IOException { @Override public void addTuple(Tuple tuple) throws IOException { + if (isShuffle) { + // it is to calculate min/max values, and it is only used for the intermediate file. + stats.analyzeField(tuple); + } + Datum datum; if (serde instanceof BinarySerializerDeserializer) { @@ -198,11 +203,6 @@ public void addTuple(Tuple tuple) throws IOException { } serde.serialize(schema.getColumn(j), datum, os, nullChars); - - if (isShuffle) { - // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(j, datum); - } } lasti = i + 1; nullByte = 0; @@ -221,12 +221,6 @@ public void addTuple(Tuple tuple) throws IOException { if (columnNum -1 > i) { os.write((byte) delimiter); } - - if (isShuffle) { - // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(i, datum); - } - } writer.append(EMPTY_KEY, new Text(os.toByteArray())); }