Skip to content

Commit

Permalink
[FRAGENT-2816][Spark] Adds additional executor-level memory metrics (#…
Browse files Browse the repository at this point in the history
…17168)

* Add nested Spark executor metrics

* Update metadata.csv

* Update tests with Spark3.0+

* Lint + changelog

* Update test to support both 2.4 and 3.0

* Add core e2e additional metrics

* Add flaky e2e metrics

* Updates metadata csv - alphabetical order

* Definitely makes sense. Currently, Spark only has 1 level of nested dictionary in all the metrics. This might need to be revisited if Spark ever moves to an additional level

Co-authored-by: Ilia Kurenkov <ilia.kurenkov@datadoghq.com>

* Improves readability

* Improves changelog

* Drop bullet-list : all added metrics are listed in Spark doc

---------

Co-authored-by: Ilia Kurenkov <ilia.kurenkov@datadoghq.com>
  • Loading branch information
tbavelier and iliakur authored Mar 18, 2024
1 parent 6d52875 commit e0d72de
Show file tree
Hide file tree
Showing 7 changed files with 383 additions and 7 deletions.
1 change: 1 addition & 0 deletions spark/changelog.d/17168.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Adds additional Executor-level memory metrics (https://spark.apache.org/docs/latest/monitoring.html#executor-metrics)
27 changes: 27 additions & 0 deletions spark/datadog_checks/spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,33 @@
'totalShuffleRead': ('spark.{}.total_shuffle_read', COUNT),
'totalShuffleWrite': ('spark.{}.total_shuffle_write', COUNT),
'maxMemory': ('spark.{}.max_memory', COUNT),
# memoryMetrics is a dictionary of metrics
'memoryMetrics.usedOnHeapStorageMemory': ('spark.{}.mem.used_on_heap_storage', COUNT),
'memoryMetrics.usedOffHeapStorageMemory': ('spark.{}.mem.used_off_heap_storage', COUNT),
'memoryMetrics.totalOnHeapStorageMemory': ('spark.{}.mem.total_on_heap_storage', COUNT),
'memoryMetrics.totalOffHeapStorageMemory': ('spark.{}.mem.total_off_heap_storage', COUNT),
# peakMemoryMetrics is a dictionary of metrics (available only in Spark 3.0+: https://issues.apache.org/jira/browse/SPARK-23429)
'peakMemoryMetrics.JVMHeapMemory': ('spark.{}.peak_mem.jvm_heap_memory', COUNT),
'peakMemoryMetrics.JVMOffHeapMemory': ('spark.{}.peak_mem.jvm_off_heap_memory', COUNT),
'peakMemoryMetrics.OnHeapExecutionMemory': ('spark.{}.peak_mem.on_heap_execution', COUNT),
'peakMemoryMetrics.OffHeapExecutionMemory': ('spark.{}.peak_mem.off_heap_execution', COUNT),
'peakMemoryMetrics.OnHeapStorageMemory': ('spark.{}.peak_mem.on_heap_storage', COUNT),
'peakMemoryMetrics.OffHeapStorageMemory': ('spark.{}.peak_mem.off_heap_storage', COUNT),
'peakMemoryMetrics.OnHeapUnifiedMemory': ('spark.{}.peak_mem.on_heap_unified', COUNT),
'peakMemoryMetrics.OffHeapUnifiedMemory': ('spark.{}.peak_mem.off_heap_unified', COUNT),
'peakMemoryMetrics.DirectPoolMemory': ('spark.{}.peak_mem.direct_pool', COUNT),
'peakMemoryMetrics.MappedPoolMemory': ('spark.{}.peak_mem.mapped_pool', COUNT),
'peakMemoryMetrics.MinorGCCount': ('spark.{}.peak_mem.minor_gc_count', COUNT),
'peakMemoryMetrics.MinorGCTime': ('spark.{}.peak_mem.minor_gc_time', COUNT),
'peakMemoryMetrics.MajorGCCount': ('spark.{}.peak_mem.major_gc_count', COUNT),
'peakMemoryMetrics.MajorGCTime': ('spark.{}.peak_mem.major_gc_time', COUNT),
# Within peakMemoryMetrics, enabled if spark.executor.processTreeMetrics.enabled is true.
'peakMemoryMetrics.ProcessTreeJVMVMemory': ('spark.{}.peak_mem.process_tree_jvm', COUNT),
'peakMemoryMetrics.ProcessTreeJVMRSSMemory': ('spark.{}.peak_mem.process_tree_jvm_rss', COUNT),
'peakMemoryMetrics.ProcessTreePythonVMemory': ('spark.{}.peak_mem.process_tree_python', COUNT),
'peakMemoryMetrics.ProcessTreePythonRSSMemory': ('spark.{}.peak_mem.process_tree_python_rss', COUNT),
'peakMemoryMetrics.ProcessTreeOtherVMemory': ('spark.{}.peak_mem.process_tree_other', COUNT),
'peakMemoryMetrics.ProcessTreeOtherRSSMemory': ('spark.{}.peak_mem.process_tree_other_rss', COUNT),
}

SPARK_DRIVER_METRICS = {
Expand Down
10 changes: 9 additions & 1 deletion spark/datadog_checks/spark/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,15 @@ def _set_metrics_from_json(self, tags, metrics_json, metrics):
return

for status, (metric_name, metric_type) in iteritems(metrics):
metric_status = metrics_json.get(status)
# Metrics defined with a dot `.` are exposed in a nested dictionary.
# {"foo": {"bar": "baz", "qux": "quux"}}
# foo.bar -> baz
# foo.qux -> quux
if '.' in status:
parent_key, child_key = status.split('.')
metric_status = metrics_json.get(parent_key, {}).get(child_key)
else:
metric_status = metrics_json.get(status)

if metric_status is not None:
self._set_metric(metric_name, metric_type, metric_status, tags)
Expand Down
76 changes: 74 additions & 2 deletions spark/metadata.csv
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,38 @@ spark.driver.completed_tasks,count,,task,,Number of completed tasks in the drive
spark.driver.disk_used,count,,byte,,Amount of disk used in the driver,0,spark,dvr dsk usd,
spark.driver.failed_tasks,count,,task,,Number of failed tasks in the driver,0,spark,dvr fld tsk,
spark.driver.max_memory,count,,byte,,Maximum memory used in the driver,0,spark,dvr max mem,
spark.driver.mem.total_off_heap_storage,count,,byte,,Total available off heap memory for storage,0,spark,dvr total off heap str mem,
spark.driver.mem.total_on_heap_storage,count,,byte,,Total available on heap memory for storage,0,spark,dvr total on heap str mem,
spark.driver.mem.used_off_heap_storage,count,,byte,,Used off heap memory currently for storage,0,spark,dvr used off heap str mem,
spark.driver.mem.used_on_heap_storage,count,,byte,,Used on heap memory currently for storage,0,spark,dvr used on heap str mem,
spark.driver.memory_used,count,,byte,,Amount of memory used in the driver,0,spark,dvr mem usd,
spark.driver.peak_mem.direct_pool,count,,byte,,Peak memory that the JVM is using for direct buffer pool,0,spark,dvr peak mem dir pool,
spark.driver.peak_mem.jvm_heap_memory,count,,byte,,Peak memory usage of the heap that is used for object allocation,0,spark,dvr peak mem jvm heap,
spark.driver.peak_mem.jvm_off_heap_memory,count,,byte,,Peak memory usage of non-heap memory that is used by the Java virtual machine,0,spark,dvr peak mem jvm off heap,
spark.driver.peak_mem.major_gc_count,count,,byte,,Total major GC count,0,spark,dvr peak mem maj gc c,
spark.driver.peak_mem.major_gc_time,count,,millisecond,,Elapsed total major GC time,0,spark,dvr peak mem maj gc t,
spark.driver.peak_mem.mapped_pool,count,,byte,,Peak memory that the JVM is using for mapped buffer pool,0,spark,dvr peak mem map pool,
spark.driver.peak_mem.minor_gc_count,count,,byte,,Total minor GC count,0,spark,dvr peak mem min gc c,
spark.driver.peak_mem.minor_gc_time,count,,millisecond,,Elapsed total minor GC time,0,spark,dvr peak mem min gc t,
spark.driver.peak_mem.off_heap_execution,count,,byte,,Peak off heap execution memory in use,0,spark,dvr peak mem off heap exct,
spark.driver.peak_mem.off_heap_storage,count,,byte,,Peak off heap storage memory in use,0,spark,dvr peak mem off heap str,
spark.driver.peak_mem.off_heap_unified,count,,byte,,Peak off heap memory (execution and storage),0,spark,dvr peak mem off heap unf,
spark.driver.peak_mem.on_heap_execution,count,,byte,,Peak on heap execution memory in use,0,spark,dvr peak mem on heap exct,
spark.driver.peak_mem.on_heap_storage,count,,byte,,Peak on heap storage memory in use,0,spark,dvr peak mem on heap str,
spark.driver.peak_mem.on_heap_unified,count,,byte,,Peak on heap memory (execution and storage),0,spark,dvr peak mem on heap unf,
spark.driver.peak_mem.process_tree_jvm_rss,count,,byte,,Resident Set Size: number of pages the process has in real memory,0,spark,dvr peak mem tree jvm rss,
spark.driver.peak_mem.process_tree_jvm,count,,byte,,Virtual memory size,0,spark,dvr peak mem tree jvm,
spark.driver.peak_mem.process_tree_other_rss,count,,byte,,Resident Set Size for other kind of process,0,spark,dvr peak mem tree oth rss,
spark.driver.peak_mem.process_tree_other,count,,byte,,Virtual memory size for other kind of process,0,spark,dvr peak mem tree oth,
spark.driver.peak_mem.process_tree_python_rss,count,,byte,,Resident Set Size for Python,0,spark,dvr peak mem tree py rss,
spark.driver.peak_mem.process_tree_python,count,,byte,,Virtual memory size for Python,0,spark,dvr peak mem tree py,
spark.driver.rdd_blocks,count,,block,,Number of RDD blocks in the driver,0,spark,dvr rdd blk,
spark.driver.total_duration,count,,millisecond,,Time spent in the driver,0,spark,dvr tot dur,
spark.driver.total_input_bytes,count,,byte,,Number of input bytes in the driver,0,spark,dvr tot in byt,
spark.driver.total_shuffle_read,count,,byte,,Number of bytes read during a shuffle in the driver,0,spark,dvr tot shfl rd,
spark.driver.total_shuffle_write,count,,byte,,Number of shuffled bytes in the driver,0,spark,dvr tot shfl wrt,
spark.driver.total_tasks,count,,task,,Number of total tasks in the driver,0,spark,dvr tot task,
spark.executor_memory,count,,byte,,Maximum memory available for caching RDD blocks in the application's executors,0,spark,exe max mem,
spark.executor.active_tasks,count,,task,,Number of active tasks in the application's executors,0,spark,exe act tsk,
spark.executor.completed_tasks,count,,task,,Number of completed tasks in the application's executors,0,spark,exe comp tsk,
spark.executor.count,count,,task,,Number of executors,0,spark,exe count,
Expand All @@ -21,22 +46,69 @@ spark.executor.id.completed_tasks,count,,task,,Number of completed tasks in this
spark.executor.id.disk_used,count,,byte,,Amount of disk space used by persisted RDDs in this executor,0,spark,exe id dsk usd,
spark.executor.id.failed_tasks,count,,task,,Number of failed tasks in this executor,0,spark,exe id fld tsk,
spark.executor.id.max_memory,count,,byte,,Total amount of memory available for storage for this executor,0,spark,exe id mem max,
spark.executor.id.mem.total_off_heap_storage,count,,byte,,Total available off heap memory for storage,0,spark,exe id total off heap str mem,
spark.executor.id.mem.total_on_heap_storage,count,,byte,,Total available on heap memory for storage,0,spark,exe id total on heap str mem,
spark.executor.id.mem.used_off_heap_storage,count,,byte,,Used off heap memory currently for storage,0,spark,exe id used off heap str mem,
spark.executor.id.mem.used_on_heap_storage,count,,byte,,Used on heap memory currently for storage,0,spark,exe id used on heap str mem,
spark.executor.id.memory_used,count,,byte,,Amount of memory used for cached RDDs in this executor.,0,spark,exe id mem usd,
spark.executor.id.peak_mem.direct_pool,count,,byte,,Peak memory that the JVM is using for direct buffer pool,0,spark,exe id peak mem dir pool,
spark.executor.id.peak_mem.jvm_heap_memory,count,,byte,,Peak memory usage of the heap that is used for object allocation,0,spark,exe id peak mem jvm heap,
spark.executor.id.peak_mem.jvm_off_heap_memory,count,,byte,,Peak memory usage of non-heap memory that is used by the Java virtual machine,0,spark,exe id peak mem jvm off heap,
spark.executor.id.peak_mem.major_gc_count,count,,byte,,Total major GC count,0,spark,exe id peak mem maj gc c,
spark.executor.id.peak_mem.major_gc_time,count,,millisecond,,Elapsed total major GC time,0,spark,exe id peak mem maj gc t,
spark.executor.id.peak_mem.mapped_pool,count,,byte,,Peak memory that the JVM is using for mapped buffer pool,0,spark,exe id peak mem map pool,
spark.executor.id.peak_mem.minor_gc_count,count,,byte,,Total minor GC count,0,spark,exe id peak mem min gc c,
spark.executor.id.peak_mem.minor_gc_time,count,,millisecond,,Elapsed total minor GC time,0,spark,exe id peak mem min gc t,
spark.executor.id.peak_mem.off_heap_execution,count,,byte,,Peak off heap execution memory in use,0,spark,exe id peak mem off heap exct,
spark.executor.id.peak_mem.off_heap_storage,count,,byte,,Peak off heap storage memory in use,0,spark,exe id peak mem off heap str,
spark.executor.id.peak_mem.off_heap_unified,count,,byte,,Peak off heap memory (execution and storage),0,spark,exe id peak mem off heap unf,
spark.executor.id.peak_mem.on_heap_execution,count,,byte,,Peak on heap execution memory in use,0,spark,exe id peak mem on heap exct,
spark.executor.id.peak_mem.on_heap_storage,count,,byte,,Peak on heap storage memory in use,0,spark,exe id peak mem on heap str,
spark.executor.id.peak_mem.on_heap_unified,count,,byte,,Peak on heap memory (execution and storage),0,spark,exe id peak mem on heap unf,
spark.executor.id.peak_mem.process_tree_jvm_rss,count,,byte,,Resident Set Size: number of pages the process has in real memory,0,spark,exe id peak mem tree jvm rss,
spark.executor.id.peak_mem.process_tree_jvm,count,,byte,,Virtual memory size,0,spark,exe id peak mem tree jvm,
spark.executor.id.peak_mem.process_tree_other_rss,count,,byte,,Resident Set Size for other kind of process,0,spark,exe id peak mem tree oth rss,
spark.executor.id.peak_mem.process_tree_other,count,,byte,,Virtual memory size for other kind of process,0,spark,exe id peak mem tree oth,
spark.executor.id.peak_mem.process_tree_python_rss,count,,byte,,Resident Set Size for Python,0,spark,exe id peak mem tree py rss,
spark.executor.id.peak_mem.process_tree_python,count,,byte,,Virtual memory size for Python,0,spark,exe id peak mem tree py,
spark.executor.id.rdd_blocks,count,,block,,Number of persisted RDD blocks in this executor,0,spark,exe rdd blk,
spark.executor.id.total_duration,count,,millisecond,,Time spent by the executor executing tasks,0,spark,exe id tot dur,
spark.executor.id.total_input_bytes,count,,byte,,Total number of input bytes in the executor,0,spark,exe id in byt,
spark.executor.id.total_shuffle_read,count,,byte,,Total number of bytes read during a shuffle in the executor,0,spark,exe id tot shfl rd,
spark.executor.id.total_shuffle_write,count,,byte,,Total number of shuffled bytes in the executor,0,spark,exe id tot shfl wrt,
spark.executor.id.total_tasks,count,,task,,Total number of tasks in this executor,0,spark,exe id tot tsk,
spark.executor.max_memory,count,,byte,,Max memory across all executors working for a particular application,0,spark,exe mem max,
spark.executor.mem.total_off_heap_storage,count,,byte,,Total available off heap memory for storage,0,spark,exe total off heap str mem,
spark.executor.mem.total_on_heap_storage,count,,byte,,Total available on heap memory for storage,0,spark,exe total on heap str mem,
spark.executor.mem.used_off_heap_storage,count,,byte,,Used off heap memory currently for storage,0,spark,exe used off heap str mem,
spark.executor.mem.used_on_heap_storage,count,,byte,,Used on heap memory currently for storage,0,spark,exe used on heap str mem,
spark.executor.memory_used,count,,byte,,Amount of memory used for cached RDDs in the application's executors,0,spark,exe mem usd,
spark.executor.peak_mem.direct_pool,count,,byte,,Peak memory that the JVM is using for direct buffer pool,0,spark,exe peak mem dir pool,
spark.executor.peak_mem.jvm_heap_memory,count,,byte,,Peak memory usage of the heap that is used for object allocation,0,spark,exe peak mem jvm heap,
spark.executor.peak_mem.jvm_off_heap_memory,count,,byte,,Peak memory usage of non-heap memory that is used by the Java virtual machine,0,spark,exe peak mem jvm off heap,
spark.executor.peak_mem.major_gc_count,count,,byte,,Total major GC count,0,spark,exe peak mem maj gc c,
spark.executor.peak_mem.major_gc_time,count,,millisecond,,Elapsed total major GC time,0,spark,exe peak mem maj gc t,
spark.executor.peak_mem.mapped_pool,count,,byte,,Peak memory that the JVM is using for mapped buffer pool,0,spark,exe peak mem map pool,
spark.executor.peak_mem.minor_gc_count,count,,byte,,Total minor GC count,0,spark,exe peak mem min gc c,
spark.executor.peak_mem.minor_gc_time,count,,millisecond,,Elapsed total minor GC time,0,spark,exe peak mem min gc t,
spark.executor.peak_mem.off_heap_execution,count,,byte,,Peak off heap execution memory in use,0,spark,exe peak mem off heap exct,
spark.executor.peak_mem.off_heap_storage,count,,byte,,Peak off heap storage memory in use,0,spark,exe peak mem off heap str,
spark.executor.peak_mem.off_heap_unified,count,,byte,,Peak off heap memory (execution and storage),0,spark,exe peak mem off heap unf,
spark.executor.peak_mem.on_heap_execution,count,,byte,,Peak on heap execution memory in use,0,spark,exe peak mem on heap exct,
spark.executor.peak_mem.on_heap_storage,count,,byte,,Peak on heap storage memory in use,0,spark,exe peak mem on heap str,
spark.executor.peak_mem.on_heap_unified,count,,byte,,Peak on heap memory (execution and storage),0,spark,exe peak mem on heap unf,
spark.executor.peak_mem.process_tree_jvm_rss,count,,byte,,Resident Set Size: number of pages the process has in real memory,0,spark,exe peak mem tree jvm rss,
spark.executor.peak_mem.process_tree_jvm,count,,byte,,Virtual memory size,0,spark,exe peak mem tree jvm,
spark.executor.peak_mem.process_tree_other_rss,count,,byte,,Resident Set Size for other kind of process,0,spark,exe peak mem tree oth rss,
spark.executor.peak_mem.process_tree_other,count,,byte,,Virtual memory size for other kind of process,0,spark,exe peak mem tree oth,
spark.executor.peak_mem.process_tree_python_rss,count,,byte,,Resident Set Size for Python,0,spark,exe peak mem tree py rss,
spark.executor.peak_mem.process_tree_python,count,,byte,,Virtual memory size for Python,0,spark,exe peak mem tree py,
spark.executor.rdd_blocks,count,,block,,Number of persisted RDD blocks in the application's executors,0,spark,exe rdd blk,
spark.executor.total_duration,count,,millisecond,,Time spent by the application's executors executing tasks,0,spark,exe tot dur,
spark.executor.total_input_bytes,count,,byte,,Total number of input bytes in the application's executors,0,spark,exe in byt,
spark.executor.total_shuffle_read,count,,byte,,Total number of bytes read during a shuffle in the application's executors,0,spark,exe tot shfl rd,
spark.executor.total_shuffle_write,count,,byte,,Total number of shuffled bytes in the application's executors,0,spark,exe tot shfl wrt,
spark.executor.total_tasks,count,,task,,Total number of tasks in the application's executors,0,spark,exe tot tsk,
spark.executor_memory,count,,byte,,Maximum memory available for caching RDD blocks in the application's executors,0,spark,exe max mem,
spark.job.count,count,,task,,Number of jobs,0,spark,jb count,
spark.job.num_active_stages,count,,stage,,Number of active stages in the application,0,spark,jb act stg,
spark.job.num_active_tasks,count,,task,,Number of active tasks in the application,0,spark,jb nm act tsk,
Expand Down Expand Up @@ -75,7 +147,7 @@ spark.streaming.statistics.batch_duration,gauge,,millisecond,,Application's stre
spark.streaming.statistics.num_active_batches,gauge,,job,,Number of active streaming batches,0,spark,num active batches,
spark.streaming.statistics.num_active_receivers,gauge,,object,,Number of active streaming receivers,0,spark,num active receivers,
spark.streaming.statistics.num_inactive_receivers,gauge,,object,,Number of inactive streaming receivers,0,spark,num inactive receivers,
spark.streaming.statistics.num_processed_records,count,,record,,Number of processed streaming records ,0,spark,num processed records,
spark.streaming.statistics.num_processed_records,count,,record,,Number of processed streaming records,0,spark,num processed records,
spark.streaming.statistics.num_received_records,count,,record,,Number of received streaming records,0,spark,num received records,
spark.streaming.statistics.num_receivers,gauge,,object,,Number of streaming application's receivers,0,spark,num receivers,
spark.streaming.statistics.num_retained_completed_batches,count,,job,,Number of retained completed application's streaming batches,0,spark,num retained completed batches,
Expand Down
Loading

0 comments on commit e0d72de

Please sign in to comment.