Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Information about release notes of INFINI Framework is provided here.
- Add utility to securely marshal JSON (#85)

### Bug fix
- Fixed comsumer segment without producer (#89)

### Improvements
- Structure http error response (#86)
Expand Down
11 changes: 10 additions & 1 deletion modules/queue/disk_queue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ READ_MSG:
}
}

// check reader
if d.reader == nil {
return messages, false, errors.New("reader is nil")
}
//read message size
err = binary.Read(d.reader, binary.BigEndian, &msgSize)
if err != nil {
Expand Down Expand Up @@ -344,7 +348,7 @@ READ_MSG:
}
newData, err := zstd.ZSTDDecompress(nil, readBuf)
if err != nil {
log.Error(err)
log.Errorf("decompress message error: %v %v,%v %v", d.fileName, d.segment, d.readPos, err)
ctx.UpdateNextOffset(d.segment, nextReadPos)
return messages, false, err
}
Expand Down Expand Up @@ -467,6 +471,11 @@ func (d *Consumer) ResetOffset(segment, readPos int64) error {
log.Debugf("reset offset: %v,%v, file: %v, queue:%v", segment, readPos, d.fileName, d.queue)
}

// no producer write data to this queue
if d.diskQueue.writeSegmentNum == 0 && d.diskQueue.writePos == 0 {
return nil
}

if segment > d.diskQueue.writeSegmentNum {
log.Errorf("reading segment [%v] is greater than writing segment [%v]", segment, d.diskQueue.writeSegmentNum)
return io.EOF
Expand Down
2 changes: 2 additions & 0 deletions modules/queue/disk_queue/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func (d *DiskBasedQueue) readOne() ([]byte, error) {
}
newData, err := zstd.ZSTDDecompress(nil, readBuf)
if err != nil {
log.Errorf("diskqueue(%s) failed to decompress %v,%v - %s", d.name, d.readSegmentFileNum, d.readPos)
return nil, err
}
return newData, nil
Expand Down Expand Up @@ -552,6 +553,7 @@ func (d *DiskBasedQueue) writeOne(data []byte) WriteResponse {
}
newData, err := zstd.ZSTDCompress(nil, data, d.cfg.Compress.Message.Level)
if err != nil {
log.Errorf("diskqueue(%s) failed to compress %v,%v - %s", d.name, d.readSegmentFileNum, d.readPos)
res.Error = err
return res
}
Expand Down
4 changes: 4 additions & 0 deletions modules/queue/disk_queue/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ func (module *DiskQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *q
}
if ok {
q1 := q.(*DiskBasedQueue)
if q1.writeSegmentNum == 0 && q1.writePos == 0 {
//empty queue, no need to create consumer
return nil, errors.New("empty queue")
}
return q1.AcquireConsumer(qconfig, consumer, offset)
}
panic(errors.Errorf("queue [%v] not found", qconfig.Name))
Expand Down
6 changes: 4 additions & 2 deletions plugins/queue/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,10 @@ func (processor *QueueConsumerProcessor) NewSlicedWorker(ctx *pipeline.Context,
case string:
v = r.(string)
}
log.Errorf("worker[%v], queue:[%v], slice:[%v], offset:[%v]->[%v],%v", workerID, qConfig.ID, sliceID, initOffset, offset, v)
ctx.Failed(fmt.Errorf("panic in slice worker: %+v", r))
if v != "empty queue" {
log.Errorf("worker[%v], queue:[%v], slice:[%v], offset:[%v]->[%v],%v", workerID, qConfig.ID, sliceID, initOffset, offset, v)
ctx.Failed(fmt.Errorf("panic in slice worker: %+v", r))
}
if parentContext != nil {
parentContext.RecordError(fmt.Errorf("panic in slice worker: %+v", r))
}
Expand Down
Loading