Skip to content

Conversation

af-md
Copy link
Collaborator

@af-md af-md commented Sep 4, 2025

closes #90

Summary

Adds support for running steps inside goroutines with deterministic step ID generation.

This is by no means the final solution, it's a PR to get feedback.

Problem

Currently, running steps inside goroutines causes non-deterministic step ID generation due to race conditions:

Solution

Pre-generate step IDs before launching goroutines:

  • Added Go() function that pre-generates step IDs and runs steps in goroutines
  • Added WithNextStepID option to pass pre-generated IDs to RunAsStep
  • Modified RunAsStep to use pre-generated IDs when provided
    result, err := dbos.Go(ctx, func(ctx context.Context) (string, error) {
        return processItem(ctx, item)
    })

Open Questions

  • Does this satisfy the UX you had in mind @maxdml ? I am a bit unsure as to what you had in mind but gave it a go anyway.
  • Might be worth giving me fuller example of how you think the UX would work inside the workflow

ToDos

  • I still need to add some docs
  • Still need to write tests for it as well

@af-md af-md requested a review from maxdml September 4, 2025 20:02
@maxdml
Copy link
Collaborator

maxdml commented Sep 5, 2025

@af-md thanks for the PR!

The approach has the right fundamentals, i.e., dbos.Go generates a step ID and calls RunAsStep in a goroutine.

I think we can make this simpler by simply having a package level Go() that calls the package level RunAsStep inside a goroutine. That way we don't have to repeat all the logic from the package level RunAsStep into Go.

With respect to what dbos.Go returns to the user: it should be non blocking and return to the caller a channel (typed with the output type of the step). That way I can write:

chans = make([]chan int, 3)
for := range 10 {
    resChan1, err := dbos.Go(ctx, StepFnClosure)
    // handle err
}

// Read from each channel here

The res channel can hold types similar to the workflow outcome chan:

type stepOutcome[R any] struct {
	result R
	err    error
}

@af-md
Copy link
Collaborator Author

af-md commented Sep 7, 2025

@maxdml does this feature have any conflict with what @apoliakov said about pre generating stepIDs: https://discord.com/channels/1156433345631232100/1166779411920597002/1413954852618244267

It makes sense to run steps inside Go routines - as they tend to be better performant compared to standard execution - however the users should be advised to write their code to wait for a step to complete (committed into DB) and then move onto the next step? probably that's what you were thinking of anyway...

@af-md af-md changed the title Run as step inside go routines run a step inside a go routine Sep 7, 2025
@apoliakov
Copy link
Member

@maxdml does this feature have any conflict with what @apoliakov said about pre generating stepIDs: https://discord.com/channels/1156433345631232100/1166779411920597002/1413954852618244267

It makes sense to run steps inside Go routines - as they tend to be better performant compared to standard execution - however the users should be told to write their code to wait for a step to complete (committed into DB) and then move onto the next step? probably that's what you were thinking of anyway...

Ah... what I said was a comment on how Python works. Here we may have an opportunity to make it act differently. But Max or Peter will need to opine on that

@maxdml
Copy link
Collaborator

maxdml commented Sep 8, 2025

@maxdml @apoliakov there is a small misunderstanding here. The problem we are solving with this PR is the non deterministic generation of stepIDs, resulting from the execution of steps in goroutines.

What this PR will do is to serialize the generation of step IDs from within the workflow. That way, step IDs will be generated deterministically, before the RunAsStep code executes, and regardless of the order in which they complete, they'll always have the same stepID.

@af-md af-md marked this pull request as ready for review September 10, 2025 08:05
@af-md af-md self-assigned this Sep 10, 2025
Copy link
Collaborator

@maxdml maxdml left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The low level implementation looks good, see my comments for the test.

I am realizing we need to change the API to support mocking. Specifically we should have a mirror Go method on the DBOSContext interface, that would be typeless (returns (stepOutcome[any], error)). The reason we've been doing this for all DBOS methods is to allow the mocking of DBOSContext in users' tests.

The package-level Go would, like the package-level RunAsStep does with its interface counterpart, call the interface Go with a typed-erased function and set the stepName in the options.

The interface level Go will do the step introspection, increment the stepID, then call the interface level RunAsStep and return a typeless (stepOutcome[any]) channel which we can pipe to the generic one (see an example in RunWorkflow.)

}

// StepOutcome holds the result and error from a step execution
type stepOutcome[R any] struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type stepOutcome[R any] struct {
type StepOutcome[R any] struct {

This should be made public.

Also let's add a bit more description in the comment as to where it is used, so pkg.go.dev picks it up.

Comment on lines +655 to +661
// Test step IDs are deterministic and in the order of execution
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
require.NoError(t, err, "failed to get workflow steps")
require.Len(t, steps, numSteps, "expected %d steps, got %d", numSteps, len(steps))
for i := 0; i < numSteps; i++ {
assert.Equal(t, i, steps[i].StepID, "expected step ID to be %d, got %d", i, steps[i].StepID)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not test what you think it does: GetWorkflowSteps returns all the workflow steps sorted by ascending step ID, so you're testing the SQL, not the step ID attribution.

The way to test this would be to have each step take their ID as input and return their ID. Then, you can iterate over the channels and make sure that the iterator number == the step result from the channel.

  • Channels should be ordered by stepID
  • If the correct ID was attributed to the step, the step return value will be equal to the channel iterator

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a second part, we should also exercise the recovery part, either by running recoverPendingWorkflow, or simply by executing the workflow again with the same workflowID. If there was a non determinism step attribution, DBOS would throw an error during the second RunAsStep -- which it shouldn't.

For this to happen, however, we must ensure the workflow stays PENDING and does not return in the first, run, which we can achieve with an event (see this example). (If we don't do that, re-running the workflow will just get the workflow outcome, rather than going through the steps again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support calling runAsStep in goroutines inside workflows
3 participants