Skip to content

Commit

Permalink
allow empty payload for nonbatch message (apache#236)
Browse files Browse the repository at this point in the history
print out error message from MessageReceived
  • Loading branch information
zzzming authored May 6, 2020
1 parent 6edc8f4 commit e7f1673
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
}

func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
if r.buffer.ReadableBytes() == 0 {
if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
return nil, nil, ErrEOM
}
if !r.batched {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer)
if consumer, ok := c.consumerHandler(consumerID); ok {
err := consumer.MessageReceived(response, payload)
if err != nil {
c.log.WithField("consumerID", consumerID).Error("handle message err: ", response.MessageId)
c.log.WithField("consumerID", consumerID).WithError(err).Error("handle message Id: ", response.MessageId)
}
} else {
c.log.WithField("consumerID", consumerID).Warn("Got unexpected message: ", response.MessageId)
Expand Down

0 comments on commit e7f1673

Please sign in to comment.