forked from galaxydi/go-loghub
-
Notifications
You must be signed in to change notification settings - Fork 113
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: update golang consumer group
- Loading branch information
Showing
13 changed files
with
478 additions
and
353 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,61 +1,106 @@ | ||
package consumerLibrary | ||
|
||
import ( | ||
"strings" | ||
"time" | ||
|
||
sls "github.com/aliyun/aliyun-log-go-sdk" | ||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
"time" | ||
) | ||
|
||
type ConsumerCheckPointTracker struct { | ||
client *ConsumerClient | ||
defaultFlushCheckPointIntervalSec int64 | ||
tempCheckPoint string | ||
lastPersistentCheckPoint string | ||
trackerShardId int | ||
lastCheckTime int64 | ||
logger log.Logger | ||
} | ||
|
||
func initConsumerCheckpointTracker(shardId int, consumerClient *ConsumerClient, logger log.Logger) *ConsumerCheckPointTracker { | ||
checkpointTracker := &ConsumerCheckPointTracker{ | ||
defaultFlushCheckPointIntervalSec: 60, | ||
client: consumerClient, | ||
trackerShardId: shardId, | ||
logger: logger, | ||
} | ||
return checkpointTracker | ||
type CheckPointTracker interface { | ||
// GetCheckPoint get lastest saved check point | ||
GetCheckPoint() string | ||
// GetCurrentCursor get current fetched data cursor | ||
GetCurrentCursor() string | ||
// SaveCheckPoint, save checkpoint | ||
SaveCheckPoint(force bool) error | ||
} | ||
|
||
func (checkPointTracker *ConsumerCheckPointTracker) setMemoryCheckPoint(cursor string) { | ||
checkPointTracker.tempCheckPoint = cursor | ||
type DefaultCheckPointTracker struct { | ||
client *ConsumerClient | ||
heartBeat *ConsumerHeartBeat | ||
nextCursor string // cursor for already pulled data | ||
currentCursor string // cursor for data processed, but may not be saved to server | ||
pendingCheckPoint string // pending cursor to saved | ||
savedCheckPoint string // already saved | ||
shardId int | ||
logger log.Logger | ||
} | ||
|
||
func (checkPointTracker *ConsumerCheckPointTracker) setPersistentCheckPoint(cursor string) { | ||
checkPointTracker.lastPersistentCheckPoint = cursor | ||
func initConsumerCheckpointTracker(shardId int, consumerClient *ConsumerClient, consumerHeatBeat *ConsumerHeartBeat, logger log.Logger) *DefaultCheckPointTracker { | ||
checkpointTracker := &DefaultCheckPointTracker{ | ||
client: consumerClient, | ||
heartBeat: consumerHeatBeat, | ||
shardId: shardId, | ||
logger: logger, | ||
} | ||
return checkpointTracker | ||
} | ||
|
||
func (checkPointTracker *ConsumerCheckPointTracker) flushCheckPoint() error { | ||
if checkPointTracker.tempCheckPoint != "" && checkPointTracker.tempCheckPoint != checkPointTracker.lastPersistentCheckPoint { | ||
if err := checkPointTracker.client.updateCheckPoint(checkPointTracker.trackerShardId, checkPointTracker.tempCheckPoint, true); err != nil { | ||
return err | ||
} | ||
func (tracker *DefaultCheckPointTracker) initCheckPoint(cursor string) { | ||
tracker.savedCheckPoint = cursor | ||
} | ||
|
||
checkPointTracker.lastPersistentCheckPoint = checkPointTracker.tempCheckPoint | ||
func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error { | ||
tracker.pendingCheckPoint = tracker.nextCursor | ||
if force { | ||
return tracker.flushCheckPoint() | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (checkPointTracker *ConsumerCheckPointTracker) flushCheck() { | ||
currentTime := time.Now().Unix() | ||
if currentTime > checkPointTracker.lastCheckTime+checkPointTracker.defaultFlushCheckPointIntervalSec { | ||
if err := checkPointTracker.flushCheckPoint(); err != nil { | ||
level.Warn(checkPointTracker.logger).Log("msg", "update checkpoint get error", "error", err) | ||
} else { | ||
checkPointTracker.lastCheckTime = currentTime | ||
func (tracker *DefaultCheckPointTracker) GetCheckPoint() string { | ||
return tracker.savedCheckPoint | ||
} | ||
|
||
func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string { | ||
return tracker.currentCursor | ||
} | ||
|
||
func (tracker *DefaultCheckPointTracker) setCurrentCheckPoint(cursor string) { | ||
tracker.currentCursor = cursor | ||
} | ||
|
||
func (tracker *DefaultCheckPointTracker) setNextCursor(cursor string) { | ||
tracker.nextCursor = cursor | ||
} | ||
|
||
func (tracker *DefaultCheckPointTracker) flushCheckPoint() error { | ||
if tracker.pendingCheckPoint == "" || tracker.pendingCheckPoint == tracker.savedCheckPoint { | ||
return nil | ||
} | ||
for i := 0; ; i++ { | ||
err := tracker.client.updateCheckPoint(tracker.shardId, tracker.pendingCheckPoint, true) | ||
if err == nil { | ||
break | ||
} | ||
slsErr, ok := err.(*sls.Error) | ||
if ok { | ||
if strings.EqualFold(slsErr.Code, "ConsumerNotExsit") || strings.EqualFold(slsErr.Code, "ConsumerNotMatch") { | ||
tracker.heartBeat.removeHeartShard(tracker.shardId) | ||
level.Warn(tracker.logger).Log("msg", "consumer has been removed or shard has been reassigned", "shard", tracker.shardId, "err", slsErr) | ||
break | ||
} else if strings.EqualFold(slsErr.Code, "ShardNotExsit") { | ||
tracker.heartBeat.removeHeartShard(tracker.shardId) | ||
level.Warn(tracker.logger).Log("msg", "shard does not exist", "shard", tracker.shardId) | ||
break | ||
} | ||
} | ||
if i >= 2 { | ||
level.Error(tracker.logger).Log( | ||
"msg", "failed to save checkpoint", | ||
"consumer", tracker.client.option.ConsumerName, | ||
"shard", tracker.shardId, | ||
"checkpoint", tracker.pendingCheckPoint, | ||
) | ||
return err | ||
} | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
} | ||
|
||
func (checkPointTracker *ConsumerCheckPointTracker) getCheckPoint() string { | ||
return checkPointTracker.tempCheckPoint | ||
tracker.savedCheckPoint = tracker.pendingCheckPoint | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.