Description
Search before asking
- I had searched in the issues and found no similar issues.
Environment
Linux
EventMesh version
1.10.0
What happened
There is a problem with the class of SubStreamHandler. grpc is two-way communication. I have integrated grpc pub and sub functions in the project.
This code throws an exception, and every 30 seconds,
private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
}
}
I analyzed the reason, because when the exception is thrown, you should call onCompleted to close the sender, but that is not done here, resulting in
grpc thinks the connection is still there and keeps throwing exceptions in a loop.
I tried the method below,it solved the problem。
Will you change code quickly? I am using the project,but this problem stop me。
@slf4j
public class SubStreamHandler extends Thread implements Serializable {
...
public SubStreamHandler(final ConsumerServiceStub consumerAsyncClient, final EventMeshGrpcClientConfig clientConfig,
final ReceiveMsgHook listener) {
this.consumerAsyncClient = consumerAsyncClient;
this.clientConfig = clientConfig;
this.listener = listener;
}
public void sendSubscription(final CloudEvent subscription) {
synchronized (this) {
if (this.sender == null) {
this.sender = consumerAsyncClient.subscribeStream(createReceiver());
}
}
senderOnNext(subscription);
}
private StreamObserver<CloudEvent> createReceiver() {
return new StreamObserver<CloudEvent>() {
@Override
public void onNext(final CloudEvent message) {
T msg = EventMeshCloudEventBuilder.buildMessageFromEventMeshCloudEvent(message, listener.getProtocolType());
if (msg instanceof Set) {
log.info("Received message from Server:{}", message);
} else {
log.info("Received message from Server.|seq={}|uniqueId={}|",
EventMeshCloudEventUtils.getSeqNum(message), EventMeshCloudEventUtils.getUniqueId(message));
CloudEvent streamReply = null;
try {
Optional<T> reply = listener.handle(msg);
if (reply.isPresent()) {
streamReply = buildReplyMessage(message, reply.get());
}
} catch (Exception e) {
log.error("Error in handling reply message.|seq={}|uniqueId={}|",
EventMeshCloudEventUtils.getSeqNum(message), EventMeshCloudEventUtils.getUniqueId(message), e);
}
if (streamReply != null) {
log.info("Sending reply message to Server.|seq={}|uniqueId={}|",
EventMeshCloudEventUtils.getSeqNum(streamReply), EventMeshCloudEventUtils.getUniqueId(streamReply));
senderOnNext(streamReply);
}
}
}
@Override
public void onError(final Throwable t) {
log.error("Received Server side error", t);
**close();**
}
@Override
public void onCompleted() {
log.info("Finished receiving messages from server.");
close();
}
};
}
private CloudEvent buildReplyMessage(final CloudEvent reqMessage, final T replyMessage) {
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventMeshCloudEvent(replyMessage,
clientConfig, listener.getProtocolType());
return CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessage.getAttributesMap()).putAllAttributes(cloudEvent.getAttributesMap())
.putAttributes(ProtocolKey.DATA_CONTENT_TYPE,
CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build())
// Indicate that it is a subscription response
.putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, CloudEventAttributeValue.newBuilder().setCeString(ProtocolKey.SUB_REPLY_MESSAGE).build())
.build();
}
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
log.error("SubStreamHandler Thread interrupted", e);
Thread.currentThread().interrupt();
}
}
public void close() {
if (this.sender != null) {
senderOnComplete();
}
latch.countDown();
log.info("SubStreamHandler closed.");
}
private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
**close();**
}
}
private void senderOnComplete() {
try {
synchronized (sender) {
sender.onCompleted();
**sender=null;**
}
} catch (Exception e) {
log.error("StreamObserver Error onComplete", e);
}
}
}
How to reproduce
when exception happen ,this method will cause endless loop。
for example,grpc time out。
private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
}
}
Debug logs
no
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct *