Skip to content

Commit ad76d9d

Browse files
authored
branch-4.0: [opt](scheduler) Improve Graceful Shutdown Behavior for BE and FE, and Optimize Query Retry During BE Shutdown #56601 #58019 (#58526)
bp #56601 #58019
1 parent 9b25055 commit ad76d9d

File tree

24 files changed

+304
-88
lines changed

24 files changed

+304
-88
lines changed

be/src/common/config.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,6 +1312,7 @@ DEFINE_String(user_files_secure_path, "${DORIS_HOME}");
13121312
DEFINE_Int32(fe_expire_duration_seconds, "60");
13131313

13141314
DEFINE_Int32(grace_shutdown_wait_seconds, "120");
1315+
DEFINE_Int32(grace_shutdown_post_delay_seconds, "30");
13151316

13161317
DEFINE_Int16(bitmap_serialize_version, "1");
13171318

be/src/common/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,6 +1364,12 @@ DECLARE_Int32(fe_expire_duration_seconds);
13641364
// , but if the waiting time exceed the limit, then be will exit directly.
13651365
// During this period, FE will not send any queries to BE and waiting for all running queries to stop.
13661366
DECLARE_Int32(grace_shutdown_wait_seconds);
1367+
// When using the graceful stop feature, after the main process waits for
1368+
// all currently running tasks to finish, it will continue to wait for
1369+
// an additional period to ensure that queries still running on other nodes have also completed.
1370+
// Since a BE node cannot detect the task execution status on other BE nodes,
1371+
// you may need to increase this threshold to allow for a longer waiting time.
1372+
DECLARE_Int32(grace_shutdown_post_delay_seconds);
13671373

13681374
// BitmapValue serialize version.
13691375
DECLARE_Int16(bitmap_serialize_version);

be/src/http/action/health_action.cpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,42 @@
2424
#include "http/http_headers.h"
2525
#include "http/http_request.h"
2626
#include "http/http_status.h"
27+
#include "runtime/exec_env.h"
2728

2829
namespace doris {
2930

3031
const static std::string HEADER_JSON = "application/json";
3132

3233
void HealthAction::handle(HttpRequest* req) {
34+
std::string status;
35+
std::string msg;
36+
HttpStatus st;
37+
// always return HttpStatus::OK
38+
// because in k8s, we don't want the pod to be removed
39+
// from service during shutdown
40+
if (!doris::k_is_server_ready) {
41+
status = "Server is not available";
42+
msg = "Server is not ready";
43+
st = HttpStatus::OK;
44+
} else if (doris::k_doris_exit) {
45+
status = "Server is not available";
46+
msg = "Server is shutting down";
47+
st = HttpStatus::OK;
48+
} else {
49+
status = "OK";
50+
msg = "OK";
51+
st = HttpStatus::OK;
52+
}
53+
3354
std::stringstream ss;
3455
ss << "{";
35-
ss << "\"status\": \"OK\",";
36-
ss << "\"msg\": \"To Be Added\"";
56+
ss << "\"status\": \"" << status << "\",";
57+
ss << "\"msg\": \"" << msg << "\"";
3758
ss << "}";
3859
std::string result = ss.str();
3960

4061
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
41-
HttpChannel::send_reply(req, HttpStatus::OK, result);
62+
HttpChannel::send_reply(req, st, result);
4263
}
4364

4465
} // end namespace doris

be/src/runtime/exec_env.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,13 @@ void ExecEnv::wait_for_all_tasks_done() {
177177
sleep(1);
178178
++wait_seconds_passed;
179179
}
180+
// This is a conservative strategy.
181+
// Because a query might still have fragments running on other BE nodes.
182+
// In other words, the query hasn't truly terminated.
183+
// If the current BE is shut down at this point,
184+
// the FE will detect the downtime of a related BE and cancel the entire query,
185+
// defeating the purpose of a graceful stop.
186+
sleep(config::grace_shutdown_post_delay_seconds);
180187
}
181188

182189
bool ExecEnv::check_auth_token(const std::string& auth_token) {

be/src/runtime/exec_env.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,10 @@ class IndexPolicyMgr;
132132
struct SyncRowsetStats;
133133
class DeleteBitmapAggCache;
134134

135+
// set to true when BE is shutting down
135136
inline bool k_doris_exit = false;
137+
// set to true after BE start ready
138+
inline bool k_is_server_ready = false;
136139

137140
// Execution environment for queries/plan fragments.
138141
// Contains all required global structures, and handles to

be/src/service/doris_main.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,12 +602,15 @@ int main(int argc, char** argv) {
602602

603603
exec_env->storage_engine().notify_listeners();
604604

605+
doris::k_is_server_ready = true;
606+
605607
while (!doris::k_doris_exit) {
606608
#if defined(LEAK_SANITIZER)
607609
__lsan_do_leak_check();
608610
#endif
609611
sleep(3);
610612
}
613+
doris::k_is_server_ready = false;
611614
LOG(INFO) << "Doris main exiting.";
612615
#if defined(LLVM_PROFILE)
613616
__llvm_profile_write_file();

be/test/http/http_client_test.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
392392
st = client.execute(&response);
393393
EXPECT_TRUE(st.ok());
394394
std::cout << "response = " << response << "\n";
395-
EXPECT_TRUE(response.find("To Be Added") != std::string::npos);
395+
EXPECT_TRUE(response.find("Server is not ready") != std::string::npos);
396396
}
397397

398398
{
@@ -423,7 +423,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
423423
st = client.execute(&response);
424424
EXPECT_TRUE(st.ok());
425425
std::cout << "response = " << response << "\n";
426-
EXPECT_TRUE(response.find("To Be Added") != std::string::npos);
426+
EXPECT_TRUE(response.find("Server is not ready") != std::string::npos);
427427
}
428428

429429
{

fe/fe-core/src/main/java/org/apache/doris/DorisFE.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.doris.journal.bdbje.BDBTool;
3737
import org.apache.doris.journal.bdbje.BDBToolOptions;
3838
import org.apache.doris.persist.meta.MetaReader;
39+
import org.apache.doris.qe.Coordinator;
40+
import org.apache.doris.qe.QeProcessorImpl;
3941
import org.apache.doris.qe.QeService;
4042
import org.apache.doris.qe.SimpleScheduler;
4143
import org.apache.doris.service.ExecuteEnv;
@@ -63,7 +65,9 @@
6365
import java.nio.channels.OverlappingFileLockException;
6466
import java.nio.file.StandardOpenOption;
6567
import java.time.LocalDate;
68+
import java.util.List;
6669
import java.util.concurrent.TimeUnit;
70+
import java.util.concurrent.atomic.AtomicBoolean;
6771

6872
public class DorisFE {
6973
private static final Logger LOG = LogManager.getLogger(DorisFE.class);
@@ -82,6 +86,12 @@ public class DorisFE {
8286
private static FileChannel processLockFileChannel;
8387
private static FileLock processFileLock;
8488

89+
// set to true when all servers are ready.
90+
private static final AtomicBoolean serverReady = new AtomicBoolean(false);
91+
92+
// HTTP server instance, used for graceful shutdown
93+
private static HttpServer httpServer;
94+
8595
public static void main(String[] args) {
8696
// Every doris version should have a final meta version, it should not change
8797
// between small releases. Add a check here to avoid mistake.
@@ -144,7 +154,19 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
144154
}
145155

146156
Log4jConfig.initLogging(dorisHomeDir + "/conf/");
147-
Runtime.getRuntime().addShutdownHook(new Thread(LogManager::shutdown));
157+
// Add shutdown hook for graceful exit
158+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
159+
LOG.info("Received shutdown signal, starting graceful shutdown...");
160+
serverReady.set(false);
161+
gracefulShutdown();
162+
163+
// Shutdown HTTP server after main process graceful shutdown is complete
164+
if (httpServer != null) {
165+
httpServer.shutdown();
166+
}
167+
168+
LogManager.shutdown();
169+
}));
148170

149171
// set dns cache ttl
150172
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
@@ -202,7 +224,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
202224
feServer.start();
203225

204226
if (options.enableHttpServer) {
205-
HttpServer httpServer = new HttpServer();
227+
httpServer = new HttpServer();
206228
httpServer.setPort(Config.http_port);
207229
httpServer.setHttpsPort(Config.https_port);
208230
httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
@@ -231,11 +253,14 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
231253

232254
ThreadPoolManager.registerAllThreadPoolMetric();
233255
startMonitor();
256+
257+
serverReady.set(true);
258+
// JVM will exit when shutdown hook is completed
234259
while (true) {
235260
Thread.sleep(2000);
236261
}
237262
} catch (Throwable e) {
238-
// Some exception may thrown before LOG is inited.
263+
// Some exception may throw before LOG is inited.
239264
// So need to print to stdout
240265
e.printStackTrace();
241266
LOG.error("", e);
@@ -584,4 +609,25 @@ public static class StartupOptions {
584609
public boolean enableHttpServer = true;
585610
public boolean enableQeService = true;
586611
}
612+
613+
public static boolean isServerReady() {
614+
return serverReady.get();
615+
}
616+
617+
private static void gracefulShutdown() {
618+
// wait for all queries to finish
619+
try {
620+
long now = System.currentTimeMillis();
621+
List<Coordinator> allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators();
622+
while (!allCoordinators.isEmpty() && System.currentTimeMillis() - now < 300 * 1000L) {
623+
Thread.sleep(1000);
624+
allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators();
625+
LOG.info("waiting {} queries to finish before shutdown", allCoordinators.size());
626+
}
627+
} catch (Throwable t) {
628+
LOG.error("", t);
629+
}
630+
631+
LOG.info("graceful shutdown finished");
632+
}
587633
}

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ private boolean isColocated() {
102102
public long getColocatedBeId(String clusterId) throws ComputeGroupException {
103103
CloudSystemInfoService infoService = ((CloudSystemInfoService) Env.getCurrentSystemInfo());
104104
List<Backend> bes = infoService.getBackendsByClusterId(clusterId).stream()
105-
.filter(be -> !be.isQueryDisabled()).collect(Collectors.toList());
105+
.filter(be -> be.isQueryAvailable()).collect(Collectors.toList());
106106
String clusterName = infoService.getClusterNameByClusterId(clusterId);
107107
if (bes.isEmpty()) {
108108
LOG.warn("failed to get available be, cluster: {}-{}", clusterName, clusterId);
@@ -420,11 +420,7 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) throws Compu
420420
List<Backend> availableBes = new ArrayList<>();
421421
List<Backend> decommissionAvailBes = new ArrayList<>();
422422
for (Backend be : clusterBes) {
423-
long lastUpdateMs = be.getLastUpdateMs();
424-
long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis());
425-
// be core or restart must in heartbeat_interval_second
426-
if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
427-
&& !be.isSmoothUpgradeSrc()) {
423+
if (be.isQueryAvailable() && !be.isSmoothUpgradeSrc()) {
428424
if (be.isDecommissioned()) {
429425
decommissionAvailBes.add(be);
430426
} else {

fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,11 @@ public class FeConstants {
3636

3737
public static int checkpoint_interval_second = 60; // 1 minutes
3838

39-
// dpp version
40-
public static String dpp_version = "3_2_0";
41-
4239
// bloom filter false positive probability
4340
public static double default_bloom_filter_fpp = 0.05;
4441

4542
// set to true to skip some step when running FE unit test
4643
public static boolean runningUnitTest = false;
47-
// use to set some mocked values for FE unit test
48-
public static Object unitTestConstant = null;
4944

5045
// set to false to disable internal schema db
5146
public static boolean enableInternalSchemaDb = true;
@@ -68,29 +63,8 @@ public class FeConstants {
6863
// use for copy into test
6964
public static boolean disablePreHeat = false;
7065

71-
public static final String FS_PREFIX_S3 = "s3";
72-
public static final String FS_PREFIX_S3A = "s3a";
73-
public static final String FS_PREFIX_S3N = "s3n";
74-
public static final String FS_PREFIX_OSS = "oss";
75-
public static final String FS_PREFIX_GCS = "gs";
76-
public static final String FS_PREFIX_BOS = "bos";
77-
public static final String FS_PREFIX_COS = "cos";
78-
public static final String FS_PREFIX_COSN = "cosn";
79-
public static final String FS_PREFIX_LAKEFS = "lakefs";
80-
public static final String FS_PREFIX_OBS = "obs";
81-
public static final String FS_PREFIX_OFS = "ofs";
82-
public static final String FS_PREFIX_GFS = "gfs";
83-
public static final String FS_PREFIX_JFS = "jfs";
84-
public static final String FS_PREFIX_HDFS = "hdfs";
85-
public static final String FS_PREFIX_VIEWFS = "viewfs";
86-
public static final String FS_PREFIX_FILE = "file";
87-
8866
public static final String INTERNAL_DB_NAME = "__internal_schema";
8967
public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME = "cloud_cache_hotspot";
9068

9169
public static String METADATA_FAILURE_RECOVERY_KEY = "metadata_failure_recovery";
92-
93-
public static String CLOUD_RETRY_E230 = "E-230";
94-
95-
public static String BUILT_IN_STORAGE_VAULT_NAME = "built_in_storage_vault";
9670
}

0 commit comments

Comments
 (0)