diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/DefaultSelfUpgradeChecker.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/DefaultSelfUpgradeChecker.java new file mode 100644 index 00000000000..c5c0edb8ebf --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/DefaultSelfUpgradeChecker.java @@ -0,0 +1,52 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade; + +import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; +import com.alibaba.nacos.naming.monitor.MetricsMonitor; + +/** + * Default upgrade checker for self node. + * + * @author xiweng.yy + */ +public class DefaultSelfUpgradeChecker implements SelfUpgradeChecker { + + private static final String DEFAULT = "default"; + + @Override + public String checkType() { + return DEFAULT; + } + + @Override + public boolean isReadyToUpgrade(ServiceManager serviceManager, DoubleWriteDelayTaskEngine taskEngine) { + System.out.println("test"); + return checkServiceAndInstanceNumber(serviceManager) && checkDoubleWriteStatus(taskEngine); + } + + private boolean checkServiceAndInstanceNumber(ServiceManager serviceManager) { + boolean result = serviceManager.getServiceCount() == MetricsMonitor.getDomCountMonitor().get(); + result &= serviceManager.getInstanceCount() == MetricsMonitor.getIpCountMonitor().get(); + return result; + } + + private boolean checkDoubleWriteStatus(DoubleWriteDelayTaskEngine taskEngine) { + return taskEngine.isEmpty(); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/SelfUpgradeChecker.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/SelfUpgradeChecker.java new file mode 100644 index 00000000000..e32c1ed0901 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/SelfUpgradeChecker.java @@ -0,0 +1,44 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade; + +import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; + +/** + * Upgrade checker for self-node to judge whether current node is ready to upgrade. + * + * @author xiweng.yy + */ +public interface SelfUpgradeChecker { + + /** + * Get the check type of this self upgrade checker. + * + * @return type + */ + String checkType(); + + /** + * Judge whether current node is ready to upgrade. + * + * @param serviceManager service manager for v1 mode. + * @param taskEngine double write task engine + * @return {@code true} if current node is ready to upgrade, otherwise {@code false} + */ + boolean isReadyToUpgrade(ServiceManager serviceManager, DoubleWriteDelayTaskEngine taskEngine); +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/SelfUpgradeCheckerSpiHolder.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/SelfUpgradeCheckerSpiHolder.java new file mode 100644 index 00000000000..460fef49190 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/SelfUpgradeCheckerSpiHolder.java @@ -0,0 +1,55 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade; + +import com.alibaba.nacos.common.spi.NacosServiceLoader; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * SPI holder for self upgrade checker. + * + * @author xiweng.yy + */ +public class SelfUpgradeCheckerSpiHolder { + + private static final SelfUpgradeCheckerSpiHolder INSTANCE = new SelfUpgradeCheckerSpiHolder(); + + private static final DefaultSelfUpgradeChecker DEFAULT_SELF_UPGRADE_CHECKER = new DefaultSelfUpgradeChecker(); + + private final Map selfUpgradeCheckerMap; + + private SelfUpgradeCheckerSpiHolder() { + Collection checkers = NacosServiceLoader.load(SelfUpgradeChecker.class); + selfUpgradeCheckerMap = new HashMap<>(checkers.size()); + for (SelfUpgradeChecker each : checkers) { + selfUpgradeCheckerMap.put(each.checkType(), each); + } + } + + /** + * Find target type self checker. + * + * @param type target type + * @return target {@link SelfUpgradeChecker} if exist, otherwise {@link DefaultSelfUpgradeChecker} + */ + public static SelfUpgradeChecker findSelfChecker(String type) { + return INSTANCE.selfUpgradeCheckerMap.getOrDefault(type, DEFAULT_SELF_UPGRADE_CHECKER); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java index e174fd041ec..7d7f0f12bf9 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgement.java @@ -37,7 +37,6 @@ import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.execute.AsyncServicesCheckTask; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher; -import com.alibaba.nacos.naming.monitor.MetricsMonitor; import com.alibaba.nacos.sys.env.EnvUtil; import org.codehaus.jackson.Version; import org.codehaus.jackson.util.VersionUtil; @@ -82,6 +81,8 @@ public class UpgradeJudgement extends Subscriber { private ScheduledExecutorService upgradeChecker; + private SelfUpgradeChecker selfUpgradeChecker; + private static final int MAJOR_VERSION = 2; private static final int MINOR_VERSION = 4; @@ -111,6 +112,7 @@ public UpgradeJudgement(RaftPeerSet raftPeerSet, RaftCore raftCore, ClusterVersi } private void initUpgradeChecker() { + selfUpgradeChecker = SelfUpgradeCheckerSpiHolder.findSelfChecker(EnvUtil.getProperty("upgrading.checker.type", "default")); upgradeChecker = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory("upgrading.checker")); upgradeChecker.scheduleAtFixedRate(() -> { if (isUseGrpcFeatures()) { @@ -191,7 +193,7 @@ private void checkAndDowngrade(boolean jraftFeature) { private boolean checkForUpgrade() { if (!useGrpcFeatures.get()) { - boolean selfCheckResult = checkServiceAndInstanceNumber() && checkDoubleWriteStatus(); + boolean selfCheckResult = selfUpgradeChecker.isReadyToUpgrade(serviceManager, doubleWriteDelayTaskEngine); Member self = memberManager.getSelf(); self.setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, selfCheckResult); memberManager.updateMember(self); @@ -208,16 +210,6 @@ private boolean checkForUpgrade() { return result; } - private boolean checkServiceAndInstanceNumber() { - boolean result = serviceManager.getServiceCount() == MetricsMonitor.getDomCountMonitor().get(); - result &= serviceManager.getInstanceCount() == MetricsMonitor.getIpCountMonitor().get(); - return result; - } - - private boolean checkDoubleWriteStatus() { - return doubleWriteDelayTaskEngine.isEmpty(); - } - private void doUpgrade() { Loggers.SRV_LOG.info("Upgrade to 2.0.X"); useGrpcFeatures.compareAndSet(false, true); diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/MockSelfUpgradeChecker.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/MockSelfUpgradeChecker.java new file mode 100644 index 00000000000..bb4da3f3f44 --- /dev/null +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/MockSelfUpgradeChecker.java @@ -0,0 +1,33 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.upgrade; + +import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; + +public class MockSelfUpgradeChecker implements SelfUpgradeChecker { + + @Override + public String checkType() { + return "mock"; + } + + @Override + public boolean isReadyToUpgrade(ServiceManager serviceManager, DoubleWriteDelayTaskEngine taskEngine) { + return true; + } +} diff --git a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java index f1e6572d0c5..147aacecf64 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/core/v2/upgrade/UpgradeJudgementTest.java @@ -25,15 +25,18 @@ import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore; import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet; import com.alibaba.nacos.naming.core.ServiceManager; +import com.alibaba.nacos.naming.core.v2.index.ServiceStorage; import com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine; import com.alibaba.nacos.sys.env.EnvUtil; +import com.alibaba.nacos.sys.utils.ApplicationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.mock.env.MockEnvironment; import java.util.Collection; import java.util.HashSet; @@ -53,7 +56,7 @@ public class UpgradeJudgementTest { private final long sleepForCheck = 800L; @Mock - private ConfigurableEnvironment environment; + private ConfigurableApplicationContext context; @Mock private RaftPeerSet raftPeerSet; @@ -76,20 +79,26 @@ public class UpgradeJudgementTest { @Mock private UpgradeStates upgradeStates; + @Mock + private ServiceStorage serviceStorage; + private UpgradeJudgement upgradeJudgement; @Before public void setUp() throws Exception { - EnvUtil.setEnvironment(environment); + EnvUtil.setEnvironment(new MockEnvironment()); EnvUtil.setIsStandalone(false); + when(context.getBean(ServiceManager.class)).thenReturn(serviceManager); + when(context.getBean(ServiceStorage.class)).thenReturn(serviceStorage); + ApplicationUtils.injectContext(context); upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager, - upgradeStates, - doubleWriteDelayTaskEngine); + upgradeStates, doubleWriteDelayTaskEngine); } @After public void tearDown() { upgradeJudgement.shutdown(); + EnvUtil.setEnvironment(null); } @Test @@ -168,12 +177,12 @@ public void testUpgradeCheckSelfFail() throws Exception { public void testAlreadyUpgradedAndCheckSelfFail() throws Exception { Collection members = mockMember("2.0.0", "2.0.0", "2.0.0"); Iterator iterator = members.iterator(); - when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(false); iterator.next(); while (iterator.hasNext()) { iterator.next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true); } when(upgradeStates.isUpgraded()).thenReturn(true); + upgradeJudgement.shutdown(); upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager, upgradeStates, doubleWriteDelayTaskEngine); upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build()); @@ -203,15 +212,15 @@ public void testUpgradeCheckOthersFail() throws Exception { public void testAlreadyUpgradedAndCheckOthersFail() throws Exception { Collection members = mockMember("2.0.0", "2.0.0", "2.0.0"); members.iterator().next().setExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE, true); - when(doubleWriteDelayTaskEngine.isEmpty()).thenReturn(true); + upgradeJudgement.shutdown(); when(upgradeStates.isUpgraded()).thenReturn(true); upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager, upgradeStates, doubleWriteDelayTaskEngine); upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build()); + TimeUnit.MILLISECONDS.sleep(sleepForCheck); verify(raftPeerSet, never()).init(); verify(raftCore, never()).init(); verify(versionJudgement, never()).reset(); - TimeUnit.MILLISECONDS.sleep(sleepForCheck); assertTrue(upgradeJudgement.isUseGrpcFeatures()); assertTrue(upgradeJudgement.isUseJraftFeatures()); } @@ -229,14 +238,14 @@ public void testDowngradeOneFor14XNode() throws Exception { assertFalse(upgradeJudgement.isUseGrpcFeatures()); assertTrue(upgradeJudgement.isUseJraftFeatures()); } - + @Test public void testAlreadyUpgradedAndDowngradeOneFor14XNode() throws Exception { - Collection members = mockMember("1.4.0", "2.0.0", "2.0.0"); when(upgradeStates.isUpgraded()).thenReturn(true); + upgradeJudgement.shutdown(); upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager, upgradeStates, doubleWriteDelayTaskEngine); - upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build()); + upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(mockMember("1.4.0", "2.0.0", "2.0.0")).build()); verify(raftPeerSet, never()).init(); verify(raftCore, never()).init(); verify(versionJudgement, never()).reset(); @@ -261,11 +270,11 @@ public void testDowngradeTwoNode() throws Exception { @Test public void testAlreadyUpgradedAndDowngradeTwoNode() throws Exception { - Collection members = mockMember("", "", "2.0.0"); when(upgradeStates.isUpgraded()).thenReturn(true); + upgradeJudgement.shutdown(); upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager, upgradeStates, doubleWriteDelayTaskEngine); - upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build()); + upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(mockMember("", "", "2.0.0")).build()); verify(raftPeerSet, atMostOnce()).init(); verify(raftCore, atMostOnce()).init(); verify(versionJudgement, atMostOnce()).reset(); @@ -290,11 +299,11 @@ public void testDowngradeOneNode() throws Exception { @Test public void testAlreadyUpgradedAndDowngradeOneNode() throws Exception { - Collection members = mockMember("1.3.2", "2.0.0", "2.0.0"); when(upgradeStates.isUpgraded()).thenReturn(true); + upgradeJudgement.shutdown(); upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager, upgradeStates, doubleWriteDelayTaskEngine); - upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(members).build()); + upgradeJudgement.onEvent(MembersChangeEvent.builder().triggers(mockMember("1.3.2", "2.0.0", "2.0.0")).build()); verify(raftPeerSet, atMostOnce()).init(); verify(raftCore, atMostOnce()).init(); verify(versionJudgement, atMostOnce()).reset(); @@ -303,6 +312,19 @@ public void testAlreadyUpgradedAndDowngradeOneNode() throws Exception { assertFalse(upgradeJudgement.isUseJraftFeatures()); } + @Test + public void testUpgradedBySpecifiedSelfUpgradeChecker() throws InterruptedException { + upgradeJudgement.shutdown(); + MockEnvironment mockEnvironment = new MockEnvironment(); + mockEnvironment.setProperty("upgrading.checker.type", "mock"); + EnvUtil.setEnvironment(mockEnvironment); + mockMember("1.3.2", "2.0.0", "2.0.0"); + upgradeJudgement = new UpgradeJudgement(raftPeerSet, raftCore, versionJudgement, memberManager, serviceManager, + upgradeStates, doubleWriteDelayTaskEngine); + TimeUnit.MILLISECONDS.sleep(sleepForCheck); + assertTrue((Boolean) memberManager.getSelf().getExtendVal(MemberMetaDataConstants.READY_TO_UPGRADE)); + } + private Collection mockMember(String... versions) { Collection result = new HashSet<>(); for (int i = 0; i < versions.length; i++) { diff --git a/naming/src/test/resources/META-INF/services/com.alibaba.nacos.naming.core.v2.upgrade.SelfUpgradeChecker b/naming/src/test/resources/META-INF/services/com.alibaba.nacos.naming.core.v2.upgrade.SelfUpgradeChecker new file mode 100644 index 00000000000..468f5031b82 --- /dev/null +++ b/naming/src/test/resources/META-INF/services/com.alibaba.nacos.naming.core.v2.upgrade.SelfUpgradeChecker @@ -0,0 +1,17 @@ +# +# Copyright 1999-2020 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.alibaba.nacos.naming.core.v2.upgrade.MockSelfUpgradeChecker