Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add adaptive service #1649

Merged
merged 43 commits into from
Dec 7, 2021
Merged
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8f8cf96
feat(adasvc): add infrastructure for adaptive service
justxuewei Oct 29, 2021
4dfc01e
feat(adasvc): reference config support adaptive service
justxuewei Oct 29, 2021
2ca1018
feat(adasvc): add p2c load balance
justxuewei Oct 29, 2021
e83a8a6
feat(cluster): add capacity evaluator interface
justxuewei Oct 31, 2021
f0c64e6
feat(cluster): add capacity updater
justxuewei Nov 1, 2021
69dd9d3
feat(cluster): add capacity updater
justxuewei Nov 3, 2021
2db43f7
feat(cluster): add fields to vegas capeva
justxuewei Nov 3, 2021
588f392
feat(cluster): refactor capeva interface
justxuewei Nov 4, 2021
e0a4d1a
feat(cluster): add more fields to vegas capeva
justxuewei Nov 4, 2021
9af855a
feat(cluster): vegas evaupdater done
justxuewei Nov 5, 2021
785c74c
Merge branch '3.0' into feat/adasvc
justxuewei Nov 6, 2021
1d5fca1
fix(common): fix typo
justxuewei Nov 6, 2021
7a537ff
fix(common): fix typo
justxuewei Nov 6, 2021
6d866b5
fix(cluster): add apache license
justxuewei Nov 6, 2021
865c058
feat(cluster): define limiter & update interface
justxuewei Nov 15, 2021
9d26e68
Merge branch '3.0' into feat/adasvc
justxuewei Nov 15, 2021
c3f9dbe
feat(cluster): remove cpu stat temporarily
justxuewei Nov 16, 2021
ff4e60b
feat(cluster): update hill climbing limiter
justxuewei Nov 17, 2021
0472bf1
feat(cluster): hill climbing done
justxuewei Nov 19, 2021
7e98a27
Merge branch '3.0' into feat/adasvc
justxuewei Nov 19, 2021
70ca487
fix(cluster): fix issue where init limitation is 0
justxuewei Nov 20, 2021
c949cef
Merge branch '3.0' into feat/adasvc
justxuewei Nov 20, 2021
ece019f
feat(cluster): provder-side filter done
justxuewei Nov 20, 2021
bf43b0b
fix(cluster): fix uint64 subtraction issue
justxuewei Nov 20, 2021
d8ca7f1
fix(cluster): add adaptivesvc filter to default service filters
justxuewei Nov 22, 2021
30dcb14
style: go fmt
justxuewei Nov 22, 2021
5b48b40
fix(filter): import adaptivesvc
justxuewei Nov 22, 2021
99f6919
Merge branch '3.0' into feat/adasvc
justxuewei Nov 22, 2021
f906714
Merge branch '3.0' into feat/adasvc
justxuewei Nov 25, 2021
c7edac5
fix(imports): import adaptivesvc cluster and p2c loadbalance
justxuewei Nov 26, 2021
73b4f70
fix(config): fix unexpectedly panic
justxuewei Nov 26, 2021
1fa48ca
feat(adasvc): add debug logs
justxuewei Dec 1, 2021
0acc52e
fix(adasvc): pass attachements with string
justxuewei Dec 1, 2021
e5de62f
feat(adasvc): detail debug logs
justxuewei Dec 2, 2021
6f1d7bf
fix(adasvc): fix log info
justxuewei Dec 2, 2021
4a04d4a
feat: detail dubbo logs
justxuewei Dec 5, 2021
4ed7505
feat: remove useless logs
justxuewei Dec 5, 2021
8b12eac
fix(adasvc): fix incorrect type
justxuewei Dec 5, 2021
9ac6763
Merge branch '3.0' into feat-adasvc
justxuewei Dec 5, 2021
c23612b
style: go fmt & dubbofmt
justxuewei Dec 5, 2021
0f7459b
fix: rpc result attrs is not initialized
justxuewei Dec 5, 2021
429a336
fix(protocol): fix result panic when attrs is not initialized
justxuewei Dec 5, 2021
bfc336e
Merge branch '3.0' into feat-adasvc
justxuewei Dec 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(cluster): update hill climbing limiter
  • Loading branch information
justxuewei committed Nov 17, 2021
commit ff4e60bdc748a99275cb645177dd7806ab4dc69e
142 changes: 132 additions & 10 deletions filter/adaptivesvc/limiter/hill_climbing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package limiter

import (
"go.uber.org/atomic"
"sync"
"time"
)

Expand All @@ -20,19 +21,52 @@ const (
HillClimbingOptionExtendPlus HillClimbingOption = 2
)

var (
initialLimitation uint64 = 50
maxLimitation uint64 = 500
radicalPeriod uint64 = 1000
stablePeriod uint64 = 32000
)

// HillClimbing is a limiter using HillClimbing algorithm
type HillClimbing struct {
seq *atomic.Uint64

inflight *atomic.Uint64
limitation *atomic.Uint64

lastUpdatedTimeMutex *sync.Mutex
// nextUpdateTime = lastUpdatedTime + updateInterval
updateInterval *atomic.Uint64
lastUpdatedTime *atomic.Time

// indicators of the current round
successCounter *atomic.Uint64
rttAvg *atomic.Float64

bestMutex *sync.Mutex
// indicators of history
bestConcurrency *atomic.Uint64
bestRTTAvg *atomic.Float64
bestLimitation *atomic.Uint64
bestSuccessRate *atomic.Uint64
}

func NewHillClimbing() Limiter {
l := &HillClimbing{
seq: new(atomic.Uint64),
inflight: new(atomic.Uint64),
limitation: new(atomic.Uint64),
seq: new(atomic.Uint64),
inflight: new(atomic.Uint64),
limitation: new(atomic.Uint64),
lastUpdatedTimeMutex: new(sync.Mutex),
updateInterval: atomic.NewUint64(radicalPeriod),
lastUpdatedTime: atomic.NewTime(time.Now()),
successCounter: new(atomic.Uint64),
rttAvg: new(atomic.Float64),
bestMutex: new(sync.Mutex),
bestConcurrency: new(atomic.Uint64),
bestRTTAvg: new(atomic.Float64),
bestLimitation: new(atomic.Uint64),
bestSuccessRate: new(atomic.Uint64),
}

return l
Expand Down Expand Up @@ -60,20 +94,18 @@ func (l *HillClimbing) Acquire() (Updater, error) {

type HillClimbingUpdater struct {
startTime time.Time
seq uint64
limiter *HillClimbing

updateInterval *atomic.Uint64
lastUpdatedTime time.Time
successCounter *atomic.Uint64
// for debug purposes
seq uint64
}

func NewHillClimbingUpdater(limiter *HillClimbing) *HillClimbingUpdater {
inflight := limiter.inflight.Add(1)
u := &HillClimbingUpdater{
startTime: time.Now(),
seq: limiter.seq.Add(1) - 1,
limiter: limiter,
seq: limiter.seq.Add(1) - 1,
}
VerboseDebugf("[NewHillClimbingUpdater] A new request arrived, seq: %d, inflight: %d, time: %s.",
u.seq, inflight, u.startTime.String())
Expand All @@ -84,27 +116,117 @@ func (u *HillClimbingUpdater) DoUpdate(rtt, inflight uint64) error {
defer func() {
u.limiter.inflight.Add(-1)
}()
VerboseDebugf("[HillClimbingUpdater.DoUpdate] A request finished, the limiter will be updated, seq: %d.", u.seq)
VerboseDebugf("[HillClimbingUpdater] A request finished, the limiter will be updated, seq: %d.", u.seq)

u.limiter.lastUpdatedTimeMutex.Lock()
// if lastUpdatedTime is updated, terminate DoUpdate immediately
lastUpdatedTime := u.limiter.lastUpdatedTime.Load()
u.limiter.lastUpdatedTimeMutex.Unlock()

option, err := u.getOption(rtt, inflight)
if err != nil {
return err
}
if u.shouldDrop(lastUpdatedTime) {
return nil
}

if err = u.adjustLimitation(option); err != nil {
return err
}
return nil
}

func (u *HillClimbingUpdater) getOption(rtt, inflight uint64) (HillClimbingOption, error) {
var option HillClimbingOption
now := time.Now()
option := HillClimbingOptionDoNothing

lastUpdatedTime := u.limiter.lastUpdatedTime.Load()
updateInterval := u.limiter.updateInterval.Load()
rttAvg := u.limiter.rttAvg.Load()
successCounter := u.limiter.successCounter.Load()
limitation := u.limiter.limitation.Load()

if now.Sub(lastUpdatedTime) > time.Duration(updateInterval) ||
rttAvg == 0 {
// Current req is at the next round or no rttAvg.

// FIXME(justxuewei): If all requests in one round
// not receive responses, rttAvg will be 0, and
// concurrency will be 0 as well, the actual
// concurrency, however, is not 0.
concurrency := float64(successCounter) * rttAvg / float64(updateInterval)

// Consider extending limitation if concurrent is
// about to reach the limitation.
if uint64(concurrency*1.5) > limitation {
if updateInterval == radicalPeriod {
option = HillClimbingOptionExtendPlus
} else {
option = HillClimbingOptionExtend
}
}

successRate := uint64(1000.0 * float64(successCounter) / float64(updateInterval))

// Wrap the code into an anonymous function due to
// use defer to ensure the bestMutex is unlocked
// once the best-indicators is updated.
isUpdated := func() bool {
u.limiter.bestMutex.Lock()
defer u.limiter.bestMutex.Unlock()
if successRate > u.limiter.bestSuccessRate.Load() {
// successRate is the best in the history, update
// all best-indicators.
u.limiter.bestSuccessRate.Store(successRate)
u.limiter.bestRTTAvg.Store(rttAvg)
u.limiter.bestConcurrency.Store(uint64(concurrency))
u.limiter.bestLimitation.Store(u.limiter.limitation.Load())
VerboseDebugf("[HillClimbingUpdater] Best-indicators are up-to-date, " +
"seq: %d, bestSuccessRate: %d, bestRTTAvg: %.4f, bestConcurrency: %d," +
" bestLimitation: %d.", u.seq, u.limiter.bestSuccessRate.Load(),
u.limiter.bestRTTAvg.Load(), u.limiter.bestConcurrency.Load(),
u.limiter.bestLimitation.Load())
return true
}
return false
}()

if !isUpdated && u.shouldShrink(successCounter, rttAvg) {

}

// reset data for the last round
u.limiter.successCounter.Store(0)
u.limiter.rttAvg.Store(float64(rtt))
} else {
// still in the current round
// TODO(justxuewei): [TBD] if needs to protect here using mutex??
u.limiter.successCounter.Add(1)
}

return option, nil
}

func (u *HillClimbingUpdater) shouldShrink(counter uint64, rttAvg float64) bool {


return false
}

// TODO(justxuewei): update lastUpdatedTime
func (u *HillClimbingUpdater) adjustLimitation(option HillClimbingOption) error {
u.limiter.lastUpdatedTimeMutex.Lock()
defer u.limiter.lastUpdatedTimeMutex.Unlock()

return nil
}

func (u *HillClimbingUpdater) shouldDrop(lastUpdatedTime time.Time) (isDropped bool) {
if !u.limiter.lastUpdatedTime.Load().Equal(lastUpdatedTime) {
VerboseDebugf("[HillClimbingUpdater] The limitation is updated by others, drop this update, seq: %d.", u.seq)
isDropped = true
return
}
return
}