Skip to content

Commit fe2bd7c

Browse files
authored
[ISSUE #4804] Fix SubStreamHandler exception loop by closeOnError (#4807)
* Handle exception loop by closeOnError * Lombok optimization * some format optimization * Avoid closing multiple times * Remove redundant set null * Revert "Avoid closing multiple times" This reverts commit 774397f. * Use synchronized latch to keep senderOnComplete called once * Use boolean to prevent latch called by somebody else * Remove the unique callee/caller close() of onCompleted()
1 parent a003c03 commit fe2bd7c

File tree

2 files changed

+14
-36
lines changed

2 files changed

+14
-36
lines changed

eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ public EventMeshGrpcConsumer(final EventMeshGrpcClientConfig clientConfig) {
8787
}
8888

8989
public void init() {
90-
this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext()
91-
.build();
90+
this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext().build();
9291
this.consumerClient = ConsumerServiceGrpc.newBlockingStub(channel);
9392
this.consumerAsyncClient = ConsumerServiceGrpc.newStub(channel);
9493
this.heartbeatClient = HeartbeatServiceGrpc.newBlockingStub(channel);
@@ -125,8 +124,8 @@ public void subscribe(final List<SubscriptionItem> subscriptionItems) {
125124

126125
addSubscription(subscriptionItems, SDK_STREAM_URL, GrpcType.STREAM);
127126

128-
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null,
129-
subscriptionItems);
127+
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(
128+
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems);
130129
synchronized (this) {
131130
if (subStreamHandler == null) {
132131
subStreamHandler = new SubStreamHandler<>(consumerAsyncClient, clientConfig, listener);
@@ -137,8 +136,8 @@ public void subscribe(final List<SubscriptionItem> subscriptionItems) {
137136
}
138137

139138
private Response subscribeWebhook(List<SubscriptionItem> subscriptionItems, String url) {
140-
final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE,
141-
url, subscriptionItems);
139+
final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(
140+
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems);
142141
try {
143142
CloudEvent response = consumerClient.subscribe(subscription);
144143
log.info("Received response:{}", response);
@@ -169,8 +168,8 @@ public Response unsubscribe(final List<SubscriptionItem> subscriptionItems, fina
169168

170169
removeSubscription(subscriptionItems);
171170

172-
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url,
173-
subscriptionItems);
171+
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(
172+
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems);
174173
try {
175174
final CloudEvent response = consumerClient.unsubscribe(cloudEvent);
176175
log.info("Received response:{}", response);
@@ -191,8 +190,8 @@ public Response unsubscribe(final List<SubscriptionItem> subscriptionItems) {
191190

192191
removeSubscription(subscriptionItems);
193192

194-
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null,
195-
subscriptionItems);
193+
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(
194+
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems);
196195

197196
try {
198197
final CloudEvent response = consumerClient.unsubscribe(cloudEvent);
@@ -277,14 +276,12 @@ private void resubscribe() {
277276

278277
subscriptionGroup.forEach((url, items) -> {
279278
if (isStreamSub.get()) {
280-
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE,
281-
url,
282-
items);
279+
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(
280+
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, items);
283281
subStreamHandler.sendSubscription(subscription);
284282
} else {
285283
subscribeWebhook(items, url);
286284
}
287-
288285
});
289286
}
290287

@@ -303,6 +300,7 @@ public void close() {
303300
}
304301
}
305302

303+
@Data
306304
private static class SubscriptionInfo {
307305

308306
private transient SubscriptionItem subscriptionItem;
@@ -314,25 +312,5 @@ private static class SubscriptionInfo {
314312
this.url = url;
315313
this.grpcType = grpcType;
316314
}
317-
318-
public GrpcType getGrpcType() {
319-
return grpcType;
320-
}
321-
322-
public SubscriptionItem getSubscriptionItem() {
323-
return subscriptionItem;
324-
}
325-
326-
public void setSubscriptionItem(final SubscriptionItem subscriptionItem) {
327-
this.subscriptionItem = subscriptionItem;
328-
}
329-
330-
public String getUrl() {
331-
return url;
332-
}
333-
334-
public void setUrl(final String url) {
335-
this.url = url;
336-
}
337315
}
338316
}

eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,12 @@ public void onNext(final CloudEvent message) {
9696
@Override
9797
public void onError(final Throwable t) {
9898
log.error("Received Server side error", t);
99+
close();
99100
}
100101

101102
@Override
102103
public void onCompleted() {
103104
log.info("Finished receiving messages from server.");
104-
close();
105105
}
106106
};
107107
}
@@ -134,7 +134,6 @@ public void close() {
134134
}
135135

136136
latch.countDown();
137-
138137
log.info("SubStreamHandler closed.");
139138
}
140139

@@ -145,6 +144,7 @@ private void senderOnNext(final CloudEvent subscription) {
145144
}
146145
} catch (Exception e) {
147146
log.error("StreamObserver Error onNext", e);
147+
close();
148148
}
149149
}
150150

0 commit comments

Comments
 (0)