diff --git a/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala b/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala index baf8791..be36e06 100644 --- a/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala +++ b/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala @@ -22,6 +22,7 @@ object SparkSubmitPlugin extends AutoPlugin { lazy val sparkSubmitMaster = settingKey[(Seq[String], Seq[String]) => String]("(SparkArgs, AppArgs) => Default Spark Master") lazy val sparkSubmitPropertiesFile = settingKey[Option[String]]("The default configuration file used by Spark") lazy val sparkSubmitClasspath = taskKey[Seq[File]]("Classpath used in SparkSubmit. For example, this can include the HADOOP_CONF_DIR for yarn deployment.") + def sparkSubmit = defaultSparkSubmitKey class SparkSubmitSetting(name: String) { lazy val sparkSubmit = InputKey[Unit](name, @@ -48,6 +49,13 @@ object SparkSubmitPlugin extends AutoPlugin { lazy val defaultSettings = Seq( + fork in sparkSubmit := (fork in defaultSparkSubmitKey).value, + javaHome in sparkSubmit := (javaHome in defaultSparkSubmitKey).value, + connectInput in sparkSubmit := (connectInput in defaultSparkSubmitKey).value, + outputStrategy in sparkSubmit := (outputStrategy in defaultSparkSubmitKey).value, + javaOptions in sparkSubmit := (javaOptions in defaultSparkSubmitKey).value, + baseDirectory in sparkSubmit := (baseDirectory in defaultSparkSubmitKey).value, + envVars in sparkSubmit := (envVars in defaultSparkSubmitKey).value, sparkSubmit := { val jar = (sparkSubmitJar in sparkSubmit).value @@ -80,8 +88,23 @@ object SparkSubmitPlugin extends AutoPlugin { } } + val runner = + if((fork in sparkSubmit).value) { + val forkOptions = ForkOptions( + bootJars = Nil, + javaHome = (javaHome in sparkSubmit).value, + connectInput = (connectInput in sparkSubmit).value, + outputStrategy = (outputStrategy in sparkSubmit).value, + runJVMOptions = (javaOptions in sparkSubmit).value, + workingDirectory = Some((baseDirectory in sparkSubmit).value), + envVars = (envVars in sparkSubmit).value + ) + new sbt.ForkRun(forkOptions) + } else { + new sbt.Run(scalaInstance.value, trapExit.value, taskTemporaryDirectory.value) + } - runner.value.run( + runner.run( "org.apache.spark.deploy.SparkSubmit", sparkSubmitClasspath.value, options, @@ -121,7 +144,8 @@ object SparkSubmitPlugin extends AutoPlugin { sparkSubmitClasspath := data((fullClasspath in Compile).value) ) - def defaultSparkSubmitSetting: SparkSubmitSetting = SparkSubmitSetting("sparkSubmit") + lazy val defaultSparkSubmitSetting: SparkSubmitSetting = SparkSubmitSetting("sparkSubmit") + def defaultSparkSubmitKey = defaultSparkSubmitSetting.sparkSubmit override def trigger = allRequirements } @@ -145,4 +169,4 @@ object SparkSubmitYARN extends AutoPlugin { data((fullClasspath in Compile).value) } ) -} \ No newline at end of file +}