Skip to content

Commit

Permalink
Support heartbeat timeout & retry_times configuration (#4175)
Browse files Browse the repository at this point in the history
During the high overload, the heartbeat timeout will occur frequently and the load will be fail . This pull request will add retrying strategy to handling the heartbeat timeout.
The configuration is heartbeat_retry_times and the default value is 3, heartbeat_timeout_second default value is 5
  • Loading branch information
meegoo authored Mar 18, 2022
1 parent 2d9fcb4 commit 0881cb2
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 23 deletions.
16 changes: 12 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/catalog/FsBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.starrocks.catalog;

import com.google.gson.annotations.SerializedName;
import com.starrocks.common.Config;
import com.starrocks.common.FeMetaVersion;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
Expand All @@ -47,6 +48,8 @@ public class FsBroker implements Writable, Comparable<FsBroker> {
@SerializedName(value = "isAlive")
public boolean isAlive;

private int heartbeatRetryTimes = 0;

public FsBroker() {
}

Expand All @@ -71,12 +74,17 @@ public boolean handleHbResponse(BrokerHbResponse hbResponse) {
}
lastUpdateTime = hbResponse.getHbTime();
heartbeatErrMsg = "";
this.heartbeatRetryTimes = 0;
} else {
if (isAlive) {
isAlive = false;
isChanged = true;
if (this.heartbeatRetryTimes < Config.heartbeat_retry_times) {
this.heartbeatRetryTimes++;
} else {
if (isAlive) {
isAlive = false;
isChanged = true;
}
heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
}
heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
}

return isChanged;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

public class ClientPool {
static GenericKeyedObjectPoolConfig heartbeatConfig = new GenericKeyedObjectPoolConfig();
static int heartbeatTimeoutMs = FeConstants.heartbeat_interval_second * 1000;
static int heartbeatTimeoutMs = Config.heartbeat_timeout_second * 1000;

static GenericKeyedObjectPoolConfig backendConfig = new GenericKeyedObjectPoolConfig();
static int backendTimeoutMs = 60000; // 1min
Expand Down
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1357,4 +1357,18 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static long min_routine_load_lag_for_metrics = 10000;

/**
* The heartbeat timeout of be/broker/fe.
* the default is 5 seconds
*/
@ConfField(mutable = true)
public static int heartbeat_timeout_second = 5;

/**
* The heartbeat retry times of be/broker/fe.
* the default is 3
*/
@ConfField(mutable = true)
public static int heartbeat_retry_times = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class FeConstants {
public static long default_db_data_quota_bytes = Long.MAX_VALUE;
public static long default_db_replica_quota_size = Long.MAX_VALUE;

public static int heartbeat_interval_second = 5;
public static int checkpoint_interval_second = 60; // 1 minutes

// dpp version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Catalog;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Config;
import com.starrocks.common.Reference;
import com.starrocks.system.Backend;
import com.starrocks.system.SystemInfoService;
Expand Down Expand Up @@ -138,7 +138,7 @@ public static void addToBlacklist(Long backendID) {
}
lock.lock();
try {
int tryTime = FeConstants.heartbeat_interval_second + 1;
int tryTime = Config.heartbeat_timeout_second + 1;
blacklistBackends.put(backendID, tryTime);
LOG.warn("add black list " + backendID);
} finally {
Expand Down
20 changes: 14 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.DiskInfo;
import com.starrocks.catalog.DiskInfo.DiskState;
import com.starrocks.common.Config;
import com.starrocks.common.FeMetaVersion;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
Expand Down Expand Up @@ -96,6 +97,8 @@ public enum BackendState {
// this field is set by tablet report, and just for metric monitor, no need to persist.
private volatile long tabletMaxCompactionScore = 0;

private int heartbeatRetryTimes = 0;

// additional backendStatus information for BE, display in JSON format
private BackendStatus backendStatus = new BackendStatus();

Expand Down Expand Up @@ -670,14 +673,19 @@ public boolean handleHbResponse(BackendHbResponse hbResponse) {
}

heartbeatErrMsg = "";
this.heartbeatRetryTimes = 0;
} else {
if (isAlive.compareAndSet(true, false)) {
isChanged = true;
LOG.info("{} is dead,", this.toString());
}
if (this.heartbeatRetryTimes < Config.heartbeat_retry_times) {
this.heartbeatRetryTimes++;
} else {
if (isAlive.compareAndSet(true, false)) {
isChanged = true;
LOG.info("{} is dead,", this.toString());
}

heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
lastMissingHeartbeatTime = System.currentTimeMillis();
heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
lastMissingHeartbeatTime = System.currentTimeMillis();
}
}

return isChanged;
Expand Down
16 changes: 12 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/system/Frontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.starrocks.system;

import com.starrocks.catalog.Catalog;
import com.starrocks.common.Config;
import com.starrocks.common.FeMetaVersion;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class Frontend implements Writable {

private boolean isAlive = false;

private int heartbeatRetryTimes = 0;

public Frontend() {
}

Expand Down Expand Up @@ -125,12 +128,17 @@ public boolean handleHbResponse(FrontendHbResponse hbResponse) {
feVersion = hbResponse.getFeVersion();
heartbeatErrMsg = "";
isChanged = true;
this.heartbeatRetryTimes = 0;
} else {
if (isAlive) {
isAlive = false;
isChanged = true;
if (this.heartbeatRetryTimes < Config.heartbeat_retry_times) {
this.heartbeatRetryTimes++;
} else {
if (isAlive) {
isAlive = false;
isChanged = true;
}
heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
}
heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
}
return isChanged;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.starrocks.catalog.FsBroker;
import com.starrocks.common.ClientPool;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.ThreadPoolManager;
import com.starrocks.common.Version;
import com.starrocks.common.util.MasterDaemon;
Expand Down Expand Up @@ -74,7 +73,7 @@ public class HeartbeatMgr extends MasterDaemon {
private static volatile AtomicReference<TMasterInfo> masterInfo = new AtomicReference<>();

public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) {
super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000);
super("heartbeat mgr", Config.heartbeat_timeout_second * 1000);
this.nodeMgr = nodeMgr;
this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Catalog;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Reference;
import com.starrocks.persist.EditLog;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void setUp() {
// Comment out these code temporatily.
// @Test
public void testGetHostWithBackendId() {
FeConstants.heartbeat_interval_second = Integer.MAX_VALUE;
Config.heartbeat_timeout_second = Integer.MAX_VALUE;
TNetworkAddress address;
// three locations
List<TScanRangeLocation> nullLocations = null;
Expand Down Expand Up @@ -141,7 +142,7 @@ public void testGetHostWithBackendId() {
// Comment out these code temporatily.
// @Test
public void testGetHostWithNoParams() {
FeConstants.heartbeat_interval_second = Integer.MAX_VALUE;
Config.heartbeat_timeout_second = Integer.MAX_VALUE;
ImmutableMap<Long, Backend> nullBackends = null;
ImmutableMap<Long, Backend> emptyBackends = ImmutableMap.of();

Expand Down Expand Up @@ -176,7 +177,7 @@ public void testGetHostWithNoParams() {
// Comment out these code temporatily.
// @Test
public void testBlackList() {
FeConstants.heartbeat_interval_second = Integer.MAX_VALUE;
Config.heartbeat_timeout_second = Integer.MAX_VALUE;
TNetworkAddress address = null;

Backend backendA = new Backend(0, "addressA", 0);
Expand Down

0 comments on commit 0881cb2

Please sign in to comment.