Skip to content

Commit

Permalink
[alibaba#3117]Just Sink the Notify implementation into common module …
Browse files Browse the repository at this point in the history
…and optimize some parts (alibaba#3118)

* [alibaba#3117]Sink the Notify implementation into common module and optimize some parts.

* [alibaba#3117]fix typo and reformat code styles.

* [alibaba#3117]fix typo and reformat code styles.

* [alibaba#3118]fix typo and formate.

* [alibaba#3118]Unify Subsciber and SmartSubscriber, and fix some typo and reformat.

* [alibaba#3118]fix some typo and reformat.
  • Loading branch information
zongtanghu authored Jun 21, 2020
1 parent bb7cfbc commit 1dc29f2
Show file tree
Hide file tree
Showing 11 changed files with 1,026 additions and 0 deletions.
35 changes: 35 additions & 0 deletions common/src/main/java/com/alibaba/nacos/common/NotThreadSafe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation that marks a method as not thread safe.
*
* @author zongtanghu
*/
@Documented
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.SOURCE)
public @interface NotThreadSafe {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.ClassUtils;
import com.alibaba.nacos.common.utils.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import static com.alibaba.nacos.common.notify.NotifyCenter.ringBufferSize;

/**
* The default event publisher implementation.
*
* <p>Internally, use {@link ArrayBlockingQueue <Event/>} as a message staging queue.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
* @author zongtanghu
*/
public class DefaultPublisher extends Thread implements EventPublisher {

private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);

private volatile boolean initialized = false;

private volatile boolean shutdown = false;

private Class<? extends Event> eventType;

private final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();

private int queueMaxSize = -1;

private BlockingQueue<Event> queue;

private volatile Long lastEventSequence = -1L;

private final AtomicReferenceFieldUpdater<DefaultPublisher, Long> updater = AtomicReferenceFieldUpdater
.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");

@Override
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<Event>(bufferSize);
start();
}

public ConcurrentHashSet<Subscriber> getSubscribers() {
return subscribers;
}

@Override
public synchronized void start() {
if (!initialized) {
// start just called once
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}

public long currentEventSize() {
return queue.size();
}

@Override
public void run() {
openEventHandler();
}

void openEventHandler() {
try {
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
for (; ; ) {
if (shutdown || hasSubscriber()) {
break;
}
ThreadUtils.sleep(1000L);
}

for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
updater.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : {}", ex);
}
}

private boolean hasSubscriber() {
return CollectionUtils.isNotEmpty(subscribers);
}

@Override
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}

@Override
public void removeSubscriber(Subscriber subscriber) {
subscribers.remove(subscriber);
}

@Override
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}

void checkIsStart() {
if (!initialized) {
throw new IllegalStateException("Publisher does not start");
}
}

@Override
public void shutdown() {
this.shutdown = true;
this.queue.clear();
}

public boolean isInitialized() {
return initialized;
}

void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
final String sourceName = ClassUtils.getName(event);

// Notification single event listener
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}

final String targetName = ClassUtils.getName(subscriber.subscribeType());
if (!Objects.equals(sourceName, targetName)) {
continue;
}
notifySubscriber(subscriber, event);
}
}

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {

LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);

final Runnable job = new Runnable() {
@Override
public void run() {
subscriber.onEvent(event);
}
};

final Executor executor = subscriber.executor();

if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception : {}", e);
}
}
}
}
45 changes: 45 additions & 0 deletions common/src/main/java/com/alibaba/nacos/common/notify/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

/**
* An abstract class for event.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings({"PMD.AbstractClassShouldStartWithAbstractNamingRule"})
public abstract class Event implements Serializable {

private static final AtomicLong SEQUENCE = new AtomicLong(0);

private final long sequence = SEQUENCE.getAndIncrement();

/**
* Event sequence number, which can be used to handle the sequence of events.
*
* @return sequence num, It's best to make sure it's monotone.
*/
public long sequence() {
return sequence;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.listener.Subscriber;

/**
* Event publisher.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
* @author zongtanghu
*/
public interface EventPublisher extends Closeable {

/**
* Initializes the event publisher.
*
* @param type {@link Event >}
* @param bufferSize Message staging queue size
*/
void init(Class<? extends Event> type, int bufferSize);

/**
* The number of currently staged events.
*
* @return event size
*/
long currentEventSize();

/**
* Add listener.
*
* @param subscriber {@link Subscriber}
*/
void addSubscriber(Subscriber subscriber);

/**
* Remove listener.
*
* @param subscriber {@link Subscriber}
*/
void removeSubscriber(Subscriber subscriber);

/**
* publish event.
*
* @param event {@link Event}
* @return publish event is success
*/
boolean publish(Event event);

/**
* Notify listener.
*
* @param subscriber {@link Subscriber}
* @param event {@link Event}
*/
void notifySubscriber(Subscriber subscriber, Event event);
}
Loading

0 comments on commit 1dc29f2

Please sign in to comment.