Skip to content

[Bug] grpc SubStreamHandler exception #4804

Closed
@9997766

Description

@9997766

Search before asking

  • I had searched in the issues and found no similar issues.

Environment

Linux

EventMesh version

1.10.0

What happened

There is a problem with the class of SubStreamHandler. grpc is two-way communication. I have integrated grpc pub and sub functions in the project.
This code throws an exception, and every 30 seconds,

private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
}
}

I analyzed the reason, because when the exception is thrown, you should call onCompleted to close the sender, but that is not done here, resulting in
grpc thinks the connection is still there and keeps throwing exceptions in a loop.

I tried the method below,it solved the problem。
Will you change code quickly? I am using the project,but this problem stop me。

@slf4j
public class SubStreamHandler extends Thread implements Serializable {

...
public SubStreamHandler(final ConsumerServiceStub consumerAsyncClient, final EventMeshGrpcClientConfig clientConfig,
final ReceiveMsgHook listener) {
this.consumerAsyncClient = consumerAsyncClient;
this.clientConfig = clientConfig;
this.listener = listener;
}

public void sendSubscription(final CloudEvent subscription) {
    synchronized (this) {
        if (this.sender == null) {
            this.sender = consumerAsyncClient.subscribeStream(createReceiver());
        }
    }
    senderOnNext(subscription);
}

private StreamObserver<CloudEvent> createReceiver() {
    return new StreamObserver<CloudEvent>() {

        @Override
        public void onNext(final CloudEvent message) {
            T msg = EventMeshCloudEventBuilder.buildMessageFromEventMeshCloudEvent(message, listener.getProtocolType());
            if (msg instanceof Set) {
                log.info("Received message from Server:{}", message);
            } else {
                log.info("Received message from Server.|seq={}|uniqueId={}|",
                        EventMeshCloudEventUtils.getSeqNum(message), EventMeshCloudEventUtils.getUniqueId(message));
                CloudEvent streamReply = null;
                try {
                    Optional<T> reply = listener.handle(msg);
                    if (reply.isPresent()) {
                        streamReply = buildReplyMessage(message, reply.get());
                    }
                } catch (Exception e) {
                    log.error("Error in handling reply message.|seq={}|uniqueId={}|",
                            EventMeshCloudEventUtils.getSeqNum(message), EventMeshCloudEventUtils.getUniqueId(message), e);
                }
                if (streamReply != null) {
                    log.info("Sending reply message to Server.|seq={}|uniqueId={}|",
                            EventMeshCloudEventUtils.getSeqNum(streamReply), EventMeshCloudEventUtils.getUniqueId(streamReply));
                    senderOnNext(streamReply);
                }
            }
        }

        @Override
        public void onError(final Throwable t) {
            log.error("Received Server side error", t);
            **close();**
        }

        @Override
        public void onCompleted() {
            log.info("Finished receiving messages from server.");
            close();
        }
    };
}

private CloudEvent buildReplyMessage(final CloudEvent reqMessage, final T replyMessage) {
    final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventMeshCloudEvent(replyMessage,
            clientConfig, listener.getProtocolType());

    return CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessage.getAttributesMap()).putAllAttributes(cloudEvent.getAttributesMap())
            .putAttributes(ProtocolKey.DATA_CONTENT_TYPE,
                    CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build())
            // Indicate that it is a subscription response
            .putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, CloudEventAttributeValue.newBuilder().setCeString(ProtocolKey.SUB_REPLY_MESSAGE).build())
            .build();
}

@Override
public void run() {
    try {
        latch.await();
    } catch (InterruptedException e) {
        log.error("SubStreamHandler Thread interrupted", e);
        Thread.currentThread().interrupt();
    }
}

public void close() {
    if (this.sender != null) {
        senderOnComplete();
    }

    latch.countDown();

    log.info("SubStreamHandler closed.");
}

private void senderOnNext(final CloudEvent subscription) {
    try {
        synchronized (sender) {
            sender.onNext(subscription);
        }
    } catch (Exception e) {
        log.error("StreamObserver Error onNext", e);
        **close();**
    }
}

private void senderOnComplete() {
    try {
        synchronized (sender) {
            sender.onCompleted();
            **sender=null;**
        }
    } catch (Exception e) {
        log.error("StreamObserver Error onComplete", e);
    }
}

}

How to reproduce

when exception happen ,this method will cause endless loop。
for example,grpc time out。
private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
}
}

Debug logs

no

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions