Skip to content

Commit

Permalink
[fix](cloud) Fix cloud decomission and check wal (apache#47187)
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng authored Jan 29, 2025
1 parent fe17d16 commit 60910a4
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,22 @@ private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB> expec
} catch (UserException e) {
LOG.warn("failed to register water shed txn id, decommission be {}", be.getId(), e);
}
be.setDecommissioned(true);
be.setDecommissioning(true);
}
}

if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED) {
// When the synchronization status of the node is "NODE_STATUS_DECOMMISSIONED",
// it indicates that the conditions for decommissioning have
// already been checked in CloudTabletRebalancer.java,
// such as the tablets having been successfully migrated and no remnants of WAL on the backend (BE).
if (!be.isDecommissioned()) {
LOG.warn("impossible status, somewhere has bug, backend: {} status: {}", be, status);
}
be.setDecommissioned(true);
// edit log
Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,46 +483,53 @@ public void checkDecommissionState(Map<String, List<Long>> clusterToBes) {
LOG.info("backend {} not found", beId);
continue;
}
if ((backend.isDecommissioned() && tabletNum == 0 && !backend.isActive())
|| (backend.isDecommissioned() && beList.size() == 1)) {
LOG.info("check decommission be {} state {} tabletNum {} isActive {} beList {}",
backend.getId(), backend.isDecommissioned(), tabletNum, backend.isActive(), beList);
if (!beToDecommissionedTime.containsKey(beId)) {
LOG.info("prepare to notify meta service be {} decommissioned", backend.getId());
Cloud.AlterClusterRequest.Builder builder =
Cloud.AlterClusterRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);

Cloud.ClusterPB.Builder clusterBuilder =
Cloud.ClusterPB.newBuilder();
clusterBuilder.setClusterName(backend.getCloudClusterName());
clusterBuilder.setClusterId(backend.getCloudClusterId());
clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);

Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder();
nodeBuilder.setIp(backend.getHost());
nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);

clusterBuilder.addNodes(nodeBuilder);
builder.setCluster(clusterBuilder);

Cloud.AlterClusterResponse response;
try {
response = MetaServiceProxy.getInstance().alterCluster(builder.build());
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("notify decommission response: {}", response);
}
LOG.info("notify decommission response: {} ", response);
} catch (RpcException e) {
LOG.info("failed to notify decommission", e);
return;
}
beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
if (!backend.isDecommissioning()) {
continue;
}
// here check wal
long walNum = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
LOG.info("check decommissioning be {} state {} tabletNum {} isActive {} beList {}, wal num {}",
backend.getId(), backend.isDecommissioning(), tabletNum, backend.isActive(), beList, walNum);
if ((tabletNum != 0 || backend.isActive() || walNum != 0) && beList.size() != 1) {
continue;
}
if (beToDecommissionedTime.containsKey(beId)) {
continue;
}
LOG.info("prepare to notify meta service be {} decommissioned", backend.getAddress());
Cloud.AlterClusterRequest.Builder builder =
Cloud.AlterClusterRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);

Cloud.ClusterPB.Builder clusterBuilder =
Cloud.ClusterPB.newBuilder();
clusterBuilder.setClusterName(backend.getCloudClusterName());
clusterBuilder.setClusterId(backend.getCloudClusterId());
clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);

Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder();
nodeBuilder.setIp(backend.getHost());
nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);

clusterBuilder.addNodes(nodeBuilder);
builder.setCluster(clusterBuilder);

Cloud.AlterClusterResponse response;
try {
response = MetaServiceProxy.getInstance().alterCluster(builder.build());
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("notify decommission response: {}", response);
continue;
}
LOG.info("notify decommission response: {} ", response);
} catch (RpcException e) {
LOG.warn("failed to notify decommission", e);
continue;
}
beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
}
}
}
Expand Down Expand Up @@ -884,7 +891,7 @@ private boolean getTransferPair(List<Long> bes, Map<Long, List<Tablet>> beToTabl
LOG.info("backend {} not found", be);
continue;
}
if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioned()
if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioning()
&& !backend.isSmoothUpgradeSrc()) {
destBe = be;
minTabletsNum = tabletNum;
Expand All @@ -898,7 +905,7 @@ private boolean getTransferPair(List<Long> bes, Map<Long, List<Tablet>> beToTabl
LOG.info("backend {} not found", be);
continue;
}
if (backend.isDecommissioned() && tabletNum > 0) {
if (backend.isDecommissioning() && tabletNum > 0) {
srcBe = be;
srcDecommissioned = true;
break;
Expand Down Expand Up @@ -967,7 +974,7 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet
for (Long be : bes) {
long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size();
Backend backend = cloudSystemInfoService.getBackend(be);
if (backend != null && !backend.isDecommissioned()) {
if (backend != null && !backend.isDecommissioning()) {
beNum++;
}
totalTabletsNum += tabletNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.SlidingWindowCounter;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
Expand Down Expand Up @@ -128,6 +129,11 @@ private boolean isPreviousWalFinished(long tableId, List<Long> aliveBeIds) {
}

public long getAllWalQueueSize(Backend backend) {
long getAllWalQueueSizeDP = DebugPointUtil.getDebugParamOrDefault("FE.GET_ALL_WAL_QUEUE_SIZE", -1L);
if (getAllWalQueueSizeDP > 0) {
LOG.info("backend id:" + backend.getHost() + ",use dp all wal size:" + getAllWalQueueSizeDP);
return getAllWalQueueSizeDP;
}
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
.setTableId(-1)
.build();
Expand Down
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class Backend implements Writable {
@SerializedName("isDecommissioned")
private AtomicBoolean isDecommissioned;

private AtomicBoolean isDecommissioning = new AtomicBoolean(false);

// rootPath -> DiskInfo
@SerializedName("disksRef")
private volatile ImmutableMap<String, DiskInfo> disksRef;
Expand Down Expand Up @@ -404,6 +406,14 @@ public boolean setDecommissioned(boolean isDecommissioned) {
return false;
}

public boolean setDecommissioning(boolean isDecommissioning) {
if (this.isDecommissioning.compareAndSet(!isDecommissioning, isDecommissioning)) {
LOG.warn("{} set decommissioning: {}", this.toString(), isDecommissioning);
return true;
}
return false;
}

public void setHost(String host) {
this.host = host;
}
Expand Down Expand Up @@ -490,6 +500,10 @@ public boolean isDecommissioned() {
return this.isDecommissioned.get();
}

public boolean isDecommissioning() {
return this.isDecommissioning.get();
}

public boolean isQueryAvailable() {
return isAlive() && !isQueryDisabled() && !isShutDown.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2462,15 +2462,19 @@ class Suite implements GroovyInterceptable {
}
}

def get_cluster = { be_unique_id ->
def get_cluster = { be_unique_id , MetaService ms=null->
def jsonOutput = new JsonOutput()
def map = [instance_id: "${instance_id}", cloud_unique_id: "${be_unique_id}" ]
def js = jsonOutput.toJson(map)
log.info("get cluster req: ${js} ".toString())

def add_cluster_api = { request_body, check_func ->
httpTest {
endpoint context.config.metaServiceHttpAddress
if (ms) {
endpoint ms.host+':'+ms.httpPort
} else {
endpoint context.config.metaServiceHttpAddress
}
uri "/MetaService/http/get_cluster?token=${token}"
body request_body
check check_func
Expand Down Expand Up @@ -2643,7 +2647,7 @@ class Suite implements GroovyInterceptable {
}
}

def d_node = { be_unique_id, ip, port, cluster_name, cluster_id ->
def d_node = { be_unique_id, ip, port, cluster_name, cluster_id, MetaService ms=null ->
def jsonOutput = new JsonOutput()
def clusterInfo = [
type: "COMPUTE",
Expand All @@ -2663,7 +2667,11 @@ class Suite implements GroovyInterceptable {

def d_cluster_api = { request_body, check_func ->
httpTest {
endpoint context.config.metaServiceHttpAddress
if (ms) {
endpoint ms.host+':'+ms.httpPort
} else {
endpoint context.config.metaServiceHttpAddress
}
uri "/MetaService/http/decommission_node?token=${token}"
body request_body
check check_func
Expand Down
Loading

0 comments on commit 60910a4

Please sign in to comment.