diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index 8798443bda..d9f2a1f783 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -114,7 +114,7 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) { } func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) { - if r.buffer.ReadableBytes() == 0 { + if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 { return nil, nil, ErrEOM } if !r.batched { diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index dace305620..d02dedad6b 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -527,7 +527,7 @@ func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) if consumer, ok := c.consumerHandler(consumerID); ok { err := consumer.MessageReceived(response, payload) if err != nil { - c.log.WithField("consumerID", consumerID).Error("handle message err: ", response.MessageId) + c.log.WithField("consumerID", consumerID).WithError(err).Error("handle message Id: ", response.MessageId) } } else { c.log.WithField("consumerID", consumerID).Warn("Got unexpected message: ", response.MessageId)