diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 52c5bf0..f078861 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -22,6 +22,7 @@ import java.net.URI import com.qubole.sparklens.analyzer._ import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} +import com.qubole.sparklens.pluggable.{ComplimentaryMetrics, DriverMetrics} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf @@ -48,6 +49,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { protected val stageIDToJobID = new mutable.HashMap[Int, Long] protected val failedStages = new ListBuffer[String] protected val appMetrics = new AggregateMetrics() + val pluggableMetricsMap = new mutable.HashMap[String, ComplimentaryMetrics]() + private def hostCount():Int = hostMap.size @@ -144,11 +147,18 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { //println(s"Application ${applicationStart.appId} started at ${applicationStart.time}") appInfo.applicationID = applicationStart.appId.getOrElse("NA") appInfo.startTime = applicationStart.time + pluggableMetricsMap("driverMetrics") = new DriverMetrics() + pluggableMetricsMap.foreach(x => + x._2.onApplicationStart(applicationStart) + ) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { //println(s"Application ${appInfo.applicationID} ended at ${applicationEnd.time}") appInfo.endTime = applicationEnd.time + pluggableMetricsMap.foreach(x => + x._2.onApplicationEnd(applicationEnd) + ) val appContext = new AppContext(appInfo, appMetrics, @@ -157,7 +167,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { jobMap, jobSQLExecIDMap, stageMap, - stageIDToJobID) + stageIDToJobID, + pluggableMetricsMap) asyncReportingEnabled(sparkConf) match { case true => { diff --git a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala index b7a2fc4..61a3900 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala @@ -125,7 +125,8 @@ class QuboleNotebookListener(sparkConf: SparkConf) extends QuboleJobListener(spa jobMap, jobSQLExecIDMap, stageMap, - stageIDToJobID) + stageIDToJobID, + pluggableMetricsMap) val out = new mutable.StringBuilder() diff --git a/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala b/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala index 56f7584..25c70a3 100644 --- a/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala +++ b/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala @@ -34,6 +34,12 @@ class SimpleAppAnalyzer extends AppAnalyzer { "task-level granularity and aggregated across the app (all tasks, stages, and jobs).\n") ac.appMetrics.print("Application Metrics", out) out.println("\n") + + ac.pluggableMetricsMap.foreach(x => { + x._2.print(x._1, out) + out.println("\n") + }) + out.println("\n") out.toString() } } diff --git a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala index 99a2a79..8cf7a17 100644 --- a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala @@ -21,62 +21,14 @@ import java.util.Locale import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.TaskInfo + +import com.qubole.sparklens.common.MetricsHelper._ + import org.json4s.DefaultFormats import org.json4s.JsonAST.JValue import scala.collection.mutable -/* -Keeps track of min max sum mean and variance for any metric at any level -Reference to incremental updates of variance: -https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_Online_algorithm - */ - -class AggregateValue { - var value: Long = 0L - var min: Long = Long.MaxValue - var max: Long = Long.MinValue - var mean: Double = 0.0 - var variance: Double = 0.0 - var m2: Double = 0.0 - - override def toString(): String = { - s"""{ - | "value": ${value}, - | "min": ${min}, - | "max": ${max}, - | "mean": ${mean}, - | "m2": ${m2} - | "variance": ${variance} - }""".stripMargin - } - - def getMap(): Map[String, Any] = { - Map("value" -> value, - "min" -> min, - "max" -> max, - "mean" -> mean, - "m2" -> m2, - "variance" -> variance) - } -} - -object AggregateValue { - def getValue(json: JValue): AggregateValue = { - implicit val formats = DefaultFormats - - val value = new AggregateValue - value.value = (json \ "value").extract[Long] - value.min = (json \ "min").extract[Long] - value.max = (json \ "max").extract[Long] - value.mean = (json \ "mean").extract[Double] - value.variance = (json \ "variance").extract[Double] - //making it optional for backward compatibility with sparklens.json files - value.m2 = (json \ "m2").extractOrElse[Double](0.0) - value - } -} - class AggregateMetrics() { var count = 0L val map = new mutable.HashMap[AggregateMetrics.Metric, AggregateValue]() @@ -103,45 +55,6 @@ class AggregateMetrics() { @transient val numberFormatter = java.text.NumberFormat.getIntegerInstance - def bytesToString(size: Long): String = { - val TB = 1L << 40 - val GB = 1L << 30 - val MB = 1L << 20 - val KB = 1L << 10 - - val (value, unit) = { - if (Math.abs(size) >= 1*TB) { - (size.asInstanceOf[Double] / TB, "TB") - } else if (Math.abs(size) >= 1*GB) { - (size.asInstanceOf[Double] / GB, "GB") - } else if (Math.abs(size) >= 1*MB) { - (size.asInstanceOf[Double] / MB, "MB") - } else { - (size.asInstanceOf[Double] / KB, "KB") - } - } - "%.1f %s".formatLocal(Locale.US, value, unit) - } - - def toMillis(size:Long): String = { - val MS = 1000000L - val SEC = 1000 * MS - val MT = 60 * SEC - val HR = 60 * MT - - val (value, unit) = { - if (size >= 1*HR) { - (size.asInstanceOf[Double] / HR, "hh") - } else if (size >= 1*MT) { - (size.asInstanceOf[Double] / MT, "mm") - } else if (size >= 1*SEC) { - (size.asInstanceOf[Double] / SEC, "ss") - } else { - (size.asInstanceOf[Double] / MS, "ms") - } - } - "%.1f %s".formatLocal(Locale.US, value, unit) - } def formatNanoTime(x: (AggregateMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { sb.append(f" ${x._1}%-30s${toMillis(x._2.value)}%20s${toMillis(x._2.min)}%15s${toMillis(x._2.max)}%15s${toMillis(x._2.mean.toLong)}%20s") @@ -202,8 +115,8 @@ class AggregateMetrics() { count += 1 } - def print(caption: String, sb: mutable.StringBuilder):Unit = { - sb.append(s" AggregateMetrics (${caption}) total measurements ${count} ") + def print(caption: String, sb: mutable.StringBuilder): Unit = { + sb.append(s" AggregateMetrics (${caption}) total measurements ${count} ") .append("\n") sb.append(f" NAME SUM MIN MAX MEAN ") .append("\n") diff --git a/src/main/scala/com/qubole/sparklens/common/AggregateValue.scala b/src/main/scala/com/qubole/sparklens/common/AggregateValue.scala new file mode 100644 index 0000000..5d85290 --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/common/AggregateValue.scala @@ -0,0 +1,56 @@ +package com.qubole.sparklens.common + +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JValue + +/* +Keeps track of min max sum mean and variance for any metric at any level +Reference to incremental updates of variance: +https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_Online_algorithm + */ + +class AggregateValue { + var value: Long = 0L + var min: Long = Long.MaxValue + var max: Long = Long.MinValue + var mean: Double = 0.0 + var variance: Double = 0.0 + var m2: Double = 0.0 + + override def toString(): String = { + s"""{ + | "value": ${value}, + | "min": ${min}, + | "max": ${max}, + | "mean": ${mean}, + | "m2": ${m2} + | "variance": ${variance} + }""".stripMargin + } + + def getMap(): Map[String, Any] = { + Map("value" -> value, + "min" -> min, + "max" -> max, + "mean" -> mean, + "m2" -> m2, + "variance" -> variance) + } +} + +object AggregateValue { + def getValue(json: JValue): AggregateValue = { + implicit val formats = DefaultFormats + + val value = new AggregateValue + value.value = (json \ "value").extract[Long] + value.min = (json \ "min").extract[Long] + value.max = (json \ "max").extract[Long] + value.mean = (json \ "mean").extract[Double] + value.variance = (json \ "variance").extract[Double] + //making it optional for backward compatibility with sparklens.json files + value.m2 = (json \ "m2").extractOrElse[Double](0.0) + value + } +} + diff --git a/src/main/scala/com/qubole/sparklens/common/AppContext.scala b/src/main/scala/com/qubole/sparklens/common/AppContext.scala index d5263b8..22d7e92 100644 --- a/src/main/scala/com/qubole/sparklens/common/AppContext.scala +++ b/src/main/scala/com/qubole/sparklens/common/AppContext.scala @@ -17,6 +17,7 @@ package com.qubole.sparklens.common import com.qubole.sparklens.timespan._ +import com.qubole.sparklens.pluggable.ComplimentaryMetrics import org.json4s.DefaultFormats import org.json4s.JsonAST.JValue import org.json4s.jackson.Serialization @@ -31,7 +32,8 @@ case class AppContext(appInfo: ApplicationInfo, jobMap: mutable.HashMap[Long, JobTimeSpan], jobSQLExecIdMap:mutable.HashMap[Long, Long], stageMap: mutable.HashMap[Int, StageTimeSpan], - stageIDToJobID: mutable.HashMap[Int, Long]) { + stageIDToJobID: mutable.HashMap[Int, Long], + pluggableMetricsMap: mutable.HashMap[String, ComplimentaryMetrics]) { def filterByStartAndEndTime(startTime: Long, endTime: Long): AppContext = { new AppContext(appInfo, @@ -48,7 +50,8 @@ case class AppContext(appInfo: ApplicationInfo, stageMap .filter(x => x._2.startTime >= startTime && x._2.endTime <= endTime), - stageIDToJobID) + stageIDToJobID, + pluggableMetricsMap) } override def toString(): String = { @@ -61,7 +64,8 @@ case class AppContext(appInfo: ApplicationInfo, "jobMap" -> AppContext.getMap(jobMap), "jobSQLExecIdMap" -> jobSQLExecIdMap, "stageMap" -> AppContext.getMap(stageMap), - "stageIDToJobID" -> stageIDToJobID + "stageIDToJobID" -> stageIDToJobID, + "pluggableMetricsMap" -> ComplimentaryMetrics.getMap(pluggableMetricsMap) ) Serialization.writePretty(map) } @@ -134,7 +138,8 @@ object AppContext { JobTimeSpan.getTimeSpan((json \ "jobMap").extract[Map[String, JValue]]), getJobSQLExecIdMap(json, new mutable.HashMap[Long, Long]), StageTimeSpan.getTimeSpan((json \ "stageMap").extract[Map[String, JValue]]), - getJobToStageMap((json \ "stageIDToJobID").extract[Map[Int, JValue]]) + getJobToStageMap((json \ "stageIDToJobID").extract[Map[Int, JValue]]), + ComplimentaryMetrics.getMetricsMap((json \ "pluggableMetricsMap").extract[Map[String, JValue]]) ) } diff --git a/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala b/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala new file mode 100644 index 0000000..0c1f730 --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala @@ -0,0 +1,63 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.qubole.sparklens.common + +import java.util.Locale + +object MetricsHelper { + + def bytesToString(size: Long): String = { + val TB = 1L << 40 + val GB = 1L << 30 + val MB = 1L << 20 + val KB = 1L << 10 + + val (value, unit) = { + if (Math.abs(size) >= 1*TB) { + (size.asInstanceOf[Double] / TB, "TB") + } else if (Math.abs(size) >= 1*GB) { + (size.asInstanceOf[Double] / GB, "GB") + } else if (Math.abs(size) >= 1*MB) { + (size.asInstanceOf[Double] / MB, "MB") + } else { + (size.asInstanceOf[Double] / KB, "KB") + } + } + "%.1f %s".formatLocal(Locale.US, value, unit) + } + + def toMillis(size:Long): String = { + val MS = 1000000L + val SEC = 1000 * MS + val MT = 60 * SEC + val HR = 60 * MT + + val (value, unit) = { + if (size >= 1*HR) { + (size.asInstanceOf[Double] / HR, "hh") + } else if (size >= 1*MT) { + (size.asInstanceOf[Double] / MT, "mm") + } else if (size >= 1*SEC) { + (size.asInstanceOf[Double] / SEC, "ss") + } else { + (size.asInstanceOf[Double] / MS, "ms") + } + } + "%.1f %s".formatLocal(Locale.US, value, unit) + } +} diff --git a/src/main/scala/com/qubole/sparklens/pluggable/ComplimentaryMetrics.scala b/src/main/scala/com/qubole/sparklens/pluggable/ComplimentaryMetrics.scala new file mode 100644 index 0000000..d250495 --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/pluggable/ComplimentaryMetrics.scala @@ -0,0 +1,70 @@ +package com.qubole.sparklens.pluggable + +import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerApplicationStart} +import org.json4s.{DefaultFormats, MappingException} +import org.json4s.JsonAST.JValue + +import scala.collection.mutable + +trait ComplimentaryMetrics { + def getMap(): Map[String, _ <: Any] = { + throw new NotImplementedError(s"getMap() method is not implemented.") + } + + def getObject(json: JValue): ComplimentaryMetrics = { + throw new NotImplementedError(s"getObject() method is not implemented.") + } + + def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + throw new NotImplementedError(s"getObject() method is not implemented.") + } + + def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + throw new NotImplementedError(s"getObject() method is not implemented.") + } + + def print(caption: String, sb: mutable.StringBuilder): Unit = { + throw new NotImplementedError(s"getObject() method is not implemented.") + } +} + +object ComplimentaryMetrics { + + /** + * Returns object which extends [[ComplimentaryMetrics]] by matching input string + */ + def fromString(value: String): ComplimentaryMetrics = { + value.toLowerCase match { + case "drivermetrics" => DriverMetrics + case _ => throw new Exception(s"Object ${value} not found.") + } + } + + /** + * Used for for extracting the pluggableMetricsMap in [[com.qubole.sparklens.QuboleJobListener]] + * to construct [[com.qubole.sparklens.common.AppContext]] from the JSON. + */ + def getMetricsMap(json: Map[String, JValue]): mutable.HashMap[String, ComplimentaryMetrics] = { + val metricsMap = new mutable.HashMap[String, ComplimentaryMetrics] + try { + implicit val formats = DefaultFormats + val metricsMap = new mutable.HashMap[String, ComplimentaryMetrics] + json.keys.map(key => { + val value = json.get(key).get + metricsMap.put(key, fromString(key).getObject(value)) + }) + } catch { + case e: Exception if !e.isInstanceOf[MappingException] => + throw(e) + } + metricsMap + } + + /** + * Used for for converting the pluggableMetricsMap in [[com.qubole.sparklens.QuboleJobListener]] + * to a formatted map which is then dumped in the JSON file/printed on console. + */ + def getMap(metricsMap: mutable.HashMap[String, _ <: ComplimentaryMetrics]): Map[String, Any] = { + metricsMap.keys.map(key => (key.toString, metricsMap(key).getMap)).toMap + } +} \ No newline at end of file diff --git a/src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala b/src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala new file mode 100644 index 0000000..4036c22 --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala @@ -0,0 +1,169 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.qubole.sparklens.pluggable + +import java.lang.management.ManagementFactory +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + +import com.qubole.sparklens.common.AggregateValue +import com.qubole.sparklens.common.MetricsHelper._ +import javax.management.{Attribute, ObjectName} +import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerApplicationStart} +import org.json4s + +import scala.collection.mutable + +class DriverMetrics extends ComplimentaryMetrics { + + private val ProcessCpuTime = "ProcessCpuTime" + + val map = new mutable.HashMap[DriverMetrics.Metric, AggregateValue]() + @transient val formatterMap = new mutable.HashMap[DriverMetrics.Metric, ((DriverMetrics + .Metric, AggregateValue), mutable.StringBuilder) => Unit]() + + formatterMap(DriverMetrics.driverHeapMax) = formatStaticBytes + formatterMap(DriverMetrics.driverMaxHeapCommitted) = formatStaticBytes + formatterMap(DriverMetrics.driverMaxHeapUsed) = formatStaticBytes + formatterMap(DriverMetrics.driverCPUTime) = formatStaticMillisTime + formatterMap(DriverMetrics.driverGCTime) = formatStaticMillisTime + formatterMap(DriverMetrics.driverGCCount) = formatCount + + + private val threadExecutor = Executors.newSingleThreadScheduledExecutor + threadExecutor + + val updateDriverMemMetrics = new Runnable { + def run() = { + val memUsage = java.lang.management.ManagementFactory.getMemoryMXBean.getHeapMemoryUsage + updateMetric(DriverMetrics.driverHeapMax, memUsage.getMax) + updateMetric(DriverMetrics.driverMaxHeapCommitted, memUsage.getCommitted) + updateMetric(DriverMetrics.driverMaxHeapUsed, memUsage.getUsed) + } + } + + def collectGCMetrics(): Unit = { + val operatingSystemObjectName = ObjectName.getInstance("java.lang:type=OperatingSystem") + updateMetric(DriverMetrics.driverCPUTime, + ManagementFactory.getPlatformMBeanServer + .getAttribute(operatingSystemObjectName, ProcessCpuTime).asInstanceOf[Attribute] + .getValue.asInstanceOf[Long]) + + var gcCount: Long = 0 + var gcTime: Long = 0 + val iter = ManagementFactory.getGarbageCollectorMXBeans.iterator() + while (iter.hasNext) { + val current = iter.next() + gcCount += current.getCollectionCount + gcTime += current.getCollectionTime + } + updateMetric(DriverMetrics.driverGCTime, gcTime) + updateMetric(DriverMetrics.driverGCCount, gcCount) + } + + override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + scheduleMetricsCollection() + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + collectGCMetrics() + terminateMetricsCollection() + } + + // Start a thread to collect the driver JVM memory stats every 10 seconds + def scheduleMetricsCollection(): Unit = { + threadExecutor.scheduleAtFixedRate(updateDriverMemMetrics, 0, 10, TimeUnit.SECONDS) + } + + def terminateMetricsCollection(): Unit = { + threadExecutor.shutdown() + } + + def updateMetric(metric: DriverMetrics.Metric, newValue: Long): Unit = { + val aggregateValue = map.getOrElse(metric, new AggregateValue) + if (!map.contains(metric)) { + map(metric) = aggregateValue + } + aggregateValue.value = math.max(aggregateValue.max, newValue) + } + + override def getMap(): Map[String, Any] = { + Map("map" -> map.keys.map(key => (key.toString, map.get(key).get.getMap())).toMap) + } + + def formatStaticMillisTime(x: (DriverMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { + def addUnits(x: Long): String = { + toMillis(x * 1000000) + } + sb.append(f" ${x._1}%-30s${addUnits(x._2.value)}%20s") + .append("\n") + } + + def formatStaticBytes(x: (DriverMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { + sb.append(f" ${x._1}%-30s${bytesToString(x._2.value)}%20s") + .append("\n") + } + + def formatCount(x: (DriverMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { + sb.append(f" ${x._1}%-30s${x._2.value}%20s") + .append("\n") + } + + override def print(caption: String, sb: mutable.StringBuilder):Unit = { + sb.append(s" DriverMetrics (${caption}) ") + .append("\n") + sb.append(f" NAME Value ") + .append("\n") + + map.toBuffer.sortWith((a, b) => a._1.toString < b._1.toString).foreach(x => { + formatterMap(x._1)(x, sb) + }) + } +} + +object DriverMetrics extends Enumeration with ComplimentaryMetrics { + import org.json4s._ + + type Metric = Value + + val driverHeapMax, + driverMaxHeapCommitted, + driverMaxHeapUsed, + driverCPUTime, + driverGCCount, + driverGCTime + = Value + + override def getObject(json: json4s.JValue): ComplimentaryMetrics = { + try { + implicit val formats = DefaultFormats + + val metrics = new DriverMetrics() + val map = (json \ "map").extract[Map[String, JValue]] + + map.keys.foreach(key => metrics.map.put(withName(key), + AggregateValue.getValue(map.get(key).get))) + + metrics + } catch { + case e: MappingException => + new DriverMetrics() + case e: Exception => + throw(e) + } + } +} diff --git a/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala b/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala index f129413..c017723 100644 --- a/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala +++ b/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala @@ -18,9 +18,10 @@ package com.qubole.sparklens.analyzer -import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} +import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} import com.qubole.sparklens.helper.JobOverlapHelper +import com.qubole.sparklens.pluggable.ComplimentaryMetrics import org.scalatest.FunSuite @@ -66,7 +67,9 @@ class JobOverlapAnalyzerSuite extends FunSuite { jobMap, jobSQLExecIDMap, mutable.HashMap[Int, StageTimeSpan](), - mutable.HashMap[Int, Long]()) + mutable.HashMap[Int, Long](), + mutable.HashMap[String, ComplimentaryMetrics]() + ) } test("JobOverlapAnalyzerTest: Jobs running in parallel should be considered while computing " + diff --git a/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala b/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala index 86fbe77..8005aa2 100644 --- a/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala +++ b/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala @@ -17,8 +17,9 @@ package com.qubole.sparklens.scheduler -import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} +import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} +import com.qubole.sparklens.pluggable.ComplimentaryMetrics import org.scalatest.FunSuite import scala.collection.mutable @@ -266,7 +267,8 @@ class PQParallelStageSchedulerSuite extends FunSuite { jobMap, jobSQLExecIDMap, mutable.HashMap[Int, StageTimeSpan](), - mutable.HashMap[Int, Long]()) + mutable.HashMap[Int, Long](), + mutable.HashMap[String, ComplimentaryMetrics]()) val time = CompletionEstimator.estimateAppWallClockTimeWithJobLists(ac, 1, 1, 3) assert(time === 3, s"Test failed")