Skip to content

Commit

Permalink
fix: fix adaptive service issues (#1718)
Browse files Browse the repository at this point in the history
* fix: fix adaptive service issues

* fix: add license

* fix: fix logging message
  • Loading branch information
justxuewei authored Jan 18, 2022
1 parent 7168e8e commit 06da7ce
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 11 deletions.
15 changes: 14 additions & 1 deletion cluster/cluster/adaptivesvc/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/cluster/cluster/base"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
clsutils "dubbo.apache.org/dubbo-go/v3/cluster/utils"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
Expand Down Expand Up @@ -67,8 +68,20 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation
// invoke
result := invoker.Invoke(ctx, invocation)

// if the adaptive service encounters an error, DO NOT
// update the metrics.
if clsutils.IsAdaptiveServiceFailed(result.Error()) {
return result
}

// update metrics
remainingStr := result.Attachment(constant.AdaptiveServiceRemainingKey, "").(string)
remainingIface := result.Attachment(constant.AdaptiveServiceRemainingKey, "")
remainingStr, ok := remainingIface.(string)
if !ok {
logger.Errorf("[adasvc cluster] The %s field type of value %v should be string.",
constant.AdaptiveServiceRemainingKey, remainingIface)
return result
}
remaining, err := strconv.Atoi(remainingStr)
if err != nil {
logger.Warnf("the remaining is unexpected, we need a int type, but we got %s, err: %v.", remainingStr, err)
Expand Down
46 changes: 46 additions & 0 deletions cluster/utils/adaptivesvc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package utils

import (
"fmt"
"strings"
)

import (
"dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc"
adasvcfilter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter"
)

var ReachLimitationErrorString = fmt.Sprintf("%s: %s",
adaptivesvc.ErrAdaptiveSvcInterrupted.Error(),
adasvcfilter.ErrReachLimitation.Error())

func DoesAdaptiveServiceReachLimitation(err error) bool {
if err == nil {
return false
}
return err.Error() == ReachLimitationErrorString
}

func IsAdaptiveServiceFailed(err error) bool {
if err == nil {
return false
}
return strings.HasPrefix(err.Error(), adaptivesvc.ErrAdaptiveSvcInterrupted.Error())
}
41 changes: 31 additions & 10 deletions filter/adaptivesvc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package adaptivesvc
import (
"context"
"fmt"
"strings"
"sync"
)

Expand All @@ -40,8 +41,9 @@ var (
adaptiveServiceProviderFilterOnce sync.Once
instance filter.Filter

ErrUpdaterNotFound = fmt.Errorf("updater not found")
ErrUnexpectedUpdaterType = fmt.Errorf("unexpected updater type")
ErrAdaptiveSvcInterrupted = fmt.Errorf("adaptive service interrupted")
ErrUpdaterNotFound = fmt.Errorf("updater not found")
ErrUnexpectedUpdaterType = fmt.Errorf("unexpected updater type")
)

func init() {
Expand Down Expand Up @@ -70,47 +72,55 @@ func (f *adaptiveServiceProviderFilter) Invoke(ctx context.Context, invoker prot
// a new limiter
if l, err = limiterMapperSingleton.newAndSetMethodLimiter(invoker.GetURL(),
invocation.MethodName(), limiter.HillClimbingLimiter); err != nil {
return &protocol.RPCResult{Err: err}
return &protocol.RPCResult{Err: wrapErrAdaptiveSvcInterrupted(err)}
}
} else {
// unexpected errors
return &protocol.RPCResult{Err: err}
return &protocol.RPCResult{Err: wrapErrAdaptiveSvcInterrupted(err)}
}
}

updater, err := l.Acquire()
if err != nil {
return &protocol.RPCResult{Err: err}
return &protocol.RPCResult{Err: wrapErrAdaptiveSvcInterrupted(err)}
}

invocation.Attributes()[constant.AdaptiveServiceUpdaterKey] = updater

invocation.SetAttribute(constant.AdaptiveServiceUpdaterKey, updater)
return invoker.Invoke(ctx, invocation)
}

func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result protocol.Result, invoker protocol.Invoker,
invocation protocol.Invocation) protocol.Result {

if isErrAdaptiveSvcInterrupted(result.Error()) {
// If the Invoke method of the adaptiveServiceProviderFilter returns an error,
// the OnResponse of the adaptiveServiceProviderFilter should not be performed.
return result
}

// get updater from the attributes
updaterIface, _ := invocation.GetAttribute(constant.AdaptiveServiceUpdaterKey)
if updaterIface == nil {
logger.Errorf("[adasvc filter] The updater is not found on the attributes: %#v",
invocation.Attributes())
return &protocol.RPCResult{Err: ErrUpdaterNotFound}
}
updater, ok := updaterIface.(limiter.Updater)
if !ok {
logger.Errorf("[adasvc filter] The type of the updater is not unexpected, we got %#v", updaterIface)
return &protocol.RPCResult{Err: ErrUnexpectedUpdaterType}
}

err := updater.DoUpdate()
if err != nil {
// DoUpdate was failed, but the invocation is not failed.
// Printing the error to logs is better than returning a
// result with an error.
logger.Errorf("[adasvc filter] The DoUpdate method was failed, err: %s.", err)
return &protocol.RPCResult{Err: err}
}

// get limiter for the mapper
l, err := limiterMapperSingleton.getMethodLimiter(invoker.GetURL(), invocation.MethodName())
if err != nil {
logger.Errorf("[adasvc filter] The method limiter for \"%s\" is not found.", invocation.MethodName())
return &protocol.RPCResult{Err: err}
}

Expand All @@ -123,3 +133,14 @@ func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result pro

return result
}

func wrapErrAdaptiveSvcInterrupted(customizedErr interface{}) error {
return fmt.Errorf("%w: %v", ErrAdaptiveSvcInterrupted, customizedErr)
}

func isErrAdaptiveSvcInterrupted(err error) bool {
if err == nil {
return false
}
return strings.HasPrefix(err.Error(), ErrAdaptiveSvcInterrupted.Error())
}

0 comments on commit 06da7ce

Please sign in to comment.