Skip to content

Discarding Corrupt Message due to apparent decompression failure #1331

Open
@frankjkelly

Description

@frankjkelly

Expected behavior

Expect more logs to help debug what is going on with decompression

Actual behavior

We get the message

{"log":{"consumerID":95,"level":"ERROR",
"msg":"Discarding corrupted message","msgID":{"entryId":1792,"ledgerId":758404,"partition":-1},"name":"","subscription":"reader-czjug","time":"2025-02-12T10:46:13.616639169Z",
"topic":"persistent://XXXX/wav/084b74e6-f4c6-4ff8-9bff-d35370e6a77b",
"validationError":1},"stream":"stdout","timestamp":1739357173616}

which looks like it is coming from here

func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
validationError pb.CommandAck_ValidationError) {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to discardCorruptedMessage " +
"by closing or closed consumer")
return
}
pc.log.WithFields(log.Fields{
"msgID": msgID,
"validationError": validationError,
}).Error("Discarding corrupted message")

validationError:1 appears to be a decompression error

CommandAck_DecompressionError CommandAck_ValidationError = 1

which means it is coming from here I guess?

uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer)
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
return err
}

and given no other log lines then it suggests a cause is this

uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
if err != nil {
return nil, err
}

Steps to reproduce

Sorry I don't have reproduction steps but maybe we could add some logging here?

uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
if err != nil {
return nil, err
}

System configuration

Pulsar version: 2.11.3 and 3.3.2 (we upgraded brokers and still seeing same issue)
Pulsar Golang client: 0.14.0

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions