Skip to content

Commit

Permalink
feat: add new Getter for CheckPointTracker and add fetch freq control
Browse files Browse the repository at this point in the history
  • Loading branch information
wxybear committed May 31, 2023
1 parent d767a4d commit 5021290
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 19 deletions.
20 changes: 17 additions & 3 deletions consumer/checkpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ import (
"github.com/go-kit/kit/log/level"
)

// CheckPointTracker
// Generally, you just need SaveCheckPoint, if you use more funcs, make sure you understand these
type CheckPointTracker interface {
// GetCheckPoint get lastest saved check point
GetCheckPoint() string
// SaveCheckPoint, save next cursor to checkpoint
SaveCheckPoint(force bool) error
// GetCurrentCursor get current fetched data cursor
GetCurrentCursor() string
// SaveCheckPoint, save checkpoint
SaveCheckPoint(force bool) error
// GetNextCursor get next fetched data cursor(this is also the next checkpoint to be saved)
GetNextCursor() string
// GetShardId, return the id of shard tracked
GetShardId() int
}

type DefaultCheckPointTracker struct {
Expand Down Expand Up @@ -60,14 +66,22 @@ func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string {
return tracker.currentCursor
}

func (tracker *DefaultCheckPointTracker) setCurrentCheckPoint(cursor string) {
func (tracker *DefaultCheckPointTracker) setCurrentCursor(cursor string) {
tracker.currentCursor = cursor
}

func (tracker *DefaultCheckPointTracker) GetNextCursor() string {
return tracker.nextCursor
}

func (tracker *DefaultCheckPointTracker) setNextCursor(cursor string) {
tracker.nextCursor = cursor
}

func (tracker *DefaultCheckPointTracker) GetShardId() int {
return tracker.shardId
}

func (tracker *DefaultCheckPointTracker) flushCheckPoint() error {
if tracker.pendingCheckPoint == "" || tracker.pendingCheckPoint == tracker.savedCheckPoint {
return nil
Expand Down
35 changes: 25 additions & 10 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,42 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err
return cursor, err
}

func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, err error) {
func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, rawSize int, err error) {
var logBytes []byte
for retry := 0; retry < 3; retry++ {
gl, nextCursor, err = consumer.client.PullLogs(consumer.option.Project, consumer.option.Logstore, shardId, cursor, "", consumer.option.MaxFetchLogGroupCount)
logBytes, nextCursor, err = consumer.client.GetLogsBytes(consumer.option.Project, consumer.option.Logstore, shardId,
cursor, "",
consumer.option.MaxFetchLogGroupCount)
if err == nil {
rawSize = len(logBytes)
gl, err = sls.LogsBytesDecode(logBytes)
if err == nil {
break
}
}
if err != nil {
slsError, ok := err.(sls.Error)
if ok {
level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error",
"shard", shardId,
"error", slsError,
"tryTimes", retry+1,
"cursor", cursor,
)
if slsError.HTTPCode == 403 {
level.Warn(consumer.logger).Log("msg", "shard Get checkpoint gets errors, starts to try again", "shard", shardId, "error", slsError)
time.Sleep(5 * time.Second)
} else {
level.Warn(consumer.logger).Log("msg", "shard Get checkpoint gets errors, starts to try again", "shard", shardId, "error", slsError)
time.Sleep(200 * time.Millisecond)
}
} else {
level.Warn(consumer.logger).Log("msg", "unknown error when pull log", "shardId", shardId, "cursor", cursor, "error", err)
level.Warn(consumer.logger).Log("msg", "unknown error when pull log",
"shardId", shardId,
"cursor", cursor,
"error", err,
"tryTimes", retry+1)
}
} else {
return gl, nextCursor, nil
time.Sleep(200 * time.Millisecond)
}
}
// If you can't retry the log three times, it will return to empty list and start pulling the log cursor,
// so that next time you will come in and pull the function again, which is equivalent to a dead cycle.
return gl, nextCursor, err
return
}
9 changes: 5 additions & 4 deletions consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ShardConsumerWorker struct {
nextFetchCursor string
lastFetchGroupCount int
lastFetchTime time.Time
lastFetchRawSize int
consumerStatus string
processor Processor
shardId int
Expand Down Expand Up @@ -144,16 +145,16 @@ func (consumer *ShardConsumerWorker) updateStatus(success bool) {
}

func (consumer *ShardConsumerWorker) shouldFetch() bool {
if consumer.lastFetchGroupCount >= 1000 {
if consumer.lastFetchGroupCount >= consumer.client.option.MaxFetchLogGroupCount || consumer.lastFetchRawSize >= 4 * 1024 * 1024 {
return true
}
duration := time.Since(consumer.lastFetchTime)
if consumer.lastFetchGroupCount < 100 {
if consumer.lastFetchGroupCount < 100 && consumer.lastFetchRawSize < 1024 * 1024{
// The time used here is in milliseconds.
return duration > 500*time.Millisecond
} else if consumer.lastFetchGroupCount < 500 {
} else if consumer.lastFetchGroupCount < 500 && consumer.lastFetchRawSize < 2 * 1024 * 1024 {
return duration > 200*time.Millisecond
} else { // 500 - 1000
} else {
return duration > 50*time.Millisecond
}
}
Expand Down
5 changes: 3 additions & 2 deletions consumer/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error {
// update last fetch time, for control fetch frequency
consumer.lastFetchTime = time.Now()

logGroup, nextCursor, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
logGroup, nextCursor, rawSize, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
if err != nil {
return err
}
// set cursors user to decide whether to save according to the execution of `process`
consumer.consumerCheckPointTracker.setCurrentCheckPoint(consumer.nextFetchCursor)
consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor)
consumer.lastFetchLogGroupList = logGroup
consumer.nextFetchCursor = nextCursor
consumer.lastFetchRawSize = rawSize
consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList)
consumer.consumerCheckPointTracker.setNextCursor(consumer.nextFetchCursor)
level.Debug(consumer.logger).Log(
Expand Down

0 comments on commit 5021290

Please sign in to comment.