diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java index 48d08973c09..3c3efcabca9 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -20,8 +20,6 @@ import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.common.utils.CollectionUtils; -import com.alibaba.nacos.common.utils.ClassUtils; -import com.alibaba.nacos.common.utils.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +40,7 @@ */ public class DefaultPublisher extends Thread implements EventPublisher { - private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); + protected static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); private volatile boolean initialized = false; @@ -50,13 +48,13 @@ public class DefaultPublisher extends Thread implements EventPublisher { private Class eventType; - private final ConcurrentHashSet subscribers = new ConcurrentHashSet(); + protected final ConcurrentHashSet subscribers = new ConcurrentHashSet(); private int queueMaxSize = -1; private BlockingQueue queue; - private volatile Long lastEventSequence = -1L; + protected volatile Long lastEventSequence = -1L; private final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater .newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence"); @@ -98,13 +96,17 @@ public void run() { void openEventHandler() { try { + + // This variable is defined to resolve the problem which message overstock in the queue. + int waitTimes = 60; // To ensure that messages are not lost, enable EventHandler when // waiting for the first Subscriber to register for (; ; ) { - if (shutdown || hasSubscriber()) { + if (shutdown || hasSubscriber() || waitTimes <= 0) { break; } ThreadUtils.sleep(1000L); + waitTimes--; } for (; ; ) { @@ -162,9 +164,13 @@ public boolean isInitialized() { return initialized; } + /** + * Receive and notifySubscriber to process the event. + * + * @param event {@link Event}. + */ void receiveEvent(Event event) { final long currentEventSequence = event.sequence(); - final String sourceName = ClassUtils.getName(event); // Notification single event listener for (Subscriber subscriber : subscribers) { @@ -175,10 +181,8 @@ void receiveEvent(Event event) { continue; } - final String targetName = ClassUtils.getName(subscriber.subscribeType()); - if (!Objects.equals(sourceName, targetName)) { - continue; - } + // Because unifying smartSubscriber and subscriber, so here need to think of compatibility. + // Remove original judge part of codes. notifySubscriber(subscriber, event); } } diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultSharePublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultSharePublisher.java new file mode 100644 index 00000000000..3028423f7f7 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultSharePublisher.java @@ -0,0 +1,113 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * The default share event publisher implementation for slow event. + * + * @author zongtanghu + */ +public class DefaultSharePublisher extends DefaultPublisher { + + private final Map, Set> subMappings = new ConcurrentHashMap, Set>(); + + private final Lock lock = new ReentrantLock(); + + /** + * Add listener for default share publisher. + * + * @param subscriber {@link Subscriber} + * @param subscribeType subscribe event type, such as slow event or general event. + */ + public void addSubscriber(Subscriber subscriber, Class subscribeType) { + // Actually, do a classification based on the slowEvent type. + Class subSlowEventType = (Class) subscribeType; + // For adding to parent class attributes synchronization. + subscribers.add(subscriber); + + lock.lock(); + try { + Set sets = subMappings.get(subSlowEventType); + if (sets == null) { + Set newSet = new ConcurrentHashSet(); + newSet.add(subscriber); + subMappings.put(subSlowEventType, newSet); + return; + } + sets.add(subscriber); + } finally { + lock.unlock(); + } + } + + /** + * Remove listener for default share publisher. + * + * @param subscriber {@link Subscriber} + * @param subscribeType subscribe event type, such as slow event or general event. + */ + public void removeSubscriber(Subscriber subscriber, Class subscribeType) { + // Actually, do a classification based on the slowEvent type. + Class subSlowEventType = (Class) subscribeType; + // For removing to parent class attributes synchronization. + subscribers.remove(subscriber); + + lock.lock(); + try { + Set sets = subMappings.get(subSlowEventType); + + if (sets != null && sets.contains(subscriber)) { + sets.remove(subscriber); + } + } finally { + lock.unlock(); + } + } + + @Override + public void receiveEvent(Event event) { + + final long currentEventSequence = event.sequence(); + // get subscriber set based on the slow EventType. + final Class slowEventType = (Class) event.getClass(); + + // Get for Map, the algorithm is O(1). + Set subscribers = subMappings.get(slowEventType); + + // Notification single event subscriber + for (Subscriber subscriber : subscribers) { + // Whether to ignore expiration events + if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { + LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", + event.getClass()); + continue; + } + + // Notify single subscriber for slow event. + notifySubscriber(subscriber, event); + } + } +} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java index 5b23b877ab5..da39661ed8f 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java @@ -56,7 +56,7 @@ public class NotifyCenter { private static final NotifyCenter INSTANCE = new NotifyCenter(); - private EventPublisher sharePublisher; + private DefaultSharePublisher sharePublisher; private static Class clazz = null; @@ -100,7 +100,11 @@ public EventPublisher apply(Class cls, Integer buffer) throws N }; try { - INSTANCE.sharePublisher = publisherFactory.apply(SlowEvent.class, shareBufferSize); + + // Create and init DefaultSharePublisher instance. + INSTANCE.sharePublisher = new DefaultSharePublisher(); + INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize); + } catch (Throwable ex) { LOGGER.error("Service class newInstance has error : {}", ex); } @@ -171,13 +175,19 @@ public static void registerSubscriber(final Subscriber consumer) throws Naco // based on subclass's subscribeTypes method return list, it can register to publisher. if (consumer instanceof SmartSubscriber) { for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { - addSubscriber(consumer, subscribeType); + // For case, producer: defaultSharePublisher -> consumer: smartSubscriber. + if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) { + INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType); + } else { + // For case, producer: defaultPublisher -> consumer: subscriber. + addSubscriber(consumer, subscribeType); + } } return; } if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) { - INSTANCE.sharePublisher.addSubscriber(consumer); + INSTANCE.sharePublisher.addSubscriber(consumer, cls); return; } @@ -195,7 +205,10 @@ private static void addSubscriber(final Subscriber consumer, Class void deregisterSubscriber(final Subscriber consumer) { final Class cls = consumer.subscribeType(); if (consumer instanceof SmartSubscriber) { for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { - removeSubscriber(consumer, subscribeType); + if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) { + INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType); + } else { + removeSubscriber(consumer, subscribeType); + } } return; } + if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) { - INSTANCE.sharePublisher.removeSubscriber(consumer); + INSTANCE.sharePublisher.removeSubscriber(consumer, cls); return; } @@ -300,7 +318,10 @@ public static EventPublisher registerToPublisher(final Class ev } final String topic = ClassUtils.getCanonicalName(eventType); - MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize); + synchronized (NotifyCenter.class) { + // MapUtils.computeIfAbsent is a unsafe method. + MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize); + } EventPublisher publisher = INSTANCE.publisherMap.get(topic); return publisher; } diff --git a/common/src/test/java/com/alibaba/nacos/common/notify/NotifyCenterTest.java b/common/src/test/java/com/alibaba/nacos/common/notify/NotifyCenterTest.java new file mode 100644 index 00000000000..5848c415a80 --- /dev/null +++ b/common/src/test/java/com/alibaba/nacos/common/notify/NotifyCenterTest.java @@ -0,0 +1,538 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import com.alibaba.nacos.common.notify.listener.SmartSubscriber; +import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.common.utils.ThreadUtils; +import org.junit.Assert; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author liaochuntao + * @author zongtanghu + */ +@FixMethodOrder(value = MethodSorters.NAME_ASCENDING) +public class NotifyCenterTest { + + private static class TestSlowEvent extends SlowEvent { + } + + private static class TestEvent extends Event { + + @Override + public long sequence() { + return System.currentTimeMillis(); + } + } + + static { + System.setProperty("nacos.core.notify.share-buffer-size", "8"); + } + + @Test + public void testEventsCanBeSubscribed() throws Exception { + + NotifyCenter.registerToSharePublisher(TestSlowEvent.class); + NotifyCenter.registerToPublisher(TestEvent.class, 8); + + final CountDownLatch latch = new CountDownLatch(2); + final AtomicInteger count = new AtomicInteger(0); + + NotifyCenter.registerSubscriber(new Subscriber() { + @Override + public void onEvent(TestSlowEvent event) { + try { + count.incrementAndGet(); + } finally { + latch.countDown(); + } + } + + @Override + public Class subscribeType() { + return TestSlowEvent.class; + } + }); + + NotifyCenter.registerSubscriber(new Subscriber() { + @Override + public void onEvent(TestEvent event) { + try { + count.incrementAndGet(); + } finally { + latch.countDown(); + } + } + + @Override + public Class subscribeType() { + return TestEvent.class; + } + }); + + Assert.assertTrue(NotifyCenter.publishEvent(new TestEvent())); + Assert.assertTrue(NotifyCenter.publishEvent(new TestSlowEvent())); + + ThreadUtils.sleep(5000L); + + + latch.await(5000L, TimeUnit.MILLISECONDS); + + Assert.assertEquals(2, count.get()); + } + + static CountDownLatch latch = new CountDownLatch(3); + + static class ExpireEvent extends Event { + + static AtomicLong sequence = new AtomicLong(3); + + private long no = sequence.getAndDecrement(); + + @Override + public long sequence() { + latch.countDown(); + return no; + } + } + + @Test + public void testCanIgnoreExpireEvent() throws Exception { + NotifyCenter.registerToPublisher(ExpireEvent.class, 16); + final AtomicInteger count = new AtomicInteger(0); + + NotifyCenter.registerSubscriber(new Subscriber() { + @Override + public void onEvent(ExpireEvent event) { + count.incrementAndGet(); + } + + @Override + public Class subscribeType() { + return ExpireEvent.class; + } + + public boolean ignoreExpireEvent() { + return true; + } + + }); + + for (int i = 0; i < 3; i++) { + Assert.assertTrue(NotifyCenter.publishEvent(new ExpireEvent())); + } + + latch.await(10000L, TimeUnit.MILLISECONDS); + Assert.assertEquals(1, count.get()); + } + + static CountDownLatch latch2 = new CountDownLatch(3); + + static class NoExpireEvent extends Event { + + static AtomicLong sequence = new AtomicLong(3); + + private long no = sequence.getAndDecrement(); + + @Override + public long sequence() { + return no; + } + } + + @Test + public void testNoIgnoreExpireEvent() throws Exception { + NotifyCenter.registerToPublisher(NoExpireEvent.class, 16); + final AtomicInteger count = new AtomicInteger(0); + + NotifyCenter.registerSubscriber(new Subscriber() { + @Override + public void onEvent(Event event) { + count.incrementAndGet(); + latch2.countDown(); + } + + @Override + public Class subscribeType() { + return NoExpireEvent.class; + } + }); + + for (int i = 0; i < 3; i++) { + Assert.assertTrue(NotifyCenter.publishEvent(new NoExpireEvent())); + } + + latch2.await(10000L, TimeUnit.MILLISECONDS); + Assert.assertEquals(3, count.get()); + } + + private static class SlowE1 extends SlowEvent { + + private String info = "SlowE1"; + + public String getInfo() { + return info; + } + + public void setInfo(String info) { + this.info = info; + } + + } + + private static class SlowE2 extends SlowEvent { + + private String info = "SlowE2"; + + public String getInfo() { + return info; + } + + public void setInfo(String info) { + this.info = info; + } + + } + + @Test + public void testSharePublishTwoSlowEvents() throws Exception { + NotifyCenter.registerToSharePublisher(SlowE1.class); + NotifyCenter.registerToSharePublisher(SlowE2.class); + + final CountDownLatch latch1 = new CountDownLatch(15); + final CountDownLatch latch2 = new CountDownLatch(15); + + final String[] values = new String[] {null, null}; + + NotifyCenter.registerSubscriber(new Subscriber() { + + @Override + public void onEvent(SlowE1 event) { + ThreadUtils.sleep(1000L); + values[0] = event.info; + latch1.countDown(); + } + + @Override + public Class subscribeType() { + return SlowE1.class; + } + }); + + NotifyCenter.registerSubscriber(new Subscriber() { + @Override + public void onEvent(SlowE2 event) { + values[1] = event.info; + latch2.countDown(); + } + + @Override + public Class subscribeType() { + return SlowE2.class; + } + }); + + for (int i = 0; i < 30; i++) { + NotifyCenter.publishEvent(new SlowE1()); + NotifyCenter.publishEvent(new SlowE2()); + } + + latch1.await(); + latch2.await(); + + Assert.assertEquals("SlowE1", values[0]); + Assert.assertEquals("SlowE2", values[1]); + + } + + + static class SmartEvent1 extends Event { + + @Override + public long sequence() { + return System.currentTimeMillis(); + } + } + + static class SmartEvent2 extends Event { + + @Override + public long sequence() { + return System.currentTimeMillis(); + } + } + + /** + * One SmartSubscriber can listen serveral Events. + * And then, Notify publish events by Publisher. + * + * @throws Exception + */ + @Test + public void testSeveralEventsPublishedBySinglePublisher() throws Exception { + + final AtomicInteger count1 = new AtomicInteger(0); + final AtomicInteger count2 = new AtomicInteger(0); + + final CountDownLatch latch1 = new CountDownLatch(3); + final CountDownLatch latch2 = new CountDownLatch(3); + + NotifyCenter.registerToPublisher(SmartEvent1.class, 1024); + NotifyCenter.registerToPublisher(SmartEvent2.class, 1024); + + NotifyCenter.registerSubscriber(new SmartSubscriber() { + @Override + public List> subscribeTypes() { + List> list = new ArrayList>(); + list.add(SmartEvent1.class); + list.add(SmartEvent2.class); + return list; + } + + @Override + public void onEvent(Event event) { + if (event instanceof SmartEvent1) { + count1.incrementAndGet(); + latch1.countDown(); + } + + if (event instanceof SmartEvent2) { + count2.incrementAndGet(); + latch2.countDown(); + + } + } + }); + + for (int i = 0; i < 3; i++) { + Assert.assertTrue(NotifyCenter.publishEvent(new SmartEvent1())); + Assert.assertTrue(NotifyCenter.publishEvent(new SmartEvent2())); + } + + latch1.await(3000L, TimeUnit.MILLISECONDS); + latch2.await(3000L, TimeUnit.MILLISECONDS); + + Assert.assertEquals(3, count1.get()); + Assert.assertEquals(3, count2.get()); + + } + + private static class TestSlowEvent1 extends SlowEvent { + } + + private static class TestSlowEvent2 extends SlowEvent { + } + + /** + * Two general subscriber can listen two kinds of SlowEvents. + * And then, Notify publish events by SharePublisher. + * + * @throws Exception + */ + @Test + public void testMutipleSlowEventsListenedBySubscriber() throws Exception { + + NotifyCenter.registerToSharePublisher(TestSlowEvent1.class); + NotifyCenter.registerToSharePublisher(TestSlowEvent2.class); + + + final AtomicInteger count1 = new AtomicInteger(0); + final AtomicInteger count2 = new AtomicInteger(0); + + final CountDownLatch latch1 = new CountDownLatch(3); + final CountDownLatch latch2 = new CountDownLatch(3); + + NotifyCenter.registerSubscriber(new Subscriber() { + @Override + public void onEvent(TestSlowEvent1 event) { + count1.incrementAndGet(); + latch1.countDown(); + } + + @Override + public Class subscribeType() { + return TestSlowEvent1.class; + } + }); + + NotifyCenter.registerSubscriber(new Subscriber() { + @Override + public void onEvent(TestSlowEvent2 event) { + count2.incrementAndGet(); + latch2.countDown(); + + } + + @Override + public Class subscribeType() { + return TestSlowEvent2.class; + } + }); + + for (int i = 0; i < 3; i++) { + Assert.assertTrue(NotifyCenter.publishEvent(new TestSlowEvent1())); + Assert.assertTrue(NotifyCenter.publishEvent(new TestSlowEvent2())); + } + + ThreadUtils.sleep(2000L); + + latch1.await(3000L, TimeUnit.MILLISECONDS); + latch2.await(3000L, TimeUnit.MILLISECONDS); + + Assert.assertEquals(3, count1.get()); + Assert.assertEquals(3, count2.get()); + + } + + private static class TestSlowEvent3 extends SlowEvent { + } + + private static class TestSlowEvent4 extends SlowEvent { + } + + /** + * One SmartSubscriber can listen serveral SlowEvents. + * And then, Notify publish events by SharePublisher. + * + * @throws Exception + */ + @Test + public void testMutipleSlowEventsListenedBySmartsubscriber() throws Exception { + + NotifyCenter.registerToSharePublisher(TestSlowEvent3.class); + NotifyCenter.registerToSharePublisher(TestSlowEvent4.class); + + + final AtomicInteger count1 = new AtomicInteger(0); + final AtomicInteger count2 = new AtomicInteger(0); + + final CountDownLatch latch1 = new CountDownLatch(3); + final CountDownLatch latch2 = new CountDownLatch(3); + + NotifyCenter.registerSubscriber(new SmartSubscriber() { + + @Override + public void onEvent(Event event) { + if (event instanceof TestSlowEvent3) { + count1.incrementAndGet(); + latch1.countDown(); + } + + if (event instanceof TestSlowEvent4) { + count2.incrementAndGet(); + latch2.countDown(); + } + } + + @Override + public List> subscribeTypes() { + List> subTypes = new ArrayList>(); + subTypes.add(TestSlowEvent3.class); + subTypes.add(TestSlowEvent4.class); + return subTypes; + } + }); + + for (int i = 0; i < 3; i++) { + Assert.assertTrue(NotifyCenter.publishEvent(new TestSlowEvent3())); + Assert.assertTrue(NotifyCenter.publishEvent(new TestSlowEvent4())); + } + + ThreadUtils.sleep(2000L); + + latch1.await(3000L, TimeUnit.MILLISECONDS); + latch2.await(3000L, TimeUnit.MILLISECONDS); + + Assert.assertEquals(3, count1.get()); + Assert.assertEquals(3, count2.get()); + + } + + private static class TestSlowEvent5 extends SlowEvent { + } + + private static class TestEvent6 extends Event { + } + + /** + * One SmartSubscriber can listen mutiple kinds of event: SlowEvent and General Event. + * And then, Notify publish events by SharePublisher and default pusbisher. + * + * @throws Exception + */ + @Test + public void testMutipleKindsEventsCanListenBySmartsubscriber() throws Exception { + + NotifyCenter.registerToSharePublisher(TestSlowEvent5.class); + NotifyCenter.registerToPublisher(TestEvent6.class, 1024); + + + final AtomicInteger count1 = new AtomicInteger(0); + final AtomicInteger count2 = new AtomicInteger(0); + + final CountDownLatch latch1 = new CountDownLatch(3); + final CountDownLatch latch2 = new CountDownLatch(3); + + NotifyCenter.registerSubscriber(new SmartSubscriber() { + + @Override + public void onEvent(Event event) { + if (event instanceof TestSlowEvent5) { + count1.incrementAndGet(); + latch1.countDown(); + } + + if (event instanceof TestEvent6) { + count2.incrementAndGet(); + latch2.countDown(); + } + } + + @Override + public List> subscribeTypes() { + List> subTypes = new ArrayList>(); + subTypes.add(TestSlowEvent5.class); + subTypes.add(TestEvent6.class); + return subTypes; + } + }); + + for (int i = 0; i < 3; i++) { + Assert.assertTrue(NotifyCenter.publishEvent(new TestSlowEvent5())); + Assert.assertTrue(NotifyCenter.publishEvent(new TestEvent6())); + } + + ThreadUtils.sleep(3000L); + + latch1.await(3000L, TimeUnit.MILLISECONDS); + latch2.await(3000L, TimeUnit.MILLISECONDS); + + Assert.assertEquals(3, count1.get()); + Assert.assertEquals(3, count2.get()); + + } +}