Skip to content

Commit

Permalink
Merge pull request #1675 from XiaoWeiKIN/graceful-shutdown
Browse files Browse the repository at this point in the history
feature-graceful-shutdwon
  • Loading branch information
LaurenceLiZhixin authored Jan 17, 2022
2 parents 78b6c8a + 39916bb commit 7168e8e
Show file tree
Hide file tree
Showing 29 changed files with 567 additions and 322 deletions.
6 changes: 3 additions & 3 deletions common/extension/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func SetFilter(name string, v func() filter.Filter) {
}

// GetFilter finds the filter extension with @name
func GetFilter(name string) filter.Filter {
func GetFilter(name string) (filter.Filter, bool) {
if filters[name] == nil {
panic("filter for " + name + " is not existing, make sure you have imported the package.")
return nil, false
}
return filters[name]()
return filters[name](), true
}

// SetRejectedExecutionHandler sets the RejectedExecutionHandler with @name
Expand Down
104 changes: 71 additions & 33 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,36 +53,48 @@ import (
*/
const defaultShutDownTime = time.Second * 60

// GracefulShutdownInit todo GracefulShutdownInit in 3.0 should be discusesed.
func GracefulShutdownInit() {
signals := make(chan os.Signal, 1)

signal.Notify(signals, ShutdownSignals...)

func gracefulShutdownInit() {
// retrieve ShutdownConfig for gracefulShutdownFilter
if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(Setter); ok && rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, rootConfig.Shutdown)
cGracefulShutdownFilter, existcGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
if !existcGracefulShutdownFilter {
return
}
sGracefulShutdownFilter, existsGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownProviderFilterKey)
if !existsGracefulShutdownFilter {
return
}
if filter, ok := cGracefulShutdownFilter.(Setter); ok && rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown())
}

go func() {
select {
case sig := <-signals:
logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
// gracefulShutdownOnce.Do(func() {
time.AfterFunc(totalTimeout(), func() {
logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
os.Exit(0)
})
BeforeShutdown()
// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
for _, dumpSignal := range DumpHeapShutdownSignals {
if sig == dumpSignal {
debug.WriteHeapDump(os.Stdout.Fd())
if filter, ok := sGracefulShutdownFilter.(Setter); ok && rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown())
}

if GetShutDown().InternalSignal {
signals := make(chan os.Signal, 1)
signal.Notify(signals, ShutdownSignals...)

go func() {
select {
case sig := <-signals:
logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
// gracefulShutdownOnce.Do(func() {
time.AfterFunc(totalTimeout(), func() {
logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
os.Exit(0)
})
BeforeShutdown()
// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
for _, dumpSignal := range DumpHeapShutdownSignals {
if sig == dumpSignal {
debug.WriteHeapDump(os.Stdout.Fd())
}
}
os.Exit(0)
}
os.Exit(0)
}
}()
}()
}
}

// BeforeShutdown provides processing flow before shutdown
Expand Down Expand Up @@ -115,22 +127,32 @@ func destroyAllRegistries() {
// First we destroy provider's protocols, and then we destroy the consumer protocols.
func destroyProtocols() {
logger.Info("Graceful shutdown --- Destroy protocols. ")
logger.Info("Graceful shutdown --- First destroy provider's protocols. ")

consumerProtocols := getConsumerProtocols()
if rootConfig.Protocols == nil {
return
}

consumerProtocols := getConsumerProtocols()

destroyProviderProtocols(consumerProtocols)
destroyConsumerProtocols(consumerProtocols)
}

// destroyProviderProtocols destroys the provider's protocol.
// if the protocol is consumer's protocol too, we will keep it
func destroyProviderProtocols(consumerProtocols *gxset.HashSet) {
logger.Info("Graceful shutdown --- First destroy provider's protocols. ")
for _, protocol := range rootConfig.Protocols {
// the protocol is the consumer's protocol too, we can not destroy it.
if consumerProtocols.Contains(protocol.Name) {
continue
}
extension.GetProtocol(protocol.Name).Destroy()
}
}

logger.Info("Graceful shutdown --- Second destroy consumer's protocols. ")
func destroyConsumerProtocols(consumerProtocols *gxset.HashSet) {
logger.Info("Graceful shutdown --- Second Destroy consumer's protocols. ")
for name := range consumerProtocols.Items {
extension.GetProtocol(name.(string)).Destroy()
}
Expand All @@ -142,13 +164,28 @@ func waitAndAcceptNewRequests() {
return
}

timeout := rootConfig.Shutdown.GetStepTimeout()
time.Sleep(rootConfig.Shutdown.GetConsumerUpdateWaitTime())

timeout := rootConfig.Shutdown.GetStepTimeout()
// ignore this step
if timeout < 0 {
return
}
time.Sleep(timeout)
waitingProviderProcessedTimeout(rootConfig.Shutdown)
}

func waitingProviderProcessedTimeout(shutdownConfig *ShutdownConfig) {
timeout := shutdownConfig.GetStepTimeout()
if timeout <= 0 {
return
}
deadline := time.Now().Add(timeout)

for time.Now().Before(deadline) && shutdownConfig.ProviderActiveCount > 0 {
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
logger.Infof("waiting for provider active invocation count = %d", shutdownConfig.ProviderActiveCount)
}
}

//for provider. It will wait for processing receiving requests
Expand All @@ -159,19 +196,20 @@ func waitForSendingAndReceivingRequests() {
return
}
rootConfig.Shutdown.RejectRequest = true
waitingProcessedTimeout(rootConfig.Shutdown)
waitingConsumerProcessedTimeout(rootConfig.Shutdown)
}

func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
func waitingConsumerProcessedTimeout(shutdownConfig *ShutdownConfig) {
timeout := shutdownConfig.GetStepTimeout()
if timeout <= 0 {
return
}
deadline := time.Now().Add(timeout)

for time.Now().Before(deadline) && !shutdownConfig.RequestsFinished {
for time.Now().Before(deadline) && shutdownConfig.ConsumerActiveCount > 0 {
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
logger.Infof("waiting for consumer active invocation count = %d", shutdownConfig.ConsumerActiveCount)
}
}

Expand Down
51 changes: 40 additions & 11 deletions config/graceful_shutdown_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ import (
"time"
)

import (
"github.com/creasty/defaults"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
)

const (
defaultTimeout = 60 * time.Second
defaultStepTimeout = 10 * time.Second
defaultTimeout = 60 * time.Second
defaultStepTimeout = 3 * time.Second
defaultConsumerUpdateWaitTime = 3 * time.Second
)

// ShutdownConfig is used as configuration for graceful shutdown
Expand All @@ -47,14 +52,23 @@ type ShutdownConfig struct {
* and the 99.9% requests will return response in 2s, so the StepTimeout will be bigger than(10+2) * 1000ms,
* maybe (10 + 2*3) * 1000ms is a good choice.
*/
StepTimeout string `default:"10s" yaml:"step_timeout" json:"step.timeout,omitempty" property:"step.timeout"`
StepTimeout string `default:"3s" yaml:"step-timeout" json:"step.timeout,omitempty" property:"step.timeout"`

/*
* ConsumerUpdateWaitTime means when provider is shutting down, after the unregister, time to wait for client to
* update invokers. During this time, incoming invocation can be treated normally.
*/
ConsumerUpdateWaitTime string `default:"3s" yaml:"consumer-update-wait-time" json:"consumerUpdate.waitTIme,omitempty" property:"consumerUpdate.waitTIme"`
// when we try to shutdown the applicationConfig, we will reject the new requests. In most cases, you don't need to configure this.
RejectRequestHandler string `yaml:"reject_handler" json:"reject_handler,omitempty" property:"reject_handler"`
RejectRequestHandler string `yaml:"reject-handler" json:"reject-handler,omitempty" property:"reject_handler"`
// internal listen kill signal,the default is true.
InternalSignal bool `default:"true" yaml:"internal-signal" json:"internal.signal,omitempty" property:"internal.signal"`

// true -> new request will be rejected.
RejectRequest bool
// true -> all requests had been processed. In provider side it means that all requests are returned response to clients
// In consumer side, it means that all requests getting response from servers
RequestsFinished bool
// active invocation
ConsumerActiveCount int32
ProviderActiveCount int32
}

// Prefix dubbo.shutdown
Expand Down Expand Up @@ -84,6 +98,20 @@ func (config *ShutdownConfig) GetStepTimeout() time.Duration {
return result
}

func (config *ShutdownConfig) GetConsumerUpdateWaitTime() time.Duration {
result, err := time.ParseDuration(config.ConsumerUpdateWaitTime)
if err != nil {
logger.Errorf("The ConsumerUpdateTimeout configuration is invalid: %s, and we will use the default value: %s, err: %v",
config.ConsumerActiveCount, defaultConsumerUpdateWaitTime.String(), err)
return defaultConsumerUpdateWaitTime
}
return result
}

func (config *ShutdownConfig) Init() error {
return defaults.Set(config)
}

type ShutdownConfigBuilder struct {
shutdownConfig *ShutdownConfig
}
Expand All @@ -107,16 +135,17 @@ func (scb *ShutdownConfigBuilder) SetRejectRequestHandler(rejectRequestHandler s
return scb
}

func (scb *ShutdownConfigBuilder) SetRequestsFinished(requestsFinished bool) *ShutdownConfigBuilder {
scb.shutdownConfig.RequestsFinished = requestsFinished
func (scb *ShutdownConfigBuilder) SetRejectRequest(rejectRequest bool) *ShutdownConfigBuilder {
scb.shutdownConfig.RejectRequest = rejectRequest
return scb
}

func (scb *ShutdownConfigBuilder) SetRejectRequest(rejectRequest bool) *ShutdownConfigBuilder {
scb.shutdownConfig.RejectRequest = rejectRequest
func (scb *ShutdownConfigBuilder) SetInternalSignal(internalSignal bool) *ShutdownConfigBuilder {
scb.shutdownConfig.InternalSignal = internalSignal
return scb
}

func (scb *ShutdownConfigBuilder) Build() *ShutdownConfig {
defaults.Set(scb)
return scb.shutdownConfig
}
1 change: 0 additions & 1 deletion config/graceful_shutdown_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
func TestShutdownConfigGetTimeout(t *testing.T) {
config := ShutdownConfig{}
assert.False(t, config.RejectRequest)
assert.False(t, config.RequestsFinished)

config = ShutdownConfig{
Timeout: "60s",
Expand Down
20 changes: 20 additions & 0 deletions config/root_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func GetApplicationConfig() *ApplicationConfig {
return rootConfig.Application
}

func GetShutDown() *ShutdownConfig {
if err := check(); err != nil {
return NewShutDownConfigBuilder().Build()
}
if rootConfig.Shutdown != nil {
return rootConfig.Shutdown
}
return NewShutDownConfigBuilder().Build()
}

// getRegistryIds get registry ids
func (rc *RootConfig) getRegistryIds() []string {
ids := make([]string, 0)
Expand Down Expand Up @@ -208,13 +218,17 @@ func (rc *RootConfig) Init() error {
if err := rc.Consumer.Init(rc); err != nil {
return err
}
if err := rc.Shutdown.Init(); err != nil {
return err
}
// todo if we can remove this from Init in the future?
rc.Start()
return nil
}

func (rc *RootConfig) Start() {
startOnce.Do(func() {
gracefulShutdownInit()
rc.Consumer.Load()
rc.Provider.Load()
// todo if register consumer instance or has exported services
Expand All @@ -237,6 +251,7 @@ func newEmptyRootConfig() *RootConfig {
Metric: NewMetricConfigBuilder().Build(),
Logger: NewLoggerConfigBuilder().Build(),
Custom: NewCustomConfigBuilder().Build(),
Shutdown: NewShutDownConfigBuilder().Build(),
}
return newRootConfig
}
Expand Down Expand Up @@ -329,6 +344,11 @@ func (rb *RootConfigBuilder) SetCustom(customConfig *CustomConfig) *RootConfigBu
return rb
}

func (rb *RootConfigBuilder) SetShutDown(shutDownConfig *ShutdownConfig) *RootConfigBuilder {
rb.rootConfig.Shutdown = shutDownConfig
return rb
}

func (rb *RootConfigBuilder) Build() *RootConfig {
return rb.rootConfig
}
Expand Down
2 changes: 2 additions & 0 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"container/list"
"fmt"
"net/url"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -403,6 +404,7 @@ func (svc *ServiceConfig) getUrlMap() url.Values {

// whether to export or not
urlMap.Set(constant.ExportKey, strconv.FormatBool(svc.export))
urlMap.Set(constant.PIDKey, fmt.Sprintf("%d", os.Getpid()))

for _, v := range svc.Methods {
prefix := "methods." + v.Name + "."
Expand Down
2 changes: 1 addition & 1 deletion config/testdata/consumer_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ references:

shutdown_conf:
timeout: 60s
step_timeout: 10s
step-timeout: 10s

protocol_conf:
# when you choose the Dubbo protocol, the following configuration takes effect
Expand Down
2 changes: 1 addition & 1 deletion config/testdata/consumer_config_with_configcenter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ references:

shutdown_conf:
timeout: 60s
step_timeout: 10s
step-timeout: 10s

protocol_conf:
dubbo:
Expand Down
2 changes: 1 addition & 1 deletion config/testdata/consumer_config_withoutProtocol.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ references:

shutdown_conf:
timeout: 60s
step_timeout: 10s
step-timeout: 10s

protocol_conf:
dubbo:
Expand Down
2 changes: 1 addition & 1 deletion config/testdata/provider_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protocols:

shutdown_conf:
timeout: 60s
step_timeout: 10s
step-timeout: 10s

protocol_conf:
dubbo:
Expand Down
2 changes: 1 addition & 1 deletion config/testdata/provider_config_withoutProtocol.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protocols:

shutdown_conf:
timeout: 60s
step_timeout: 10s
step-timeout: 10s

protocol_conf:
dubbo:
Expand Down
Loading

0 comments on commit 7168e8e

Please sign in to comment.