From c40da963a0b0b213cf7d9c7e000729f65df63d67 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 12 Jan 2016 18:57:28 +0900 Subject: [PATCH] TAJO-2054: Print physical operators in worker webUI. --- .../engine/planner/physical/PhysicalExec.java | 5 +++ .../planner/physical/PhysicalPlanUtil.java | 39 +++++++++++++++++++ .../engine/planner/physical/SeqScanExec.java | 4 +- .../org/apache/tajo/worker/TaskHistory.java | 17 ++++++++ .../java/org/apache/tajo/worker/TaskImpl.java | 10 +++++ tajo-core/src/main/proto/ResourceProtos.proto | 1 + .../resources/webapps/worker/taskhistory.jsp | 1 + 7 files changed, 75 insertions(+), 2 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java index c70e1ff985..c612893c87 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java @@ -90,4 +90,9 @@ protected Path getExecutorTmpDir() { public TableStats getInputStats() { return null; } + + @Override + public String toString() { + return String.format("%s", getClass().getSimpleName()); + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index d1dfe40430..a861edff18 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,6 +42,7 @@ import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.TUtil; import java.io.IOException; import java.util.ArrayList; @@ -253,4 +255,41 @@ public static void setNullCharIfNecessary(QueryContext context, PersistentStoreN } } } + + /** + * A pretty print string of a physical operator and its descendant operators. + * + */ + public static void printPhysicalPlan(PhysicalExec exec, StringBuilder builder, int depth) { + + String pad = " "; + if (exec instanceof UnaryPhysicalExec) { + UnaryPhysicalExec operator = TUtil.checkTypeAndGet(exec, UnaryPhysicalExec.class); + + builder.append(StringUtils.repeat(pad, depth)).append(operator.toString()); + builder.append("\n"); + + if (operator.getChild() != null) { + printPhysicalPlan(operator.getChild(), builder, ++depth); + } + } else if (exec instanceof BinaryPhysicalExec) { + BinaryPhysicalExec operator = TUtil.checkTypeAndGet(exec, BinaryPhysicalExec.class); + + builder.append(StringUtils.repeat(pad, depth)).append(operator.getClass().getSimpleName()); + builder.append("\n"); + + if (operator.getLeftChild() != null) { + printPhysicalPlan(operator.getLeftChild(), builder, depth + 1); + } + + if (operator.getRightChild() != null) { + printPhysicalPlan(operator.getRightChild(), builder, depth + 1); + } + } else if (exec == null) { + return; + } else { + builder.append(StringUtils.repeat(pad, depth)).append(exec.toString()); + builder.append("\n"); + } + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 3ddad1e5dc..9cfc681d4c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -363,9 +363,9 @@ public TableStats getInputStats() { @Override public String toString() { if (scanner != null) { - return "SeqScanExec:" + plan + "," + scanner.getClass().getName(); + return String.format("%s:%s(%s)", getClass().getSimpleName(), scanner.getClass().getSimpleName(), plan); } else { - return "SeqScanExec:" + plan; + return String.format("%s(%s)", getClass().getSimpleName(), plan); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java index 52b0d0bd4b..42b2b09532 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java @@ -46,6 +46,7 @@ public class TaskHistory implements ProtoObject, History { private CatalogProtos.TableStatsProto outputStats; private String outputPath; private String workingPath; + private String physicalPlan; private int finishedFetchCount; private int totalFetchCount; @@ -90,6 +91,10 @@ public TaskHistory(TaskHistoryProto proto) { this.totalFetchCount = proto.getTotalFetchCount(); } + if(proto.hasPhysicalPlan()) { + this.physicalPlan = proto.getPhysicalPlan(); + } + this.fetcherHistories = proto.getFetcherHistoriesList(); } @@ -134,6 +139,10 @@ public TaskHistoryProto getProto() { builder.setFinishedFetchCount(finishedFetchCount); } + if(physicalPlan != null) { + builder.setPhysicalPlan(physicalPlan); + } + builder.addAllFetcherHistories(fetcherHistories); return builder.build(); } @@ -214,6 +223,14 @@ public void setOutputStats(CatalogProtos.TableStatsProto outputStats) { this.outputStats = outputStats; } + public String getPhysicalPlan() { + return physicalPlan; + } + + public void setPhysicalPlan(String physicalPlan) { + this.physicalPlan = physicalPlan; + } + @Override public HistoryType getHistoryType() { return HistoryType.TASK; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 6d9639cdd1..cea9ea3021 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -39,6 +39,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.physical.PhysicalExec; +import org.apache.tajo.engine.planner.physical.PhysicalPlanUtil; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequest; import org.apache.tajo.ipc.QueryMasterProtocol; @@ -102,6 +103,7 @@ public class TaskImpl implements Task { private TupleComparator sortComp = null; private final int maxUrlLength; + private String physicalPlan; public TaskImpl(final TaskRequest request, final ExecutionBlockContext executionBlockContext) throws IOException { @@ -417,6 +419,10 @@ public void run() throws Exception { this.executor = executionBlockContext.getTQueryEngine().createPlan(context, plan); this.executor.init(); + StringBuilder builder = new StringBuilder(); + PhysicalPlanUtil.printPhysicalPlan(executor, builder, 0); + this.physicalPlan = builder.toString(); + while(!context.isStopped() && executor.next() != null) { } } @@ -535,6 +541,10 @@ public TaskHistory createTaskHistory() { } taskHistory.setFinishedFetchCount(i); } + + if(physicalPlan != null) { + taskHistory.setPhysicalPlan(physicalPlan); + } } catch (Exception e) { LOG.warn(e.getMessage(), e); } diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto index 3643a97613..361a1407f6 100644 --- a/tajo-core/src/main/proto/ResourceProtos.proto +++ b/tajo-core/src/main/proto/ResourceProtos.proto @@ -309,4 +309,5 @@ message TaskHistoryProto { optional int32 finished_fetch_count = 10; optional int32 total_fetch_count = 11; repeated FetcherHistoryProto fetcher_histories = 12; + optional string physical_plan = 13; } diff --git a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp index 5dd52d3d2c..816e5486f9 100644 --- a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp +++ b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp @@ -63,6 +63,7 @@

Tajo Worker: <%=tajoWorker.getWorkerContext().getWorkerName()%>


Task Detail: <%=request.getParameter("taskAttemptId")%>

+
<%= taskHistory.getPhysicalPlan() == null ? "" : taskHistory.getPhysicalPlan()%>
ID<%=request.getParameter("taskAttemptId")%>
State<%=taskHistory.getState()%>