Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/unreleased/fix-graceful-shutdown.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Bugfix: Fix the graceful shutdown

Fix the graceful shutdown.

https://github.com/owncloud/reva/pull/272
https://github.com/owncloud/ocis/issues/11170
76 changes: 41 additions & 35 deletions cmd/revad/internal/grace/grace.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -88,13 +89,18 @@ func NewWatcher(opts ...Option) *Watcher {
// Exit exits the current process cleaning up
// existing pid files.
func (w *Watcher) Exit(errc int) {
w.Clean()
os.Exit(errc)
}

// Clean removes the pid file.
func (w *Watcher) Clean() {
err := w.clean()
if err != nil {
w.log.Warn().Err(err).Msg("error removing pid file")
} else {
w.log.Info().Msgf("pid file %q got removed", w.pidFile)
}
os.Exit(errc)
}

func (w *Watcher) clean() error {
Expand Down Expand Up @@ -266,7 +272,7 @@ type Server interface {
// TrapSignals captures the OS signal.
func (w *Watcher) TrapSignals() {
signalCh := make(chan os.Signal, 1024)
signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT)
signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
for {
s := <-signalCh
w.log.Info().Msgf("%v signal received", s)
Expand All @@ -284,14 +290,9 @@ func (w *Watcher) TrapSignals() {
w.log.Info().Msgf("child forked with new pid %d", p.Pid)
w.childPIDs = append(w.childPIDs, p.Pid)
}

case syscall.SIGQUIT:
gracefulShutdown(w)
case syscall.SIGINT, syscall.SIGTERM:
if w.gracefulShutdownTimeout == 0 {
hardShutdown(w)
}
case syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM:
gracefulShutdown(w)
return
}
}
}
Expand All @@ -300,38 +301,43 @@ func (w *Watcher) TrapSignals() {
// exit() is problematic (i.e. racey) especiaily when orchestrating multiple
// reva services from some external runtime (like in the "ocis server" case
func gracefulShutdown(w *Watcher) {
defer w.Clean()
w.log.Info().Int("Timeout", w.gracefulShutdownTimeout).Msg("preparing for a graceful shutdown with deadline")
go func() {
count := w.gracefulShutdownTimeout
ticker := time.NewTicker(time.Second)
for ; true; <-ticker.C {
w.log.Info().Msgf("shutting down in %d seconds", count-1)
count--
if count <= 0 {
w.log.Info().Msg("deadline reached before draining active conns, hard stopping ...")
for _, s := range w.ss {
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
}
w.Exit(1)
wg := sync.WaitGroup{}

for _, s := range w.ss {
wg.Add(1)
go func() {
defer wg.Done()
w.log.Info().Msgf("fd to %s:%s gracefully closed", s.Network(), s.Address())
err := s.GracefulStop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
}
}()
}

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s gracefully closed ", s.Network(), s.Address())
err := s.GracefulStop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
w.log.Info().Msg("exit with error code 1")

w.Exit(1)
select {
case <-time.After(time.Duration(w.gracefulShutdownTimeout) * time.Second):
w.log.Info().Msg("graceful shutdown timeout reached. running hard shutdown")
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
}
return
case <-done:
w.log.Info().Msg("all servers gracefully stopped")
return
}
w.log.Info().Msg("exit with error code 0")
w.Exit(0)
}

// TODO: Ideally this would call exit() but properly return an error. The
Expand Down
178 changes: 178 additions & 0 deletions cmd/revad/runtime/drivenserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package runtime

import (
"errors"
"fmt"
"net"
"net/http"
"os"
"time"

"github.com/owncloud/reva/v2/pkg/registry"
"github.com/rs/zerolog"
)

const (
HTTP = iota
GRPC
)

// RevaDrivenServer is an interface that defines the methods for starting and stopping reva HTTP/GRPC services.
type RevaDrivenServer interface {
Start() error
Stop() error
}

// revaServer is an interface that defines the methods for starting and stopping a reva server.
type revaServer interface {
Start(ln net.Listener) error
Stop() error
GracefulStop() error
Network() string
Address() string
}

// sever represents a generic reva server that implements the RevaDrivenServer interface.
type server struct {
srv revaServer
log *zerolog.Logger
gracefulShutdownTimeout time.Duration
protocol string
}

// NewDrivenHTTPServerWithOptions runs a revad server w/o watcher with the given config file and options.
// Use it in cases where you want to run a revad server without the need for a watcher and the os signal handling as a part of another runtime.
// Returns nil if no http server is configured in the config file.
// The GracefulShutdownTimeout set to default 20 seconds and can be overridden in the core config.
// Logging a fatal error and exit with code 1 if the http server cannot be created.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "exit with code 1" is misleading because the function never does it. As it is written, it seems there is a os.Exit(-1) call hidden somewhere inside this function, but it isn't the case.

If, as a result of this function returning nil, the main app decides to exit with code 1, that's fine, but it's something this function doesn't need to care about. Whatever the caller does with the result of this function isn't this function's business.

func NewDrivenHTTPServerWithOptions(mainConf map[string]interface{}, opts ...Option) RevaDrivenServer {
if !isEnabledHTTP(mainConf) {
return nil
}
options := newOptions(opts...)
if srv := newServer(HTTP, mainConf, options); srv != nil {
return srv
}
options.Logger.Fatal().Msg("nothing to do, no http enabled_services declared in config")
return nil
}

// NewDrivenGRPCServerWithOptions runs a revad server w/o watcher with the given config file and options.
// Use it in cases where you want to run a revad server without the need for a watcher and the os signal handling as a part of another runtime.
// Returns nil if no grpc server is configured in the config file.
// The GracefulShutdownTimeout set to default 20 seconds and can be overridden in the core config.
// Logging a fatal error and exit with code 1 if the grpc server cannot be created.
func NewDrivenGRPCServerWithOptions(mainConf map[string]interface{}, opts ...Option) RevaDrivenServer {
if !isEnabledGRPC(mainConf) {
return nil
}
options := newOptions(opts...)
if srv := newServer(GRPC, mainConf, options); srv != nil {
return srv
}
options.Logger.Fatal().Msg("nothing to do, no grpc enabled_services declared in config")
return nil
}

// Start starts the reva server, listening on the configured address and network.
func (s *server) Start() error {
if s.srv == nil {
err := fmt.Errorf("reva %s server not initialized", s.protocol)
s.log.Fatal().Err(err).Send()
return err
}
ln, err := net.Listen(s.srv.Network(), s.srv.Address())
if err != nil {
s.log.Fatal().Err(err).Send()
return err
}
if err = s.srv.Start(ln); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
s.log.Error().Err(err).Msgf("reva %s server error", s.protocol)
}
return err
}
return nil
}

// Stop gracefully stops the reva server, waiting for the graceful shutdown timeout.
func (s *server) Stop() error {
if s.srv == nil {
return nil
}
done := make(chan struct{})
go func() {
s.log.Info().Msgf("gracefully stopping %s:%s reva %s server", s.srv.Network(), s.srv.Address(), s.protocol)
if err := s.srv.GracefulStop(); err != nil {
s.log.Error().Err(err).Msgf("error gracefully stopping reva %s server", s.protocol)
s.srv.Stop()
}
close(done)
}()

select {
case <-time.After(s.gracefulShutdownTimeout):
s.log.Info().Msg("graceful shutdown timeout reached. running hard shutdown")
err := s.srv.Stop()
if err != nil {
s.log.Error().Err(err).Msgf("error stopping reva %s server", s.protocol)
}
return nil
case <-done:
s.log.Info().Msgf("reva %s server gracefully stopped", s.protocol)
return nil
}
}

// newServer runs a revad server w/o watcher with the given config file and options.
func newServer(protocol int, mainConf map[string]interface{}, options Options) RevaDrivenServer {
parseSharedConfOrDie(mainConf["shared"])
coreConf := parseCoreConfOrDie(mainConf["core"])
log := options.Logger

if err := registry.Init(options.Registry); err != nil {
log.Fatal().Err(err).Msg("failed to initialize registry client")
return nil
}

host, _ := os.Hostname()
log.Info().Msgf("host info: %s", host)

// Only initialize tracing if we didn't get a tracer provider.
if options.TraceProvider == nil {
log.Debug().Msg("no pre-existing tracer given, initializing tracing")
options.TraceProvider = initTracing(coreConf)
}
initCPUCount(coreConf, log)

gracefulShutdownTimeout := 20 * time.Second
if coreConf.GracefulShutdownTimeout > 0 {
gracefulShutdownTimeout = time.Duration(coreConf.GracefulShutdownTimeout) * time.Second
}

srv := &server{
log: options.Logger,
gracefulShutdownTimeout: gracefulShutdownTimeout,
}
switch protocol {
case HTTP:
s, err := getHTTPServer(mainConf["http"], options.Logger, options.TraceProvider)
if err != nil {
options.Logger.Fatal().Err(err).Msg("error creating http server")
return nil
}
srv.srv = s
srv.protocol = "http"
return srv
case GRPC:
s, err := getGRPCServer(mainConf["grpc"], options.Logger, options.TraceProvider)
if err != nil {
options.Logger.Fatal().Err(err).Msg("error creating grpc server")
return nil
}
srv.srv = s
srv.protocol = "grpc"
return srv
}
return nil
}
5 changes: 3 additions & 2 deletions cmd/revad/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import (
"strconv"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/owncloud/reva/v2/cmd/revad/internal/grace"
"github.com/owncloud/reva/v2/pkg/logger"
"github.com/owncloud/reva/v2/pkg/registry"
"github.com/owncloud/reva/v2/pkg/rgrpc"
"github.com/owncloud/reva/v2/pkg/rhttp"
"github.com/owncloud/reva/v2/pkg/sharedconf"
rtrace "github.com/owncloud/reva/v2/pkg/trace"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/trace"
Expand All @@ -53,7 +53,8 @@ func RunWithOptions(mainConf map[string]interface{}, pidFile string, opts ...Opt
coreConf := parseCoreConfOrDie(mainConf["core"])

if err := registry.Init(options.Registry); err != nil {
panic(err)
options.Logger.Fatal().Err(err).Msg("failed to initialize registry client")
return
}

run(mainConf, coreConf, options.Logger, options.TraceProvider, pidFile)
Expand Down
8 changes: 4 additions & 4 deletions pkg/rgrpc/rgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"net"
"sort"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/mitchellh/mapstructure"
"github.com/owncloud/reva/v2/internal/grpc/interceptors/appctx"
"github.com/owncloud/reva/v2/internal/grpc/interceptors/auth"
"github.com/owncloud/reva/v2/internal/grpc/interceptors/log"
Expand All @@ -33,8 +35,6 @@ import (
"github.com/owncloud/reva/v2/internal/grpc/interceptors/useragent"
"github.com/owncloud/reva/v2/pkg/sharedconf"
rtrace "github.com/owncloud/reva/v2/pkg/trace"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog"
mtls "go-micro.dev/v4/util/tls"
Expand Down Expand Up @@ -279,15 +279,15 @@ func (s *Server) cleanupServices() {

// Stop stops the server.
func (s *Server) Stop() error {
s.cleanupServices()
s.s.Stop()
s.cleanupServices()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jvillafanez I assumed that we should stop the Listen first and then clean the services. Could you check if it makes sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, this Stop method will be call by a secondary thread because the main one will be running the service. Once we call s.s.Stop(), the main thread will keep going, so there is a risk that the app finishes without having run the s.cleanupServices() (or maybe it runs partially for some services until the app closes).
Maybe I'm wrong, there are too many Stop methods and it's hard to know which one we're calling here.

return nil
}

// GracefulStop gracefully stops the server.
func (s *Server) GracefulStop() error {
s.cleanupServices()
s.s.GracefulStop()
s.cleanupServices()
return nil
}

Expand Down
Loading