Skip to content

Commit

Permalink
优化事件总线
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Aug 26, 2020
1 parent 2f17567 commit 3fa64b1
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ public ClusterConnecting(String localId, String brokerId) {

disposable.add(Flux.<TopicPayload>create(sink -> this.output = sink)
.flatMap(payload -> {
byte[] body = topicPayloadCodec.encode(payload).getBytes();
payload.retain();
Payload encoded = topicPayloadCodec.encode(payload);
byte[] body = encoded.getBytes(true);
// return operations.convertAndSend("/broker/bus/" + localId + "/" + brokerId, body);
return clusterManager
.getQueue("/broker/bus/" + localId + "/" + brokerId)
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/org/jetlinks/supports/event/BrokerEventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,14 @@ public BrokerEventBus() {
public <T> Flux<T> subscribe(@NotNull Subscription subscription,
@NotNull Decoder<T> decoder) {
return subscribe(subscription)
.flatMap(payload -> Mono.justOrEmpty(payload.decode(decoder)));
.flatMap(payload -> {
try {
payload.retain();
return Mono.justOrEmpty(payload.decode(decoder));
} finally {
payload.release();
}
});
}

@Override
Expand Down Expand Up @@ -286,10 +293,13 @@ private void handleBrokerSubscription(Subscription subscription, SubscriptionInf

private void doPublish(SubscriptionInfo info, TopicPayload payload) {
try {
payload.retain();
info.sink.next(payload);
log.debug("publish [{}] to [{}] complete", payload.getTopic(), info);
} catch (Throwable error) {
log.error("publish [{}] to [{}] event error", payload.getTopic(), info, error);
} finally {
payload.release();
}
}

Expand Down Expand Up @@ -341,12 +351,15 @@ private Mono<Long> doPublishFromBroker(TopicPayload payload, Predicate<Subscript
.map(subscriptionInfo -> {
for (SubscriptionInfo info : subscriptionInfo) {
try {
payload.retain();
info.sink.next(payload);
if (log.isDebugEnabled()) {
log.debug("broker publish [{}] to [{}] complete", payload.getTopic(), info);
}
} catch (Exception e) {
log.warn("broker publish [{}] to [{}] error", payload.getTopic(), info, e);
} finally {
payload.release();
}
}
return (long) subscriptionInfo.size();
Expand Down
4 changes: 3 additions & 1 deletion src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

<logger name="org.apache" level="warn"/>
<logger name="io.lettuce" level="warn"/>
<!--   <logger name="org.jetlinks" level="trace"/>-->
<logger name="org.springframework" level="warn"/>

<!--   <logger name="org.jetlinks" level="trace"/>-->

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
Expand Down

0 comments on commit 3fa64b1

Please sign in to comment.