Skip to content

Commit

Permalink
[alibaba#3141]Move the related notify test cases code to common modul…
Browse files Browse the repository at this point in the history
…e, fix NPE and improve peformance. (alibaba#3145)

* [alibaba#3141]fix typo and move unit test cases codes from core to common module.

* [alibaba#3141]implement DefaultSharePublisher to separate sharePublish from default publisher and fix typo.

* [alibaba#3141]fix typo and NPE issue.

* [alibaba#3141]improve performance and fix typo.

* [alibaba#3141]add removeSubscriber logical codes.

* [alibaba#3141]rename the unit test method's names.

* [alibaba#3141]fix typo.

* [alibaba#3141]fix thread unsafe and atomic issue.
  • Loading branch information
zongtanghu authored Jun 24, 2020
1 parent f5e1025 commit ac0eabf
Show file tree
Hide file tree
Showing 4 changed files with 695 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.ClassUtils;
import com.alibaba.nacos.common.utils.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,21 +40,21 @@
*/
public class DefaultPublisher extends Thread implements EventPublisher {

private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);

private volatile boolean initialized = false;

private volatile boolean shutdown = false;

private Class<? extends Event> eventType;

private final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();

private int queueMaxSize = -1;

private BlockingQueue<Event> queue;

private volatile Long lastEventSequence = -1L;
protected volatile Long lastEventSequence = -1L;

private final AtomicReferenceFieldUpdater<DefaultPublisher, Long> updater = AtomicReferenceFieldUpdater
.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
Expand Down Expand Up @@ -98,13 +96,17 @@ public void run() {

void openEventHandler() {
try {

// This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
for (; ; ) {
if (shutdown || hasSubscriber()) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
ThreadUtils.sleep(1000L);
waitTimes--;
}

for (; ; ) {
Expand Down Expand Up @@ -162,9 +164,13 @@ public boolean isInitialized() {
return initialized;
}

/**
* Receive and notifySubscriber to process the event.
*
* @param event {@link Event}.
*/
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
final String sourceName = ClassUtils.getName(event);

// Notification single event listener
for (Subscriber subscriber : subscribers) {
Expand All @@ -175,10 +181,8 @@ void receiveEvent(Event event) {
continue;
}

final String targetName = ClassUtils.getName(subscriber.subscribeType());
if (!Objects.equals(sourceName, targetName)) {
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
notifySubscriber(subscriber, event);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* The default share event publisher implementation for slow event.
*
* @author zongtanghu
*/
public class DefaultSharePublisher extends DefaultPublisher {

private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<Class<? extends SlowEvent>, Set<Subscriber>>();

private final Lock lock = new ReentrantLock();

/**
* Add listener for default share publisher.
*
* @param subscriber {@link Subscriber}
* @param subscribeType subscribe event type, such as slow event or general event.
*/
public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
// Actually, do a classification based on the slowEvent type.
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// For adding to parent class attributes synchronization.
subscribers.add(subscriber);

lock.lock();
try {
Set<Subscriber> sets = subMappings.get(subSlowEventType);
if (sets == null) {
Set<Subscriber> newSet = new ConcurrentHashSet<Subscriber>();
newSet.add(subscriber);
subMappings.put(subSlowEventType, newSet);
return;
}
sets.add(subscriber);
} finally {
lock.unlock();
}
}

/**
* Remove listener for default share publisher.
*
* @param subscriber {@link Subscriber}
* @param subscribeType subscribe event type, such as slow event or general event.
*/
public void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
// Actually, do a classification based on the slowEvent type.
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// For removing to parent class attributes synchronization.
subscribers.remove(subscriber);

lock.lock();
try {
Set<Subscriber> sets = subMappings.get(subSlowEventType);

if (sets != null && sets.contains(subscriber)) {
sets.remove(subscriber);
}
} finally {
lock.unlock();
}
}

@Override
public void receiveEvent(Event event) {

final long currentEventSequence = event.sequence();
// get subscriber set based on the slow EventType.
final Class<? extends SlowEvent> slowEventType = (Class<? extends SlowEvent>) event.getClass();

// Get for Map, the algorithm is O(1).
Set<Subscriber> subscribers = subMappings.get(slowEventType);

// Notification single event subscriber
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}

// Notify single subscriber for slow event.
notifySubscriber(subscriber, event);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class NotifyCenter {

private static final NotifyCenter INSTANCE = new NotifyCenter();

private EventPublisher sharePublisher;
private DefaultSharePublisher sharePublisher;

private static Class<? extends EventPublisher> clazz = null;

Expand Down Expand Up @@ -100,7 +100,11 @@ public EventPublisher apply(Class<? extends Event> cls, Integer buffer) throws N
};

try {
INSTANCE.sharePublisher = publisherFactory.apply(SlowEvent.class, shareBufferSize);

// Create and init DefaultSharePublisher instance.
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);

} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : {}", ex);
}
Expand Down Expand Up @@ -171,13 +175,19 @@ public static <T> void registerSubscriber(final Subscriber consumer) throws Naco
// based on subclass's subscribeTypes method return list, it can register to publisher.
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
addSubscriber(consumer, subscribeType);
// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
// For case, producer: defaultPublisher -> consumer: subscriber.
addSubscriber(consumer, subscribeType);
}
}
return;
}

if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) {
INSTANCE.sharePublisher.addSubscriber(consumer);
INSTANCE.sharePublisher.addSubscriber(consumer, cls);
return;
}

Expand All @@ -195,7 +205,10 @@ private static void addSubscriber(final Subscriber consumer, Class<? extends Eve
throws NacosException {

final String topic = ClassUtils.getCanonicalName(subscribeType);
MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
publisher.addSubscriber(consumer);
}
Expand All @@ -209,12 +222,17 @@ public static <T> void deregisterSubscriber(final Subscriber consumer) {
final Class<? extends Event> cls = consumer.subscribeType();
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
removeSubscriber(consumer, subscribeType);
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
} else {
removeSubscriber(consumer, subscribeType);
}
}
return;
}

if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) {
INSTANCE.sharePublisher.removeSubscriber(consumer);
INSTANCE.sharePublisher.removeSubscriber(consumer, cls);
return;
}

Expand Down Expand Up @@ -300,7 +318,10 @@ public static EventPublisher registerToPublisher(final Class<? extends Event> ev
}

final String topic = ClassUtils.getCanonicalName(eventType);
MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
return publisher;
}
Expand Down
Loading

0 comments on commit ac0eabf

Please sign in to comment.