From f0c582cc8b580030c31c1caa84925f0a84538ccf Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Fri, 14 Feb 2020 13:47:50 +0800 Subject: [PATCH] Init kudu.write_duration accumulator lazily --- .../scala/org/apache/kudu/spark/kudu/KuduContext.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index 85f1416095..e38bbc5de0 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -128,8 +128,11 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou val timestampAccumulator = new TimestampAccumulator() sc.register(timestampAccumulator) - val durationHistogram = new HdrHistogramAccumulator() - sc.register(durationHistogram, "kudu.write_duration") + lazy val durationHistogram = { + val acc = new HdrHistogramAccumulator() + sc.register(acc, "kudu.write_duration") + acc + } @deprecated("Use KuduContext constructor", "1.4.0") def this(kuduMaster: String) { @@ -374,6 +377,8 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou rdd = repartitionRows(rdd, tableName, schema, writeOptions) } + // materialize durationHistogram + durationHistogram // Write the rows for each Spark partition. rdd.foreachPartition(iterator => { val pendingErrors = writePartitionRows(