diff --git a/common/src/main/java/com/alibaba/nacos/common/NotThreadSafe.java b/common/src/main/java/com/alibaba/nacos/common/NotThreadSafe.java new file mode 100644 index 00000000000..8076d2afa6f --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/NotThreadSafe.java @@ -0,0 +1,35 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation that marks a method as not thread safe. + * + * @author zongtanghu + */ +@Documented +@Target({ElementType.TYPE, ElementType.METHOD}) +@Retention(RetentionPolicy.SOURCE) +public @interface NotThreadSafe { + +} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java new file mode 100644 index 00000000000..48d08973c09 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -0,0 +1,210 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; +import com.alibaba.nacos.common.utils.ThreadUtils; +import com.alibaba.nacos.common.utils.CollectionUtils; +import com.alibaba.nacos.common.utils.ClassUtils; +import com.alibaba.nacos.common.utils.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.alibaba.nacos.common.notify.NotifyCenter.ringBufferSize; + +/** + * The default event publisher implementation. + * + *

Internally, use {@link ArrayBlockingQueue } as a message staging queue. + * + * @author liaochuntao + * @author zongtanghu + */ +public class DefaultPublisher extends Thread implements EventPublisher { + + private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); + + private volatile boolean initialized = false; + + private volatile boolean shutdown = false; + + private Class eventType; + + private final ConcurrentHashSet subscribers = new ConcurrentHashSet(); + + private int queueMaxSize = -1; + + private BlockingQueue queue; + + private volatile Long lastEventSequence = -1L; + + private final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater + .newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence"); + + @Override + public void init(Class type, int bufferSize) { + setDaemon(true); + setName("nacos.publisher-" + type.getName()); + this.eventType = type; + this.queueMaxSize = bufferSize; + this.queue = new ArrayBlockingQueue(bufferSize); + start(); + } + + public ConcurrentHashSet getSubscribers() { + return subscribers; + } + + @Override + public synchronized void start() { + if (!initialized) { + // start just called once + super.start(); + if (queueMaxSize == -1) { + queueMaxSize = ringBufferSize; + } + initialized = true; + } + } + + public long currentEventSize() { + return queue.size(); + } + + @Override + public void run() { + openEventHandler(); + } + + void openEventHandler() { + try { + // To ensure that messages are not lost, enable EventHandler when + // waiting for the first Subscriber to register + for (; ; ) { + if (shutdown || hasSubscriber()) { + break; + } + ThreadUtils.sleep(1000L); + } + + for (; ; ) { + if (shutdown) { + break; + } + final Event event = queue.take(); + receiveEvent(event); + updater.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); + } + } catch (Throwable ex) { + LOGGER.error("Event listener exception : {}", ex); + } + } + + private boolean hasSubscriber() { + return CollectionUtils.isNotEmpty(subscribers); + } + + @Override + public void addSubscriber(Subscriber subscriber) { + subscribers.add(subscriber); + } + + @Override + public void removeSubscriber(Subscriber subscriber) { + subscribers.remove(subscriber); + } + + @Override + public boolean publish(Event event) { + checkIsStart(); + boolean success = this.queue.offer(event); + if (!success) { + LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); + receiveEvent(event); + return true; + } + return true; + } + + void checkIsStart() { + if (!initialized) { + throw new IllegalStateException("Publisher does not start"); + } + } + + @Override + public void shutdown() { + this.shutdown = true; + this.queue.clear(); + } + + public boolean isInitialized() { + return initialized; + } + + void receiveEvent(Event event) { + final long currentEventSequence = event.sequence(); + final String sourceName = ClassUtils.getName(event); + + // Notification single event listener + for (Subscriber subscriber : subscribers) { + // Whether to ignore expiration events + if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { + LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", + event.getClass()); + continue; + } + + final String targetName = ClassUtils.getName(subscriber.subscribeType()); + if (!Objects.equals(sourceName, targetName)) { + continue; + } + notifySubscriber(subscriber, event); + } + } + + @Override + public void notifySubscriber(final Subscriber subscriber, final Event event) { + + LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber); + + final Runnable job = new Runnable() { + @Override + public void run() { + subscriber.onEvent(event); + } + }; + + final Executor executor = subscriber.executor(); + + if (executor != null) { + executor.execute(job); + } else { + try { + job.run(); + } catch (Throwable e) { + LOGGER.error("Event callback exception : {}", e); + } + } + } +} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java new file mode 100644 index 00000000000..a8562884213 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java @@ -0,0 +1,45 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An abstract class for event. + * + * @author liaochuntao + * @author zongtanghu + */ +@SuppressWarnings({"PMD.AbstractClassShouldStartWithAbstractNamingRule"}) +public abstract class Event implements Serializable { + + private static final AtomicLong SEQUENCE = new AtomicLong(0); + + private final long sequence = SEQUENCE.getAndIncrement(); + + /** + * Event sequence number, which can be used to handle the sequence of events. + * + * @return sequence num, It's best to make sure it's monotone. + */ + public long sequence() { + return sequence; + } + +} + diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java new file mode 100644 index 00000000000..21f44465d86 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java @@ -0,0 +1,74 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import com.alibaba.nacos.common.lifecycle.Closeable; +import com.alibaba.nacos.common.notify.listener.Subscriber; + +/** + * Event publisher. + * + * @author liaochuntao + * @author zongtanghu + */ +public interface EventPublisher extends Closeable { + + /** + * Initializes the event publisher. + * + * @param type {@link Event >} + * @param bufferSize Message staging queue size + */ + void init(Class type, int bufferSize); + + /** + * The number of currently staged events. + * + * @return event size + */ + long currentEventSize(); + + /** + * Add listener. + * + * @param subscriber {@link Subscriber} + */ + void addSubscriber(Subscriber subscriber); + + /** + * Remove listener. + * + * @param subscriber {@link Subscriber} + */ + void removeSubscriber(Subscriber subscriber); + + /** + * publish event. + * + * @param event {@link Event} + * @return publish event is success + */ + boolean publish(Event event); + + /** + * Notify listener. + * + * @param subscriber {@link Subscriber} + * @param event {@link Event} + */ + void notifySubscriber(Subscriber subscriber, Event event); +} \ No newline at end of file diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java new file mode 100644 index 00000000000..5b23b877ab5 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java @@ -0,0 +1,323 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.JustForTest; +import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.common.notify.listener.SmartSubscriber; +import com.alibaba.nacos.common.utils.BiFunction; +import com.alibaba.nacos.common.utils.ClassUtils; +import com.alibaba.nacos.common.utils.MapUtils; +import com.alibaba.nacos.common.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.ServiceLoader; +import java.util.NoSuchElementException; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR; + +/** + * Unified Event Notify Center. + * + * @author liaochuntao + * @author zongtanghu + */ +public class NotifyCenter { + + private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); + + public static int ringBufferSize = 16384; + + public static int shareBufferSize = 1024; + + private static final AtomicBoolean CLOSED = new AtomicBoolean(false); + + private static BiFunction, Integer, EventPublisher> publisherFactory = null; + + private static final NotifyCenter INSTANCE = new NotifyCenter(); + + private EventPublisher sharePublisher; + + private static Class clazz = null; + + /** + * Publisher management container. + */ + private final Map publisherMap = new ConcurrentHashMap(16); + + static { + // Internal ArrayBlockingQueue buffer size. For applications with high write throughput, + // this value needs to be increased appropriately. default value is 16384 + String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size"; + ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384); + + // The size of the public publisher's message staging queue buffer + String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size"; + shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024); + + final ServiceLoader loader = ServiceLoader.load(EventPublisher.class); + Iterator iterator = loader.iterator(); + + if (iterator.hasNext()) { + clazz = iterator.next().getClass(); + } else { + clazz = DefaultPublisher.class; + } + + publisherFactory = new BiFunction, Integer, EventPublisher>() { + + @Override + public EventPublisher apply(Class cls, Integer buffer) throws NacosException { + try { + EventPublisher publisher = clazz.newInstance(); + publisher.init(cls, buffer); + return publisher; + } catch (Throwable ex) { + LOGGER.error("Service class newInstance has error : {}", ex); + throw new NacosException(SERVER_ERROR, ex); + } + } + }; + + try { + INSTANCE.sharePublisher = publisherFactory.apply(SlowEvent.class, shareBufferSize); + } catch (Throwable ex) { + LOGGER.error("Service class newInstance has error : {}", ex); + } + + ThreadUtils.addShutdownHook(new Runnable() { + @Override + public void run() { + shutdown(); + } + }); + } + + @JustForTest + public static Map getPublisherMap() { + return INSTANCE.publisherMap; + } + + @JustForTest + public static EventPublisher getPublisher(Class topic) { + if (ClassUtils.isAssignableFrom(SlowEvent.class, topic)) { + return INSTANCE.sharePublisher; + } + return INSTANCE.publisherMap.get(topic.getCanonicalName()); + } + + @JustForTest + public static EventPublisher getSharePublisher() { + return INSTANCE.sharePublisher; + } + + /** + * Shutdown the serveral publisher instance which notifycenter has. + */ + public static void shutdown() { + if (!CLOSED.compareAndSet(false, true)) { + return; + } + LOGGER.warn("[NotifyCenter] Start destroying Publisher"); + + for (Map.Entry entry : INSTANCE.publisherMap.entrySet()) { + try { + EventPublisher eventPublisher = entry.getValue(); + eventPublisher.shutdown(); + } catch (Throwable e) { + LOGGER.error("[EventPublisher] shutdown has error : {}", e); + } + } + + try { + INSTANCE.sharePublisher.shutdown(); + } catch (Throwable e) { + LOGGER.error("[SharePublisher] shutdown has error : {}", e); + } + + LOGGER.warn("[NotifyCenter] Destruction of the end"); + } + + /** + * Register a Subscriber. If the Publisher concerned by the Subscriber does not exist, then PublihserMap will + * preempt a placeholder Publisher first. + * + * @param consumer subscriber + * @param event type + */ + public static void registerSubscriber(final Subscriber consumer) throws NacosException { + final Class cls = consumer.subscribeType(); + // If you want to listen to multiple events, you do it separately, + // based on subclass's subscribeTypes method return list, it can register to publisher. + if (consumer instanceof SmartSubscriber) { + for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { + addSubscriber(consumer, subscribeType); + } + return; + } + + if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) { + INSTANCE.sharePublisher.addSubscriber(consumer); + return; + } + + addSubscriber(consumer, consumer.subscribeType()); + } + + /** + * Add a subscriber to pusblisher. + * + * @param consumer subscriber instance. + * @param subscribeType subscribeType. + * @throws NacosException BiFunction mappingFunction may throw a NacosException. + */ + private static void addSubscriber(final Subscriber consumer, Class subscribeType) + throws NacosException { + + final String topic = ClassUtils.getCanonicalName(subscribeType); + MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize); + EventPublisher publisher = INSTANCE.publisherMap.get(topic); + publisher.addSubscriber(consumer); + } + + /** + * Deregister subscriber. + * + * @param consumer subscriber instance. + */ + public static void deregisterSubscriber(final Subscriber consumer) { + final Class cls = consumer.subscribeType(); + if (consumer instanceof SmartSubscriber) { + for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { + removeSubscriber(consumer, subscribeType); + } + return; + } + if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) { + INSTANCE.sharePublisher.removeSubscriber(consumer); + return; + } + + if (removeSubscriber(consumer, consumer.subscribeType())) { + return; + } + throw new NoSuchElementException("The subcriber has no event publisher"); + } + + /** + * Remove subscriber. + * + * @param consumer subscriber instance. + * @param subscribeType subscribeType. + * @return whether remove subscriber successfully or not. + */ + private static boolean removeSubscriber(final Subscriber consumer, Class subscribeType) { + + final String topic = ClassUtils.getCanonicalName(subscribeType); + if (INSTANCE.publisherMap.containsKey(topic)) { + EventPublisher publisher = INSTANCE.publisherMap.get(topic); + publisher.removeSubscriber(consumer); + return true; + } + + return false; + } + + /** + * Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is + * actually published. + * + * @param event class Instances of the event. + */ + public static boolean publishEvent(final Event event) { + try { + return publishEvent(event.getClass(), event); + } catch (Throwable ex) { + LOGGER.error("There was an exception to the message publishing : {}", ex); + return false; + } + } + + /** + * Request publisher publish event Publishers load lazily, calling publisher. + * + * @param eventType class Instances type of the event type. + * @param event event instance. + */ + private static boolean publishEvent(final Class eventType, final Event event) { + final String topic = ClassUtils.getCanonicalName(eventType); + if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { + return INSTANCE.sharePublisher.publish(event); + } + + if (INSTANCE.publisherMap.containsKey(topic)) { + EventPublisher publisher = INSTANCE.publisherMap.get(topic); + return publisher.publish(event); + } + throw new NoSuchElementException("There are no [" + topic + "] publishers for this event, please register"); + } + + /** + * Register to share-publisher. + * + * @param eventType class Instances type of the event type. + * @return share publisher instance. + */ + public static EventPublisher registerToSharePublisher(final Class eventType) { + return INSTANCE.sharePublisher; + } + + /** + * Register publisher. + * + * @param eventType class Instances type of the event type. + * @param queueMaxSize the publisher's queue max size. + */ + public static EventPublisher registerToPublisher(final Class eventType, final int queueMaxSize) + throws NacosException { + if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { + return INSTANCE.sharePublisher; + } + + final String topic = ClassUtils.getCanonicalName(eventType); + MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize); + EventPublisher publisher = INSTANCE.publisherMap.get(topic); + return publisher; + } + + /** + * Deregister publisher. + * + * @param eventType class Instances type of the event type. + */ + public static void deregisterPublisher(final Class eventType) { + final String topic = ClassUtils.getCanonicalName(eventType); + EventPublisher publisher = INSTANCE.publisherMap.remove(topic); + try { + publisher.shutdown(); + } catch (Throwable ex) { + LOGGER.error("There was an exception when publisher shutdown : {}", ex); + } + } + +} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java b/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java new file mode 100644 index 00000000000..fef83daa2a0 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java @@ -0,0 +1,32 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +/** + * This event share one event-queue. + * + * @author liaochuntao + * @author zongtanghu + */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") +public abstract class SlowEvent extends Event { + + @Override + public long sequence() { + return 0; + } +} \ No newline at end of file diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java b/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java new file mode 100644 index 00000000000..8e1da9f8b74 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java @@ -0,0 +1,48 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify.listener; + +import com.alibaba.nacos.common.notify.Event; + +import java.util.List; + +/** + * Subscribers to multiple events can be listened to. + * + * @author liaochuntao + * @author zongtanghu + */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") +public abstract class SmartSubscriber extends Subscriber { + + /** + * Returns which event type are smartsubscriber interested in. + * + * @return The interestd event types. + */ + public abstract List> subscribeTypes(); + + @Override + public final Class subscribeType() { + return null; + } + + @Override + public final boolean ignoreExpireEvent() { + return false; + } +} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java new file mode 100644 index 00000000000..65a41d59eee --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java @@ -0,0 +1,63 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify.listener; + +import com.alibaba.nacos.common.notify.Event; + +import java.util.concurrent.Executor; + +/** + * An abstract subscriber class for subscriber interface. + * + * @author liaochuntao + * @author zongtanghu + */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") +public abstract class Subscriber { + + /** + * Event callback. + * + * @param event {@link Event} + */ + public abstract void onEvent(T event); + + /** + * Type of this subscriber's subscription. + * + * @return Class which extends {@link Event} + */ + public abstract Class subscribeType(); + + /** + * It is up to the listener to determine whether the callback is asynchronous or synchronous. + * + * @return {@link Executor} + */ + public Executor executor() { + return null; + } + + /** + * Whether to ignore expired events. + * + * @return default value is {@link Boolean#FALSE} + */ + public boolean ignoreExpireEvent() { + return false; + } +} diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java new file mode 100644 index 00000000000..20accf0851b --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java @@ -0,0 +1,42 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.utils; + +import com.alibaba.nacos.api.exception.NacosException; + +/** + * Represents a function that accepts two arguments and produces a result. + * The following utility functions are extracted from org.apache.commons.lang3. + * + *

This is a functional interface + * whose functional method is {@link #apply(Object, Object)}. + * + * @author zongtanghu + * + */ +public interface BiFunction { + + /** + * Applies this function to the two given arguments. + * + * @param t the first function argument + * @param u the second function argument + * @return the function result + * @throws NacosException function throws NacosException + */ + R apply(T t, U u) throws NacosException; +} diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java new file mode 100644 index 00000000000..5f4250971ee --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java @@ -0,0 +1,123 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.utils; + +import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException; + +import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR; + +/** + * Utils for Class. + * + * @author liaochuntao + */ +public final class ClassUtils { + + /** + * Finds and returns class by className. + * + * @param className String value for className. + * @return class Instances of the class represent classes and interfaces. + */ + public static Class findClassByName(String className) { + try { + return Class.forName(className); + } catch (Exception e) { + throw new NacosRuntimeException(SERVER_ERROR, "this class name not found"); + } + } + + /** + * Determines if the class or interface represented by this object is either the same as, or is a superclass or + * superinterface of, the class or interface represented by the specified parameter. + * + * @param clazz Instances of the class represent classes and interfaces. + * @param cls Instances of the class represent classes and interfaces. + * @return the value indicating whether objects of the type can be assigned to objects of this class. + */ + public static boolean isAssignableFrom(Class clazz, Class cls) { + Objects.requireNonNull(cls, "cls"); + return clazz.isAssignableFrom(cls); + } + + /** + * Gets and returns the class name. + * + * @param cls Instances of the class represent classes and interfaces. + * @return the name of the class or interface represented by this object. + */ + public static String getName(Class cls) { + Objects.requireNonNull(cls, "cls"); + return cls.getName(); + } + + /** + * Gets and returns className. + * + * @param obj Object instance. + * @return className. + */ + public static String getName(Object obj) { + Objects.requireNonNull(obj, "obj"); + return obj.getClass().getName(); + } + + /** + * Gets and returns the canonical name of the underlying class. + * + * @param cls Instances of the class represent classes and interfaces. + * @return The canonical name of the underlying class. + */ + public static String getCanonicalName(Class cls) { + Objects.requireNonNull(cls, "cls"); + return cls.getCanonicalName(); + } + + /** + * Gets and returns the canonical name of the underlying class. + * + * @param obj Object instance. + * @return The canonical name of the underlying class. + */ + public static String getCanonicalName(Object obj) { + Objects.requireNonNull(obj, "obj"); + return obj.getClass().getCanonicalName(); + } + + /** + * Gets and returns the simple name of the underlying class. + * + * @param cls Instances of the class represent classes and interfaces. + * @return the simple name of the underlying class. + */ + public static String getSimplaName(Class cls) { + Objects.requireNonNull(cls, "cls"); + return cls.getSimpleName(); + } + + /** + * Gets and returns the simple name of the underlying class as given in the source code. + * + * @param obj Object instance. + * @return the simple name of the underlying class. + */ + public static String getSimplaName(Object obj) { + Objects.requireNonNull(obj, "obj"); + return obj.getClass().getSimpleName(); + } + +} diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java index 2a33a28bb88..eaa1d1ae687 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java @@ -16,6 +16,9 @@ package com.alibaba.nacos.common.utils; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.NotThreadSafe; + import java.util.Collection; import java.util.Dictionary; import java.util.Map; @@ -124,4 +127,32 @@ public static void putIfValNoEmpty(Map target, Object key, Object value) { } } + /** + * ComputeIfAbsent lazy load. + * + * @param target target Map data. + * @param key map key. + * @param mappingFunction funtion which is need to be executed. + * @param param1 function's parameter value1. + * @param param2 function's parameter value1. + * @return + */ + @NotThreadSafe + public static Object computeIfAbsent(Map target, Object key, BiFunction mappingFunction, Object param1, + Object param2) throws NacosException { + + Objects.requireNonNull(key, "key"); + Objects.requireNonNull(key, "mappingFunction"); + Objects.requireNonNull(key, "param1"); + Objects.requireNonNull(key, "param2"); + + Object val = target.get(key); + if (val == null) { + Object ret = mappingFunction.apply(param1, param2); + target.put(key, ret); + return ret; + } + return val; + } + }