Skip to content

Commit

Permalink
[ISSUE #5127] fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JiangShuJu committed Nov 9, 2024
1 parent 2ba54c7 commit 8783728
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.lmax.disruptor.dsl.ProducerType;

import lombok.Getter;
import lombok.Setter;


public class Channel implements LifeCycle {
Expand All @@ -39,11 +40,16 @@ public class Channel implements LifeCycle {
@Getter
private DisruptorProvider provider;
private final Integer size;
private final EventHandler<MessageEntity> eventHandler;
@Setter
private EventHandler<MessageEntity> eventHandler;
private volatile boolean started = false;
private final TopicMetadata topic;
private static final String THREAD_NAME_PREFIX = "standalone_disruptor_provider_";

public Channel(TopicMetadata topic) {
this(DEFAULT_SIZE, topic, null);
}

public Channel(TopicMetadata topic, EventHandler<MessageEntity> eventHandler) {
this(DEFAULT_SIZE, topic, eventHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,20 @@ public static StandaloneBroker getInstance() {
public MessageEntity putMessage(String topicName, CloudEvent message) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
if (!messageContainer.containsKey(topicMetadata)) {
createTopic(topicName);
throw new RuntimeException("The topic is not created");
}
Channel channel = messageContainer.get(topicMetadata);
if (channel.isClosed()) {
throw new RuntimeException("The topic is not subscribed");
}
MessageEntity messageEntity = new MessageEntity(new TopicMetadata(topicName), message);
channel.getProvider().onData(messageEntity);
return messageEntity;
}

public Channel createTopic(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
return messageContainer.computeIfAbsent(topicMetadata, k -> {
Subscribe subscribe = subscribeContainer.get(topicMetadata);
if (subscribe == null) {
throw new IllegalStateException("the topic not exist subscribe ");
}
Channel channel = new Channel(topicMetadata, subscribe);
channel.start();
return channel;
});
return messageContainer.computeIfAbsent(topicMetadata, k -> new Channel(topicMetadata));
}

/**
Expand Down Expand Up @@ -139,10 +134,17 @@ public void deleteTopicIfExist(String topicName) {

public void subscribed(String topicName, Subscribe subscribe) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
if (getMessageContainer().containsKey(topicMetadata)) {
if (subscribeContainer.containsKey(topicMetadata)) {
log.warn("the topic already subscribed");
return;
}
Channel channel = getMessageContainer().get(topicMetadata);
if (channel == null) {
log.warn("the topic is not created");
return;
}
channel.setEventHandler(subscribe);
channel.start();
subscribeContainer.put(topicMetadata, subscribe);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ public static MessageEntity createMessageEntity(TopicMetadata topicMetadata, Clo
}

public static Subscribe createSubscribe(StandaloneBroker standaloneBroker) {
standaloneBroker.createTopic(TEST_TOPIC);
return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> {
});
}

public static Subscribe createSubscribe(StandaloneBroker standaloneBroker, List<CloudEvent> cloudEvents) {
standaloneBroker.createTopic(TEST_TOPIC);
return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> {
cloudEvents.add(cloudEvent);
});
Expand Down

0 comments on commit 8783728

Please sign in to comment.