Skip to content

Commit

Permalink
Merge pull request #927 from wangqifan/bugfix/currentMeta_update
Browse files Browse the repository at this point in the history
update current when keeper master update
  • Loading branch information
LanternLee authored Jan 20, 2025
2 parents a1edbe9 + a71a325 commit 28a0ac0
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ protected void makeKeepersOk(Long clusterDbId, Long shardDbId, Pair<String, Inte
currentMetaManager.getClusterRouteByDcId(currentMetaManager.getClusterMeta(clusterDbId).getActiveDc(), clusterDbId),
keyedObjectPool, 1000, 1, scheduled, executors);
try {
currentMetaManager.setKeeperMaster(clusterDbId, shardDbId, newMaster.getKey(), newMaster.getValue());
// 必须先改 meta, 再修改 keepr, 不然可能被 KeeperStateAlignChecker reset 回去。
job.execute().get(waitTimeoutSeconds/2, TimeUnit.SECONDS);
logger.debug("[doRun][set]cluster_{}, shard_{}, {}", clusterDbId, shardDbId, newMaster);
executionLog.info("[makeKeepersOk]success");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error("[makeKeepersOk]" + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import com.ctrip.xpipe.redis.meta.server.meta.DcMetaCache;
import com.ctrip.xpipe.tuple.Pair;
import com.ctrip.xpipe.utils.StringUtil;

import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -33,6 +34,7 @@ public AbstractKeeperMasterChooser(Long clusterDbId, Long shardDbId, DcMetaCache

@Override
protected void work() {
String dcName = dcMetaCache.getPrimaryDc(clusterDbId, shardDbId);
Pair<String, Integer> keeperMaster = chooseKeeperMaster();
logger.debug("[doRun]cluster_{}, shard_{}, {}", clusterDbId, shardDbId, keeperMaster);
Pair<String, Integer> currentMaster = currentMetaManager.getKeeperMaster(clusterDbId, shardDbId);
Expand All @@ -41,7 +43,7 @@ protected void work() {
return;
}
logger.debug("[doRun][set]cluster_{}, shard_{}, {}", clusterDbId, shardDbId, keeperMaster);
currentMetaManager.setKeeperMaster(clusterDbId, shardDbId, keeperMaster.getKey(), keeperMaster.getValue());
currentMetaManager.setKeeperMaster(clusterDbId, shardDbId, keeperMaster.getKey(), keeperMaster.getValue(), dcName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public interface CurrentMetaManager extends Observable {

void setKeeperMaster(Long clusterDbId, Long shardDbId, String addr);

void setKeeperMaster(Long clusterDbId, Long shardDbId, String ip, int port, String expectedPrimaryDc);

void setKeeperMaster(Long clusterDbId, Long shardDbId, String ip, int port);

void setApplierMasterAndNotify(Long clusterDbId, Long shardDbId, String ip, int port, String sids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.ctrip.xpipe.tuple.Pair;
import com.ctrip.xpipe.utils.IpUtils;
import com.ctrip.xpipe.utils.ObjectUtils;
import com.ctrip.xpipe.utils.StringUtil;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -534,8 +535,18 @@ public String getSrcSids(Long clusterDbId, Long shardDbId) {

@Override
public void setKeeperMaster(Long clusterDbId, Long shardDbId, String ip, int port) {


setKeeperMaster(clusterDbId, shardDbId, ip, port, null);
}

@Override
public void setKeeperMaster(Long clusterDbId, Long shardDbId, String ip, int port, String expectedPrimaryDc) {
String dcName = dcMetaCache.getPrimaryDc(clusterDbId, shardDbId);
if(expectedPrimaryDc != null && !StringUtil.trimEquals(dcName, expectedPrimaryDc)) {
// 如果 expectedDc 为null, 不进行校验。发生了dr切换,禁止修改。
// 如果任务基于 PrimaryDc 来修改 keeper meta 就要校验检测过程是否 dc 切换。
logger.info("[setKeeperMaster][rejected] primaryDc:{}, expectedPrimaryDc:{}", dcName, expectedPrimaryDc);
return;
}
Pair<String, Integer> keeperMaster = new Pair<String, Integer>(ip, port);
if(currentMeta.setKeeperMaster(clusterDbId, shardDbId, keeperMaster)){
logger.info("[setKeeperMaster]cluster_{},shard_{},{}:{}", clusterDbId, shardDbId, ip, port);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package com.ctrip.xpipe.redis.meta.server.keeper.keepermaster.impl;

import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.entity.ShardMeta;
import com.ctrip.xpipe.redis.meta.server.cluster.impl.DefaultSlotManager;
import com.ctrip.xpipe.redis.meta.server.meta.impl.DefaultCurrentMetaManager;
import com.ctrip.xpipe.tuple.Pair;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import java.util.Arrays;
import java.util.HashSet;

import static org.mockito.Mockito.when;

Expand All @@ -13,8 +23,14 @@
*/
public class DefaultDcKeeperMasterChooserTest extends AbstractDcKeeperMasterChooserTest{

private DefaultDcKeeperMasterChooser defaultDcKeeperMasterChooser;

private DefaultDcKeeperMasterChooser defaultDcKeeperMasterChooser;

@InjectMocks
private DefaultCurrentMetaManager metaManager;

@Mock
private DefaultSlotManager slotManager;

@Before
public void beforeDefaultDcKeeperMasterChooserTest() throws Exception{

Expand Down Expand Up @@ -47,5 +63,66 @@ public void testMasterChooserAlgorithm(){

Assert.assertTrue(defaultDcKeeperMasterChooser.getKeeperMasterChooserAlgorithm() instanceof HeteroDownStreamDcKeeperMasterChooserAlgorithm);
}

@Test
public void testWork() throws Exception {

ClusterMeta clusterMeta = new ClusterMeta(clusterId)
.setDbId(clusterDbId)
.setType("ONE_WAY");
clusterMeta.addShard(new ShardMeta(shardId)
.setDbId(shardDbId));
when(dcMetaCache.getClusters()).thenReturn(new HashSet<>(Arrays.asList(clusterMeta)));
when(dcMetaCache.getClusterMeta(clusterDbId)).thenReturn(clusterMeta);
metaManager.setDcMetaCache(dcMetaCache);
when(slotManager.getSlotIdByKey(clusterDbId)).thenReturn(1);
metaManager.setSlotManager(slotManager);
metaManager.addSlot(1);

defaultDcKeeperMasterChooser = new DefaultDcKeeperMasterChooser(clusterDbId, shardDbId,
multiDcService, dcMetaCache, metaManager, scheduled,
getXpipeNettyClientKeyedObjectPool());

Pair<String, Integer> oldMaster = new Pair<>("127.0.0.1", 8080);
try {
metaManager.setKeeperMaster(clusterDbId, shardDbId, oldMaster.getKey(), oldMaster.getValue());
} catch (NullPointerException e) {
// stateHandlers 为 null
}

Pair<String, Integer> newMaster = metaManager.getKeeperMaster(clusterDbId, shardDbId);
when(dcMetaCache.getPrimaryDc(clusterDbId, shardDbId))
.thenReturn("ptjq")
.thenReturn("ptjq")
.thenReturn("ptoy");
when(dcMetaCache.isCurrentShardParentCluster(clusterDbId, shardDbId)).thenReturn(true);
when(dcMetaCache.isCurrentDcBackUp(clusterDbId)).thenReturn(true);

when(multiDcService.getActiveKeeper("ptjq", clusterDbId, shardDbId))
.thenReturn(new KeeperMeta().setIp("10.1.12.2").setPort(8081));

defaultDcKeeperMasterChooser.work();

newMaster = metaManager.getKeeperMaster(clusterDbId, shardDbId);

Assert.assertEquals(oldMaster, newMaster);

when(dcMetaCache.getPrimaryDc(clusterDbId, shardDbId))
.thenReturn("ptjq")
.thenReturn("ptjq")
.thenReturn("ptjq");

try {
defaultDcKeeperMasterChooser.work();
} catch (NullPointerException exception) {
// stateHandlers 为 null
}

newMaster = metaManager.getKeeperMaster(clusterDbId, shardDbId);

Assert.assertNotEquals(oldMaster, newMaster);


}

}

0 comments on commit 28a0ac0

Please sign in to comment.