Skip to content

Commit ee5c005

Browse files
kabochyayuyang08
authored andcommitted
refactor ReplicaStatsManager static methods (pinterest#129)
1 parent d0fcf4b commit ee5c005

18 files changed

+172
-148
lines changed

drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafka.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class DoctorKafka {
2323

2424
private DoctorKafkaConfig drkafkaConf;
2525

26-
public BrokerStatsProcessor brokerStatsProcessor = null;
26+
private BrokerStatsProcessor brokerStatsProcessor = null;
2727

2828
private DoctorKafkaActionReporter actionReporter = null;
2929

@@ -35,8 +35,11 @@ public class DoctorKafka {
3535

3636
private DoctorKafkaHeartbeat heartbeat = null;
3737

38-
public DoctorKafka(DoctorKafkaConfig drkafkaConf) {
39-
this.drkafkaConf = drkafkaConf;
38+
private ReplicaStatsManager replicaStatsManager = null;
39+
40+
public DoctorKafka(ReplicaStatsManager replicaStatsManager) {
41+
this.replicaStatsManager = replicaStatsManager;
42+
this.drkafkaConf = replicaStatsManager.getConfig();
4043
this.clusterZkUrls = drkafkaConf.getClusterZkUrls();
4144
this.zookeeperClient = new ZookeeperClient(drkafkaConf.getDoctorKafkaZkurl());
4245
}
@@ -50,26 +53,26 @@ public void start() {
5053
SecurityProtocol actionReportSecurityProtocol = drkafkaConf.getActionReportProducerSecurityProtocol();
5154

5255
LOG.info("Start rebuilding the replica stats by reading the past 24 hours brokerstats");
53-
ReplicaStatsManager.readPastReplicaStats(brokerstatsZkurl, statsSecurityProtocol,
56+
replicaStatsManager.readPastReplicaStats(brokerstatsZkurl, statsSecurityProtocol,
5457
drkafkaConf.getBrokerStatsTopic(), drkafkaConf.getBrokerStatsBacktrackWindowsInSeconds());
5558
LOG.info("Finish rebuilding the replica stats");
5659

5760
brokerStatsProcessor = new BrokerStatsProcessor(brokerstatsZkurl, statsSecurityProtocol, statsTopic,
58-
drkafkaConf.getBrokerStatsConsumerSslConfigs());
61+
drkafkaConf.getBrokerStatsConsumerSslConfigs(), replicaStatsManager);
5962
brokerStatsProcessor.start();
6063

6164
actionReporter = new DoctorKafkaActionReporter(actionReportZkurl, actionReportSecurityProtocol, actionReportTopic,
6265
drkafkaConf.getActionReportProducerSslConfigs());
6366
for (String clusterZkUrl : clusterZkUrls) {
6467
DoctorKafkaClusterConfig clusterConf = drkafkaConf.getClusterConfigByZkUrl(clusterZkUrl);
65-
KafkaCluster kafkaCluster = ReplicaStatsManager.clusters.get(clusterZkUrl);
68+
KafkaCluster kafkaCluster = replicaStatsManager.getClusters().get(clusterZkUrl);
6669

6770
if (kafkaCluster == null) {
6871
LOG.error("No brokerstats info for cluster {}", clusterZkUrl);
6972
continue;
7073
}
7174
KafkaClusterManager clusterManager = new KafkaClusterManager(
72-
clusterZkUrl, kafkaCluster, clusterConf, drkafkaConf, actionReporter, zookeeperClient);
75+
clusterZkUrl, kafkaCluster, clusterConf, drkafkaConf, actionReporter, zookeeperClient, replicaStatsManager);
7376
clusterManagers.put(clusterConf.getClusterName(), clusterManager);
7477
clusterManager.start();
7578
LOG.info("Starting cluster manager for " + clusterZkUrl);

drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafkaMain.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class DoctorKafkaMain extends Application<DoctorKafkaAppConfig> {
4242

4343
public static DoctorKafka doctorKafka = null;
4444
private static DoctorKafkaWatcher operatorWatcher = null;
45+
public static ReplicaStatsManager replicaStatsManager = null;
4546

4647
@Override
4748
public void initialize(Bootstrap<DoctorKafkaAppConfig> bootstrap) {
@@ -54,16 +55,16 @@ public void run(DoctorKafkaAppConfig configuration, Environment environment) thr
5455

5556
LOG.info("Configuration path : {}", configuration.getConfig());
5657

57-
ReplicaStatsManager.config = new DoctorKafkaConfig(configuration.getConfig());
58+
replicaStatsManager = new ReplicaStatsManager(new DoctorKafkaConfig(configuration.getConfig()));
5859

59-
if (!ReplicaStatsManager.config.getRestartDisabled()){
60-
operatorWatcher = new DoctorKafkaWatcher(ReplicaStatsManager.config.getRestartIntervalInSeconds());
60+
if (!replicaStatsManager.getConfig().getRestartDisabled()){
61+
operatorWatcher = new DoctorKafkaWatcher(replicaStatsManager.getConfig().getRestartIntervalInSeconds());
6162
operatorWatcher.start();
6263
}
6364

64-
configureServerRuntime(configuration, ReplicaStatsManager.config);
65+
configureServerRuntime(configuration, replicaStatsManager.getConfig());
6566

66-
doctorKafka = new DoctorKafka(ReplicaStatsManager.config);
67+
doctorKafka = new DoctorKafka(replicaStatsManager);
6768

6869
registerAPIs(environment, doctorKafka);
6970
registerServlets(environment);
@@ -119,8 +120,8 @@ private void registerAPIs(Environment environment, DoctorKafka doctorKafka) {
119120
}
120121

121122
private void startMetricsService() {
122-
int ostrichPort = ReplicaStatsManager.config.getOstrichPort();
123-
String tsdHostPort = ReplicaStatsManager.config.getTsdHostPort();
123+
int ostrichPort = replicaStatsManager.getConfig().getOstrichPort();
124+
String tsdHostPort = replicaStatsManager.getConfig().getTsdHostPort();
124125
if (tsdHostPort == null && ostrichPort == 0) {
125126
LOG.info("OpenTSDB and Ostrich options missing, not starting Ostrich service");
126127
} else if (ostrichPort == 0) {

drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ public class KafkaBroker implements Comparable<KafkaBroker> {
3939
private long reservedBytesOut;
4040
private Set<TopicPartition> toBeAddedReplicas;
4141

42-
public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, int brokerId) {
42+
private ReplicaStatsManager replicaStatsManager;
43+
44+
public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager, int brokerId) {
4345
assert clusterConfig != null;
4446
this.zkUrl = clusterConfig.getZkUrl();
4547
this.brokerId = brokerId;
@@ -53,6 +55,7 @@ public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, int brokerId) {
5355
this.reservedBytesOut = 0L;
5456
this.bytesInPerSecLimit = clusterConfig.getNetworkInLimitInBytes();
5557
this.bytesOutPerSecLimit = clusterConfig.getNetworkOutLimitInBytes();
58+
this.replicaStatsManager = replicaStatsManager;
5659
}
5760

5861
public JsonElement toJson() {
@@ -71,10 +74,10 @@ public JsonElement toJson() {
7174
public long getMaxBytesIn() {
7275
long result = 0L;
7376
for (TopicPartition topicPartition : leaderReplicas) {
74-
result += ReplicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
77+
result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
7578
}
7679
for (TopicPartition topicPartition : followerReplicas) {
77-
result += ReplicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
80+
result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
7881
}
7982
return result;
8083
}
@@ -83,7 +86,7 @@ public long getMaxBytesIn() {
8386
public long getMaxBytesOut() {
8487
long result = 0L;
8588
for (TopicPartition topicPartition : leaderReplicas) {
86-
result += ReplicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
89+
result += replicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
8790
}
8891
return result;
8992
}

drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ public class KafkaCluster {
4646
public ConcurrentMap<Integer, KafkaBroker> brokers;
4747
private ConcurrentMap<Integer, LinkedList<BrokerStats>> brokerStatsMap;
4848
public ConcurrentMap<String, Set<TopicPartition>> topicPartitions = new ConcurrentHashMap<>();
49+
private ReplicaStatsManager replicaStatsManager;
4950

50-
public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig) {
51+
public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager) {
5152
this.zkUrl = zookeeper;
53+
this.replicaStatsManager = replicaStatsManager;
5254
this.brokers = new ConcurrentHashMap<>();
5355
this.clusterConfig = clusterConfig;
5456
this.brokerStatsMap = new ConcurrentHashMap<>();
@@ -84,7 +86,7 @@ public void recordBrokerStats(BrokerStats brokerStats) {
8486

8587
if (!brokerStats.getHasFailure()) {
8688
// only record brokerstat when there is no failure on that broker.
87-
KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, i));
89+
KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, replicaStatsManager, i));
8890
broker.update(brokerStats);
8991
}
9092

@@ -239,8 +241,8 @@ public PriorityQueue<KafkaBroker> getBrokerQueue() {
239241
public Map<Integer, KafkaBroker> getAlternativeBrokers(PriorityQueue<KafkaBroker> brokerQueue,
240242
OutOfSyncReplica oosReplica) {
241243
TopicPartition topicPartition = oosReplica.topicPartition;
242-
double inBoundReq = ReplicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
243-
double outBoundReq = ReplicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
244+
double inBoundReq = replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
245+
double outBoundReq = replicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
244246
int preferredBroker = oosReplica.replicaBrokers.get(0);
245247

246248
boolean success = true;
@@ -277,7 +279,6 @@ public Map<Integer, KafkaBroker> getAlternativeBrokers(PriorityQueue<KafkaBroker
277279
return success ? result : null;
278280
}
279281

280-
281282
public KafkaBroker getAlternativeBroker(TopicPartition topicPartition,
282283
double tpBytesIn, double tpBytesOut) {
283284
PriorityQueue<KafkaBroker> brokerQueue =
@@ -309,7 +310,7 @@ public long getMaxBytesIn() {
309310
for (Map.Entry<String, Set<TopicPartition>> entry : topicPartitions.entrySet()) {
310311
Set<TopicPartition> topicPartitions = entry.getValue();
311312
for (TopicPartition tp : topicPartitions) {
312-
result += ReplicaStatsManager.getMaxBytesIn(zkUrl, tp);
313+
result += replicaStatsManager.getMaxBytesIn(zkUrl, tp);
313314
}
314315
}
315316
return result;
@@ -321,7 +322,7 @@ public long getMaxBytesOut() {
321322
for (Map.Entry<String, Set<TopicPartition>> entry : topicPartitions.entrySet()) {
322323
Set<TopicPartition> topicPartitions = entry.getValue();
323324
for (TopicPartition tp : topicPartitions) {
324-
result += ReplicaStatsManager.getMaxBytesOut(zkUrl, tp);
325+
result += replicaStatsManager.getMaxBytesOut(zkUrl, tp);
325326
}
326327
}
327328
return result;

drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public class KafkaClusterManager implements Runnable {
7878
private DoctorKafkaConfig drkafkaConfig = null;
7979
private DoctorKafkaClusterConfig clusterConfig;
8080
private DoctorKafkaActionReporter actionReporter = null;
81+
private ReplicaStatsManager replicaStatsManager;
8182
private boolean stopped = true;
8283
private Thread thread = null;
8384

@@ -101,7 +102,8 @@ public KafkaClusterManager(String zkUrl, KafkaCluster kafkaCluster,
101102
DoctorKafkaClusterConfig clusterConfig,
102103
DoctorKafkaConfig drkafkaConfig,
103104
DoctorKafkaActionReporter actionReporter,
104-
ZookeeperClient zookeeperClient) {
105+
ZookeeperClient zookeeperClient,
106+
ReplicaStatsManager replicaStatsManager) {
105107
assert clusterConfig != null;
106108
this.zkUrl = zkUrl;
107109
this.zkUtils = KafkaUtils.getZkUtils(zkUrl);
@@ -117,6 +119,7 @@ public KafkaClusterManager(String zkUrl, KafkaCluster kafkaCluster,
117119
if (clusterConfig.enabledDeadbrokerReplacement()) {
118120
this.brokerReplacer = new BrokerReplacer(drkafkaConfig.getBrokerReplacementCommand());
119121
}
122+
this.replicaStatsManager = replicaStatsManager;
120123
}
121124

122125
public KafkaCluster getCluster() {
@@ -234,8 +237,8 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker,
234237

235238
for (Map.Entry<TopicPartition, Double> entry : tpTraffic.entrySet()) {
236239
TopicPartition tp = entry.getKey();
237-
double tpBytesIn = ReplicaStatsManager.getMaxBytesIn(zkUrl, tp);
238-
double tpBytesOut = ReplicaStatsManager.getMaxBytesOut(zkUrl, tp);
240+
double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
241+
double tpBytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp);
239242
double brokerTraffic = (bytesIn - toBeReducedBytesIn - tpBytesIn) +
240243
(bytesOut - toBeReducedBytesOut - tpBytesOut);
241244

@@ -312,7 +315,7 @@ private void generateFollowerReassignmentPlan(KafkaBroker broker) {
312315

313316
for (Map.Entry<TopicPartition, Double> entry : tpTraffic.entrySet()) {
314317
TopicPartition tp = entry.getKey();
315-
double tpBytesIn = ReplicaStatsManager.getMaxBytesIn(zkUrl, tp);
318+
double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
316319
if (brokerBytesIn - toBeReducedBytesIn - tpBytesIn < bytesInLimit) {
317320
// if moving a topic partition out will have the broker be under-utilized, do not
318321
// move it out.
@@ -487,8 +490,8 @@ private Map<TopicPartition, Double> sortTopicPartitionsByTraffic(List<TopicParti
487490
Map<TopicPartition, Double> tpTraffic = new HashMap<>();
488491
for (TopicPartition tp : tps) {
489492
try {
490-
double bytesIn = ReplicaStatsManager.getMaxBytesIn(zkUrl, tp);
491-
double bytesOut = ReplicaStatsManager.getMaxBytesOut(zkUrl, tp);
493+
double bytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
494+
double bytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp);
492495
tpTraffic.put(tp, bytesIn + bytesOut);
493496
} catch (Exception e) {
494497
LOG.info("Exception in sorting topic partition {}", tp, e);

drkafka/src/main/java/com/pinterest/doctorkafka/replicastats/BrokerStatsProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ public class BrokerStatsProcessor implements Runnable {
3232
private String topic;
3333
private SecurityProtocol securityProtocol;
3434
private Map<String, String> consumerConfigs;
35+
private ReplicaStatsManager replicaStatsManager;
3536

3637
public BrokerStatsProcessor(String zkUrl, SecurityProtocol securityProtocol,
37-
String topic, Map<String, String> consumerConfigs) {
38+
String topic, Map<String, String> consumerConfigs, ReplicaStatsManager replicaStatsManager) {
3839
this.zkUrl = zkUrl;
3940
this.topic = topic;
4041
this.securityProtocol = securityProtocol;
4142
this.consumerConfigs = consumerConfigs;
43+
this.replicaStatsManager = replicaStatsManager;
4244
}
4345

4446

@@ -77,7 +79,7 @@ public void run() {
7779
continue;
7880
}
7981

80-
ReplicaStatsManager.update(brokerStats);
82+
replicaStatsManager.update(brokerStats);
8183
OpenTsdbMetricConverter.incr(DoctorKafkaMetrics.BROKERSTATS_MESSAGES, 1,
8284
"zkUrl= " + brokerStats.getZkUrl());
8385
} catch (Exception e) {

drkafka/src/main/java/com/pinterest/doctorkafka/replicastats/PastReplicaStatsProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,16 @@ public class PastReplicaStatsProcessor implements Runnable {
3232
private long startOffset;
3333
private long endOffset;
3434
private Thread thread;
35+
private ReplicaStatsManager replicaStatsManager;
3536

3637
public PastReplicaStatsProcessor(String zkUrl, SecurityProtocol securityProtocol, TopicPartition topicPartition,
37-
long startOffset, long endOffset) {
38+
long startOffset, long endOffset, ReplicaStatsManager replicaStatsManager) {
3839
this.zkUrl = zkUrl;
3940
this.securityProtocol = securityProtocol;
4041
this.topicPartition = topicPartition;
4142
this.startOffset = startOffset;
4243
this.endOffset = endOffset;
44+
this.replicaStatsManager = replicaStatsManager;
4345
}
4446

4547
public void start() {
@@ -82,7 +84,7 @@ public void run() {
8284
OpenTsdbMetricConverter.incr(DoctorKafkaMetrics.MESSAGE_DESERIALIZE_ERROR, 1);
8385
continue;
8486
}
85-
ReplicaStatsManager.update(brokerStats);
87+
replicaStatsManager.update(brokerStats);
8688
}
8789
}
8890
} catch (Exception e) {

0 commit comments

Comments
 (0)