Skip to content

Commit

Permalink
feat: Add retry counter on failed reconciliation (#255)
Browse files Browse the repository at this point in the history
* feat: Add retry counter on failed reconciliation

If reconciliation failed, increase the Retry counter, set the NotBefore
field to a future date and reschedule a retry.

A Go routine handles the force retry because the system tries to
reconcile only if an event tell the system to do that or with the fixed
periodical Resync (which is slow for that).

Because we never tracked if a MicroVM was able to boot or not, we just
let the reconciler to check if the process is not there and react to the
results. In case the MicroVM was not able to boot, we reported back a
success on the MicroVM start step, which is not right and we can't track
failed state with that. As a solution, now a step has a Verify function
that will be called after Do. If the result is false, it marks the step
failed. That way we can start the MicroVM, wait a bit and check if it's
still running, if it's not running, the start failed.

* Stop hard failing resync on boot

If one fails, we can still listen on new requests and reconcile vms, if
they are failing always, the retry logic will handle this.

Co-authored-by: Claudia <claudiaberesford@gmail.com>
  • Loading branch information
yitsushi and Callisto13 authored Nov 19, 2021
1 parent 9e0abc6 commit 6ec5c7d
Show file tree
Hide file tree
Showing 35 changed files with 388 additions and 56 deletions.
80 changes: 68 additions & 12 deletions core/application/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@ package application
import (
"context"
"fmt"
"time"

"github.com/sirupsen/logrus"

"github.com/weaveworks/flintlock/api/events"
"github.com/weaveworks/flintlock/core/models"
"github.com/weaveworks/flintlock/core/plans"
"github.com/weaveworks/flintlock/core/ports"
portsctx "github.com/weaveworks/flintlock/core/ports/context"
"github.com/weaveworks/flintlock/pkg/defaults"
"github.com/weaveworks/flintlock/pkg/log"
"github.com/weaveworks/flintlock/pkg/planner"
)

const backoffBaseInSeconds = 20

func (a *app) ReconcileMicroVM(ctx context.Context, id, namespace string) error {
logger := log.GetLogger(ctx).WithField("action", "reconcile")

Expand Down Expand Up @@ -52,52 +57,100 @@ func (a *app) ResyncMicroVMs(ctx context.Context, namespace string) error {
return nil
}

func (a *app) plan(spec *models.MicroVM, logger *logrus.Entry) (planner.Plan, error) {
func (a *app) plan(spec *models.MicroVM, logger *logrus.Entry) planner.Plan {
l := logger.WithField("stage", "plan")
l.Info("Generate plan")

if spec.Status.Retry > a.cfg.MaximumRetry {
return nil, reachedMaximumRetryError{vmid: spec.ID, retries: spec.Status.Retry}
}

// Delete only if the spec was marked as deleted.
if spec.Spec.DeletedAt != 0 {
input := &plans.DeletePlanInput{
StateDirectory: a.cfg.RootStateDir,
VM: spec,
}

return plans.MicroVMDeletePlan(input), nil
return plans.MicroVMDeletePlan(input)
}

input := &plans.CreateOrUpdatePlanInput{
StateDirectory: a.cfg.RootStateDir,
VM: spec,
}

return plans.MicroVMCreateOrUpdatePlan(input), nil
return plans.MicroVMCreateOrUpdatePlan(input)
}

func (a *app) reschedule(ctx context.Context, logger *logrus.Entry, spec *models.MicroVM) error {
spec.Status.Retry++
waitTime := time.Duration(spec.Status.Retry*backoffBaseInSeconds) * time.Second
spec.Status.NotBefore = time.Now().Add(waitTime).Unix()

logger.Infof(
"[%d/%d] reconciliation failed, rescheduled for next attempt at %s",
spec.Status.Retry,
a.cfg.MaximumRetry,
time.Unix(spec.Status.NotBefore, 0),
)

if _, err := a.ports.Repo.Save(ctx, spec); err != nil {
return fmt.Errorf("saving spec failed: %w", err)
}

go func(id, ns string, sleepTime time.Duration) {
time.Sleep(sleepTime)

err := a.ports.EventService.Publish(
context.Background(),
defaults.TopicMicroVMEvents,
&events.MicroVMSpecUpdated{
ID: id,
Namespace: ns,
},
)
if err != nil {
logger.Errorf("failed to publish an update event for %s/%s", ns, id)
}
}(spec.ID.Name(), spec.ID.Namespace(), waitTime)

return nil
}

func (a *app) reconcile(ctx context.Context, spec *models.MicroVM, logger *logrus.Entry) error {
l := logger.WithField("vmid", spec.ID.String())
l.Info("Starting reconciliation")
localLogger := logger.WithField("vmid", spec.ID.String())
localLogger.Info("Starting reconciliation")

plan, planErr := a.plan(spec, l)
if planErr != nil {
return planErr
if spec.Status.Retry > a.cfg.MaximumRetry {
spec.Status.State = models.FailedState

logger.Error(reachedMaximumRetryError{vmid: spec.ID, retries: spec.Status.Retry})

return nil
}

if spec.Status.NotBefore > 0 && time.Now().Before(time.Unix(spec.Status.NotBefore, 0)) {
return nil
}

plan := a.plan(spec, localLogger)

execCtx := portsctx.WithPorts(ctx, a.ports)

executionID, err := a.ports.IdentifierService.GenerateRandom()
if err != nil {
if scheduleErr := a.reschedule(ctx, localLogger, spec); scheduleErr != nil {
return fmt.Errorf("rescheduling failed: %w", scheduleErr)
}

return fmt.Errorf("generating plan execution id: %w", err)
}

actuator := planner.NewActuator()

stepCount, err := actuator.Execute(execCtx, plan, executionID)
if err != nil {
if scheduleErr := a.reschedule(ctx, localLogger, spec); scheduleErr != nil {
return fmt.Errorf("rescheduling failed: %w", scheduleErr)
}

return fmt.Errorf("executing plan: %w", err)
}

Expand All @@ -109,6 +162,9 @@ func (a *app) reconcile(ctx context.Context, spec *models.MicroVM, logger *logru
return nil
}

spec.Status.Retry = 0
spec.Status.NotBefore = 0

if _, err := a.ports.Repo.Save(ctx, spec); err != nil {
return fmt.Errorf("saving spec after plan execution: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions core/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
ErrUnsupportedIfaceType = errors.New("unsupported network interface type")
ErrIfaceNotFound = errors.New("network interface not found")
ErrMissingStatusInfo = errors.New("status is not defined")
ErrUnableToBoot = errors.New("microvm is unable to boot")
)

// TopicNotFoundError is an error created when a topic with a specific name isn't found.
Expand Down
2 changes: 2 additions & 0 deletions core/models/microvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type MicroVMStatus struct {
NetworkInterfaces NetworkInterfaceStatuses `json:"network_interfaces"`
// Retry is a counter about how many times we retried to reconcile.
Retry int `json:"retry"`
// NotBefore tells the system to do not reconcile until given timestamp.
NotBefore int64 `json:"not_before" validate:"omitempty"`
}

// Kernel is the specification of the kernel and its arguments.
Expand Down
2 changes: 2 additions & 0 deletions core/plans/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ package plans
const (
MicroVMDeletePlanName = "microvm_delete"
MicroVMCreateOrUpdatePlanName = "microvm_create_update"

microVMBootTime = 5
)
2 changes: 1 addition & 1 deletion core/plans/microvm_create_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *microvmCreateOrUpdatePlan) Create(ctx context.Context) ([]planner.Proce
}

// MicroVM provider start
if err := p.addStep(ctx, microvm.NewStartStep(p.vm, ports.Provider)); err != nil {
if err := p.addStep(ctx, microvm.NewStartStep(p.vm, ports.Provider, microVMBootTime)); err != nil {
return nil, fmt.Errorf("adding microvm start step: %w", err)
}

Expand Down
4 changes: 4 additions & 0 deletions core/steps/event/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ func (s *eventPublish) Do(ctx context.Context) ([]planner.Procedure, error) {

return nil, nil
}

func (s *eventPublish) Verify(ctx context.Context) error {
return nil
}
4 changes: 4 additions & 0 deletions core/steps/event/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ func TestNewPublish(t *testing.T) {
// now than crying later if the system does not do what we want.
shouldDo, _ := step.ShouldDo(ctx)
subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeTrue())
g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewPublish_eventServiceFailure(t *testing.T) {
Expand All @@ -67,7 +69,9 @@ func TestNewPublish_eventServiceFailure(t *testing.T) {
step := event.NewPublish(testTopic, evt, eventService)

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
4 changes: 4 additions & 0 deletions core/steps/microvm/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ func (s *createStep) Do(ctx context.Context) ([]planner.Procedure, error) {

return nil, nil
}

func (s *createStep) Verify(ctx context.Context) error {
return nil
}
10 changes: 10 additions & 0 deletions core/steps/microvm/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ func TestNewCreateStep(t *testing.T) {

shouldDo, shouldErr := step.ShouldDo(ctx)
subSteps, doErr := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeTrue())
g.Expect(shouldErr).To(g.BeNil())
g.Expect(subSteps).To(g.BeEmpty())
g.Expect(doErr).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewCreateStep_StateCheck(t *testing.T) {
Expand Down Expand Up @@ -96,9 +98,11 @@ func TestNewCreateStep_StateCheck(t *testing.T) {
Return(testCase.State, nil)

shouldDo, shouldErr := step.ShouldDo(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.Equal(testCase.ExpectToRun))
g.Expect(shouldErr).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
}

Expand All @@ -119,9 +123,11 @@ func TestNewCreateStep_StateCheckError(t *testing.T) {
Return(ports.MicroVMStateUnknown, errors.New("i have no idea"))

shouldDo, shouldErr := step.ShouldDo(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeFalse())
g.Expect(shouldErr).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewCreateStep_VMIsNotDefined(t *testing.T) {
Expand All @@ -137,9 +143,11 @@ func TestNewCreateStep_VMIsNotDefined(t *testing.T) {
step := microvm.NewCreateStep(vm, microVMService)

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).To(g.MatchError(internalerr.ErrSpecRequired))
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewCreateStep_ServiceCreateError(t *testing.T) {
Expand All @@ -159,7 +167,9 @@ func TestNewCreateStep_ServiceCreateError(t *testing.T) {
Return(errors.New("ensuring state dir: ...."))

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
4 changes: 4 additions & 0 deletions core/steps/microvm/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ func (s *deleteStep) Do(ctx context.Context) ([]planner.Procedure, error) {

return nil, nil
}

func (s *deleteStep) Verify(ctx context.Context) error {
return nil
}
10 changes: 10 additions & 0 deletions core/steps/microvm/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ func TestNewDeleteStep(t *testing.T) {

shouldDo, shouldErr := step.ShouldDo(ctx)
subSteps, doErr := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeTrue())
g.Expect(shouldErr).To(g.BeNil())
g.Expect(subSteps).To(g.BeEmpty())
g.Expect(doErr).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewDeleteStep_StateCheck(t *testing.T) {
Expand Down Expand Up @@ -97,9 +99,11 @@ func TestNewDeleteStep_StateCheck(t *testing.T) {
Return(testCase.State, nil)

shouldDo, shouldErr := step.ShouldDo(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.Equal(testCase.ExpectToRun))
g.Expect(shouldErr).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
}

Expand All @@ -120,9 +124,11 @@ func TestNewDeleteStep_StateCheckError(t *testing.T) {
Return(ports.MicroVMStateUnknown, errors.New("i have no idea"))

shouldDo, shouldErr := step.ShouldDo(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeFalse())
g.Expect(shouldErr).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewDeleteStep_VMIsNotDefined(t *testing.T) {
Expand All @@ -138,9 +144,11 @@ func TestNewDeleteStep_VMIsNotDefined(t *testing.T) {
step := microvm.NewDeleteStep(vm, microVMService)

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).To(g.MatchError(internalerr.ErrSpecRequired))
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewDeleteStep_ServiceDeleteError(t *testing.T) {
Expand All @@ -160,7 +168,9 @@ func TestNewDeleteStep_ServiceDeleteError(t *testing.T) {
Return(errors.New("ensuring state dir: ...."))

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
Loading

0 comments on commit 6ec5c7d

Please sign in to comment.