Skip to content

Commit

Permalink
[modify] refine consumer group processor panic log
Browse files Browse the repository at this point in the history
  • Loading branch information
shabicheng committed Apr 29, 2022
1 parent dafcaaa commit c8ff383
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions consumer/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package consumerLibrary
import (
"errors"
"fmt"
"github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log/level"
"runtime"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log/level"
)

func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) {
Expand Down Expand Up @@ -53,9 +55,11 @@ func (consumer *ShardConsumerWorker) consumerProcessTask() string {
// If the user's consumption function reports a panic error, it will be captured and retry until sucessed.
defer func() {
if r := recover(); r != nil {
level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r)
stackBuf := make([]byte, 1<<16)
runtime.Stack(stackBuf, false)
level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf))
for {
if consumer.consumerRetryProcessTask() == true {
if consumer.consumerRetryProcessTask() {
break
} else {
time.Sleep(time.Second * 2)
Expand All @@ -74,7 +78,9 @@ func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool {
level.Info(consumer.logger).Log("msg", "Start retrying the process function")
defer func() {
if r := recover(); r != nil {
level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r)
stackBuf := make([]byte, 1<<16)
runtime.Stack(stackBuf, false)
level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf))
}
}()
consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList)
Expand Down

0 comments on commit c8ff383

Please sign in to comment.