Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
}

@Override
public void init() throws IOException {
super.init();
public void init(boolean needsRescan) throws IOException {
super.init(needsRescan);
for (EvalNode aggFunction : aggFunctions) {
aggFunction.bind(inSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
outputTuple = new VTuple(outSchema.size());
}

@Override
public void init(boolean rescan) throws IOException {
init(rescan, true);
}

public Tuple next() throws IOException {

if (leftTupleSlots.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode ,
}

@Override
public void init() throws IOException {
super.init();
public void init(boolean needsRescan) throws IOException {
super.init(needsRescan);
progress = 0.0f;
if (qual != null) {
qual.bind(inSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ public PhysicalExec getRightChild() {
return rightChild;
}

@Override
public void init() throws IOException {
leftChild.init();
rightChild.init();
protected void init(boolean leftRescan, boolean rightRescan) throws IOException {
leftChild.init(leftRescan);
rightChild.init(rightRescan);
progress = 0.0f;
super.init(leftRescan || rightRescan);
}

super.init();
@Override
public void init(boolean needsRescan) throws IOException {
init(needsRescan, needsRescan);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, Ph
}

@Override
public void init() throws IOException {
super.init();
public void init(boolean needsRescan) throws IOException {
super.init(needsRescan);

storeTablePath = context.getOutputPath();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,91 @@

package org.apache.tajo.engine.planner.physical;

import com.google.common.collect.Lists;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.plan.expr.AlgebraicUtil;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.expr.EvalTreeUtil;
import org.apache.tajo.plan.logical.JoinNode;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
import java.util.List;

// common join exec except HashLeftOuterJoinExec
public abstract class CommonJoinExec extends BinaryPhysicalExec {

// from logical plan
protected JoinNode plan;
protected final boolean hasJoinQual;
protected final boolean hasJoinFilter;

protected EvalNode joinQual;
protected EvalNode joinQual; // ex) a.id = b.id
protected EvalNode joinFilter; // ex) a > 10

// projection
protected Projector projector;

public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
PhysicalExec inner) {
this(context, plan, outer, inner, false);
}

public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
PhysicalExec inner, boolean extractJoinFilter) {
super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
plan.getOutSchema(), outer, inner);
this.plan = plan;
this.joinQual = plan.getJoinQual();
this.hasJoinQual = plan.hasJoinQual();
if (plan.hasJoinQual() && extractJoinFilter) {
Pair<EvalNode, EvalNode> extracted = extractJoinConditions(plan.getJoinQual());
joinQual = extracted.getFirst();
joinFilter = extracted.getSecond();
} else {
joinQual = plan.getJoinQual();
}
this.hasJoinQual = joinQual != null;
this.hasJoinFilter = joinFilter != null;

// for projection
this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
}

private Pair<EvalNode, EvalNode> extractJoinConditions(EvalNode joinQual) {
List<EvalNode> joinQuals = Lists.newArrayList();
List<EvalNode> joinFilters = Lists.newArrayList();
for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(joinQual)) {
if (EvalTreeUtil.isJoinQual(eachQual, true)) {
joinQuals.add(eachQual);
} else {
joinFilters.add(eachQual);
}
}

return new Pair<EvalNode, EvalNode>(
joinQuals.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(joinQuals),
joinFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(joinFilters)
);
}

@Override
public void init() throws IOException {
super.init();
public void init(boolean leftRescan, boolean rightRescan) throws IOException {
super.init(leftRescan, rightRescan);
if (hasJoinQual) {
joinQual.bind(inSchema);
}
if (hasJoinFilter) {
joinFilter.bind(inSchema);
}
}

@Override
protected void compile() {
if (hasJoinQual) {
joinQual = context.getPrecompiledEval(inSchema, joinQual);
}
// compile filter?
}

public JoinNode getPlan() {
Expand All @@ -74,6 +114,7 @@ public void close() throws IOException {
super.close();
plan = null;
joinQual = null;
joinFilter = null;
projector = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public DistinctGroupbyFirstAggregationExec(TaskAttemptContext context, DistinctG
}

@Override
public void init() throws IOException {
super.init();
public void init(boolean needsRescan) throws IOException {
super.init(needsRescan);

// finding grouping column index
Column[] groupingColumns = plan.getGroupingColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public DistinctGroupbyHashAggregationExec(TaskAttemptContext context, DistinctGr
}

@Override
public void init() throws IOException {
super.init();
public void init(boolean needsRescan) throws IOException {
super.init(needsRescan);

List<Integer> distinctGroupingKeyIdList = new ArrayList<Integer>();
for (Column col: plan.getGroupingColumns()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, Distinct
}

@Override
public void init() throws IOException {
super.init();
public void init(boolean needsRescan) throws IOException {
super.init(needsRescan);
numGroupingColumns = plan.getGroupingColumns().length;

List<GroupbyNode> groupbyNodes = plan.getSubPlans();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
import org.apache.tajo.plan.logical.DistinctGroupbyNode;
import org.apache.tajo.plan.logical.GroupbyNode;
import org.apache.tajo.storage.Tuple;
Expand Down Expand Up @@ -66,9 +65,12 @@ public DistinctGroupbySortAggregationExec(final TaskAttemptContext context, Dist
for(int i = 0; i < resultColumnIds.length; i++) {
resultColumnIdIndexes[resultColumnIds[i]] = i;
}
}

for (SortAggregateExec eachExec: aggregateExecs) {
eachExec.init();
@Override
public void init(boolean needsScan) throws IOException {
for (int i = 0; i < aggregateExecs.length; i++) {
aggregateExecs[i].init(i < groupbyNodeNum);
}
}

Expand Down Expand Up @@ -171,10 +173,6 @@ public void close() throws IOException {
}
}

@Override
public void init() throws IOException {
}

@Override
public void rescan() throws IOException {
finished = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctG
}

@Override
public void init() throws IOException {
super.init();
public void init(boolean needsRescan) throws IOException {
super.init(needsRescan);

numGroupingColumns = plan.getGroupingColumns().length;
resultTupleLength = numGroupingColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
}

@Override
public void init() throws IOException {
super.init();
public void init(boolean needsRescan) throws IOException {
super.init(needsRescan);
progress = 0.0f;
for (Target target : plan.getTargets()) {
target.getEvalTree().bind(inSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ public void setSortBufferBytesNum(int sortBufferBytesNum) {
this.sortBufferBytesNum = sortBufferBytesNum;
}

public void init() throws IOException {
public void init(boolean needsRescan) throws IOException {
inputStats = new TableStats();
super.init();
super.init(needsRescan);
}

public SortNode getPlan() {
Expand Down Expand Up @@ -279,8 +279,7 @@ public Tuple next() throws IOException {
info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");

if (memoryResident) { // if all sorted data reside in a main-memory table.
TupleSorter sorter = getSorter(inMemoryTable);
result = new MemTableScanner(sorter.sort(), inMemoryTable.size(), sortAndStoredBytes);
result = new MemTableScanner();
} else { // if input data exceeds main-memory at least once

try {
Expand Down Expand Up @@ -533,25 +532,26 @@ private Scanner createKWayMergerInternal(final Scanner [] sources, final int sta
}
}

private static class MemTableScanner extends AbstractScanner {
final Iterable<Tuple> iterable;
final long sortAndStoredBytes;
final int totalRecords;
private class MemTableScanner extends AbstractScanner {
Iterable<Tuple> iterable; // be warn, this can hold references to tuples inside of inMemoryTable
int totalRecords;

Iterator<Tuple> iterator;
// for input stats
float scannerProgress;
int numRecords;
TableStats scannerTableStats;

public MemTableScanner(Iterable<Tuple> iterable, int length, long inBytes) {
this.iterable = iterable;
this.totalRecords = length;
this.sortAndStoredBytes = inBytes;
public MemTableScanner() {
this.iterable = getSorter(inMemoryTable).sort();
this.totalRecords = inMemoryTable.size();
}

@Override
public void init() throws IOException {
if (iterable == null) {
throw new IllegalStateException("Backing memory is released already");
}
iterator = iterable.iterator();

scannerProgress = 0.0f;
Expand All @@ -566,12 +566,14 @@ public void init() throws IOException {

@Override
public Tuple next() throws IOException {
if (iterator.hasNext()) {
if (iterator != null && iterator.hasNext()) {
numRecords++;
return iterator.next();
} else {
return null;
}
if (!parentNeedsRescan) {
close();
}
return null;
}

@Override
Expand All @@ -582,6 +584,8 @@ public void reset() throws IOException {
@Override
public void close() throws IOException {
iterator = null;
iterable = null;
inMemoryTable.clear();
scannerProgress = 1.0f;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode
super(context, plan, child);
}

public void init() throws IOException {
super.init();
public void init(boolean needsRescan) throws IOException {
super.init(needsRescan);
}

private Appender getAppender(String partition) throws IOException {
Expand Down
Loading