Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@ case class NativeShuffleExchangeExec(
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) ++
mutable.LinkedHashMap(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set(
"stage_id",
"mem_spill_count",
"mem_spill_size",
"mem_spill_iotime",
"disk_spill_size",
"disk_spill_iotime",
"shuffle_write_total_time",
"shuffle_read_total_time"))
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"mem_spill_count",
"mem_spill_size",
"mem_spill_iotime",
"disk_spill_size",
"disk_spill_iotime",
"shuffle_write_total_time",
"shuffle_read_total_time"))
.toSeq: _*)).toMap

lazy val readMetrics: Map[String, SQLMetric] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,38 +167,50 @@ object NativeHelper extends Logging {
})
}

def getDefaultNativeMetrics(sc: SparkContext): Map[String, SQLMetric] = {
def metric(name: String) = SQLMetrics.createMetric(sc, name)
def nanoTimingMetric(name: String) = SQLMetrics.createNanoTimingMetric(sc, name)
def sizeMetric(name: String) = SQLMetrics.createSizeMetric(sc, name)

var metrics = TreeMap(
"stage_id" -> metric("stageId"),
"output_rows" -> metric("Native.output_rows"),
"output_batches" -> metric("Native.output_batches"),
"elapsed_compute" -> nanoTimingMetric("Native.elapsed_compute"),
"build_hash_map_time" -> nanoTimingMetric("Native.build_hash_map_time"),
"probed_side_hash_time" -> nanoTimingMetric("Native.probed_side_hash_time"),
"probed_side_search_time" -> nanoTimingMetric("Native.probed_side_search_time"),
"probed_side_compare_time" -> nanoTimingMetric("Native.probed_side_compare_time"),
"build_output_time" -> nanoTimingMetric("Native.build_output_time"),
"fallback_sort_merge_join_time" -> nanoTimingMetric("Native.fallback_sort_merge_join_time"),
"mem_spill_count" -> metric("Native.mem_spill_count"),
"mem_spill_size" -> sizeMetric("Native.mem_spill_size"),
"mem_spill_iotime" -> nanoTimingMetric("Native.mem_spill_iotime"),
"disk_spill_size" -> sizeMetric("Native.disk_spill_size"),
"disk_spill_iotime" -> nanoTimingMetric("Native.disk_spill_iotime"),
"shuffle_write_total_time" -> nanoTimingMetric("Native.shuffle_write_total_time"),
"shuffle_read_total_time" -> nanoTimingMetric("Native.shuffle_read_total_time"))

if (AuronAdaptor.getInstance.getAuronConfiguration.getBoolean(
SparkAuronConfiguration.INPUT_BATCH_STATISTICS_ENABLE)) {
metrics ++= TreeMap(
"input_batch_count" -> metric("Native.input_batches"),
"input_row_count" -> metric("Native.input_rows"),
"input_batch_mem_size" -> sizeMetric("Native.input_mem_bytes"))
private val defaultNativeMetricCreators: Map[String, SparkContext => SQLMetric] = Map(
"stage_id" -> (sc => SQLMetrics.createMetric(sc, "stageId")),
"output_rows" -> (sc => SQLMetrics.createMetric(sc, "Native.output_rows")),
"output_batches" -> (sc => SQLMetrics.createMetric(sc, "Native.output_batches")),
"elapsed_compute" -> (sc => SQLMetrics.createNanoTimingMetric(sc, "Native.elapsed_compute")),
"build_hash_map_time" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.build_hash_map_time")),
"probed_side_hash_time" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.probed_side_hash_time")),
"probed_side_search_time" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.probed_side_search_time")),
"probed_side_compare_time" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.probed_side_compare_time")),
"build_output_time" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.build_output_time")),
"fallback_sort_merge_join_time" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.fallback_sort_merge_join_time")),
"mem_spill_count" -> (sc => SQLMetrics.createMetric(sc, "Native.mem_spill_count")),
"mem_spill_size" -> (sc => SQLMetrics.createSizeMetric(sc, "Native.mem_spill_size")),
"mem_spill_iotime" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.mem_spill_iotime")),
"disk_spill_size" -> (sc => SQLMetrics.createSizeMetric(sc, "Native.disk_spill_size")),
"disk_spill_iotime" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.disk_spill_iotime")),
"shuffle_write_total_time" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.shuffle_write_total_time")),
"shuffle_read_total_time" -> (sc =>
SQLMetrics.createNanoTimingMetric(sc, "Native.shuffle_read_total_time")),
"input_batch_count" -> (sc => SQLMetrics.createMetric(sc, "Native.input_batches")),
"input_row_count" -> (sc => SQLMetrics.createMetric(sc, "Native.input_rows")),
"input_batch_mem_size" -> (sc => SQLMetrics.createSizeMetric(sc, "Native.input_mem_bytes")))

def getDefaultNativeMetrics(sc: SparkContext, keys: Set[String]): Map[String, SQLMetric] = {
val enabledKeys =
if (AuronAdaptor.getInstance.getAuronConfiguration.getBoolean(
SparkAuronConfiguration.INPUT_BATCH_STATISTICS_ENABLE)) {
keys
} else {
keys -- Set("input_batch_count", "input_row_count", "input_batch_mem_size")
}

TreeMap[String, SQLMetric]() ++ enabledKeys.iterator.flatMap { key =>
defaultNativeMetricCreators.get(key).map(f => key -> f(sc))
}
metrics
}

private def getDefaultNativeFileMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Expand Down Expand Up @@ -233,7 +245,8 @@ object NativeHelper extends Logging {
}

def getNativeFileScanMetrics(sc: SparkContext): Map[String, SQLMetric] = TreeMap(
getDefaultNativeMetrics(sc)
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
.toSeq ++ getDefaultNativeFileMetrics(sc).toSeq: _*)
getDefaultNativeMetrics(
sc,
Set("stage_id", "output_rows", "elapsed_compute")).toSeq ++ getDefaultNativeFileMetrics(
sc).toSeq: _*)
Comment on lines 247 to +251
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getNativeFileScanMetrics still materializes getDefaultNativeMetrics(...).toSeq and reconstructs a TreeMap via varargs. Since getDefaultNativeMetrics now returns a map directly, you can avoid the intermediate sequence/varargs by combining maps with ++ (e.g., getDefaultNativeMetrics(...) ++ getDefaultNativeFileMetrics(sc)), preserving sorted output while being a bit lighter.

Copilot uses AI. Check for mistakes.
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ abstract class ConvertToNativeBase(override val child: SparkPlan)

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
.getDefaultNativeMetrics(sparkContext, Set("stage_id", "output_rows", "elapsed_compute"))
.toSeq :+
("size", SQLMetrics.createSizeMetric(sparkContext, "Native.batch_bytes_size")): _*)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,20 @@ abstract class NativeAggBase(

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set(
"stage_id",
"output_rows",
"elapsed_compute",
"mem_spill_count",
"mem_spill_size",
"mem_spill_iotime",
"disk_spill_size",
"disk_spill_iotime",
"input_batch_count",
"input_batch_mem_size",
"input_row_count"))
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
"elapsed_compute",
"mem_spill_count",
"mem_spill_size",
"mem_spill_iotime",
"disk_spill_size",
"disk_spill_iotime",
"input_batch_count",
"input_batch_mem_size",
"input_row_count"))
.toSeq: _*) ++
Map(
"hashing_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Native.hashing_time")) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,29 @@ abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val chi
def getRunId: UUID
override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
"output_batches",
"elapsed_compute",
"build_hash_map_time",
"probed_side_hash_time",
"probed_side_search_time",
"probed_side_compare_time",
"build_output_time",
"fallback_sort_merge_join_time",
"mem_spill_count",
"mem_spill_size",
"mem_spill_iotime",
"disk_spill_size",
"disk_spill_iotime",
"shuffle_write_total_time",
"shuffle_read_total_time",
"input_batch_count",
"input_row_count",
"input_batch_mem_size"))
.toSeq :+
("dataSize", SQLMetrics.createSizeMetric(sparkContext, "data size")) :+
("numOutputRows", SQLMetrics.createMetric(sparkContext, "number of output rows")) :+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,20 @@ abstract class NativeBroadcastJoinBase(

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set(
"stage_id",
"output_rows",
"elapsed_compute",
"probed_side_hash_time",
"probed_side_search_time",
"probed_side_compare_time",
"build_output_time",
"fallback_sort_merge_join_time",
"input_batch_count",
"input_batch_mem_size",
"input_row_count"))
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
"elapsed_compute",
"probed_side_hash_time",
"probed_side_search_time",
"probed_side_compare_time",
"build_output_time",
"fallback_sort_merge_join_time",
"input_batch_count",
"input_batch_mem_size",
"input_row_count"))
.toSeq: _*)

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ abstract class NativeExpandBase(

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ abstract class NativeFilterBase(condition: Expression, override val child: Spark

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ abstract class NativeGenerateBase(

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ abstract class NativeGlobalLimitBase(limit: Int, offset: Int, override val child

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set("stage_id", "output_rows"))
.getDefaultNativeMetrics(sparkContext, Set("stage_id", "output_rows"))
.toSeq: _*)

override def output: Seq[Attribute] = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ abstract class NativeLocalLimitBase(limit: Int, override val child: SparkPlan)

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set("stage_id", "output_rows"))
.getDefaultNativeMetrics(sparkContext, Set("stage_id", "output_rows"))
.toSeq: _*)

override def output: Seq[Attribute] = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ abstract class NativeParquetInsertIntoHiveTableBase(
BasicWriteJobStatsTracker.metrics ++
Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
.getDefaultNativeMetrics(sparkContext, Set("stage_id", "output_rows", "elapsed_compute"))
.toSeq
:+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time"))
:+ ("bytes_written",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ abstract class NativeProjectBase(projectList: Seq[NamedExpression], override val

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,21 @@ abstract class NativeShuffledHashJoinBase(

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set(
"stage_id",
"output_rows",
"elapsed_compute",
"build_hash_map_time",
"probed_side_hash_time",
"probed_side_search_time",
"probed_side_compare_time",
"build_output_time",
"fallback_sort_merge_join_time",
"input_batch_count",
"input_batch_mem_size",
"input_row_count"))
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
"elapsed_compute",
"build_hash_map_time",
"probed_side_hash_time",
"probed_side_search_time",
"probed_side_compare_time",
"build_output_time",
"fallback_sort_merge_join_time",
"input_batch_count",
"input_batch_mem_size",
"input_row_count"))
.toSeq: _*)

private def nativeSchema = Util.getNativeSchema(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,20 @@ abstract class NativeSortBase(

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set(
"stage_id",
"output_rows",
"elapsed_compute",
"mem_spill_count",
"mem_spill_size",
"mem_spill_iotime",
"disk_spill_size",
"disk_spill_iotime",
"input_batch_count",
"input_batch_mem_size",
"input_row_count"))
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
"elapsed_compute",
"mem_spill_count",
"mem_spill_size",
"mem_spill_iotime",
"disk_spill_size",
"disk_spill_iotime",
"input_batch_count",
"input_batch_mem_size",
"input_row_count"))
.toSeq: _*)

override def output: Seq[Attribute] = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ abstract class NativeSortMergeJoinBase(

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(
.getDefaultNativeMetrics(
sparkContext,
Set(
"stage_id",
"output_rows",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ abstract class NativeTakeOrderedBase(

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
.getDefaultNativeMetrics(sparkContext, Set("stage_id", "output_rows", "elapsed_compute"))
.toSeq: _*)

override def output: Seq[Attribute] = child.output
Expand Down
Loading
Loading