Skip to content

Commit 86b25c4

Browse files
SongYadongsrowen
authored andcommitted
[SPARK-26967][CORE] Put MetricsSystem instance names together for clearer management
## What changes were proposed in this pull request? `MetricsSystem` instance creations have a scattered distribution in the project code. So do their names. It may cause some inconvenience for browsing and management. This PR tries to put them together. In this way, we can have a uniform location for adding or removing them, and have a overall view of `MetircsSystem `instances in current project. It's also helpful for maintaining user documents by avoiding missing something. ## How was this patch tested? Existing unit tests. Closes #23869 from SongYadong/metrics_system_inst_manage. Lead-authored-by: SongYadong <song.yadong1@zte.com.cn> Co-authored-by: walter2001 <ydsong2007@163.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
1 parent 02bbe97 commit 86b25c4

File tree

8 files changed

+56
-24
lines changed

8 files changed

+56
-24
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.broadcast.BroadcastManager
3434
import org.apache.spark.internal.{config, Logging}
3535
import org.apache.spark.internal.config._
3636
import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager}
37-
import org.apache.spark.metrics.MetricsSystem
37+
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
3838
import org.apache.spark.network.netty.NettyBlockTransferService
3939
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
4040
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
@@ -346,13 +346,14 @@ object SparkEnv extends Logging {
346346
// Don't start metrics system right now for Driver.
347347
// We need to wait for the task scheduler to give us an app ID.
348348
// Then we can start the metrics system.
349-
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
349+
MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf, securityManager)
350350
} else {
351351
// We need to set the executor ID before the MetricsSystem is created because sources and
352352
// sinks specified in the metrics configuration file will want to incorporate this executor's
353353
// ID into the metrics they report.
354354
conf.set(EXECUTOR_ID, executorId)
355-
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
355+
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf,
356+
securityManager)
356357
ms.start()
357358
ms
358359
}

core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.{SecurityManager, SparkConf}
2525
import org.apache.spark.internal.{config, Logging}
26-
import org.apache.spark.metrics.MetricsSystem
26+
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
2727
import org.apache.spark.network.TransportContext
2828
import org.apache.spark.network.crypto.AuthServerBootstrap
2929
import org.apache.spark.network.netty.SparkTransportConf
@@ -43,7 +43,8 @@ private[deploy]
4343
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
4444
extends Logging {
4545
protected val masterMetricsSystem =
46-
MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager)
46+
MetricsSystem.createMetricsSystem(MetricsSystemInstances.SHUFFLE_SERVICE,
47+
sparkConf, securityManager)
4748

4849
private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
4950
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.internal.config._
3737
import org.apache.spark.internal.config.Deploy._
3838
import org.apache.spark.internal.config.UI._
3939
import org.apache.spark.internal.config.Worker._
40-
import org.apache.spark.metrics.MetricsSystem
40+
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
4141
import org.apache.spark.rpc._
4242
import org.apache.spark.serializer.{JavaSerializer, Serializer}
4343
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -86,9 +86,10 @@ private[deploy] class Master(
8686

8787
Utils.checkHost(address.host)
8888

89-
private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
90-
private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
91-
securityMgr)
89+
private val masterMetricsSystem =
90+
MetricsSystem.createMetricsSystem(MetricsSystemInstances.MASTER, conf, securityMgr)
91+
private val applicationMetricsSystem =
92+
MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATIONS, conf, securityMgr)
9293
private val masterSource = new MasterSource(this)
9394

9495
// After onStart, webUi will be set

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.internal.{config, Logging}
4040
import org.apache.spark.internal.config.Tests.IS_TESTING
4141
import org.apache.spark.internal.config.UI._
4242
import org.apache.spark.internal.config.Worker._
43-
import org.apache.spark.metrics.MetricsSystem
43+
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
4444
import org.apache.spark.rpc._
4545
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
4646

@@ -159,7 +159,8 @@ private[deploy] class Worker(
159159

160160
private var connectionAttemptCount = 0
161161

162-
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
162+
private val metricsSystem =
163+
MetricsSystem.createMetricsSystem(MetricsSystemInstances.WORKER, conf, securityMgr)
163164
private val workerSource = new WorkerSource(this)
164165

165166
val reverseProxy = conf.get(UI_REVERSE_PROXY)

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,3 +235,29 @@ private[spark] object MetricsSystem {
235235
new MetricsSystem(instance, conf, securityMgr)
236236
}
237237
}
238+
239+
private[spark] object MetricsSystemInstances {
240+
// The Spark standalone master process
241+
val MASTER = "master"
242+
243+
// A component within the master which reports on various applications
244+
val APPLICATIONS = "applications"
245+
246+
// A Spark standalone worker process
247+
val WORKER = "worker"
248+
249+
// A Spark executor
250+
val EXECUTOR = "executor"
251+
252+
// The Spark driver process (the process in which your SparkContext is created)
253+
val DRIVER = "driver"
254+
255+
// The Spark shuffle service
256+
val SHUFFLE_SERVICE = "shuffleService"
257+
258+
// The Spark ApplicationMaster when running on YARN
259+
val APPLICATION_MASTER = "applicationMaster"
260+
261+
// The Spark cluster scheduler when running on Mesos
262+
val MESOS_CLUSTER = "mesos_cluster"
263+
}

core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
7676
conf.set("spark.app.id", appId)
7777
conf.set("spark.executor.id", executorId)
7878

79-
val instanceName = "driver"
79+
val instanceName = MetricsSystemInstances.DRIVER
8080
val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
8181

8282
val metricName = driverMetricsSystem.buildRegistryName(source)
@@ -92,7 +92,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
9292
val executorId = "driver"
9393
conf.set("spark.executor.id", executorId)
9494

95-
val instanceName = "driver"
95+
val instanceName = MetricsSystemInstances.DRIVER
9696
val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
9797

9898
val metricName = driverMetricsSystem.buildRegistryName(source)
@@ -108,7 +108,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
108108
val appId = "testId"
109109
conf.set("spark.app.id", appId)
110110

111-
val instanceName = "driver"
111+
val instanceName = MetricsSystemInstances.DRIVER
112112
val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
113113

114114
val metricName = driverMetricsSystem.buildRegistryName(source)
@@ -126,7 +126,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
126126
conf.set("spark.app.id", appId)
127127
conf.set("spark.executor.id", executorId)
128128

129-
val instanceName = "executor"
129+
val instanceName = MetricsSystemInstances.EXECUTOR
130130
val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
131131

132132
val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -142,7 +142,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
142142
val executorId = "1"
143143
conf.set("spark.executor.id", executorId)
144144

145-
val instanceName = "executor"
145+
val instanceName = MetricsSystemInstances.EXECUTOR
146146
val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
147147

148148
val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -158,7 +158,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
158158
val appId = "testId"
159159
conf.set("spark.app.id", appId)
160160

161-
val instanceName = "executor"
161+
val instanceName = MetricsSystemInstances.EXECUTOR
162162
val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
163163

164164
val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -200,7 +200,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
200200
conf.set("spark.executor.id", executorId)
201201
conf.set(METRICS_NAMESPACE, "${spark.app.name}")
202202

203-
val instanceName = "executor"
203+
val instanceName = MetricsSystemInstances.EXECUTOR
204204
val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
205205

206206
val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -218,7 +218,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
218218
conf.set("spark.executor.id", executorId)
219219
conf.set(METRICS_NAMESPACE, namespaceToResolve)
220220

221-
val instanceName = "executor"
221+
val instanceName = MetricsSystemInstances.EXECUTOR
222222
val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
223223

224224
val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -238,7 +238,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
238238
conf.set("spark.app.name", appId)
239239
conf.set(METRICS_NAMESPACE, "${spark.app.name}")
240240

241-
val instanceName = "executor"
241+
val instanceName = MetricsSystemInstances.EXECUTOR
242242
val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
243243

244244
val metricName = executorMetricsSystem.buildRegistryName(source)

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
3333
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
3434
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
3535
import org.apache.spark.internal.config._
36-
import org.apache.spark.metrics.MetricsSystem
36+
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
3737
import org.apache.spark.util.Utils
3838

3939
/**
@@ -124,7 +124,8 @@ private[spark] class MesosClusterScheduler(
124124
extends Scheduler with MesosSchedulerUtils {
125125
var frameworkUrl: String = _
126126
private val metricsSystem =
127-
MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
127+
MetricsSystem.createMetricsSystem(MetricsSystemInstances.MESOS_CLUSTER, conf,
128+
new SecurityManager(conf))
128129
private val master = conf.get("spark.master")
129130
private val appName = conf.get("spark.app.name")
130131
private val queuedCapacity = conf.get(config.MAX_DRIVERS)

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.spark.deploy.yarn.config._
4646
import org.apache.spark.internal.Logging
4747
import org.apache.spark.internal.config._
4848
import org.apache.spark.internal.config.UI._
49-
import org.apache.spark.metrics.MetricsSystem
49+
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
5050
import org.apache.spark.rpc._
5151
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
5252
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -471,7 +471,8 @@ private[spark] class ApplicationMaster(
471471
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
472472

473473
allocator.allocateResources()
474-
val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
474+
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER,
475+
sparkConf, securityMgr)
475476
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
476477
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
477478
// do not register static sources in this case as per SPARK-25277

0 commit comments

Comments
 (0)