Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
heartbeat_result.backend_info.__set_version(get_short_version());
heartbeat_result.backend_info.__set_be_start_time(_be_epoch);
heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
// If be is gracefully stop, then k_doris_exist is set to true
heartbeat_result.backend_info.__set_is_shutdown(doris::k_doris_exit);
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,8 @@ DEFINE_Int32(partition_topn_partition_threshold, "1024");

DEFINE_Int32(fe_expire_duration_seconds, "60");

DEFINE_Int32(grace_shutdown_wait_seconds, "120");

#ifdef BE_TEST
// test s3
DEFINE_String(test_s3_resource, "resource");
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,11 @@ DECLARE_Int32(partition_topn_partition_threshold);
// as an abnormal fe, this will cause be to cancel this fe's related query.
DECLARE_Int32(fe_expire_duration_seconds);

// If use stop_be.sh --grace, then BE has to wait all running queries to stop to avoiding running query failure
// , but if the waiting time exceed the limit, then be will exit directly.
// During this period, FE will not send any queries to BE and waiting for all running queries to stop.
DECLARE_Int32(grace_shutdown_wait_seconds);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
20 changes: 20 additions & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <utility>

#include "common/config.h"
#include "runtime/fragment_mgr.h"
#include "runtime/frontend_info.h"
#include "time.h"
#include "util/debug_util.h"
Expand Down Expand Up @@ -135,4 +136,23 @@ std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_running_frontends() {
return res;
}

void ExecEnv::wait_for_all_tasks_done() {
// For graceful shutdown, need to wait for all running queries to stop
int32_t wait_seconds_passed = 0;
while (true) {
int num_queries = _fragment_mgr->running_query_num();
if (num_queries < 1) {
break;
}
if (wait_seconds_passed > doris::config::grace_shutdown_wait_seconds) {
LOG(INFO) << "There are still " << num_queries << " queries running, but "
<< wait_seconds_passed << " seconds passed, has to exist now";
break;
}
LOG(INFO) << "There are still " << num_queries << " queries running, waiting...";
sleep(1);
++wait_seconds_passed;
}
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ class ExecEnv {
this->_stream_load_executor = stream_load_executor;
}

void wait_for_all_tasks_done();

void update_frontends(const std::vector<TFrontendInfo>& new_infos);
std::map<TNetworkAddress, FrontendInfo> get_frontends();
std::map<TNetworkAddress, FrontendInfo> get_running_frontends();
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ class FragmentMgr : public RestMonitorIface {

ThreadPool* get_thread_pool() { return _thread_pool.get(); }

int32_t running_query_num() {
std::unique_lock<std::mutex> ctx_lock(_lock);
return _query_ctx_map.size();
}

private:
void cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& state_lock, bool is_pipeline,
Expand Down
3 changes: 3 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,9 @@ int main(int argc, char** argv) {
sleep(3);
}

// For graceful shutdown, need to wait for all running queries to stop
exec_env->wait_for_all_tasks_done();

return 0;
}

Expand Down
7 changes: 6 additions & 1 deletion docs/en/docs/admin-manual/config/be-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1488,4 +1488,9 @@ Indicates how many tablets failed to load in the data directory. At the same tim
#### `brpc_streaming_client_batch_bytes`

* Description: The batch size for sending data by brpc streaming client
* Default value: 262144
* Default value: 262144

#### `grace_shutdown_wait_seconds`

* Description: In cloud native deployment scenario, BE will be add to cluster and remove from cluster very frequently. User's query will fail if there is a fragment is running on the shuting down BE. Users could use stop_be.sh --grace, then BE will wait all running queries to stop to avoiding running query failure, but if the waiting time exceed the limit, then be will exit directly. During this period, FE will not send any queries to BE and waiting for all running queries to stop.
* Default value: 120
7 changes: 6 additions & 1 deletion docs/zh-CN/docs/admin-manual/config/be-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1517,4 +1517,9 @@ load tablets from header failed, failed tablets size: xxx, path=xxx
#### `brpc_streaming_client_batch_bytes`

* 描述: brpc streaming 客户端发送数据时的攒批大小(字节)
* 默认值: 262144
* 默认值: 262144

#### `grace_shutdown_wait_seconds`

* 描述: 在云原生的部署模式下,为了节省资源一个BE 可能会被频繁的加入集群或者从集群中移除。 如果在这个BE 上有正在运行的Query,那么这个Query 会失败。 用户可以使用 stop_be.sh --grace 的方式来关闭一个BE 节点,此时BE 会等待当前正在这个BE 上运行的所有查询都结束才会退出。 同时,在这个时间范围内FE 也不会分发新的query 到这个机器上。 如果超过grace_shutdown_wait_seconds这个阈值,那么BE 也会直接退出,防止一些查询长期不退出导致节点没法快速下掉的情况。
* 默认值: 120
12 changes: 11 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public class Backend implements Writable {
// No need to persist, because only master FE handle heartbeat.
private int heartbeatFailureCounter = 0;

// Not need serialize this field. If fe restart the state is reset to false. Maybe fe will
// send some queries to this BE, it is not an important problem.
private AtomicBoolean isShutDown = new AtomicBoolean(false);

public Backend() {
this.host = "";
this.version = "";
Expand Down Expand Up @@ -330,7 +334,7 @@ public boolean isDecommissioned() {
}

public boolean isQueryAvailable() {
return isAlive() && !isQueryDisabled();
return isAlive() && !isQueryDisabled() && !isShutDown.get();
}

public boolean isScheduleAvailable() {
Expand Down Expand Up @@ -659,6 +663,12 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay)
this.brpcPort = hbResponse.getBrpcPort();
}

if (this.isShutDown.get() != hbResponse.isShutDown()) {
isChanged = true;
LOG.info("{} shutdown state is changed", this.toString());
this.isShutDown.set(hbResponse.isShutDown());
}

if (!this.getNodeRoleTag().value.equals(hbResponse.getNodeRole()) && Tag.validNodeRoleTag(
hbResponse.getNodeRole())) {
isChanged = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable {
private long beStartTime;
private String host;
private String version = "";
@SerializedName(value = "isShutDown")
private boolean isShutDown = false;

public BackendHbResponse() {
super(HeartbeatResponse.Type.BACKEND);
}

public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, long beStartTime,
String version, String nodeRole) {
String version, String nodeRole, boolean isShutDown) {
super(HeartbeatResponse.Type.BACKEND);
this.beId = beId;
this.status = HbStatus.OK;
Expand All @@ -59,6 +61,7 @@ public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long
this.beStartTime = beStartTime;
this.version = version;
this.nodeRole = nodeRole;
this.isShutDown = isShutDown;
}

public BackendHbResponse(long beId, String errMsg) {
Expand Down Expand Up @@ -104,6 +107,10 @@ public String getNodeRole() {
return nodeRole;
}

public boolean isShutDown() {
return isShutDown;
}

@Override
protected void readFields(DataInput in) throws IOException {
super.readFields(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,12 @@ public HeartbeatResponse call() {
if (tBackendInfo.isSetBeNodeRole()) {
nodeRole = tBackendInfo.getBeNodeRole();
}
boolean isShutDown = false;
if (tBackendInfo.isSetIsShutdown()) {
isShutDown = tBackendInfo.isIsShutdown();
}
return new BackendHbResponse(backendId, bePort, httpPort, brpcPort,
System.currentTimeMillis(), beStartTime, version, nodeRole);
System.currentTimeMillis(), beStartTime, version, nodeRole, isShutDown);
} else {
return new BackendHbResponse(backendId, backend.getHost(),
result.getStatus().getErrorMsgs().isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testBackendHbResponseSerialization() throws IOException {
System.out.println(Env.getCurrentEnvJournalVersion());

BackendHbResponse writeResponse = new BackendHbResponse(1L, 1234, 1234, 1234, 1234, 1234, "test",
Tag.VALUE_COMPUTATION);
Tag.VALUE_COMPUTATION, false);

// Write objects to file
File file1 = new File("./BackendHbResponseSerialization");
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/HeartbeatService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct TBackendInfo {
5: optional string version
6: optional i64 be_start_time
7: optional string be_node_role
8: optional bool is_shutdown
}

struct THeartbeatResult {
Expand Down