Skip to content

Commit 3711610

Browse files
authored
Add Saga Handler (#192)
The saga pattern descibes a saga -- a series of steps -- that may be required for an API request. It's well aware that some intermediate step can go wrong, most likely quotas, and prior steps will need to be compensated for to avoid resource leaks etc. This implements that in a nice generic way so it can be used by all complex handlers that do lots of things at once.
1 parent c55f829 commit 3711610

File tree

2 files changed

+238
-0
lines changed

2 files changed

+238
-0
lines changed

pkg/server/saga/saga.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
Copyright 2025 the Unikorn Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package saga
18+
19+
import (
20+
"context"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/log"
23+
)
24+
25+
// ActionFunc is a generic action/compensation function.
26+
// They will typically be bound receivers so that saga steps can
27+
// share state between themselves.
28+
type ActionFunc func(ctx context.Context) error
29+
30+
// Action is a single step in a saga.
31+
type Action struct {
32+
// name is used for logging so we can see what went wrong.
33+
name string
34+
// action is what is executed on the good path.
35+
action ActionFunc
36+
// compensate is run if a subsequent action in the saga fails
37+
// and can undo any state changes that need to be rewound.
38+
// May be nil.
39+
compensate ActionFunc
40+
}
41+
42+
// NewAction creates a new action.
43+
func NewAction(name string, action, compensate ActionFunc) Action {
44+
return Action{
45+
name: name,
46+
action: action,
47+
compensate: compensate,
48+
}
49+
}
50+
51+
// Handler implements a saga, a set of steps to achieve a desired outcome
52+
// and a set of steps to undo any state changes on failure of an action.
53+
type Handler interface {
54+
Actions() []Action
55+
}
56+
57+
// Run implements the saga algorithm.
58+
func Run(ctx context.Context, handler Handler) error {
59+
log := log.FromContext(ctx)
60+
61+
actions := handler.Actions()
62+
63+
// Do each action in order...
64+
for i := range actions {
65+
if err := actions[i].action(ctx); err != nil {
66+
// If something went wrong we need to undo all prior steps
67+
// to compensate for any changed state e.g. quota allocations.
68+
for j := i - 1; j >= 0; j-- {
69+
if actions[j].compensate == nil {
70+
continue
71+
}
72+
73+
if cerr := actions[j].compensate(ctx); cerr != nil {
74+
// You see this in your logs, you're going to have to
75+
// do some manual unpicking!
76+
// TODO: we could add a retry in here for transient errors
77+
// (and the actual action itself), but we aware the client
78+
// and server will have a response timeout, so perhaps
79+
// adding the compensation action to a log for aysnchronous
80+
// handling may be better in future.
81+
log.Error(cerr, "compensating action failed", "name", actions[j].name)
82+
return err
83+
}
84+
}
85+
86+
// Always return the error that caused failure, which will most likely
87+
// be something useful to user like quota allocation failures.
88+
return err
89+
}
90+
}
91+
92+
return nil
93+
}

pkg/server/saga/saga_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
Copyright 2025 the Unikorn Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package saga_test
18+
19+
import (
20+
"context"
21+
"errors"
22+
"testing"
23+
24+
"github.com/stretchr/testify/require"
25+
26+
"github.com/unikorn-cloud/core/pkg/server/saga"
27+
)
28+
29+
var (
30+
errFailAction = errors.New("fail action")
31+
errFailCompensate = errors.New("fail compensation")
32+
)
33+
34+
type Handler struct {
35+
action1Result error
36+
action2Result error
37+
action3Result error
38+
39+
compensate1Result error
40+
compensate2Result error
41+
42+
action1Called bool
43+
action2Called bool
44+
action3Called bool
45+
46+
compensate1Called bool
47+
compensate2Called bool
48+
}
49+
50+
func (h *Handler) action1(ctx context.Context) error {
51+
h.action1Called = true
52+
return h.action1Result
53+
}
54+
55+
func (h *Handler) action2(ctx context.Context) error {
56+
h.action2Called = true
57+
return h.action2Result
58+
}
59+
60+
func (h *Handler) action3(ctx context.Context) error {
61+
h.action3Called = true
62+
return h.action3Result
63+
}
64+
65+
func (h *Handler) compensate1(ctx context.Context) error {
66+
h.compensate1Called = true
67+
return h.compensate1Result
68+
}
69+
70+
func (h *Handler) compensate2(ctx context.Context) error {
71+
h.compensate2Called = true
72+
return h.compensate2Result
73+
}
74+
75+
func (h *Handler) Actions() []saga.Action {
76+
return []saga.Action{
77+
saga.NewAction("action1", h.action1, h.compensate1),
78+
saga.NewAction("action2", h.action2, h.compensate2),
79+
saga.NewAction("action3", h.action3, nil),
80+
}
81+
}
82+
83+
// TestSaga ensures all actions are called.
84+
func TestSaga(t *testing.T) {
85+
t.Parallel()
86+
87+
h := &Handler{}
88+
89+
require.NoError(t, saga.Run(t.Context(), h))
90+
require.True(t, h.action1Called)
91+
require.True(t, h.action2Called)
92+
require.True(t, h.action3Called)
93+
require.False(t, h.compensate1Called)
94+
require.False(t, h.compensate2Called)
95+
}
96+
97+
// TestSagaFailAction1 ensures compensating actions are correctly run.
98+
func TestSagaFailAction1(t *testing.T) {
99+
t.Parallel()
100+
101+
h := &Handler{
102+
action1Result: errFailAction,
103+
}
104+
105+
require.ErrorIs(t, saga.Run(t.Context(), h), errFailAction)
106+
require.True(t, h.action1Called)
107+
require.False(t, h.action2Called)
108+
require.False(t, h.action3Called)
109+
require.False(t, h.compensate1Called)
110+
require.False(t, h.compensate2Called)
111+
}
112+
113+
// TestSagaFailAction3 ensures compensating actions are correctly run.
114+
func TestSagaFailAction3(t *testing.T) {
115+
t.Parallel()
116+
117+
h := &Handler{
118+
action3Result: errFailAction,
119+
}
120+
121+
require.ErrorIs(t, saga.Run(t.Context(), h), errFailAction)
122+
require.True(t, h.action1Called)
123+
require.True(t, h.action2Called)
124+
require.True(t, h.action3Called)
125+
require.True(t, h.compensate1Called)
126+
require.True(t, h.compensate2Called)
127+
}
128+
129+
// TestSagaFailCompensation2 tests that compensation errors short circuit
130+
// and that the error returned is that of the failing action.
131+
func TestSagaFailCompensation2(t *testing.T) {
132+
t.Parallel()
133+
134+
h := &Handler{
135+
action3Result: errFailAction,
136+
compensate2Result: errFailCompensate,
137+
}
138+
139+
require.ErrorIs(t, saga.Run(t.Context(), h), errFailAction)
140+
require.True(t, h.action1Called)
141+
require.True(t, h.action2Called)
142+
require.True(t, h.action3Called)
143+
require.False(t, h.compensate1Called)
144+
require.True(t, h.compensate2Called)
145+
}

0 commit comments

Comments
 (0)