From 3a4e21b60ea4e2008b3244e88715513b87523800 Mon Sep 17 00:00:00 2001 From: Hu Zongtang Date: Mon, 13 Jul 2020 17:30:50 +0800 Subject: [PATCH] [ISSUE#3179]Remove original NotifyCenter codes in the core module. (#3310) --- .../nacos/core/notify/DefaultPublisher.java | 211 ------------- .../com/alibaba/nacos/core/notify/Event.java | 37 --- .../nacos/core/notify/EventPublisher.java | 86 ------ .../nacos/core/notify/NotifyCenter.java | 280 ------------------ .../alibaba/nacos/core/notify/SlowEvent.java | 26 -- .../core/notify/listener/SmartSubscribe.java | 46 --- .../nacos/core/notify/listener/Subscribe.java | 62 ---- 7 files changed, 748 deletions(-) delete mode 100644 core/src/main/java/com/alibaba/nacos/core/notify/DefaultPublisher.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/notify/Event.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/notify/EventPublisher.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/notify/SlowEvent.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/notify/listener/SmartSubscribe.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/notify/listener/Subscribe.java diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/DefaultPublisher.java b/core/src/main/java/com/alibaba/nacos/core/notify/DefaultPublisher.java deleted file mode 100644 index 66e3bea294b..00000000000 --- a/core/src/main/java/com/alibaba/nacos/core/notify/DefaultPublisher.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.notify; - -import com.alibaba.nacos.common.utils.ConcurrentHashSet; -import com.alibaba.nacos.common.utils.ThreadUtils; -import com.alibaba.nacos.core.notify.listener.SmartSubscribe; -import com.alibaba.nacos.core.notify.listener.Subscribe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Objects; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import static com.alibaba.nacos.core.notify.NotifyCenter.RING_BUFFER_SIZE; - -/** - * The default event publisher implementation. Internally, use {@link ArrayBlockingQueue} as a message staging queue - * - * @author liaochuntao - */ -public class DefaultPublisher extends Thread implements EventPublisher { - - private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); - - private final ConcurrentHashSet subscribes = new ConcurrentHashSet<>(); - - private final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater - .newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence"); - - private volatile boolean initialized = false; - - private volatile boolean canOpen = false; - - private volatile boolean shutdown = false; - - private Class eventType; - - private int queueMaxSize = -1; - - private BlockingQueue queue; - - private volatile Long lastEventSequence = -1L; - - @Override - public void init(Class type, int bufferSize) { - setDaemon(true); - setName("nacos.publisher-" + type.getName()); - this.eventType = type; - this.queueMaxSize = bufferSize; - this.queue = new ArrayBlockingQueue<>(bufferSize); - start(); - } - - public ConcurrentHashSet getSubscribes() { - return subscribes; - } - - @Override - public synchronized void start() { - super.start(); - if (!initialized) { - if (queueMaxSize == -1) { - queueMaxSize = RING_BUFFER_SIZE; - } - initialized = true; - } - } - - @Override - public long currentEventSize() { - return queue.size(); - } - - @Override - public void run() { - openEventHandler(); - } - - void openEventHandler() { - try { - int waitTimes = 60; - // To ensure that messages are not lost, enable EventHandler when - // waiting for the first Subscriber to register - for (; ; ) { - if (shutdown || canOpen || waitTimes <= 0) { - break; - } - ThreadUtils.sleep(1_000L); - waitTimes--; - } - - for (; ; ) { - if (shutdown) { - break; - } - final Event event = queue.take(); - receiveEvent(event); - updater.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); - } - } catch (Throwable ex) { - LOGGER.error("Event listener exception : {}", ex); - } - } - - @Override - public void addSubscribe(Subscribe subscribe) { - subscribes.add(subscribe); - canOpen = true; - } - - @Override - public void unSubscribe(Subscribe subscribe) { - subscribes.remove(subscribe); - } - - @Override - public boolean publish(Event event) { - checkIsStart(); - boolean success = this.queue.offer(event); - if (!success) { - LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); - receiveEvent(event); - return true; - } - return true; - } - - void checkIsStart() { - if (!initialized) { - throw new IllegalStateException("Publisher does not start"); - } - } - - @Override - public void shutdown() { - this.shutdown = true; - this.queue.clear(); - } - - public boolean isInitialized() { - return initialized; - } - - void receiveEvent(Event event) { - final long currentEventSequence = event.sequence(); - final String sourceName = event.getClass().getName(); - - // Notification single event listener - for (Subscribe subscribe : subscribes) { - // Whether to ignore expiration events - if (subscribe.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { - LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", - event.getClass()); - continue; - } - - final String targetName = subscribe.subscribeType().getName(); - if (!Objects.equals(sourceName, targetName)) { - continue; - } - - notifySubscriber(subscribe, event); - } - - // Notification multi-event event listener - for (SmartSubscribe subscribe : SMART_SUBSCRIBES) { - // If you are a multi-event listener, you need to make additional logical judgments - if (!subscribe.canNotify(event)) { - LOGGER.debug("[NotifyCenter] the {} is unacceptable to this multi-event subscriber", event.getClass()); - continue; - } - notifySubscriber(subscribe, event); - } - } - - @Override - public void notifySubscriber(final Subscribe subscribe, final Event event) { - - LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscribe); - - final Runnable job = () -> subscribe.onEvent(event); - final Executor executor = subscribe.executor(); - if (Objects.nonNull(executor)) { - executor.execute(job); - } else { - try { - job.run(); - } catch (Throwable e) { - LOGGER.error("Event callback exception : {}", e); - } - } - } -} diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/Event.java b/core/src/main/java/com/alibaba/nacos/core/notify/Event.java deleted file mode 100644 index 53ada36346c..00000000000 --- a/core/src/main/java/com/alibaba/nacos/core/notify/Event.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.notify; - -import java.io.Serializable; - -/** - * event. - * - * @author liaochuntao - */ -public interface Event extends Serializable { - - /** - * Event sequence number, which can be used to handle the sequence of events. - * - * @return sequence num, It's best to make sure it's monotone - */ - default long sequence() { - return System.currentTimeMillis(); - } - -} diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/EventPublisher.java b/core/src/main/java/com/alibaba/nacos/core/notify/EventPublisher.java deleted file mode 100644 index 7b181c1c085..00000000000 --- a/core/src/main/java/com/alibaba/nacos/core/notify/EventPublisher.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.notify; - -import com.alibaba.nacos.common.utils.ConcurrentHashSet; -import com.alibaba.nacos.core.notify.listener.SmartSubscribe; -import com.alibaba.nacos.core.notify.listener.Subscribe; - -import java.util.Set; - -/** - * Event publisher. - * - * @author liaochuntao - */ -public interface EventPublisher { - - /** - * Multi-event listener collection list. - */ - Set SMART_SUBSCRIBES = new ConcurrentHashSet<>(); - - /** - * Initializes the event publisher. - * - * @param type {@link Class} - * @param bufferSize Message staging queue size - */ - void init(Class type, int bufferSize); - - /** - * The number of currently staged events. - * - * @return event size - */ - long currentEventSize(); - - /** - * Add listener. - * - * @param subscribe {@link Subscribe} - */ - void addSubscribe(Subscribe subscribe); - - /** - * Remove listener. - * - * @param subscribe {@link Subscribe} - */ - void unSubscribe(Subscribe subscribe); - - /** - * publish event. - * - * @param event {@link Event} - * @return publish event is success - */ - boolean publish(Event event); - - /** - * Notify listener. - * - * @param subscribe {@link Subscribe} - * @param event {@link Event} - */ - void notifySubscriber(Subscribe subscribe, Event event); - - /** - * shutdown this publisher. - */ - void shutdown(); -} diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java b/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java deleted file mode 100644 index 2ca5a2c246d..00000000000 --- a/core/src/main/java/com/alibaba/nacos/core/notify/NotifyCenter.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.notify; - -import com.alibaba.nacos.common.JustForTest; -import com.alibaba.nacos.common.utils.ConcurrentHashSet; -import com.alibaba.nacos.common.utils.ThreadUtils; -import com.alibaba.nacos.core.notify.listener.SmartSubscribe; -import com.alibaba.nacos.core.notify.listener.Subscribe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.ServiceLoader; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; - -/** - * notify center. - * - * @author liaochuntao - */ -@SuppressWarnings("all") -public class NotifyCenter { - - private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); - - private static final AtomicBoolean CLOSED = new AtomicBoolean(false); - - private static final NotifyCenter INSTANCE = new NotifyCenter(); - - private static final AtomicBoolean closed = new AtomicBoolean(false); - - public static int RING_BUFFER_SIZE = 16384; - - public static int SHATE_BUFFER_SIZE = 1024; - - private static BiFunction, Integer, EventPublisher> BUILD_FACTORY = null; - - static { - // Internal ArrayBlockingQueue buffer size. For applications with high write throughput, - // this value needs to be increased appropriately. default value is 16384 - String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size"; - RING_BUFFER_SIZE = Integer.getInteger(ringBufferSizeProperty, 16384); - - // The size of the public publisher's message staging queue buffer - String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size"; - SHATE_BUFFER_SIZE = Integer.getInteger(shareBufferSizeProperty, 1024); - - ServiceLoader loader = ServiceLoader.load(EventPublisher.class); - Iterator iterator = loader.iterator(); - - if (iterator.hasNext()) { - BUILD_FACTORY = (cls, buffer) -> { - loader.reload(); - EventPublisher publisher = ServiceLoader.load(EventPublisher.class).iterator().next(); - publisher.init(cls, buffer); - return publisher; - }; - } else { - BUILD_FACTORY = (cls, buffer) -> { - EventPublisher publisher = new DefaultPublisher(); - publisher.init(cls, buffer); - return publisher; - }; - } - - INSTANCE.sharePublisher = BUILD_FACTORY.apply(SlowEvent.class, SHATE_BUFFER_SIZE); - ThreadUtils.addShutdownHook(new Thread(() -> { - shutdown(); - })); - - } - - /** - * Publisher management container - */ - private final Map publisherMap = new ConcurrentHashMap<>(16); - - /** - * Multi-event listening list - */ - private final Set smartSubscribes = new ConcurrentHashSet<>(); - - private EventPublisher sharePublisher; - - @JustForTest - public static Map getPublisherMap() { - return INSTANCE.publisherMap; - } - - @JustForTest - public static EventPublisher getPublisher(Class topic) { - if (SlowEvent.class.isAssignableFrom(topic)) { - return INSTANCE.sharePublisher; - } - return INSTANCE.publisherMap.get(topic.getCanonicalName()); - } - - @JustForTest - public static Set getSmartSubscribes() { - return EventPublisher.SMART_SUBSCRIBES; - } - - @JustForTest - public static EventPublisher getSharePublisher() { - return INSTANCE.sharePublisher; - } - - public static void shutdown() { - if (!closed.compareAndSet(false, true)) { - return; - } - LOGGER.warn("[NotifyCenter] Start destroying Publisher"); - try { - INSTANCE.publisherMap.forEach(new BiConsumer() { - @Override - public void accept(String s, EventPublisher publisher) { - publisher.shutdown(); - } - }); - - INSTANCE.sharePublisher.shutdown(); - } catch (Throwable e) { - LOGGER.error("NotifyCenter shutdown has error : {}", e); - } - LOGGER.warn("[NotifyCenter] Destruction of the end"); - } - - /** - * Register a Subscriber. If the Publisher concerned by the Subscriber does not exist, then PublihserMap will - * preempt a placeholder Publisher first. not call {@link Publisher#start()} - * - * @param eventType Types of events that Subscriber cares about - * @param consumer subscriber - * @param event type - */ - public static void registerSubscribe(final Subscribe consumer) { - final Class cls = consumer.subscribeType(); - // If you want to listen to multiple events, you do it separately, - // without automatically registering the appropriate publisher - if (consumer instanceof SmartSubscribe) { - EventPublisher.SMART_SUBSCRIBES.add((SmartSubscribe) consumer); - return; - } - // If the event does not require additional queue resources, - // go to share-publisher to reduce resource waste - if (SlowEvent.class.isAssignableFrom(cls)) { - INSTANCE.sharePublisher.addSubscribe(consumer); - return; - } - final String topic = consumer.subscribeType().getCanonicalName(); - INSTANCE.publisherMap.computeIfAbsent(topic, s -> BUILD_FACTORY.apply(cls, RING_BUFFER_SIZE)); - EventPublisher publisher = INSTANCE.publisherMap.get(topic); - publisher.addSubscribe(consumer); - } - - /** - * deregister subscriber - * - * @param consumer subscriber - * @param - */ - public static void deregisterSubscribe(final Subscribe consumer) { - final Class cls = consumer.subscribeType(); - if (consumer instanceof SmartSubscribe) { - EventPublisher.SMART_SUBSCRIBES.remove((SmartSubscribe) consumer); - return; - } - if (SlowEvent.class.isAssignableFrom(cls)) { - INSTANCE.sharePublisher.unSubscribe(consumer); - return; - } - final String topic = consumer.subscribeType().getCanonicalName(); - if (INSTANCE.publisherMap.containsKey(topic)) { - EventPublisher publisher = INSTANCE.publisherMap.get(topic); - publisher.unSubscribe(consumer); - return; - } - throw new NoSuchElementException("The subcriber has no event publisher"); - } - - /** - * request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is - * actually published - * - * @param event - */ - public static boolean publishEvent(final Event event) { - try { - return publishEvent(event.getClass(), event); - } catch (Throwable ex) { - LOGGER.error("There was an exception to the message publishing : {}", ex); - return false; - } - } - - /** - * request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is - * actually published - * - * @param eventType - * @param event - */ - private static boolean publishEvent(final Class eventType, final Event event) { - final String topic = eventType.getCanonicalName(); - if (SlowEvent.class.isAssignableFrom(eventType)) { - return INSTANCE.sharePublisher.publish(event); - } - - if (INSTANCE.publisherMap.containsKey(topic)) { - EventPublisher publisher = INSTANCE.publisherMap.get(topic); - return publisher.publish(event); - } - throw new NoSuchElementException("There are no [" + topic + "] publishers for this event, please register"); - } - - /** - * register to share-publisher - * - * @param supplier - * @param eventType - * @return - */ - public static EventPublisher registerToSharePublisher(final Class eventType) { - return INSTANCE.sharePublisher; - } - - /** - * register publisher - * - * @param supplier - * @param eventType - * @param queueMaxSize - * @return - */ - public static EventPublisher registerToPublisher(final Class eventType, final int queueMaxSize) { - - if (SlowEvent.class.isAssignableFrom(eventType)) { - return INSTANCE.sharePublisher; - } - - final String topic = eventType.getCanonicalName(); - INSTANCE.publisherMap.computeIfAbsent(topic, s -> BUILD_FACTORY.apply(eventType, queueMaxSize)); - EventPublisher publisher = INSTANCE.publisherMap.get(topic); - return publisher; - } - - /** - * deregister publisher - * - * @param eventType - * @return - */ - public static void deregisterPublisher(final Class eventType) { - final String topic = eventType.getCanonicalName(); - EventPublisher publisher = INSTANCE.publisherMap.remove(topic); - publisher.shutdown(); - } - -} diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/SlowEvent.java b/core/src/main/java/com/alibaba/nacos/core/notify/SlowEvent.java deleted file mode 100644 index f105b68b842..00000000000 --- a/core/src/main/java/com/alibaba/nacos/core/notify/SlowEvent.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.notify; - -/** - * this event share one event-queue. - * - * @author liaochuntao - */ -public interface SlowEvent extends Event { - -} diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/listener/SmartSubscribe.java b/core/src/main/java/com/alibaba/nacos/core/notify/listener/SmartSubscribe.java deleted file mode 100644 index 074cfa557cc..00000000000 --- a/core/src/main/java/com/alibaba/nacos/core/notify/listener/SmartSubscribe.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.notify.listener; - -import com.alibaba.nacos.core.notify.Event; - -/** - * Subscribers to multiple events can be listened to. - * - * @author liaochuntao - */ -@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") -public abstract class SmartSubscribe implements Subscribe { - - /** - * Determines if the processing message is acceptable. - * - * @param event {@link Event} - * @return Determines if the processing message is acceptable - */ - public abstract boolean canNotify(Event event); - - @Override - public final Class subscribeType() { - return null; - } - - @Override - public final boolean ignoreExpireEvent() { - return false; - } -} diff --git a/core/src/main/java/com/alibaba/nacos/core/notify/listener/Subscribe.java b/core/src/main/java/com/alibaba/nacos/core/notify/listener/Subscribe.java deleted file mode 100644 index 0f28f5f397b..00000000000 --- a/core/src/main/java/com/alibaba/nacos/core/notify/listener/Subscribe.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.notify.listener; - -import com.alibaba.nacos.core.notify.Event; - -import java.util.concurrent.Executor; - -/** - * subscriber. - * - * @author liaochuntao - */ -public interface Subscribe { - - /** - * Event callback. - * - * @param event {@link Event} - */ - void onEvent(T event); - - /** - * Type of this subscriber's subscription. - * - * @return Class which extends {@link Event} - */ - Class subscribeType(); - - /** - * It is up to the listener to determine whether the callback is asynchronous or synchronous. - * - * @return {@link Executor} - */ - default Executor executor() { - return null; - } - - /** - * Whether to ignore expired events. - * - * @return default value is {@link Boolean#FALSE} - */ - default boolean ignoreExpireEvent() { - return false; - } - -}