Skip to content

Commit

Permalink
Merge pull request #800 from beiwei30/connectivity
Browse files Browse the repository at this point in the history
enhance client's connectivity
  • Loading branch information
AlexStocks authored Oct 27, 2020
2 parents fd3d824 + 75ca044 commit 268b392
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 0 deletions.
4 changes: 4 additions & 0 deletions cluster/router/healthcheck/default_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type DefaultHealthChecker struct {
// IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request
// and the current active request
func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
if !invoker.IsAvailable() {
return false
}

urlStatus := protocol.GetURLStatus(invoker.GetUrl())
if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestCountLimit() {
logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key())
Expand Down
4 changes: 4 additions & 0 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
return di.timeout
}

func (di *DubboInvoker) IsAvailable() bool {
return di.client.IsAvailable()
}

// Destroy destroy dubbo client invoker.
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
Expand Down
7 changes: 7 additions & 0 deletions remoting/exchange_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Client interface {
Close()
// send request to server.
Request(request *Request, timeout time.Duration, response *PendingResponse) error
// check if the client is still available
IsAvailable() bool
}

// This is abstraction level. it is like facade.
Expand Down Expand Up @@ -182,6 +184,11 @@ func (client *ExchangeClient) Close() {
client.client.Close()
}

// IsAvailable to check if the underlying network client is available yet.
func (client *ExchangeClient) IsAvailable() bool {
return client.client.IsAvailable()
}

// handle the response from server
func (client *ExchangeClient) Handler(response *Response) {

Expand Down
8 changes: 8 additions & 0 deletions remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ func (c *Client) Request(request *remoting.Request, timeout time.Duration, respo
return perrors.WithStack(err)
}

// isAvailable returns true if the connection is available, or it can be re-established.
func (c *Client) IsAvailable() bool {
client, _, err := c.selectSession(c.addr)
return err == nil &&
// defensive check
client != nil
}

func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) {
rpcClient, err := c.pool.getGettyRpcClient(addr)
if err != nil {
Expand Down

0 comments on commit 268b392

Please sign in to comment.