diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/FsBroker.java b/fe/fe-core/src/main/java/com/starrocks/catalog/FsBroker.java index f7444863b20480..54d84827e5842c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/FsBroker.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/FsBroker.java @@ -22,6 +22,7 @@ package com.starrocks.catalog; import com.google.gson.annotations.SerializedName; +import com.starrocks.common.Config; import com.starrocks.common.FeMetaVersion; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; @@ -47,6 +48,8 @@ public class FsBroker implements Writable, Comparable { @SerializedName(value = "isAlive") public boolean isAlive; + private int heartbeatRetryTimes = 0; + public FsBroker() { } @@ -71,12 +74,17 @@ public boolean handleHbResponse(BrokerHbResponse hbResponse) { } lastUpdateTime = hbResponse.getHbTime(); heartbeatErrMsg = ""; + this.heartbeatRetryTimes = 0; } else { - if (isAlive) { - isAlive = false; - isChanged = true; + if (this.heartbeatRetryTimes < Config.heartbeat_retry_times) { + this.heartbeatRetryTimes++; + } else { + if (isAlive) { + isAlive = false; + isChanged = true; + } + heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg(); } - heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg(); } return isChanged; diff --git a/fe/fe-core/src/main/java/com/starrocks/common/ClientPool.java b/fe/fe-core/src/main/java/com/starrocks/common/ClientPool.java index 08e58bca7ca126..86144a4d743b00 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/ClientPool.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/ClientPool.java @@ -29,7 +29,7 @@ public class ClientPool { static GenericKeyedObjectPoolConfig heartbeatConfig = new GenericKeyedObjectPoolConfig(); - static int heartbeatTimeoutMs = FeConstants.heartbeat_interval_second * 1000; + static int heartbeatTimeoutMs = Config.heartbeat_timeout_second * 1000; static GenericKeyedObjectPoolConfig backendConfig = new GenericKeyedObjectPoolConfig(); static int backendTimeoutMs = 60000; // 1min diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 59b00d3ec956d6..b07fd70b89e475 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1357,4 +1357,18 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true) public static long min_routine_load_lag_for_metrics = 10000; + + /** + * The heartbeat timeout of be/broker/fe. + * the default is 5 seconds + */ + @ConfField(mutable = true) + public static int heartbeat_timeout_second = 5; + + /** + * The heartbeat retry times of be/broker/fe. + * the default is 3 + */ + @ConfField(mutable = true) + public static int heartbeat_retry_times = 3; } diff --git a/fe/fe-core/src/main/java/com/starrocks/common/FeConstants.java b/fe/fe-core/src/main/java/com/starrocks/common/FeConstants.java index 601c2c58dd5ec1..d35305196bab02 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/FeConstants.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/FeConstants.java @@ -37,7 +37,6 @@ public class FeConstants { public static long default_db_data_quota_bytes = Long.MAX_VALUE; public static long default_db_replica_quota_size = Long.MAX_VALUE; - public static int heartbeat_interval_second = 5; public static int checkpoint_interval_second = 60; // 1 minutes // dpp version diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/com/starrocks/qe/SimpleScheduler.java index 570045a109a3d3..84d9d4dba4e2d8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SimpleScheduler.java @@ -25,7 +25,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.starrocks.catalog.Catalog; -import com.starrocks.common.FeConstants; +import com.starrocks.common.Config; import com.starrocks.common.Reference; import com.starrocks.system.Backend; import com.starrocks.system.SystemInfoService; @@ -138,7 +138,7 @@ public static void addToBlacklist(Long backendID) { } lock.lock(); try { - int tryTime = FeConstants.heartbeat_interval_second + 1; + int tryTime = Config.heartbeat_timeout_second + 1; blacklistBackends.put(backendID, tryTime); LOG.warn("add black list " + backendID); } finally { diff --git a/fe/fe-core/src/main/java/com/starrocks/system/Backend.java b/fe/fe-core/src/main/java/com/starrocks/system/Backend.java index a4c41e21af80e6..261f32a9e78939 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/Backend.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/Backend.java @@ -29,6 +29,7 @@ import com.starrocks.catalog.Catalog; import com.starrocks.catalog.DiskInfo; import com.starrocks.catalog.DiskInfo.DiskState; +import com.starrocks.common.Config; import com.starrocks.common.FeMetaVersion; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; @@ -96,6 +97,8 @@ public enum BackendState { // this field is set by tablet report, and just for metric monitor, no need to persist. private volatile long tabletMaxCompactionScore = 0; + private int heartbeatRetryTimes = 0; + // additional backendStatus information for BE, display in JSON format private BackendStatus backendStatus = new BackendStatus(); @@ -670,14 +673,19 @@ public boolean handleHbResponse(BackendHbResponse hbResponse) { } heartbeatErrMsg = ""; + this.heartbeatRetryTimes = 0; } else { - if (isAlive.compareAndSet(true, false)) { - isChanged = true; - LOG.info("{} is dead,", this.toString()); - } + if (this.heartbeatRetryTimes < Config.heartbeat_retry_times) { + this.heartbeatRetryTimes++; + } else { + if (isAlive.compareAndSet(true, false)) { + isChanged = true; + LOG.info("{} is dead,", this.toString()); + } - heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg(); - lastMissingHeartbeatTime = System.currentTimeMillis(); + heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg(); + lastMissingHeartbeatTime = System.currentTimeMillis(); + } } return isChanged; diff --git a/fe/fe-core/src/main/java/com/starrocks/system/Frontend.java b/fe/fe-core/src/main/java/com/starrocks/system/Frontend.java index e6c988fe2c48d4..bef248a36545a2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/Frontend.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/Frontend.java @@ -22,6 +22,7 @@ package com.starrocks.system; import com.starrocks.catalog.Catalog; +import com.starrocks.common.Config; import com.starrocks.common.FeMetaVersion; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; @@ -49,6 +50,8 @@ public class Frontend implements Writable { private boolean isAlive = false; + private int heartbeatRetryTimes = 0; + public Frontend() { } @@ -125,12 +128,17 @@ public boolean handleHbResponse(FrontendHbResponse hbResponse) { feVersion = hbResponse.getFeVersion(); heartbeatErrMsg = ""; isChanged = true; + this.heartbeatRetryTimes = 0; } else { - if (isAlive) { - isAlive = false; - isChanged = true; + if (this.heartbeatRetryTimes < Config.heartbeat_retry_times) { + this.heartbeatRetryTimes++; + } else { + if (isAlive) { + isAlive = false; + isChanged = true; + } + heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg(); } - heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg(); } return isChanged; } diff --git a/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatMgr.java index bc3b1d5efa6081..c65249ea857785 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatMgr.java @@ -28,7 +28,6 @@ import com.starrocks.catalog.FsBroker; import com.starrocks.common.ClientPool; import com.starrocks.common.Config; -import com.starrocks.common.FeConstants; import com.starrocks.common.ThreadPoolManager; import com.starrocks.common.Version; import com.starrocks.common.util.MasterDaemon; @@ -74,7 +73,7 @@ public class HeartbeatMgr extends MasterDaemon { private static volatile AtomicReference masterInfo = new AtomicReference<>(); public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) { - super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000); + super("heartbeat mgr", Config.heartbeat_timeout_second * 1000); this.nodeMgr = nodeMgr; this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num, Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric); diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/SimpleSchedulerTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/SimpleSchedulerTest.java index de8fc1c05371d3..fff74769d533b5 100644 --- a/fe/fe-core/src/test/java/com/starrocks/qe/SimpleSchedulerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/qe/SimpleSchedulerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.starrocks.catalog.Catalog; +import com.starrocks.common.Config; import com.starrocks.common.FeConstants; import com.starrocks.common.Reference; import com.starrocks.persist.EditLog; @@ -78,7 +79,7 @@ public void setUp() { // Comment out these code temporatily. // @Test public void testGetHostWithBackendId() { - FeConstants.heartbeat_interval_second = Integer.MAX_VALUE; + Config.heartbeat_timeout_second = Integer.MAX_VALUE; TNetworkAddress address; // three locations List nullLocations = null; @@ -141,7 +142,7 @@ public void testGetHostWithBackendId() { // Comment out these code temporatily. // @Test public void testGetHostWithNoParams() { - FeConstants.heartbeat_interval_second = Integer.MAX_VALUE; + Config.heartbeat_timeout_second = Integer.MAX_VALUE; ImmutableMap nullBackends = null; ImmutableMap emptyBackends = ImmutableMap.of(); @@ -176,7 +177,7 @@ public void testGetHostWithNoParams() { // Comment out these code temporatily. // @Test public void testBlackList() { - FeConstants.heartbeat_interval_second = Integer.MAX_VALUE; + Config.heartbeat_timeout_second = Integer.MAX_VALUE; TNetworkAddress address = null; Backend backendA = new Backend(0, "addressA", 0);