-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #20940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…xecutors REST API Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction. Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics for the driver. Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the TaskMetrics, to minimize additional logging. Modify the AppStatusListener to record the peak values for each memory metric. Add the new memory metrics to the executors REST API.
Yes, thanks.
From: UCB AMPLab <notifications@github.com>
Reply-To: apache/spark <reply@reply.github.com>
Date: Thursday, March 29, 2018 at 3:13 PM
To: apache/spark <spark@noreply.github.com>
Cc: Edwina Lu <edlu@linkedin.com>, Author <author@noreply.github.com>
Subject: Re: [apache/spark] [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API (#20940)
Can one of the admins verify this patch?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub<#20940 (comment)>, or mute the thread<https://github.com/notifications/unsubscribe-auth/AY8QS5OU-g0uI21mLBhqIV8fkqk4R-5Lks5tjVyFgaJpZM4TBDld>.
|
ok to test |
Test build #88725 has finished for PR 20940 at commit
|
…ons. ## What changes were proposed in this pull request? Set default Spark session in the TestSparkSession and TestHiveSparkSession constructors. ## How was this patch tested? new unit tests Author: Jose Torres <torres.joseph.f+github@gmail.com> Closes apache#20926 from jose-torres/test3.
… to starting with 'org.slf4j' ## What changes were proposed in this pull request? isSharedClass returns if some classes can/should be shared or not. It checks if the classes names have some keywords or start with some names. Following the logic, it can occur unintended behaviors when a custom package has `slf4j` inside the package or class name. As I guess, the first intention seems to figure out the class containing `org.slf4j`. It would be better to change the comparison logic to `name.startsWith("org.slf4j")` ## How was this patch tested? This patch should pass all of the current tests and keep all of the current behaviors. In my case, I'm using ProtobufDeserializer to get a table schema from hive tables. Thus some Protobuf packages and names have `slf4j` inside. Without this patch, it cannot be resolved because of ClassCastException from different classloaders. Author: Jongyoul Lee <jongyoul@gmail.com> Closes apache#20860 from jongyoul/SPARK-23743.
…arquet ## What changes were proposed in this pull request? This PR supports for pushing down filters for DateType in parquet ## How was this patch tested? Added UT and tested in local. Author: yucai <yyu1@ebay.com> Closes apache#20851 from yucai/SPARK-23727.
## What changes were proposed in this pull request? Roll forward c68ec4e (apache#20688). There are two minor test changes required: * An error which used to be TreeNodeException[ArithmeticException] is no longer wrapped and is now just ArithmeticException. * The test framework simply does not set the active Spark session. (Or rather, it doesn't do so early enough - I think it only happens when a query is analyzed.) I've added the required logic to SQLTestUtils. ## How was this patch tested? existing tests Author: Jose Torres <torres.joseph.f+github@gmail.com> Author: jerryshao <sshao@hortonworks.com> Closes apache#20922 from jose-torres/ratefix.
… apply to entire plan ## What changes were proposed in this pull request? This PR is to improve the test coverage of the original PR apache#20687 ## How was this patch tested? N/A Author: gatorsmile <gatorsmile@gmail.com> Closes apache#20911 from gatorsmile/addTests.
## What changes were proposed in this pull request? It may be get `spark.shuffle.service.port` from https://github.com/apache/spark/blob/9745ec3a61c99be59ef6a9d5eebd445e8af65b7a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L459 Therefore, the client configuration `spark.shuffle.service.port` does not working unless the configuration is `spark.hadoop.spark.shuffle.service.port`. - This configuration is not working: ``` bin/spark-sql --master yarn --conf spark.shuffle.service.port=7338 ``` - This configuration works: ``` bin/spark-sql --master yarn --conf spark.hadoop.spark.shuffle.service.port=7338 ``` This PR fix this issue. ## How was this patch tested? It's difficult to carry out unit testing. But I've tested it manually. Author: Yuming Wang <yumwang@ebay.com> Closes apache#20785 from wangyum/SPARK-23640.
…partitioned into specific number of partitions ## What changes were proposed in this pull request? Currently, the requiredChildDistribution does not specify the partitions. This can cause the weird corner cases where the child's distribution is `SinglePartition` which satisfies the required distribution of `ClusterDistribution(no-num-partition-requirement)`, thus eliminating the shuffle needed to repartition input data into the required number of partitions (i.e. same as state stores). That can lead to "file not found" errors on the state store delta files as the micro-batch-with-no-shuffle will not run certain tasks and therefore not generate the expected state store delta files. This PR adds the required constraint on the number of partitions. ## How was this patch tested? Modified test harness to always check that ANY stateful operator should have a constraint on the number of partitions. As part of that, the existing opt-in checks on child output partitioning were removed, as they are redundant. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#20941 from tdas/SPARK-23827.
## What changes were proposed in this pull request? Address apache#20449 (comment), If `resultIter` is already a `InterruptibleIterator`, don't double wrap it. ## How was this patch tested? Existing tests. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes apache#20920 from jiangxb1987/SPARK-23040.
Test build #88800 has finished for PR 20940 at commit
|
…torizerModel ## What changes were proposed in this pull request? Adding test for default params for `CountVectorizerModel` constructed from vocabulary. This required that the param `maxDF` be added, which was done in SPARK-23615. ## How was this patch tested? Added an explicit test for CountVectorizerModel in DefaultValuesTests. Author: Bryan Cutler <cutlerb@gmail.com> Closes apache#20942 from BryanCutler/pyspark-CountVectorizerModel-default-param-test-SPARK-15009.
## What changes were proposed in this pull request? Kubernetes driver and executor pods should request `memory + memoryOverhead` as their resources instead of just `memory`, see https://issues.apache.org/jira/browse/SPARK-23825 ## How was this patch tested? Existing unit tests were adapted. Author: David Vogelbacher <dvogelbacher@palantir.com> Closes apache#20943 from dvogelbacher/spark-23825.
…utor cores ## What changes were proposed in this pull request? As mentioned in SPARK-23285, this PR introduces a new configuration property `spark.kubernetes.executor.cores` for specifying the physical CPU cores requested for each executor pod. This is to avoid changing the semantics of `spark.executor.cores` and `spark.task.cpus` and their role in task scheduling, task parallelism, dynamic resource allocation, etc. The new configuration property only determines the physical CPU cores available to an executor. An executor can still run multiple tasks simultaneously by using appropriate values for `spark.executor.cores` and `spark.task.cpus`. ## How was this patch tested? Unit tests. felixcheung srowen jiangxb1987 jerryshao mccheah foxish Author: Yinan Li <ynli@google.com> Author: Yinan Li <liyinan926@gmail.com> Closes apache#20553 from liyinan926/master.
## What changes were proposed in this pull request? This PR implemented the following cleanups related to `UnsafeWriter` class: - Remove code duplication between `UnsafeRowWriter` and `UnsafeArrayWriter` - Make `BufferHolder` class internal by delegating its accessor methods to `UnsafeWriter` - Replace `UnsafeRow.setTotalSize(...)` with `UnsafeRowWriter.setTotalSize()` ## How was this patch tested? Tested by existing UTs Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes apache#20850 from kiszk/SPARK-23713.
…Server test. It was possible that the disconnect() was called on the handle before the server had received the handshake messages, so no connection was yet attached to the handle. The fix waits until we're sure the handle has been mapped to a client connection. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#20950 from vanzin/SPARK-23834.
## What changes were proposed in this pull request? Introduce `handleInvalid` parameter in `VectorAssembler` that can take in `"keep", "skip", "error"` options. "error" throws an error on seeing a row containing a `null`, "skip" filters out all such rows, and "keep" adds relevant number of NaN. "keep" figures out an example to find out what this number of NaN s should be added and throws an error when no such number could be found. ## How was this patch tested? Unit tests are added to check the behavior of `assemble` on specific rows and the transformer is called on `DataFrame`s of different configurations to test different corner cases. Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Author: Bago Amirbekian <bago@databricks.com> Author: Yogesh Garg <1059168+yogeshg@users.noreply.github.com> Closes apache#20829 from yogeshg/rformula_handleinvalid.
These tests can fail with a timeout if the remote repos are not responding, or slow. The tests don't need anything from those repos, so use an empty ivy config file to avoid setting up the defaults. The tests are passing reliably for me locally now, and failing more often than not today without this change since http://dl.bintray.com/spark-packages/maven doesn't seem to be loading from my machine. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#20916 from vanzin/SPARK-19964.
## What changes were proposed in this pull request? Easy fix in the markdown. ## How was this patch tested? jekyII build test manually. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: lemonjing <932191671@qq.com> Closes apache#20897 from Lemonjing/master.
## What changes were proposed in this pull request? Address apache#20924 (comment), show block manager id when remove RDD/Broadcast fails. ## How was this patch tested? N/A Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes apache#20960 from jiangxb1987/bmid.
## What changes were proposed in this pull request? Migrate foreach sink to DataSourceV2. Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works. ## How was this patch tested? existing tests Author: Jose Torres <torres.joseph.f+github@gmail.com> Closes apache#20951 from jose-torres/foreach.
## What changes were proposed in this pull request? Add interpreted execution for `MapObjects` expression. ## How was this patch tested? Added unit test. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#20771 from viirya/SPARK-23587.
## What changes were proposed in this pull request? Currently, the active spark session is set inconsistently (e.g., in createDataFrame, prior to query execution). Many places in spark also incorrectly query active session when they should be calling activeSession.getOrElse(defaultSession) and so might get None even if a Spark session exists. The semantics here can be cleaned up if we also set the active session when the default session is set. Related: https://github.com/apache/spark/pull/20926/files ## How was this patch tested? Unit test, existing test. Note that if apache#20926 merges first we should also update the tests there. Author: Eric Liang <ekl@databricks.com> Closes apache#20927 from ericl/active-session-cleanup.
…esolved state ## What changes were proposed in this pull request? Add cast to nulls introduced by PropagateEmptyRelation so in cases they're part of coalesce they will not break its type checking rules ## How was this patch tested? Added unit test Author: Robert Kruszewski <robertk@palantir.com> Closes apache#20914 from robert3005/rk/propagate-empty-fix.
## What changes were proposed in this pull request? In TestHive, the base spark session does this in getOrCreate(), we emulate that behavior for tests. ## How was this patch tested? N/A Author: gatorsmile <gatorsmile@gmail.com> Closes apache#20969 from gatorsmile/setDefault.
## What changes were proposed in this pull request? This pr added a new optimizer rule `UpdateNullabilityInAttributeReferences ` to update the nullability that `Filter` changes when having `IsNotNull`. In the master, optimized plans do not respect the nullability when `Filter` has `IsNotNull`. This wrongly generates unnecessary code. For example: ``` scala> val df = Seq((Some(1), Some(2))).toDF("a", "b") scala> val bIsNotNull = df.where($"b" =!= 2).select($"b") scala> val targetQuery = bIsNotNull.distinct scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable res5: Boolean = true scala> targetQuery.debugCodegen Found 2 WholeStageCodegen subtrees. == Subtree 1 / 2 == *HashAggregate(keys=[b#19], functions=[], output=[b#19]) +- Exchange hashpartitioning(b#19, 200) +- *HashAggregate(keys=[b#19], functions=[], output=[b#19]) +- *Project [_2#16 AS b#19] +- *Filter isnotnull(_2#16) +- LocalTableScan [_1#15, _2#16] Generated code: ... /* 124 */ protected void processNext() throws java.io.IOException { ... /* 132 */ // output the result /* 133 */ /* 134 */ while (agg_mapIter.next()) { /* 135 */ wholestagecodegen_numOutputRows.add(1); /* 136 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 137 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 138 */ /* 139 */ boolean agg_isNull4 = agg_aggKey.isNullAt(0); /* 140 */ int agg_value4 = agg_isNull4 ? -1 : (agg_aggKey.getInt(0)); /* 141 */ agg_rowWriter1.zeroOutNullBytes(); /* 142 */ // We don't need this NULL check because NULL is filtered out in `$"b" =!=2` /* 143 */ if (agg_isNull4) { /* 144 */ agg_rowWriter1.setNullAt(0); /* 145 */ } else { /* 146 */ agg_rowWriter1.write(0, agg_value4); /* 147 */ } /* 148 */ append(agg_result1); /* 149 */ /* 150 */ if (shouldStop()) return; /* 151 */ } /* 152 */ /* 153 */ agg_mapIter.close(); /* 154 */ if (agg_sorter == null) { /* 155 */ agg_hashMap.free(); /* 156 */ } /* 157 */ } /* 158 */ /* 159 */ } ``` In the line 143, we don't need this NULL check because NULL is filtered out in `$"b" =!=2`. This pr could remove this NULL check; ``` scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable res5: Boolean = false scala> targetQuery.debugCodegen ... Generated code: ... /* 144 */ protected void processNext() throws java.io.IOException { ... /* 152 */ // output the result /* 153 */ /* 154 */ while (agg_mapIter.next()) { /* 155 */ wholestagecodegen_numOutputRows.add(1); /* 156 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 157 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 158 */ /* 159 */ int agg_value4 = agg_aggKey.getInt(0); /* 160 */ agg_rowWriter1.write(0, agg_value4); /* 161 */ append(agg_result1); /* 162 */ /* 163 */ if (shouldStop()) return; /* 164 */ } /* 165 */ /* 166 */ agg_mapIter.close(); /* 167 */ if (agg_sorter == null) { /* 168 */ agg_hashMap.free(); /* 169 */ } /* 170 */ } ``` ## How was this patch tested? Added `UpdateNullabilityInAttributeReferencesSuite` for unit tests. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes apache#18576 from maropu/SPARK-21351.
Jenkins, retest this please |
@edwinalu sorry it looks like there are conflicts, we would need to rebase |
Test build #89697 has finished for PR 20940 at commit
|
…ndling of Project and Filter over partitioned relation ## What changes were proposed in this pull request? A followup of apache#20988 `PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation. ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21111 from cloud-fan/refactor.
…'s children ## What changes were proposed in this pull request? The existing query constraints framework has 2 steps: 1. propagate constraints bottom up. 2. use constraints to infer additional filters for better data pruning. For step 2, it mostly helps with Join, because we can connect the constraints from children to the join condition and infer powerful filters to prune the data of the join sides. e.g., the left side has constraints `a = 1`, the join condition is `left.a = right.a`, then we can infer `right.a = 1` to the right side and prune the right side a lot. However, the current logic of inferring filters from constraints for Join is pretty weak. It infers the filters from Join's constraints. Some joins like left semi/anti exclude output from right side and the right side constraints will be lost here. This PR propose to check the left and right constraints individually, expand the constraints with join condition and add filters to children of join directly, instead of adding to the join condition. This reverts apache#20670 , covers apache#20717 and apache#20816 This is inspired by the original PRs and the tests are all from these PRs. Thanks to the authors mgaido91 maryannxue KaiXinXiaoLei ! ## How was this patch tested? new tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21083 from cloud-fan/join.
…xecution ## What changes were proposed in this pull request? This pr supported interpreted mode for `ExternalMapToCatalyst`. ## How was this patch tested? Added tests in `ObjectExpressionsSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes apache#20980 from maropu/SPARK-23589.
…y = 0 ## What changes were proposed in this pull request? It is reported by Spark users that the deviance calculation for poisson regression does not handle y = 0. Thus, the correct model summary cannot be obtained. The user has confirmed the the issue is in ``` override def deviance(y: Double, mu: Double, weight: Double): Double = { 2.0 * weight * (y * math.log(y / mu) - (y - mu)) } when y = 0. ``` The user also mentioned there are many other places he believe we should check the same thing. However, no other changes are needed, including Gamma distribution. ## How was this patch tested? Add a comparison with R deviance calculation to the existing unit test. Author: Teng Peng <josephtengpeng@gmail.com> Closes apache#21125 from tengpeng/Spark24024GLM.
[https://issues.apache.org/jira/browse/SPARK-21168](https://issues.apache.org/jira/browse/SPARK-21168) There are no a number of other places that a client ID should be set,and I think we should use consumer.clientId in the clientId method,because the fetch request will be used by the same consumer behind. Author: liuzhaokun <liu.zhaokun@zte.com.cn> Closes apache#19887 from liu-zhaokun/master1205.
…treaming aggregation task ## What changes were proposed in this pull request? A structured streaming query with a streaming aggregation can throw the following error in rare cases. ``` java.lang.IllegalStateException: Cannot commit after already committed or aborted at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:643) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:135) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2$$anonfun$hasNext$2.apply$mcV$sp(statefulOperators.scala:359) at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:102) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:251) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2.hasNext(statefulOperators.scala:359) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:188) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:114) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:42) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:336) ``` This can happen when the following conditions are accidentally hit. - Streaming aggregation with aggregation function that is a subset of [`TypedImperativeAggregation`](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473) (for example, `collect_set`, `collect_list`, `percentile`, etc.). - Query running in `update}` mode - After the shuffle, a partition has exactly 128 records. This causes StateStore.commit to be called twice. See the [JIRA](https://issues.apache.org/jira/browse/SPARK-23004) for a more detailed explanation. The solution is to use `NextIterator` or `CompletionIterator`, each of which has a flag to prevent the "onCompletion" task from being called more than once. In this PR, I chose to implement using `NextIterator`. ## How was this patch tested? Added unit test that I have confirm will fail without the fix. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#21124 from tdas/SPARK-23004.
## What changes were proposed in this pull request? Adding PMML export to Spark ML's KMeans Model. ## How was this patch tested? New unit test for Spark ML PMML export based on the old Spark MLlib unit test. Author: Holden Karau <holden@pigscanfly.ca> Closes apache#20907 from holdenk/SPARK-11237-Add-PMML-Export-for-KMeans.
TaskSetManager.hasAttemptOnHost had a misleading comment. The comment said that it only checked for running tasks, but really it checked for any tasks that might have run in the past as well. This updates to line up with the implementation. Author: wuyi <ngone_5451@163.com> Closes apache#20998 from Ngone51/SPARK-23888.
…xecutors REST API Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction. Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics for the driver. Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the TaskMetrics, to minimize additional logging. Modify the AppStatusListener to record the peak values for each memory metric. Add the new memory metrics to the executors REST API.
Test build #89745 has finished for PR 20940 at commit
|
Test build #89746 has finished for PR 20940 at commit
|
Jenkins, retest this please |
Test build #89758 has finished for PR 20940 at commit
|
Jenkins, retest this please |
Test build #89789 has finished for PR 20940 at commit
|
This generally looks good to me. I have some concerns about piggy backing memory info on heart-beats, since basically this will be a uniform sampling of memory info and it may make the heart-beat rate tuning necessary for some useacases to get more accurate info. Unfortunately I don't have any solution to make the sampling more accurate. So I think this is good for now. |
@rezsafi, thanks for reviewing. It would be better if the heartbeat wasn't tied to sampling frequency. Most likely users would want to sample more frequently, although this would also mean more communication traffic between executors and driver. I'm out of town right now, but am planning to reschedule the design discussion when I am back, hopefully in a couple of weeks, and will invite you. |
Can one of the admins verify this patch? |
@edwinalu the diff is really weird in github now -- can you merge in the latest master? Or if github is just confused, maybe close this and open a new PR. thanks |
thanks, can you close this pull request now? |
Add new executor level memory metrics (JVM used heap/ off heap memory, on/off heap execution memory, on/off heap storage memory, direct pool memory, mapped pool memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.
What changes were proposed in this pull request?
An ExecutorMetrics class is added, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics is sent by executors to the driver as part of Heartbeat. A heartbeat is added for the driver as well, to collect these metrics for the driver.
The EventLoggingListener tracks peak values for each metric for each stage and executor. On StageCompleted events, it will log a StageExecutorMetrics event, with the peak values for the executor metrics, for each executor which updated its metrics during the stage. Logging is done per executor per stage, to minimize the amount of additional logging. Configuration parameter spark.eventLog.logStageExecutorMetrics.enabled controls whether or not the new metrics are logged.
The AppStatusListener records the peak values for each memory metric.
The new memory metrics are added to the executors REST API.
How was this patch tested?
New unit tests have been added. This was also tested on our cluster.