Skip to content

Commit

Permalink
Add SPI for upgrade checker. (#6217)
Browse files Browse the repository at this point in the history
* Add SPI for upgrade checker.

* For checkstyle
  • Loading branch information
KomachiSion authored Jul 1, 2021
1 parent 4cd6c58 commit eb2644f
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.core.v2.upgrade;

import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;

/**
* Default upgrade checker for self node.
*
* @author xiweng.yy
*/
public class DefaultSelfUpgradeChecker implements SelfUpgradeChecker {

private static final String DEFAULT = "default";

@Override
public String checkType() {
return DEFAULT;
}

@Override
public boolean isReadyToUpgrade(ServiceManager serviceManager, DoubleWriteDelayTaskEngine taskEngine) {
System.out.println("test");
return checkServiceAndInstanceNumber(serviceManager) && checkDoubleWriteStatus(taskEngine);
}

private boolean checkServiceAndInstanceNumber(ServiceManager serviceManager) {
boolean result = serviceManager.getServiceCount() == MetricsMonitor.getDomCountMonitor().get();
result &= serviceManager.getInstanceCount() == MetricsMonitor.getIpCountMonitor().get();
return result;
}

private boolean checkDoubleWriteStatus(DoubleWriteDelayTaskEngine taskEngine) {
return taskEngine.isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.core.v2.upgrade;

import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;

/**
* Upgrade checker for self-node to judge whether current node is ready to upgrade.
*
* @author xiweng.yy
*/
public interface SelfUpgradeChecker {

/**
* Get the check type of this self upgrade checker.
*
* @return type
*/
String checkType();

/**
* Judge whether current node is ready to upgrade.
*
* @param serviceManager service manager for v1 mode.
* @param taskEngine double write task engine
* @return {@code true} if current node is ready to upgrade, otherwise {@code false}
*/
boolean isReadyToUpgrade(ServiceManager serviceManager, DoubleWriteDelayTaskEngine taskEngine);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.core.v2.upgrade;

import com.alibaba.nacos.common.spi.NacosServiceLoader;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* SPI holder for self upgrade checker.
*
* @author xiweng.yy
*/
public class SelfUpgradeCheckerSpiHolder {

private static final SelfUpgradeCheckerSpiHolder INSTANCE = new SelfUpgradeCheckerSpiHolder();

private static final DefaultSelfUpgradeChecker DEFAULT_SELF_UPGRADE_CHECKER = new DefaultSelfUpgradeChecker();

private final Map<String, SelfUpgradeChecker> selfUpgradeCheckerMap;

private SelfUpgradeCheckerSpiHolder() {
Collection<SelfUpgradeChecker> checkers = NacosServiceLoader.load(SelfUpgradeChecker.class);
selfUpgradeCheckerMap = new HashMap<>(checkers.size());
for (SelfUpgradeChecker each : checkers) {
selfUpgradeCheckerMap.put(each.checkType(), each);
}
}

/**
* Find target type self checker.
*
* @param type target type
* @return target {@link SelfUpgradeChecker} if exist, otherwise {@link DefaultSelfUpgradeChecker}
*/
public static SelfUpgradeChecker findSelfChecker(String type) {
return INSTANCE.selfUpgradeCheckerMap.getOrDefault(type, DEFAULT_SELF_UPGRADE_CHECKER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute.AsyncServicesCheckTask;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.util.VersionUtil;
Expand Down Expand Up @@ -82,6 +81,8 @@ public class UpgradeJudgement extends Subscriber<MembersChangeEvent> {

private ScheduledExecutorService upgradeChecker;

private SelfUpgradeChecker selfUpgradeChecker;

private static final int MAJOR_VERSION = 2;

private static final int MINOR_VERSION = 4;
Expand Down Expand Up @@ -111,6 +112,7 @@ public UpgradeJudgement(RaftPeerSet raftPeerSet, RaftCore raftCore, ClusterVersi
}

private void initUpgradeChecker() {
selfUpgradeChecker = SelfUpgradeCheckerSpiHolder.findSelfChecker(EnvUtil.getProperty("upgrading.checker.type", "default"));
upgradeChecker = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory("upgrading.checker"));
upgradeChecker.scheduleAtFixedRate(() -> {
if (isUseGrpcFeatures()) {
Expand Down Expand Up @@ -191,7 +193,7 @@ private void checkAndDowngrade(boolean jraftFeature) {

private boolean checkForUpgrade() {
if (!useGrpcFeatures.get()) {
boolean selfCheckResult = checkServiceAndInstanceNumber() && checkDoubleWriteStatus();
boolean selfCheckResult = selfUpgradeChecker.isReadyToUpgrade(serviceManager, doubleWriteDelayTaskEngine);
Member self = memberManager.getSelf();
self.setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, selfCheckResult);
memberManager.updateMember(self);
Expand All @@ -208,16 +210,6 @@ private boolean checkForUpgrade() {
return result;
}

private boolean checkServiceAndInstanceNumber() {
boolean result = serviceManager.getServiceCount() == MetricsMonitor.getDomCountMonitor().get();
result &= serviceManager.getInstanceCount() == MetricsMonitor.getIpCountMonitor().get();
return result;
}

private boolean checkDoubleWriteStatus() {
return doubleWriteDelayTaskEngine.isEmpty();
}

private void doUpgrade() {
Loggers.SRV_LOG.info("Upgrade to 2.0.X");
useGrpcFeatures.compareAndSet(false, true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.core.v2.upgrade;

import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;

public class MockSelfUpgradeChecker implements SelfUpgradeChecker {

@Override
public String checkType() {
return "mock";
}

@Override
public boolean isReadyToUpgrade(ServiceManager serviceManager, DoubleWriteDelayTaskEngine taskEngine) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.mock.env.MockEnvironment;

import java.util.Collection;
import java.util.HashSet;
Expand All @@ -53,7 +56,7 @@ public class UpgradeJudgementTest {
private final long sleepForCheck = 800L;

@Mock
private ConfigurableEnvironment environment;
private ConfigurableApplicationContext context;

@Mock
private RaftPeerSet raftPeerSet;
Expand All @@ -76,20 +79,26 @@ public class UpgradeJudgementTest {
@Mock
private UpgradeStates upgradeStates;

@Mock
private ServiceStorage serviceStorage;

private UpgradeJudgement upgradeJudgement;

@Before
public void setUp() throws Exception {
EnvUtil.setEnvironment(environment);
EnvUtil.setEnvironment(new MockEnvironment());
EnvUtil.setIsStandalone(false);
when(context.getBean(ServiceManager.class)).thenReturn(serviceManager);
when(context.getBean(ServiceStorage.class)).thenReturn(serviceStorage);
ApplicationUtils.injectContext(context);
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates,
doubleWriteDelayTaskEngine);
upgradeStates, doubleWriteDelayTaskEngine);
}

@After
public void tearDown() {
upgradeJudgement.shutdown();
EnvUtil.setEnvironment(null);
}

@Test
Expand Down Expand Up @@ -168,12 +177,12 @@ public void testUpgradeCheckSelfFail() throws Exception {
public void testAlreadyUpgradedAndCheckSelfFail() throws Exception {
Collection<Member> members = mockMember("2.0.0", "2.0.0", "2.0.0");
Iterator<Member> iterator = members.iterator();
when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(false);
iterator.next();
while (iterator.hasNext()) {
iterator.next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true);
}
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement.shutdown();
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
Expand Down Expand Up @@ -203,15 +212,15 @@ public void testUpgradeCheckOthersFail() throws Exception {
public void testAlreadyUpgradedAndCheckOthersFail() throws Exception {
Collection<Member> members = mockMember("2.0.0", "2.0.0", "2.0.0");
members.iterator().next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true);
when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(true);
upgradeJudgement.shutdown();
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertTrue(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}
Expand All @@ -229,14 +238,14 @@ public void testDowngradeOneFor14XNode() throws Exception {
assertFalse(upgradeJudgement.isUseGrpcFeatures());
assertTrue(upgradeJudgement.isUseJraftFeatures());
}

@Test
public void testAlreadyUpgradedAndDowngradeOneFor14XNode() throws Exception {
Collection<Member> members = mockMember("1.4.0", "2.0.0", "2.0.0");
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement.shutdown();
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(mockMember("1.4.0", "2.0.0", "2.0.0")).build());
verify(raftPeerSet, never()).init();
verify(raftCore, never()).init();
verify(versionJudgement, never()).reset();
Expand All @@ -261,11 +270,11 @@ public void testDowngradeTwoNode() throws Exception {

@Test
public void testAlreadyUpgradedAndDowngradeTwoNode() throws Exception {
Collection<Member> members = mockMember("", "", "2.0.0");
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement.shutdown();
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(mockMember("", "", "2.0.0")).build());
verify(raftPeerSet, atMostOnce()).init();
verify(raftCore, atMostOnce()).init();
verify(versionJudgement, atMostOnce()).reset();
Expand All @@ -290,11 +299,11 @@ public void testDowngradeOneNode() throws Exception {

@Test
public void testAlreadyUpgradedAndDowngradeOneNode() throws Exception {
Collection<Member> members = mockMember("1.3.2", "2.0.0", "2.0.0");
when(upgradeStates.isUpgraded()).thenReturn(true);
upgradeJudgement.shutdown();
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build());
upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(mockMember("1.3.2", "2.0.0", "2.0.0")).build());
verify(raftPeerSet, atMostOnce()).init();
verify(raftCore, atMostOnce()).init();
verify(versionJudgement, atMostOnce()).reset();
Expand All @@ -303,6 +312,19 @@ public void testAlreadyUpgradedAndDowngradeOneNode() throws Exception {
assertFalse(upgradeJudgement.isUseJraftFeatures());
}

@Test
public void testUpgradedBySpecifiedSelfUpgradeChecker() throws InterruptedException {
upgradeJudgement.shutdown();
MockEnvironment mockEnvironment = new MockEnvironment();
mockEnvironment.setProperty("upgrading.checker.type", "mock");
EnvUtil.setEnvironment(mockEnvironment);
mockMember("1.3.2", "2.0.0", "2.0.0");
upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager,
upgradeStates, doubleWriteDelayTaskEngine);
TimeUnit.MILLISECONDS.sleep(sleepForCheck);
assertTrue((Boolean) memberManager.getSelf().getExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE));
}

private Collection<Member> mockMember(String... versions) {
Collection<Member> result = new HashSet<>();
for (int i = 0; i < versions.length; i++) {
Expand Down
Loading

0 comments on commit eb2644f

Please sign in to comment.