Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3117]Just Sink the Notify implementation into common module and optimize some parts #3118

Merged
merged 6 commits into from
Jun 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions common/src/main/java/com/alibaba/nacos/common/NotThreadSafe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation that marks a method as not thread safe.
*
* @author zongtanghu
*/
@Documented
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.SOURCE)
public @interface NotThreadSafe {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you want to figure out some method may be not thread safe. But I want to know whether is project do like this? I think write some discription in Javadoc is enough.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some open source project use this annotation to present unsafe method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.ClassUtils;
import com.alibaba.nacos.common.utils.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import static com.alibaba.nacos.common.notify.NotifyCenter.ringBufferSize;

/**
* The default event publisher implementation.
*
* <p>Internally, use {@link ArrayBlockingQueue <Event/>} as a message staging queue.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
* @author zongtanghu
*/
public class DefaultPublisher extends Thread implements EventPublisher {

private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);

private volatile boolean initialized = false;

private volatile boolean shutdown = false;

private Class<? extends Event> eventType;

private final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();

private int queueMaxSize = -1;

private BlockingQueue<Event> queue;

private volatile Long lastEventSequence = -1L;

private final AtomicReferenceFieldUpdater<DefaultPublisher, Long> updater = AtomicReferenceFieldUpdater
.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");

@Override
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<Event>(bufferSize);
start();
}

public ConcurrentHashSet<Subscriber> getSubscribers() {
return subscribers;
}

@Override
public synchronized void start() {
if (!initialized) {
// start just called once
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}

public long currentEventSize() {
return queue.size();
}

@Override
public void run() {
openEventHandler();
}

void openEventHandler() {
try {
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
for (; ; ) {
if (shutdown || hasSubscriber()) {
break;
}
ThreadUtils.sleep(1000L);
}

for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
updater.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : {}", ex);
}
}

private boolean hasSubscriber() {
return CollectionUtils.isNotEmpty(subscribers);
}

@Override
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}

@Override
public void removeSubscriber(Subscriber subscriber) {
subscribers.remove(subscriber);
}

@Override
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}

void checkIsStart() {
if (!initialized) {
throw new IllegalStateException("Publisher does not start");
}
}

@Override
public void shutdown() {
this.shutdown = true;
this.queue.clear();
}

public boolean isInitialized() {
return initialized;
}

void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
final String sourceName = ClassUtils.getName(event);

// Notification single event listener
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}

final String targetName = ClassUtils.getName(subscriber.subscribeType());
if (!Objects.equals(sourceName, targetName)) {
continue;
}
notifySubscriber(subscriber, event);
}
}

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {

LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);

final Runnable job = new Runnable() {
@Override
public void run() {
subscriber.onEvent(event);
}
};

final Executor executor = subscriber.executor();

if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception : {}", e);
}
}
}
}
45 changes: 45 additions & 0 deletions common/src/main/java/com/alibaba/nacos/common/notify/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

/**
* An abstract class for event.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings({"PMD.AbstractClassShouldStartWithAbstractNamingRule"})
public abstract class Event implements Serializable {

private static final AtomicLong SEQUENCE = new AtomicLong(0);

private final long sequence = SEQUENCE.getAndIncrement();

/**
* Event sequence number, which can be used to handle the sequence of events.
*
* @return sequence num, It's best to make sure it's monotone.
*/
public long sequence() {
return sequence;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.listener.Subscriber;

/**
* Event publisher.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
* @author zongtanghu
*/
public interface EventPublisher extends Closeable {

/**
* Initializes the event publisher.
*
* @param type {@link Event >}
* @param bufferSize Message staging queue size
*/
void init(Class<? extends Event> type, int bufferSize);

/**
* The number of currently staged events.
*
* @return event size
*/
long currentEventSize();

/**
* Add listener.
*
* @param subscriber {@link Subscriber}
*/
void addSubscriber(Subscriber subscriber);

/**
* Remove listener.
*
* @param subscriber {@link Subscriber}
*/
void removeSubscriber(Subscriber subscriber);

/**
* publish event.
*
* @param event {@link Event}
* @return publish event is success
*/
boolean publish(Event event);

/**
* Notify listener.
*
* @param subscriber {@link Subscriber}
* @param event {@link Event}
*/
void notifySubscriber(Subscriber subscriber, Event event);
}
Loading