Skip to content

Commit

Permalink
[Enhancement] set grace period for mv checker (#34850)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <mofei@starrocks.com>
  • Loading branch information
murphyatwork authored Nov 14, 2023
1 parent 536efe4 commit ea7ebdc
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@
package com.starrocks.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.starrocks.alter.AlterJobMgr;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvId;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -39,6 +45,8 @@ public class MVActiveChecker extends FrontendDaemon {

private static final Logger LOG = LogManager.getLogger(MVActiveChecker.class);

private static final Map<MvId, MvActiveInfo> MV_ACTIVE_INFO = Maps.newConcurrentMap();

public MVActiveChecker() {
super("MVActiveChecker", Config.mv_active_checker_interval_seconds * 1000);
}
Expand All @@ -56,10 +64,18 @@ protected void runAfterCatalogReady() {
}

@VisibleForTesting
public void runForTest() {
public void runForTest(boolean clearGrace) {
if (clearGrace) {
clearGracePeriod();
}
process();
}

@VisibleForTesting
private void clearGracePeriod() {
MV_ACTIVE_INFO.clear();
}

private void process() {
Collection<Database> dbs = GlobalStateMgr.getCurrentState().getIdToDb().values();
for (Database db : CollectionUtils.emptyIfNull(dbs)) {
Expand Down Expand Up @@ -88,6 +104,12 @@ public static void tryToActivate(MaterializedView mv) {
return;
}

MvActiveInfo activeInfo = MV_ACTIVE_INFO.get(mv.getMvId());
if (activeInfo != null && activeInfo.isInGracePeriod()) {
return;
}

boolean activeOk = false;
String mvFullName = new TableName(dbName.get(), mv.getName()).toString();
String sql = String.format("ALTER MATERIALIZED VIEW %s active", mvFullName);
try {
Expand All @@ -97,15 +119,68 @@ public static void tryToActivate(MaterializedView mv) {

connect.executeSql(sql);
if (mv.isActive()) {
activeOk = true;
LOG.info("[MVActiveChecker] activate MV {} successfully", mvFullName);
} else {
LOG.warn("[MVActiveChecker] activate MV {} failed", mvFullName);
}
} catch (Exception e) {
LOG.warn("[MVActiveChecker] activate MV {} failed", mvFullName, e);
throw new RuntimeException(e);
} finally {
ConnectContext.remove();
}

if (activeOk) {
MV_ACTIVE_INFO.remove(mv.getMvId());
} else {
if (activeInfo != null) {
activeInfo.next();
} else {
MV_ACTIVE_INFO.put(mv.getMvId(), MvActiveInfo.firstFailure());
}
}
}

public static class MvActiveInfo {
// Use 2 ** N as failure backoff, and set the max to 30 minutes
public static final long MAX_BACKOFF_MINUTES = 60;
private static final long BACKOFF_BASE = 2;
private static final long MAX_BACKOFF_TIMES = (long) (Math.log(MAX_BACKOFF_MINUTES) / Math.log(BACKOFF_BASE));

private LocalDateTime nextActive;
private int failureTimes = 0;

public static MvActiveInfo firstFailure() {
MvActiveInfo info = new MvActiveInfo();
info.next();
return info;
}

/**
* If in grace period, it should not activate the mv
*/
public boolean isInGracePeriod() {
LocalDateTime now = LocalDateTime.now(TimeUtils.getSystemTimeZone().toZoneId());
return now.isBefore(nextActive);
}

public LocalDateTime getNextActive() {
return nextActive;
}

public void next() {
LocalDateTime lastActive = LocalDateTime.now(TimeUtils.getSystemTimeZone().toZoneId());
this.failureTimes++;
this.nextActive = lastActive.plus(failureBackoff(failureTimes));
}

private Duration failureBackoff(int failureTimes) {
if (failureTimes >= MAX_BACKOFF_TIMES) {
return Duration.ofMinutes(MAX_BACKOFF_MINUTES);
}
long expBackoff = (long) Math.pow(BACKOFF_BASE, failureTimes);
return Duration.ofMinutes(expBackoff);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.starrocks.catalog.Column;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.scheduler.MVActiveChecker;
import com.starrocks.server.GlobalStateMgr;
Expand All @@ -37,6 +38,8 @@
import org.junit.BeforeClass;
import org.junit.Test;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -252,14 +255,14 @@ public void testActiveChecker() throws Exception {
starRocksAssert.dropTable(baseTableName);
Assert.assertFalse(mv.isActive());
Assert.assertEquals("base-table dropped: base_tbl_active", mv.getInactiveReason());
checker.runForTest();
checker.runForTest(true);
Assert.assertFalse(mv.isActive());
Assert.assertEquals("base-table dropped: base_tbl_active", mv.getInactiveReason());

// create the table again, and activate it
connectContext.setThreadLocalInfo();
starRocksAssert.withTable(createTableSql);
checker.runForTest();
checker.runForTest(true);
Assert.assertTrue(mv.isActive());

// activate before refresh
Expand All @@ -274,10 +277,67 @@ public void testActiveChecker() throws Exception {
// manually set to inactive
mv.setInactiveAndReason(AlterJobMgr.MANUAL_INACTIVE_MV_REASON);
Assert.assertFalse(mv.isActive());
checker.runForTest();
checker.runForTest(true);
Assert.assertFalse(mv.isActive());
Assert.assertEquals(AlterJobMgr.MANUAL_INACTIVE_MV_REASON, mv.getInactiveReason());

checker.start();
starRocksAssert.dropTable(baseTableName);
starRocksAssert.dropMaterializedView(mv.getName());
}

@Test
public void testActiveGracePeriod() throws Exception {
PlanTestBase.mockDml();
MVActiveChecker checker = GlobalStateMgr.getCurrentState().getMvActiveChecker();
checker.setStop();

String baseTableName = "base_tbl_active";
String createTableSql =
"create table " + baseTableName + " ( k1 int, k2 int) properties('replication_num'='1')";
starRocksAssert.withTable(createTableSql);
starRocksAssert.withMaterializedView("create materialized view mv_active " +
" refresh manual as select * from base_tbl_active");
MaterializedView mv = (MaterializedView) starRocksAssert.getTable(connectContext.getDatabase(), "mv_active");
Assert.assertTrue(mv.isActive());

// drop the base table and try to activate it
starRocksAssert.dropTable(baseTableName);
Assert.assertFalse(mv.isActive());
Assert.assertEquals("base-table dropped: base_tbl_active", mv.getInactiveReason());
checker.runForTest(false);
for (int i = 0; i < 10; i++) {
checker.runForTest(false);
Assert.assertFalse(mv.isActive());
}

// create the table, but in grace period, could not activate it
connectContext.setThreadLocalInfo();
starRocksAssert.withTable(createTableSql);
for (int i = 0; i < 10; i++) {
checker.runForTest(false);
Assert.assertFalse(mv.isActive());
}

// clear the grace period and active it again
checker.runForTest(true);
Assert.assertTrue(mv.isActive());

checker.start();
starRocksAssert.dropTable(baseTableName);
}

@Test
public void testActiveCheckerBackoff() {
MVActiveChecker.MvActiveInfo activeInfo = MVActiveChecker.MvActiveInfo.firstFailure();
Assert.assertTrue(activeInfo.isInGracePeriod());

LocalDateTime start = LocalDateTime.now(TimeUtils.getSystemTimeZone().toZoneId());
for (int i = 0; i < 10; i++) {
activeInfo.next();
}
Assert.assertTrue(activeInfo.isInGracePeriod());
Duration d = Duration.between(start, activeInfo.getNextActive());
Assert.assertEquals(d.toMinutes(), MVActiveChecker.MvActiveInfo.MAX_BACKOFF_MINUTES);
}
}

0 comments on commit ea7ebdc

Please sign in to comment.