From 5021290f80014068ef11fd153af8b16c060afa29 Mon Sep 17 00:00:00 2001 From: FFish Date: Tue, 30 May 2023 12:56:21 +0000 Subject: [PATCH] feat: add new Getter for CheckPointTracker and add fetch freq control --- consumer/checkpoint_tracker.go | 20 ++++++++++++++++--- consumer/consumer_client.go | 35 ++++++++++++++++++++++++---------- consumer/shard_worker.go | 9 +++++---- consumer/tasks.go | 5 +++-- 4 files changed, 50 insertions(+), 19 deletions(-) diff --git a/consumer/checkpoint_tracker.go b/consumer/checkpoint_tracker.go index 77f3df3d..46fd181b 100644 --- a/consumer/checkpoint_tracker.go +++ b/consumer/checkpoint_tracker.go @@ -9,13 +9,19 @@ import ( "github.com/go-kit/kit/log/level" ) +// CheckPointTracker +// Generally, you just need SaveCheckPoint, if you use more funcs, make sure you understand these type CheckPointTracker interface { // GetCheckPoint get lastest saved check point GetCheckPoint() string + // SaveCheckPoint, save next cursor to checkpoint + SaveCheckPoint(force bool) error // GetCurrentCursor get current fetched data cursor GetCurrentCursor() string - // SaveCheckPoint, save checkpoint - SaveCheckPoint(force bool) error + // GetNextCursor get next fetched data cursor(this is also the next checkpoint to be saved) + GetNextCursor() string + // GetShardId, return the id of shard tracked + GetShardId() int } type DefaultCheckPointTracker struct { @@ -60,14 +66,22 @@ func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string { return tracker.currentCursor } -func (tracker *DefaultCheckPointTracker) setCurrentCheckPoint(cursor string) { +func (tracker *DefaultCheckPointTracker) setCurrentCursor(cursor string) { tracker.currentCursor = cursor } +func (tracker *DefaultCheckPointTracker) GetNextCursor() string { + return tracker.nextCursor +} + func (tracker *DefaultCheckPointTracker) setNextCursor(cursor string) { tracker.nextCursor = cursor } +func (tracker *DefaultCheckPointTracker) GetShardId() int { + return tracker.shardId +} + func (tracker *DefaultCheckPointTracker) flushCheckPoint() error { if tracker.pendingCheckPoint == "" || tracker.pendingCheckPoint == tracker.savedCheckPoint { return nil diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 0f098670..4e788a37 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -125,27 +125,42 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err return cursor, err } -func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, err error) { +func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, rawSize int, err error) { + var logBytes []byte for retry := 0; retry < 3; retry++ { - gl, nextCursor, err = consumer.client.PullLogs(consumer.option.Project, consumer.option.Logstore, shardId, cursor, "", consumer.option.MaxFetchLogGroupCount) + logBytes, nextCursor, err = consumer.client.GetLogsBytes(consumer.option.Project, consumer.option.Logstore, shardId, + cursor, "", + consumer.option.MaxFetchLogGroupCount) + if err == nil { + rawSize = len(logBytes) + gl, err = sls.LogsBytesDecode(logBytes) + if err == nil { + break + } + } if err != nil { slsError, ok := err.(sls.Error) if ok { + level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error", + "shard", shardId, + "error", slsError, + "tryTimes", retry+1, + "cursor", cursor, + ) if slsError.HTTPCode == 403 { - level.Warn(consumer.logger).Log("msg", "shard Get checkpoint gets errors, starts to try again", "shard", shardId, "error", slsError) time.Sleep(5 * time.Second) - } else { - level.Warn(consumer.logger).Log("msg", "shard Get checkpoint gets errors, starts to try again", "shard", shardId, "error", slsError) - time.Sleep(200 * time.Millisecond) } } else { - level.Warn(consumer.logger).Log("msg", "unknown error when pull log", "shardId", shardId, "cursor", cursor, "error", err) + level.Warn(consumer.logger).Log("msg", "unknown error when pull log", + "shardId", shardId, + "cursor", cursor, + "error", err, + "tryTimes", retry+1) } - } else { - return gl, nextCursor, nil + time.Sleep(200 * time.Millisecond) } } // If you can't retry the log three times, it will return to empty list and start pulling the log cursor, // so that next time you will come in and pull the function again, which is equivalent to a dead cycle. - return gl, nextCursor, err + return } diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 39a6fa6b..0548b159 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -17,6 +17,7 @@ type ShardConsumerWorker struct { nextFetchCursor string lastFetchGroupCount int lastFetchTime time.Time + lastFetchRawSize int consumerStatus string processor Processor shardId int @@ -144,16 +145,16 @@ func (consumer *ShardConsumerWorker) updateStatus(success bool) { } func (consumer *ShardConsumerWorker) shouldFetch() bool { - if consumer.lastFetchGroupCount >= 1000 { + if consumer.lastFetchGroupCount >= consumer.client.option.MaxFetchLogGroupCount || consumer.lastFetchRawSize >= 4 * 1024 * 1024 { return true } duration := time.Since(consumer.lastFetchTime) - if consumer.lastFetchGroupCount < 100 { + if consumer.lastFetchGroupCount < 100 && consumer.lastFetchRawSize < 1024 * 1024{ // The time used here is in milliseconds. return duration > 500*time.Millisecond - } else if consumer.lastFetchGroupCount < 500 { + } else if consumer.lastFetchGroupCount < 500 && consumer.lastFetchRawSize < 2 * 1024 * 1024 { return duration > 200*time.Millisecond - } else { // 500 - 1000 + } else { return duration > 50*time.Millisecond } } diff --git a/consumer/tasks.go b/consumer/tasks.go index c2f5512f..367db164 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -49,14 +49,15 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error { // update last fetch time, for control fetch frequency consumer.lastFetchTime = time.Now() - logGroup, nextCursor, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) + logGroup, nextCursor, rawSize, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) if err != nil { return err } // set cursors user to decide whether to save according to the execution of `process` - consumer.consumerCheckPointTracker.setCurrentCheckPoint(consumer.nextFetchCursor) + consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor) consumer.lastFetchLogGroupList = logGroup consumer.nextFetchCursor = nextCursor + consumer.lastFetchRawSize = rawSize consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) consumer.consumerCheckPointTracker.setNextCursor(consumer.nextFetchCursor) level.Debug(consumer.logger).Log(