Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/component/controller/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (a *APIServer) writeKonnectivityConfig() error {
// Stop stops APIServer
func (a *APIServer) Stop() error {
if a.supervisor != nil {
a.supervisor.Stop()
return a.supervisor.Stop()
}
return nil
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/component/controller/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ func (a *Manager) Reconcile(_ context.Context, clusterConfig *v1beta1.ClusterCon
// Stop in case there's process running already and we need to change the config
if a.supervisor != nil {
logger.Info("reconcile has nothing to do")
a.supervisor.Stop()
if err := a.supervisor.Stop(); err != nil {
logger.WithError(err).Error("Failed to stop executable")
}
a.supervisor = nil
}

Expand All @@ -156,7 +158,7 @@ func (a *Manager) Reconcile(_ context.Context, clusterConfig *v1beta1.ClusterCon
// Stop stops Manager
func (a *Manager) Stop() error {
if a.supervisor != nil {
a.supervisor.Stop()
return a.supervisor.Stop()
}
return nil
}
4 changes: 3 additions & 1 deletion pkg/component/controller/cplb/cplb_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ func (k *Keepalived) Stop() error {
}

k.log.Info("Stopping keepalived")
k.supervisor.Stop()
if err := k.supervisor.Stop(); err != nil {
k.log.WithError(err).Error("Failed to stop executable")
}

if len(k.Config.VirtualServers) > 0 {
k.log.Info("Deleting dummy interface")
Expand Down
2 changes: 1 addition & 1 deletion pkg/component/controller/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (e *Etcd) Start(ctx context.Context) error {
// Stop stops etcd
func (e *Etcd) Stop() error {
if e.supervisor != nil {
e.supervisor.Stop()
return e.supervisor.Stop()
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/component/controller/k0scontrolapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *K0SControlAPI) Start(_ context.Context) error {
// Stop stops k0s api
func (m *K0SControlAPI) Stop() error {
if m.supervisor != nil {
m.supervisor.Stop()
return m.supervisor.Stop()
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/component/controller/kine.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (k *Kine) Start(ctx context.Context) error {
// Stop stops kine
func (k *Kine) Stop() error {
if k.supervisor != nil {
k.supervisor.Stop()
return k.supervisor.Stop()
}
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/component/controller/konnectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (k *Konnectivity) runServer(count uint) error {
if k.supervisor != nil {
k.EmitWithPayload("restarting konnectivity server due to server count change",
map[string]any{"serverCount": count})
k.supervisor.Stop()
return k.supervisor.Stop()
}

k.supervisor = &supervisor.Supervisor{
Expand Down Expand Up @@ -215,8 +215,7 @@ func (k *Konnectivity) Stop() error {
return nil
}
logrus.Debug("about to stop konnectivity supervisor")
k.supervisor.Stop()
return nil
return k.supervisor.Stop()
}

func (k *Konnectivity) health(ctx context.Context, path string) error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/component/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (a *Scheduler) Start(_ context.Context) error {
// Stop stops Scheduler
func (a *Scheduler) Stop() error {
if a.supervisor != nil {
a.supervisor.Stop()
return a.supervisor.Stop()
}
return nil
}
Expand Down Expand Up @@ -97,7 +97,9 @@ func (a *Scheduler) Reconcile(_ context.Context, clusterConfig *v1beta1.ClusterC
// Stop in case there's process running already and we need to change the config
if a.supervisor != nil {
logrus.WithField("component", kubeSchedulerComponentName).Info("reconcile has nothing to do")
a.supervisor.Stop()
if err := a.supervisor.Stop(); err != nil {
logrus.WithField("component", kubeSchedulerComponentName).WithError(err).Error("Failed to stop executable")
}
a.supervisor = nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/component/worker/containerd/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (c *Component) Stop() error {
return c.windowsStop()
}
if c.supervisor != nil {
c.supervisor.Stop()
return c.supervisor.Stop()
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/component/worker/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (k *Kubelet) Start(ctx context.Context) error {
// Stop stops kubelet
func (k *Kubelet) Stop() error {
if k.supervisor != nil {
k.supervisor.Stop()
return k.supervisor.Stop()
}
return nil
}
Expand Down
52 changes: 26 additions & 26 deletions pkg/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ type Supervisor struct {
CleanBeforeFn func() error

cmd *exec.Cmd
done chan bool
log logrus.FieldLogger
mutex sync.Mutex
startStopMutex sync.Mutex
cancel context.CancelFunc
stop func()
}

const k0sManaged = "_K0S_MANAGED=yes"
Expand All @@ -68,7 +67,7 @@ func (s *Supervisor) processWaitQuit(ctx context.Context) bool {
select {
case <-ctx.Done():
for {
s.log.Debug("Requesting graceful termination")
s.log.Debugf("Requesting graceful termination (%v)", context.Cause(ctx))
if err := requestGracefulTermination(s.cmd.Process); err != nil {
if errors.Is(err, os.ErrProcessDone) {
s.log.Info("Failed to request graceful termination: process has already terminated")
Expand Down Expand Up @@ -100,9 +99,8 @@ func (s *Supervisor) Supervise() error {
s.startStopMutex.Lock()
defer s.startStopMutex.Unlock()
// check if it is already started
if s.cancel != nil {
s.log.Warn("Already started")
return nil
if s.stop != nil {
return errors.New("already started")
}
s.log = logrus.WithField("component", s.Name)
s.PidFile = filepath.Join(s.RunDir, s.Name) + ".pid"
Expand All @@ -122,15 +120,11 @@ func (s *Supervisor) Supervise() error {
s.log.WithError(err).Warn("Old process cannot be terminated")
}

var ctx context.Context
ctx, s.cancel = context.WithCancel(context.Background())
started := make(chan error)
s.done = make(chan bool)
ctx, cancel := context.WithCancelCause(context.Background())
started, done := make(chan error, 1), make(chan bool)

go func() {
defer func() {
close(s.done)
}()
defer close(done)

s.log.Info("Starting to supervise")
restarts := 0
Expand Down Expand Up @@ -196,32 +190,38 @@ func (s *Supervisor) Supervise() error {

select {
case <-ctx.Done():
s.log.Debug("respawn canceled")
s.log.Debugf("respawn canceled (%v)", context.Cause(ctx))
return
case <-time.After(s.TimeoutRespawn):
s.log.Debug("respawning")
}
}
}()
return <-started

if err := <-started; err != nil {
cancel(err)
<-done
return err
}

s.stop = func() {
cancel(errors.New("containerd component is stopping"))
<-done
}
return nil
}

// Stop stops the supervised
func (s *Supervisor) Stop() {
func (s *Supervisor) Stop() error {
s.startStopMutex.Lock()
defer s.startStopMutex.Unlock()
if s.cancel == nil || s.log == nil {
s.log.Warn("Not started")
return
if s.stop == nil {
return errors.New("not started")
}
s.log.Debug("Sending stop message")

s.cancel()
s.cancel = nil
s.log.Debug("Waiting for stopping is done")
if s.done != nil {
<-s.done
}
s.stop()
s.stop = nil
return nil
}

// Checks if the process referenced in the PID file is a k0s-managed process.
Expand Down
25 changes: 15 additions & 10 deletions pkg/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ func TestSupervisorStart(t *testing.T) {
err := s.proc.Supervise()
if s.expectedErrMsg != "" {
assert.ErrorContains(t, err, s.expectedErrMsg)
assert.ErrorContains(t, s.proc.Stop(), "not started")
} else {
assert.NoError(t, err, "Failed to start")
assert.NoError(t, s.proc.Stop())
}
s.proc.Stop()
})
}
}
Expand Down Expand Up @@ -148,7 +149,7 @@ func TestRespawn(t *testing.T) {
TimeoutRespawn: 1 * time.Millisecond,
}
require.NoError(t, s.Supervise())
t.Cleanup(s.Stop)
t.Cleanup(func() { assert.NoError(t, s.Stop()) })

// wait til process starts up
require.NoError(t, pingPong.AwaitPing())
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestStopWhileRespawn(t *testing.T) {
}

// stop while waiting for respawn
s.Stop()
assert.NoError(t, s.Stop())
}

func TestMultiThread(t *testing.T) {
Expand All @@ -228,13 +229,13 @@ func TestMultiThread(t *testing.T) {

var wg sync.WaitGroup
assert.NoError(t, s.Supervise(), "Failed to start")
t.Cleanup(s.Stop)
t.Cleanup(func() { assert.NoError(t, s.Stop()) })

for range 255 {
wg.Add(1)
go func() {
defer wg.Done()
s.Stop()
_ = s.Stop()
_ = s.Supervise()
}()
}
Expand Down Expand Up @@ -270,13 +271,17 @@ func TestCleanupPIDFile_Gracefully(t *testing.T) {

// Start to supervise the new process.
require.NoError(t, s.Supervise())
t.Cleanup(s.Stop)
t.Cleanup(func() {
// Stop is called and checked in the regular test flow. This is just to
// ensure it will be called in any case, so don't check the error here.
_ = s.Stop()
})

// Expect the previous process to be gracefully terminated.
assert.NoError(t, prevCmd.Wait())

// Stop the supervisor and check if the PID file is gone.
s.Stop()
assert.NoError(t, s.Stop())
assert.NoFileExists(t, pidFilePath)
}

Expand Down Expand Up @@ -314,7 +319,7 @@ func TestCleanupPIDFile_LingeringProcess(t *testing.T) {
// previous process won't terminate.
err := s.Supervise()
if !assert.Error(t, err) {
s.Stop()
assert.NoError(t, s.Stop())
} else {
assert.ErrorContains(t, err, "while waiting for termination of PID")
assert.ErrorContains(t, err, pidFilePath)
Expand Down Expand Up @@ -352,7 +357,7 @@ func TestCleanupPIDFile_WrongProcess(t *testing.T) {

// Start to supervise the new process.
require.NoError(t, s.Supervise())
t.Cleanup(s.Stop)
t.Cleanup(func() { assert.NoError(t, s.Stop()) })

// Expect the PID file to be replaced with the new PID.
if pid, err := os.ReadFile(pidFilePath); assert.NoError(t, err, "Failed to read PID file") {
Expand Down Expand Up @@ -382,7 +387,7 @@ func TestCleanupPIDFile_NonexistingProcess(t *testing.T) {

// Start to supervise the new process.
require.NoError(t, s.Supervise())
t.Cleanup(s.Stop)
t.Cleanup(func() { assert.NoError(t, s.Stop()) })

// Expect the PID file to be replaced with the new PID.
if pid, err := os.ReadFile(pidFilePath); assert.NoError(t, err, "Failed to read PID file") {
Expand Down
Loading