Skip to content

Commit

Permalink
Merge 7b31e86 into 22c3087
Browse files Browse the repository at this point in the history
  • Loading branch information
whalecold authored Aug 24, 2023
2 parents 22c3087 + 7b31e86 commit ac6a51d
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 3 deletions.
3 changes: 2 additions & 1 deletion pkg/limiter/connection_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ func NewConcurrencyLimiter(lim int) ConcurrencyLimiter {
// Acquired is executed in `OnActive` which is called when a new connection is accepted, so even if the limit is reached
// the count is still need increase, but return false will lead the connection is closed then Release also be executed.
func (ml *connectionLimiter) Acquire(ctx context.Context) bool {
limit := atomic.LoadInt32(&ml.lim)
x := atomic.AddInt32(&ml.curr, 1)
return x <= atomic.LoadInt32(&ml.lim)
return x <= limit || limit <= 0
}

// Release decrease the connection counter.
Expand Down
17 changes: 17 additions & 0 deletions pkg/limiter/connection_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,21 @@ func TestConnectionLimiter(t *testing.T) {
}
}
test.Assert(t, newConnCount == 10)

lim.(Updatable).UpdateLimit(0)
var failedConnCount int32

for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
if !lim.Acquire(ctx) {
atomic.AddInt32(&failedConnCount, 1)
}
}
}()
}
wg.Wait()
test.Assert(t, failedConnCount == 0)
}
3 changes: 3 additions & 0 deletions pkg/limiter/qps_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (l *qpsLimiter) UpdateQPSLimit(interval time.Duration, limit int) {

// Acquire one token.
func (l *qpsLimiter) Acquire(ctx context.Context) bool {
if atomic.LoadInt32(&l.limit) <= 0 {
return true

Check warning on line 74 in pkg/limiter/qps_limiter.go

View check run for this annotation

Codecov / codecov/patch

pkg/limiter/qps_limiter.go#L74

Added line #L74 was not covered by tests
}
if atomic.LoadInt32(&l.tokens) <= 0 {
return false
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/limiter/qps_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,34 @@ func TestQPSLimiter(t *testing.T) {
delta = math.Abs(float64(actualQPS - int32(qps)))
// the diff range need be < 5%, the range is larger because the interval config is larger
test.Assert(t, delta < float64(qps)*0.05, delta, actualQPS)

// case5: UpdateLimit zero
qps = 0
count = 0
limiter.(*qpsLimiter).UpdateLimit(qps)
time.Sleep(interval)
max, _, itv = limiter.Status(ctx)
test.Assert(t, max == qps)
test.Assert(t, limiter.(*qpsLimiter).once == int32(float64(qps)/(time.Second.Seconds()/interval.Seconds())), limiter.(*qpsLimiter).once)
test.Assert(t, itv == interval)
wg.Add(concurrent)
var failedCount int32
stopFlag = 0
for i := 0; i < concurrent; i++ {
go func() {
for atomic.LoadInt32(&stopFlag) == 0 {
if !limiter.Acquire(ctx) {
atomic.AddInt32(&failedCount, 1)
}
}
wg.Done()
}()
}
time.AfterFunc(time.Second*2, func() {
atomic.StoreInt32(&stopFlag, 1)
})
wg.Wait()
test.Assert(t, failedCount == 0)
}

func TestCalcOnce(t *testing.T) {
Expand Down
8 changes: 6 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,15 +377,17 @@ func (s *server) buildLimiterWithOpt() (handler remote.InboundHandler) {
if limits == nil && connLimit == nil && qpsLimit == nil {
return
}

if connLimit == nil {
if limits != nil && limits.MaxConnections > 0 {
if limits != nil {
connLimit = limiter.NewConnectionLimiter(limits.MaxConnections)
} else {
connLimit = &limiter.DummyConcurrencyLimiter{}
}
}

if qpsLimit == nil {
if limits != nil && limits.MaxQPS > 0 {
if limits != nil {
interval := time.Millisecond * 100 // FIXME: should not care this implementation-specific parameter
qpsLimit = limiter.NewQPSLimiter(interval, limits.MaxQPS)
} else {
Expand All @@ -394,10 +396,12 @@ func (s *server) buildLimiterWithOpt() (handler remote.InboundHandler) {
} else {
s.opt.Limit.QPSLimitPostDecode = true
}

if limits != nil && limits.UpdateControl != nil {
updater := limiter.NewLimiterWrapper(connLimit, qpsLimit)
limits.UpdateControl(updater)
}

handler = bound.NewServerLimiterHandler(connLimit, qpsLimit, s.opt.Limit.LimitReporter, s.opt.Limit.QPSLimitPostDecode)
// TODO: gRPC limiter
return
Expand Down

0 comments on commit ac6a51d

Please sign in to comment.