From c8ff3833264d6de1140c8539d221a14b2a899f0d Mon Sep 17 00:00:00 2001 From: "davidzhang.zc" Date: Fri, 29 Apr 2022 10:56:01 +0800 Subject: [PATCH] [modify] refine consumer group processor panic log --- consumer/tasks.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/consumer/tasks.go b/consumer/tasks.go index f5435409..eb6429d5 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -3,9 +3,11 @@ package consumerLibrary import ( "errors" "fmt" - "github.com/aliyun/aliyun-log-go-sdk" - "github.com/go-kit/kit/log/level" + "runtime" "time" + + sls "github.com/aliyun/aliyun-log-go-sdk" + "github.com/go-kit/kit/log/level" ) func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { @@ -53,9 +55,11 @@ func (consumer *ShardConsumerWorker) consumerProcessTask() string { // If the user's consumption function reports a panic error, it will be captured and retry until sucessed. defer func() { if r := recover(); r != nil { - level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r) + stackBuf := make([]byte, 1<<16) + runtime.Stack(stackBuf, false) + level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf)) for { - if consumer.consumerRetryProcessTask() == true { + if consumer.consumerRetryProcessTask() { break } else { time.Sleep(time.Second * 2) @@ -74,7 +78,9 @@ func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool { level.Info(consumer.logger).Log("msg", "Start retrying the process function") defer func() { if r := recover(); r != nil { - level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r) + stackBuf := make([]byte, 1<<16) + runtime.Stack(stackBuf, false) + level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf)) } }() consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList)