Skip to content

Commit

Permalink
add healthcheck router
Browse files Browse the repository at this point in the history
  • Loading branch information
郑泽超 committed Mar 4, 2020
1 parent 989c001 commit 63a7dc2
Show file tree
Hide file tree
Showing 10 changed files with 474 additions and 1 deletion.
86 changes: 86 additions & 0 deletions cluster/router/condition/default_health_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package condition

import (
"math"
)

import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

const (
HEALTH_CHECKER = "health.checker"
DEFAULT_HEALTH_CHECKER = "default"
OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit"
SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold"
DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5
CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor"
DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5
DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000
MAX_CIRCUIT_TRIPPED_TIMEOUT = 30000
)

func init() {
extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
}

// DefaultHealthChecker is the default
type DefaultHealthChecker struct {
OutStandingRequestConutLimit int32
// the circuitbreaker threshold
RequestSuccessiveFailureThreshold int32
CircuitTrippedTimeoutFactor int32
}

func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
urlStatus := protocol.GetURLStatus(invoker.GetUrl())
if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.OutStandingRequestConutLimit {
logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key())
return false
}
return true
}
func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool {
circuitBreakerTimeout := c.getCircuitBreakerTimeout(status)
currentTime := protocol.CurrentTimeMillis()
if circuitBreakerTimeout <= 0 {
return false
}
return circuitBreakerTimeout > currentTime
}

func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 {
sleepWindow := c.getCircuitBreakerSleepWindowTime(status)
if sleepWindow <= 0 {
return 0
}
return status.GetLastRequestFailedTimestamp() + sleepWindow
}

func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 {

successiveFailureCount := status.GetSuccessiveRequestFailureCount()
diff := successiveFailureCount - c.RequestSuccessiveFailureThreshold
if diff < 0 {
return 0
} else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF {
diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF
}
sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR
if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT {
sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT
}
return int64(sleepWindow)
}

func NewDefaultHealthChecker(url *common.URL) router.HealthChecker {
return &DefaultHealthChecker{
OutStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)),
RequestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)),
CircuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)),
}
}
135 changes: 135 additions & 0 deletions cluster/router/condition/default_health_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package condition

import (
"math"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)

func TestDefaultHealthChecker_IsHealthy(t *testing.T) {
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
invoker := NewMockInvoker(url, 1)
healthy := hc.IsHealthy(invoker)
assert.True(t, healthy)

url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100")
// fake the outgoing request
for i := 0; i < 11; i++ {
request(url, "test", 0, true, false)
}
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
// the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy
assert.False(t, hc.IsHealthy(invoker))

// successive failed count is more than SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy
for i := 0; i < 11; i++ {
request(url, "test", 0, false, false)
}
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
assert.False(t, hc.IsHealthy(invoker))

// reset successive failed count and go to healthy
request(url, "test", 0, false, true)
healthy = hc.IsHealthy(invoker)
assert.True(t, hc.IsHealthy(invoker))
}

func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) {

url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
}
sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT)

// Adjust the threshold size to 1000
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000")
sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == 0)

url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime == 0)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT)

}

func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) {
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url))
assert.True(t, timeout == 0)
url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1))
// timeout must after the current time
assert.True(t, timeout > protocol.CurrentTimeMillis())

}

func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) {
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
status := protocol.GetURLStatus(url)
tripped := defaultHc.isCircuitBreakerTripped(status)
assert.False(t, tripped)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
}
tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url))
assert.True(t, tripped)

}

func TestNewDefaultHealthChecker(t *testing.T) {
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
assert.NotNil(t, defaultHc)
assert.Equal(t, defaultHc.OutStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, defaultHc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF))

url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url1.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url1.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker)
assert.NotNil(t, nondefaultHc)
assert.Equal(t, nondefaultHc.OutStandingRequestConutLimit, int32(10))
assert.Equal(t, nondefaultHc.RequestSuccessiveFailureThreshold, int32(10))
}

func request(url common.URL, method string, elapsed int64, active, succeeded bool) {
protocol.BeginCount(url, method)
if !active {
protocol.EndCount(url, method, elapsed, succeeded)
}
}
11 changes: 11 additions & 0 deletions cluster/router/condition/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,14 @@ func newAppRouterFactory() router.RouterFactory {
func (c *AppRouterFactory) NewRouter(url *common.URL) (router.Router, error) {
return NewAppRouter(url)
}

type HealthCheckRouteFactory struct {
}

func newHealthCheckRouteFactory() router.RouterFactory {
return &HealthCheckRouteFactory{}
}

func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) {
return NewHealthCheckRouter(url)
}
5 changes: 5 additions & 0 deletions cluster/router/condition/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,8 @@ func TestNewAppRouterFactory(t *testing.T) {
factory := newAppRouterFactory()
assert.NotNil(t, factory)
}

func TestHealthCheckRouteFactory(t *testing.T) {
factory := newHealthCheckRouteFactory()
assert.NotNil(t, factory)
}
61 changes: 61 additions & 0 deletions cluster/router/condition/health_check_route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package condition

import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

const (
HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled"
)

type HealthCheckRouter struct {
url *common.URL
enabled bool
checker router.HealthChecker
}

func NewHealthCheckRouter(url *common.URL) (router.Router, error) {
r := &HealthCheckRouter{}
r.url = url
r.enabled = url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false)
if r.enabled {
checkerName := url.GetParam(HEALTH_CHECKER, DEFAULT_HEALTH_CHECKER)
r.checker = extension.GetHealthChecker(checkerName, url)
}
return r, nil
}

func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if !r.enabled {
return invokers
}
var healthyInvokers []protocol.Invoker
for _, invoker := range invokers {
if r.checker.IsHealthy(invoker) {
healthyInvokers = append(healthyInvokers, invoker)
}
}
if len(healthyInvokers) == 0 {
logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey())
return invokers
} else {
return healthyInvokers
}
}

func (r *HealthCheckRouter) Priority() int64 {
return 0
}

// URL Return URL in router
func (r *HealthCheckRouter) URL() common.URL {
return *r.url
}

func (r *HealthCheckRouter) HealthyChecker() router.HealthChecker {
return r.checker
}
Loading

0 comments on commit 63a7dc2

Please sign in to comment.