Skip to content

Commit

Permalink
Add a call stack to prevent infinite WF invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
tjerman authored and darh committed Oct 1, 2021
1 parent e89160b commit f865d63
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 10 deletions.
7 changes: 5 additions & 2 deletions automation/service/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type (
session chan *wfexec.Session
graph *wfexec.Graph
trace bool
callStack []uint64
}

sessionAccessController interface {
Expand Down Expand Up @@ -182,7 +183,7 @@ func (svc *session) Start(g *wfexec.Graph, i auth.Identifiable, ssp types.Sessio

var (
ctx = auth.SetIdentityToContext(context.Background(), i)
ses = svc.spawn(g, ssp.WorkflowID, ssp.Trace)
ses = svc.spawn(g, ssp.WorkflowID, ssp.Trace, ssp.CallStack)
)

ses.CreatedAt = *now()
Expand Down Expand Up @@ -230,12 +231,13 @@ func (svc *session) Resume(sessionID, stateID uint64, i auth.Identifiable, input
//
// We need initial context for the session because we want to catch all cancellations or timeouts from there
// and not from any potential HTTP requests or similar temporary context that can prematurely destroy a workflow session
func (svc *session) spawn(g *wfexec.Graph, workflowID uint64, trace bool) (ses *types.Session) {
func (svc *session) spawn(g *wfexec.Graph, workflowID uint64, trace bool, callStack []uint64) (ses *types.Session) {
s := &spawn{
workflowID: workflowID,
session: make(chan *wfexec.Session, 1),
graph: g,
trace: trace,
callStack: callStack,
}

// Send new-session request
Expand Down Expand Up @@ -266,6 +268,7 @@ func (svc *session) Watch(ctx context.Context) {
case s := <-svc.spawnQueue:
opts := []wfexec.SessionOpt{
wfexec.SetWorkflowID(s.workflowID),
wfexec.SetCallStack(s.callStack...),
wfexec.SetHandler(svc.stateChangeHandler(ctx)),
}

Expand Down
2 changes: 1 addition & 1 deletion automation/service/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (svc trigger) canManageTrigger(ctx context.Context, res *types.Trigger, per
// registers all triggers on all given workflows
// before registering triggers on a workflow, all workflow triggers are unregistered
func (svc *trigger) registerWorkflows(ctx context.Context, workflows ...*types.Workflow) error {
// load ALL workflows directly from store
// load ALL triggers directly from store
tt, _, err := store.SearchAutomationTriggers(ctx, svc.store, types.TriggerFilter{
WorkflowID: types.WorkflowSet(workflows).IDs(),
Deleted: filter.StateInclusive,
Expand Down
19 changes: 18 additions & 1 deletion automation/service/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type (
triggers *trigger
session *session

opt options.WorkflowOpt

log *zap.Logger

// maps resolved workflow graphs to workflow ID (key, uint64)
Expand Down Expand Up @@ -79,9 +81,10 @@ const (
workflowDefChanged workflowChanges = 4
)

func Workflow(log *zap.Logger, corredorOpt options.CorredorOpt) *workflow {
func Workflow(log *zap.Logger, corredorOpt options.CorredorOpt, opt options.WorkflowOpt) *workflow {
return &workflow{
log: log,
opt: opt,
actionlog: DefaultActionlog,
store: DefaultStore,
ac: DefaultAccessControl,
Expand Down Expand Up @@ -552,11 +555,18 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl
// Start with workflow scope
scope := wf.Scope.MustMerge()

callStack := wfexec.GetContextCallStack(ctx)
if len(callStack) > svc.opt.CallStackSize {
return WorkflowErrMaximumCallStackSizeExceeded()
}

ssp := types.SessionStartParams{
WorkflowID: wf.ID,
KeepFor: wf.KeepSessions,
Trace: wf.Trace || p.Trace,
StepID: p.StepID,

CallStack: callStack,
}

if !p.Trace {
Expand Down Expand Up @@ -697,6 +707,11 @@ func makeWorkflowHandler(ac workflowExecController, s *session, t *types.Trigger
return WorkflowErrNotAllowedToExecute()
}

callStack := wfexec.GetContextCallStack(ctx)
if len(callStack) > s.opt.CallStackSize {
return WorkflowErrMaximumCallStackSizeExceeded()
}

wait, err = s.Start(g, runAs, types.SessionStartParams{
WorkflowID: wf.ID,
KeepFor: wf.KeepSessions,
Expand All @@ -705,6 +720,8 @@ func makeWorkflowHandler(ac workflowExecController, s *session, t *types.Trigger
StepID: t.StepID,
EventType: t.EventType,
ResourceType: t.ResourceType,

CallStack: callStack,
})

if err != nil {
Expand Down
36 changes: 36 additions & 0 deletions automation/service/workflow_actions.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions automation/service/workflow_actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,7 @@ errors:
- error: notAllowedToExecuteCorredorStep
message: "not allowed to run corredorExec function, corredor is disabled"
log: "failed to execute {{workflow}} with corredorExec function step; corredor is disabled"

- error: maximumCallStackSizeExceeded
message: "maximum call stack size exceeded"
log: "maximum call stack size exceeded"
2 changes: 2 additions & 0 deletions automation/types/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type (
StepID uint64
EventType string
ResourceType string

CallStack []uint64
}

SessionFilter struct {
Expand Down
10 changes: 6 additions & 4 deletions pkg/options/workflow.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/options/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ props:
type: bool
default: false
description: Enables verbose logging for workflow execution

- name: callStackSize
type: int
default: 16
description: Defines the maximum call stack size between workflows
30 changes: 28 additions & 2 deletions pkg/wfexec/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type (
dumpStacktraceOnPanic bool

eventHandler StateChangeHandler

callStack []uint64
}

StateChangeHandler func(SessionStatus, *State, *Session)
Expand Down Expand Up @@ -106,6 +108,8 @@ type (
}

SessionStatus int

callStackCtxKey struct{}
)

const (
Expand Down Expand Up @@ -180,6 +184,8 @@ func NewSession(ctx context.Context, g *Graph, oo ...SessionOpt) *Session {
s.log = s.log.
With(zap.Uint64("sessionID", s.id))

s.callStack = append(s.callStack, s.id)

go s.worker(ctx)

return s
Expand Down Expand Up @@ -624,9 +630,10 @@ func (s *Session) exec(ctx context.Context, log *zap.Logger, st *State) (nxt []*
// Context received in exec() wil not have the identity we're expecting
// so we need to pull it from state owner and add it to new context
// that is set to step exec function
ctxWithIdentity := auth.SetIdentityToContext(ctx, st.owner)
stepCtx := auth.SetIdentityToContext(ctx, st.owner)
stepCtx = SetContextCallStack(stepCtx, s.callStack)

result, st.err = st.step.Exec(ctxWithIdentity, st.MakeRequest())
result, st.err = st.step.Exec(stepCtx, st.MakeRequest())

if iterator, isIterator := result.(Iterator); isIterator && st.err == nil {
// Exec fn returned an iterator, adding loop to stack
Expand Down Expand Up @@ -851,6 +858,12 @@ func SetDumpStacktraceOnPanic(dump bool) SessionOpt {
}
}

func SetCallStack(id ...uint64) SessionOpt {
return func(s *Session) {
s.callStack = id
}
}

func (ss Steps) hash() map[Step]bool {
out := make(map[Step]bool)
for _, s := range ss {
Expand Down Expand Up @@ -883,3 +896,16 @@ func (ss Steps) IDs() []uint64 {

return ids
}

func SetContextCallStack(ctx context.Context, ss []uint64) context.Context {
return context.WithValue(ctx, callStackCtxKey{}, ss)
}

func GetContextCallStack(ctx context.Context) []uint64 {
v := ctx.Value(callStackCtxKey{})
if v == nil {
return nil
}

return v.([]uint64)
}

0 comments on commit f865d63

Please sign in to comment.