Skip to content

Commit

Permalink
decommission skip leaky tablets
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 committed Oct 24, 2024
1 parent 15803e8 commit 6f3c533
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 14 deletions.
11 changes: 10 additions & 1 deletion fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,16 @@ public class Config extends ConfigBase {
* For performance based considerations, better not set a very high value for this.
*/
@ConfField(mutable = true, masterOnly = true)
public static int decommission_tablet_check_threshold = 5000;
public static int decommission_tablet_check_threshold = 50000;

/**
* When decommission a backend, need to migrate all its tablets to other backends.
* But there maybe some leaky tablets due to forgetting to deleting them from TabletInvertIndex.
* They are not in use. Decommissio can skip migrating them.
* For safety, decommission wait for a period after founding leaky tablets.
*/
@ConfField(mutable = true, masterOnly = true)
public static int decommission_skip_leaky_tablet_second = 3600 * 5;

/**
* Decommission a tablet need to wait all the previous txns finished.
Expand Down
130 changes: 118 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.analysis.ModifyBackendHostNameClause;
import org.apache.doris.analysis.ModifyBrokerClause;
import org.apache.doris.analysis.ModifyFrontendHostNameClause;
import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MysqlCompatibleDatabase;
Expand All @@ -39,11 +40,13 @@
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
Expand All @@ -60,6 +63,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/*
Expand All @@ -71,6 +76,9 @@
public class SystemHandler extends AlterHandler {
private static final Logger LOG = LogManager.getLogger(SystemHandler.class);

// backendId -> tabletId -> checkTime
private Map<Long, Map<Long, Long>> backendLeakyTablets = Maps.newHashMap();

public SystemHandler() {
super("system");
}
Expand All @@ -86,17 +94,24 @@ protected void runAfterCatalogReady() {
// check all decommissioned backends, if there is no available tablet on that backend, drop it.
private void runAlterJobV2() {
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
backendLeakyTablets.entrySet().removeIf(entry -> {
long beId = entry.getKey();
Backend backend = systemInfoService.getBackend(beId);
return backend == null || !backend.isDecommissioned();
});
// check if decommission is finished
for (Long beId : systemInfoService.getAllBackendIds(false)) {
Backend backend = systemInfoService.getBackend(beId);
if (backend == null || !backend.isDecommissioned()) {
continue;
}

List<Long> backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId);
AtomicInteger totalTabletNum = new AtomicInteger(0);
List<Long> sampleTablets = Lists.newArrayList();
List<Long> sampleLeakyTablets = Lists.newArrayList();
boolean removedTablets = checkRemoveTablets(beId, 10, sampleTablets, sampleLeakyTablets, totalTabletNum);
long walNum = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds) && walNum == 0) {
if (Config.drop_backend_after_decommission && removedTablets && walNum == 0) {
try {
systemInfoService.dropBackend(beId);
LOG.info("no available tablet on decommission backend {}, drop it", beId);
Expand All @@ -107,8 +122,9 @@ private void runAlterJobV2() {
continue;
}

LOG.info("backend {} lefts {} replicas to decommission: {}{}", beId, backendTabletIds.size(),
backendTabletIds.subList(0, Math.min(10, backendTabletIds.size())),
LOG.info("backend {} lefts {} replicas to decommission: normal tablets {}{}{}",
beId, totalTabletNum.get(), sampleTablets,
sampleLeakyTablets.isEmpty() ? "" : "; maybe leaky tablets " + sampleLeakyTablets,
walNum > 0 ? "; and has " + walNum + " unfinished WALs" : "");
}
}
Expand Down Expand Up @@ -194,18 +210,108 @@ public synchronized void process(String rawSql, List<AlterClause> alterClauses,
* check if the specified backends can be dropped
* 1. backend does not have any tablet.
* 2. all tablets in backend have been recycled.
*
* return at most 10 sampleTablets
*
*/
private boolean checkTablets(Long beId, List<Long> backendTabletIds) {
private boolean checkRemoveTablets(long beId, int sampleLimit, List<Long> sampleTablets,
List<Long> sampleLeakyTablets, AtomicInteger totalTabletNum) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
List<Long> backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId);
totalTabletNum.set(backendTabletIds.size());
if (backendTabletIds.isEmpty()) {
return true;
}
if (backendTabletIds.size() < Config.decommission_tablet_check_threshold
&& Env.getCurrentRecycleBin().allTabletsInRecycledStatus(backendTabletIds)) {
LOG.info("tablet size is {}, all tablets on decommissioned backend {} have been recycled,"
+ " so this backend will be dropped immediately", backendTabletIds.size(), beId);
return true;
// if too many tablets, no check for efficiency
if (backendTabletIds.size() > Config.decommission_tablet_check_threshold) {
return false;
}
// dbId -> tableId -> partitionId -> tablet list
Map<Long, Map<Long, Map<Long, List<Long>>>> tabletsMap = Maps.newHashMap();
List<TabletMeta> tabletMetaList = invertedIndex.getTabletMetaList(backendTabletIds);
for (int i = 0; i < backendTabletIds.size(); i++) {
long tabletId = backendTabletIds.get(i);
TabletMeta tabletMeta = tabletMetaList.get(i);
if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
continue;
}
tabletsMap.computeIfAbsent(tabletMeta.getDbId(), k -> Maps.newHashMap())
.computeIfAbsent(tabletMeta.getTableId(), k -> Maps.newHashMap())
.computeIfAbsent(tabletMeta.getPartitionId(), k -> Lists.newArrayList())
.add(tabletId);
}
return false;
InternalCatalog catalog = Env.getCurrentInternalCatalog();
CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
long now = System.currentTimeMillis();
Map<Long, Long> leakyTablets = Maps.newHashMap();
Map<Long, Long> lastLeakyTablets = backendLeakyTablets.computeIfAbsent(beId, k -> Maps.newHashMap());
backendLeakyTablets.put(beId, leakyTablets);
Consumer<List<Long>> addPartitionLeakyTablets = tabletsOfPartition -> {
tabletsOfPartition.forEach(tabletId -> {
leakyTablets.put(tabletId, lastLeakyTablets.getOrDefault(tabletId, now));
if (sampleLeakyTablets.size() < sampleLimit) {
sampleLeakyTablets.add(tabletId);
}
});
};
Consumer<Map<Long, List<Long>>> addTableLeakyTablets = tabletsOfTable -> {
tabletsOfTable.values().forEach(addPartitionLeakyTablets);
};
Consumer<Map<Long, Map<Long, List<Long>>>> addDbLeakyTablets = tabletsOfDb -> {
tabletsOfDb.values().forEach(addTableLeakyTablets);
};
OUTER:
for (Map.Entry<Long, Map<Long, Map<Long, List<Long>>>> dbEntry : tabletsMap.entrySet()) {
long dbId = dbEntry.getKey();
Database db = catalog.getDbNullable(dbId);
if (db == null) {
if (!recycleBin.isRecycleDatabase(dbId)) {
addDbLeakyTablets.accept(dbEntry.getValue());
}
continue;
}

for (Map.Entry<Long, Map<Long, List<Long>>> tableEntry : dbEntry.getValue().entrySet()) {
long tableId = tableEntry.getKey();
Table tbl = db.getTableNullable(tableId);
if (tbl == null || !tbl.isManagedTable()) {
if (!recycleBin.isRecycleTable(dbId, tableId)) {
addTableLeakyTablets.accept(tableEntry.getValue());
}
continue;
}

OlapTable olapTable = (OlapTable) tbl;
olapTable.readLock();
try {
for (Map.Entry<Long, List<Long>> partitionEntry : tableEntry.getValue().entrySet()) {
long partitionId = partitionEntry.getKey();
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
if (!recycleBin.isRecyclePartition(dbId, tableId, partitionId)) {
addPartitionLeakyTablets.accept(partitionEntry.getValue());
}
continue;
}
// at present, the leaky tablets are belong to a not-found partition.
// so if a partition is in a table, no more check this partition really contains this tablet,
// just treat this tablet as no leaky.
for (long tabletId : partitionEntry.getValue()) {
sampleTablets.add(tabletId);
// for efficiency, no visit all tablets
if (sampleTablets.size() >= sampleLimit) {
break OUTER;
}
}
}
} finally {
olapTable.readUnlock();
}
}
}

long skipLeakyTs = now - Config.decommission_skip_leaky_tablet_second * 1000L;
return sampleTablets.isEmpty() && leakyTablets.values().stream().allMatch(ts -> ts < skipLeakyTs);
}

private List<Backend> checkDecommission(DecommissionBackendClause decommissionBackendClause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.SlidingWindowCounter;
import org.apache.doris.mysql.privilege.Auth;
Expand Down Expand Up @@ -138,6 +139,9 @@ public long getAllWalQueueSize(Backend backend) {
}

private long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
if (FeConstants.runningUnitTest) {
return 0;
}
PGetWalQueueSizeResponse response = null;
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
long size = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,25 @@
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.RebalancerTestUtil;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.function.Supplier;

public class DecommissionBackendTest extends TestWithFeService {
@Override
Expand All @@ -58,7 +63,7 @@ public void beforeClass() {

@Override
protected void beforeCreatingConnectContext() throws Exception {
FeConstants.default_scheduler_interval_millisecond = 1000;
FeConstants.default_scheduler_interval_millisecond = 100;
Config.tablet_checker_interval_ms = 100;
Config.tablet_schedule_interval_ms = 100;
Config.tablet_repair_delay_factor_second = 1;
Expand Down Expand Up @@ -337,5 +342,90 @@ public void testDecommissionBackendWithMTMV() throws Exception {
// 6. add backend
addNewBackend();
Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());

dropDatabase("db4");
}

@Test
public void testDecommissionBackendWithLeakyTablets() throws Exception {
// 1. create connect context
connectContext = createDefaultCtx();

Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());

// 2. create database db5
createDatabase("db5");
System.out.println(Env.getCurrentInternalCatalog().getDbNames());

// 3. create table tbl1
createTable("create table db5.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
RebalancerTestUtil.updateReplicaPathHash();

Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("db5");
OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
Assertions.assertNotNull(tbl);

Partition partition = tbl.getPartitions().iterator().next();
Tablet tablet = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
.iterator().next().getTablets().iterator().next();
Assertions.assertNotNull(tablet);
Backend srcBackend = Env.getCurrentSystemInfo().getBackend(tablet.getReplicas().get(0).getBackendId());
Assertions.assertNotNull(srcBackend);

String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\"";
AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr);
Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt);

Assertions.assertTrue(srcBackend.isDecommissioned());

TabletInvertedIndex invertIndex = Env.getCurrentInvertedIndex();
long fakeTabletId = 123123123L;
TabletMeta fakeTabletMeta = new TabletMeta(1234567L, 1234568L, 1234569L, 1234570L, 0, TStorageMedium.HDD);
Replica fakeReplica = new Replica(1234571L, srcBackend.getId(), 0, Replica.ReplicaState.NORMAL);

Supplier<List<Long>> getNotInRecycleBinTablets = () -> {
List<Long> tabletIds = Lists.newArrayList();
for (long tabletId : invertIndex.getTabletIdsByBackendId(srcBackend.getId())) {
TabletMeta tabletMeta = invertIndex.getTabletMeta(tabletId);
if (tabletMeta == null || !Env.getCurrentRecycleBin().isRecyclePartition(
tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
tabletIds.add(tabletId);
}
}
return tabletIds;
};
try {
Config.decommission_skip_leaky_tablet_second = 3600;

invertIndex.addTablet(fakeTabletId, fakeTabletMeta);
invertIndex.addReplica(fakeTabletId, fakeReplica);
long startTimestamp = System.currentTimeMillis();

List<Long> expectTabletIds = Lists.newArrayList(fakeTabletId);
while (System.currentTimeMillis() - startTimestamp < 90000
&& !expectTabletIds.equals(getNotInRecycleBinTablets.get())) {
Thread.sleep(1000);
}

Assertions.assertEquals(expectTabletIds, getNotInRecycleBinTablets.get());
Thread.sleep(5000);
Assertions.assertEquals(expectTabletIds, getNotInRecycleBinTablets.get());
Assertions.assertNotNull(Env.getCurrentSystemInfo().getBackend(srcBackend.getId()));
// skip leaky tablets, decommission succ
Config.decommission_skip_leaky_tablet_second = 1;
Thread.sleep(4000);
Assertions.assertNull(Env.getCurrentSystemInfo().getBackend(srcBackend.getId()));
} finally {
invertIndex.deleteTablet(fakeTabletId);
}

Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());

// 6. add backend
addNewBackend();
Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());

dropDatabase("db5");
}

}

0 comments on commit 6f3c533

Please sign in to comment.