-
Notifications
You must be signed in to change notification settings - Fork 24
run a step inside a go routine #109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@af-md thanks for the PR! The approach has the right fundamentals, i.e., I think we can make this simpler by simply having a package level With respect to what chans = make([]chan int, 3)
for := range 10 {
resChan1, err := dbos.Go(ctx, StepFnClosure)
// handle err
}
// Read from each channel here The res channel can hold types similar to the workflow outcome chan:
|
…implify result handling
…uce stepWithSleep function
@maxdml does this feature have any conflict with what @apoliakov said about pre generating stepIDs: https://discord.com/channels/1156433345631232100/1166779411920597002/1413954852618244267 It makes sense to run steps inside Go routines - as they tend to be better performant compared to standard execution - however the users should be advised to write their code to wait for a step to complete (committed into DB) and then move onto the next step? probably that's what you were thinking of anyway... |
Ah... what I said was a comment on how Python works. Here we may have an opportunity to make it act differently. But Max or Peter will need to opine on that |
…step ID generation
@maxdml @apoliakov there is a small misunderstanding here. The problem we are solving with this PR is the non deterministic generation of stepIDs, resulting from the execution of steps in goroutines. What this PR will do is to serialize the generation of step IDs from within the workflow. That way, step IDs will be generated deterministically, before the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The low level implementation looks good, see my comments for the test.
I am realizing we need to change the API to support mocking. Specifically we should have a mirror Go
method on the DBOSContext
interface, that would be typeless (returns (stepOutcome[any], error)
). The reason we've been doing this for all DBOS methods is to allow the mocking of DBOSContext
in users' tests.
The package-level Go
would, like the package-level RunAsStep
does with its interface counterpart, call the interface Go
with a typed-erased function and set the stepName in the options.
The interface level Go
will do the step introspection, increment the stepID, then call the interface level RunAsStep
and return a typeless (stepOutcome[any]
) channel which we can pipe to the generic one (see an example in RunWorkflow.)
} | ||
|
||
// StepOutcome holds the result and error from a step execution | ||
type stepOutcome[R any] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type stepOutcome[R any] struct { | |
type StepOutcome[R any] struct { |
This should be made public.
Also let's add a bit more description in the comment as to where it is used, so pkg.go.dev
picks it up.
// Test step IDs are deterministic and in the order of execution | ||
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) | ||
require.NoError(t, err, "failed to get workflow steps") | ||
require.Len(t, steps, numSteps, "expected %d steps, got %d", numSteps, len(steps)) | ||
for i := 0; i < numSteps; i++ { | ||
assert.Equal(t, i, steps[i].StepID, "expected step ID to be %d, got %d", i, steps[i].StepID) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not test what you think it does: GetWorkflowSteps
returns all the workflow steps sorted by ascending step ID, so you're testing the SQL, not the step ID attribution.
The way to test this would be to have each step take their ID as input and return their ID. Then, you can iterate over the channels and make sure that the iterator number == the step result from the channel.
- Channels should be ordered by stepID
- If the correct ID was attributed to the step, the step return value will be equal to the channel iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a second part, we should also exercise the recovery part, either by running recoverPendingWorkflow
, or simply by executing the workflow again with the same workflowID. If there was a non determinism step attribution, DBOS would throw an error during the second RunAsStep
-- which it shouldn't.
For this to happen, however, we must ensure the workflow stays PENDING and does not return in the first, run, which we can achieve with an event (see this example). (If we don't do that, re-running the workflow will just get the workflow outcome, rather than going through the steps again.
closes #90
Summary
Adds support for running steps inside goroutines with deterministic step ID generation.
This is by no means the final solution, it's a PR to get feedback.
Problem
Currently, running steps inside goroutines causes non-deterministic step ID generation due to race conditions:
Solution
Pre-generate step IDs before launching goroutines:
Open Questions
ToDos