Skip to content

Commit

Permalink
improve for load cluster from credis
Browse files Browse the repository at this point in the history
  • Loading branch information
qifanwang committed Jan 23, 2025
1 parent e070256 commit 38cd0cb
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class CheckConfigBean extends AbstractConfigBean {

public static final String KEY_REDIS_CONF_CHECK_INTERVAL = "redis.conf.check.interval";

public static final String KEY_CREDIS_CLUSTER_REFRESH_INTERVAL = "credis.cluster.refresh.interval";

public static final String KEY_SENTINEL_QUORUM = "console.sentinel.quorum";

public static final String KEY_SENTINEL_CHECK_INTERVAL = "console.health.sentinel.interval";
Expand Down Expand Up @@ -185,6 +187,10 @@ public int getRedisConfCheckIntervalMilli() {
);
}

public int getCRedisClusterRefreshIntervalMilli() {
return getIntProperty(KEY_CREDIS_CLUSTER_REFRESH_INTERVAL, 60000);
}

public QuorumConfig getDefaultSentinelQuorumConfig() {

String config = getProperty(KEY_SENTINEL_QUORUM, "{}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.ctrip.xpipe.redis.checker.alert.AlertConfig;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DcClusterDelayMarkDown;
import com.ctrip.xpipe.redis.console.config.model.BeaconOrgRoute;
import com.ctrip.xpipe.redis.console.util.HickwallMetricInfo;
import com.ctrip.xpipe.redis.core.config.CoreConfig;
Expand All @@ -21,19 +20,19 @@
public interface ConsoleConfig extends CoreConfig, CheckerConfig, AlertConfig {

String getServerMode();

String getDatasource();

int getConsoleNotifyRetryTimes();

int getConsoleNotifyRetryInterval();

Map<String,String> getMetaservers();

int getConsoleNotifyThreads();

Set<String> getConsoleUserAccessWhiteList();

int getRedisReplicationHealthCheckInterval();

int getClusterHealthCheckInterval();
Expand Down Expand Up @@ -126,10 +125,10 @@ public interface ConsoleConfig extends CoreConfig, CheckerConfig, AlertConfig {

int getCheckerAckTimeoutMilli();

long getMigrationTimeoutMilli();
long getMigrationTimeoutMilli();

long getServletMethodTimeoutMilli();

boolean isRedisConfigCheckMonitorOpen();

String getRedisConfigCheckRules();
Expand Down Expand Up @@ -158,4 +157,6 @@ public interface ConsoleConfig extends CoreConfig, CheckerConfig, AlertConfig {

String getHttpAcceptEncoding();

int getCRedisClusterCacheRefreshIntervalMilli();

}
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ public String getHttpAcceptEncoding() {
return dataCenterConfigBean.getHttpAcceptEncoding();
}

@Override
public int getCRedisClusterCacheRefreshIntervalMilli() {
return checkConfigBean.getCRedisClusterRefreshIntervalMilli();
}

@Override
public boolean getShouldDoAfterNettyClientConnected() {
return dataCenterConfigBean.getDoAfterNettyClientConnected();
Expand Down Expand Up @@ -623,10 +628,10 @@ public void onChange(String key, String oldValue, String newValue) {

protected String getProperty(String key) {
for(AbstractConfigBean configBean : configBeans) {
String val = configBean.getProperty(key);
if(val != null) {
return val;
}
String val = configBean.getProperty(key);
if(val != null) {
return val;
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,9 @@ private CheckCluster fromXPipe(XpipeMeta xpipeMeta, String checkCluster) {
return result;
}

@Override
protected long getIntervalMilli() {
return consoleConfig.getCRedisClusterCacheRefreshIntervalMilli();
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.ctrip.xpipe.redis.console.resources;

import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.api.lifecycle.TopElement;
import com.ctrip.xpipe.api.migration.OuterClientService;
import com.ctrip.xpipe.lifecycle.AbstractLifecycle;
import com.ctrip.xpipe.redis.checker.OuterClientCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.cluster.ConsoleLeaderAware;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestClientException;

Expand All @@ -26,7 +27,7 @@
* only cache current dc active dc one-way clusters
*/
@Component
public class DefaultOuterClientCache extends AbstractLifecycle implements OuterClientCache, TopElement {
public class DefaultOuterClientCache implements ConsoleLeaderAware, OuterClientCache {

private OuterClientService outerClientService;

Expand All @@ -36,18 +37,22 @@ public class DefaultOuterClientCache extends AbstractLifecycle implements OuterC

private TimeBoundCache<Map<String, ClusterInfo>> currentDcClustersCache;

private ScheduledExecutorService scheduled;
private ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
XpipeThreadFactory.create("OuterClientCacheRefreshScheduled"));;

private DynamicDelayPeriodTask refreshTask;

private DynamicDelayPeriodTask refreshCurrentDcTask;

protected Logger logger = LoggerFactory.getLogger(getClass());


public DefaultOuterClientCache(ConsoleConfig config) {
this.outerClientService = OuterClientService.DEFAULT;
this.config = config;
this.clustersCache = new TimeBoundCache<>(() -> 10000 + config.getRedisConfCheckIntervalMilli(),
this.clustersCache = new TimeBoundCache<>(() -> 10000 + getIntervalMilli(),
() -> loadClusters(FoundationService.DEFAULT.getDataCenter()));
this.currentDcClustersCache = new TimeBoundCache<>(() -> 10000 + config.getRedisConfCheckIntervalMilli(),
this.currentDcClustersCache = new TimeBoundCache<>(() -> 10000 + getIntervalMilli(),
() -> loadCurrentDcClusters(FoundationService.DEFAULT.getDataCenter()));
}

Expand Down Expand Up @@ -152,38 +157,70 @@ public InstanceInfo redisMetaToInstanceModel(RedisMeta redisMeta) {
return instance;
}

@Override
protected void doInitialize() throws Exception {
super.doInitialize();
this.scheduled = Executors.newScheduledThreadPool(1,
XpipeThreadFactory.create("OuterClientCacheRefreshScheduled"));
this.refreshTask = new DynamicDelayPeriodTask("OuterClientCacheRefresh", clustersCache::refresh,
config::getRedisConfCheckIntervalMilli, scheduled);
this.refreshCurrentDcTask = new DynamicDelayPeriodTask("OuterClientCurrentDcCacheRefresh", currentDcClustersCache::refresh,
config::getRedisConfCheckIntervalMilli, scheduled);
private synchronized void stopLoadData() throws Exception {

logger.info("[refreshCurrentDcTask][stop]");
if(refreshCurrentDcTask != null){
refreshCurrentDcTask.stop();
}
refreshCurrentDcTask = null;
logger.info("[refreshTask][stop]");
if(refreshTask != null){
refreshTask.stop();
}
refreshTask = null;

}

@Override
protected void doStart() throws Exception {
super.doStart();
this.refreshTask.start();
private synchronized void startLoadData() throws Exception {

if(refreshCurrentDcTask == null){
this.refreshCurrentDcTask = new DynamicDelayPeriodTask("OuterClientCurrentDcCacheRefresh", currentDcClustersCache::refresh,
this::getIntervalMilli, scheduled);
}

this.refreshCurrentDcTask.start();
logger.info("[refreshCurrentDcTask][start]");

if(refreshTask == null){
this.refreshTask = new DynamicDelayPeriodTask("OuterClientCacheRefresh", clustersCache::refresh,
this::getIntervalMilli, scheduled);
}

this.refreshTask.start();

logger.info("[refreshTask][start]");
}

private void doStart() {
logger.info("[doStart]");
try {
startLoadData();
} catch (Throwable th) {
logger.error("[doStart]", th);
}
}

private void doStop() {
try {
stopLoadData();
} catch (Throwable th) {
logger.error("[doStop]", th);
}
}

@Override
protected void doStop() throws Exception {
super.doStop();
this.refreshTask.stop();
this.refreshCurrentDcTask.stop();
public void isleader() {
doStop();
doStart();
}

@Override
protected void doDispose() throws Exception {
super.doDispose();
this.scheduled.shutdown();
this.scheduled = null;
this.refreshTask = null;
this.refreshCurrentDcTask = null;
public void notLeader() {
doStop();
}

private int getIntervalMilli() {
return config.getCRedisClusterCacheRefreshIntervalMilli();
}
}

0 comments on commit 38cd0cb

Please sign in to comment.