Skip to content

feat: introduce -fail-fast flag for src batch #1154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ All notable changes to `src-cli` are documented in this file.

## Unreleased

### Added

- Batch Changes: Added `-fail-fast` flag to `src batch preview` and `src batch apply` that causes execution to immediately halt on the first error instead of continuing with other repositories. This enables faster iteration on batch specs. [#1154](https://github.com/sourcegraph/src-cli/pull/1154)

## 6.1.0
- Support uploading GZIP compressed SCIP indexes [1146](https://github.com/sourcegraph/src-cli/pull/1146)
- Remove deprecated `lsif` subcommand, and remove LSIF->SCIP conversion support [1147](https://github.com/sourcegraph/src-cli/pull/1147)
Expand Down
9 changes: 9 additions & 0 deletions cmd/src/batch_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type batchExecuteFlags struct {
skipErrors bool
runAsRoot bool

// If true, fail fast on first error instead of continuing execution
failFast bool

// EXPERIMENTAL
textOnly bool
}
Expand Down Expand Up @@ -164,6 +167,11 @@ func newBatchExecuteFlags(flagSet *flag.FlagSet, cacheDir, tempDir string) *batc
"If true, forces all step containers to run as root.",
)

flagSet.BoolVar(
&caf.failFast, "fail-fast", false,
"Halts execution immediately upon first error instead of continuing with other tasks.",
)

return caf
}

Expand Down Expand Up @@ -430,6 +438,7 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error
TempDir: opts.flags.tempDir,
GlobalEnv: os.Environ(),
ForceRoot: opts.flags.runAsRoot,
FailFast: opts.flags.failFast,
BinaryDiffs: ffs.BinaryDiffs,
},
Logger: logManager,
Expand Down
4 changes: 2 additions & 2 deletions internal/batches/executor/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

type taskExecutor interface {
Start(context.Context, []*Task, TaskExecutionUI)
Wait(context.Context) ([]taskResult, error)
Wait() ([]taskResult, error)
}

// Coordinator coordinates the execution of Tasks. It makes use of an executor,
Expand Down Expand Up @@ -182,7 +182,7 @@ func (c *Coordinator) ExecuteAndBuildSpecs(ctx context.Context, batchSpec *batch

// Run executor.
c.exec.Start(ctx, tasks, ui)
results, errs := c.exec.Wait(ctx)
results, errs := c.exec.Wait()

// Write all step cache results to the cache.
for _, res := range results {
Expand Down
4 changes: 2 additions & 2 deletions internal/batches/executor/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func assertCacheSize(t *testing.T, cache *inMemoryExecutionCache, want int) {
// expectCachedResultForStep returns a function that can be used as a
// startCallback on dummyExecutor to assert that the first Task has no cached results.
func assertNoCachedResult(t *testing.T) func(context.Context, []*Task, TaskExecutionUI) {
return func(c context.Context, tasks []*Task, ui TaskExecutionUI) {
return func(_ context.Context, tasks []*Task, ui TaskExecutionUI) {
t.Helper()

task := tasks[0]
Expand Down Expand Up @@ -562,7 +562,7 @@ func (d *dummyExecutor) Start(ctx context.Context, ts []*Task, ui TaskExecutionU
// "noop noop noop", the crowd screams
}

func (d *dummyExecutor) Wait(context.Context) ([]taskResult, error) {
func (d *dummyExecutor) Wait() ([]taskResult, error) {
return d.results, d.waitErr
}

Expand Down
81 changes: 29 additions & 52 deletions internal/batches/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package executor
import (
"context"
"fmt"
"sync"
"time"

"github.com/neelance/parallel"
"github.com/sourcegraph/conc/pool"

"github.com/sourcegraph/sourcegraph/lib/errors"

Expand Down Expand Up @@ -69,26 +68,22 @@ type NewExecutorOpts struct {
IsRemote bool
GlobalEnv []string
ForceRoot bool
FailFast bool

BinaryDiffs bool
}

type executor struct {
opts NewExecutorOpts

par *parallel.Run
workPool *pool.ResultContextPool[*taskResult]
doneEnqueuing chan struct{}

results []taskResult
resultsMu sync.Mutex
}

func NewExecutor(opts NewExecutorOpts) *executor {
return &executor{
opts: opts,

opts: opts,
doneEnqueuing: make(chan struct{}),
par: parallel.NewRun(opts.Parallelism),
}
}

Expand All @@ -97,55 +92,45 @@ func NewExecutor(opts NewExecutorOpts) *executor {
func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI) {
defer func() { close(x.doneEnqueuing) }()

x.workPool = pool.NewWithResults[*taskResult]().WithMaxGoroutines(x.opts.Parallelism).WithContext(ctx)
if x.opts.FailFast {
x.workPool = x.workPool.WithCancelOnError()
}

for _, task := range tasks {
select {
case <-ctx.Done():
return
default:
}

x.par.Acquire()

go func(task *Task, ui TaskExecutionUI) {
defer x.par.Release()

select {
case <-ctx.Done():
return
default:
err := x.do(ctx, task, ui)
if err != nil {
x.par.Error(err)
}
}
}(task, ui)
x.workPool.Go(func(c context.Context) (*taskResult, error) {
return x.do(c, task, ui)
})
}
}

// Wait blocks until all Tasks enqueued with Start have been executed.
func (x *executor) Wait(ctx context.Context) ([]taskResult, error) {
func (x *executor) Wait() ([]taskResult, error) {
<-x.doneEnqueuing

result := make(chan error, 1)

go func(ch chan error) {
ch <- x.par.Wait()
}(result)

select {
case <-ctx.Done():
return x.results, ctx.Err()
case err := <-result:
close(result)
if err != nil {
return x.results, err
r, err := x.workPool.Wait()
results := make([]taskResult, len(r))
for i, r := range r {
if r == nil {
results[i] = taskResult{
task: nil,
stepResults: nil,
err: err,
}
} else {
results[i] = *r
}
}

return x.results, nil
return results, err
}

func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err error) {
func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (result *taskResult, err error) {
// Ensure that the status is updated when we're done.
defer func() {
ui.TaskFinished(task, err)
Expand All @@ -157,7 +142,7 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err
// Let's set up our logging.
l, err := x.opts.Logger.AddTask(util.SlugForPathInRepo(task.Repository.Name, task.Repository.Rev(), task.Path))
if err != nil {
return errors.Wrap(err, "creating log file")
return nil, errors.Wrap(err, "creating log file")
}
defer l.Close()

Expand Down Expand Up @@ -196,18 +181,10 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err
}
l.MarkErrored()
}
x.addResult(task, stepResults, err)

return err
}

func (x *executor) addResult(task *Task, stepResults []execution.AfterStepResult, err error) {
x.resultsMu.Lock()
defer x.resultsMu.Unlock()

x.results = append(x.results, taskResult{
return &taskResult{
task: task,
stepResults: stepResults,
err: err,
})
}, err
}
60 changes: 51 additions & 9 deletions internal/batches/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func TestExecutor_Integration(t *testing.T) {
wantFinishedWithErr int

wantCacheCount int

failFast bool
}{
{
name: "success",
Expand Down Expand Up @@ -316,6 +318,39 @@ func TestExecutor_Integration(t *testing.T) {
wantFinishedWithErr: 1,
wantCacheCount: 2,
},
{
name: "fail fast mode",
archives: []mock.RepoArchive{
{RepoName: testRepo1.Name, Commit: testRepo1.Rev(), Files: map[string]string{
"README.md": "# Welcome to the README\n",
}},
{RepoName: testRepo2.Name, Commit: testRepo2.Rev(), Files: map[string]string{
"README.md": "# Sourcegraph README\n",
}},
},
steps: []batcheslib.Step{
{
Run: `exit 1`,
// We must fail for the first repository, so that the other repo's work is cancelled in fail fast mode.
If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo1.Name),
},
{
// We introduce an artificial way for the second repository, so that it can't complete before the failure of the first one.
Run: `sleep 0.1`,
If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo2.Name),
},
{Run: `echo -e "foobar\n" >> README.md`},
},
tasks: []*Task{
{Repository: testRepo1},
{Repository: testRepo2},
},
wantErrInclude: "execution in github.com/sourcegraph/src-cli failed: run: exit 1",
// In fail fast mode, we expect that other steps are cancelled.
wantFinished: 0,
wantFinishedWithErr: 2,
failFast: true,
},
{
name: "mount path",
archives: []mock.RepoArchive{
Expand Down Expand Up @@ -376,17 +411,23 @@ func TestExecutor_Integration(t *testing.T) {
// Temp dir for log files and downloaded archives
testTempDir := t.TempDir()

cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images)
ctx := context.Background()
cr, _ := workspace.NewCreator(ctx, "bind", testTempDir, testTempDir, images)
// Setup executor
parallelism := 0
if tc.failFast {
parallelism = 1
}
opts := NewExecutorOpts{
Creator: cr,
RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false),
Logger: mock.LogNoOpManager{},
EnsureImage: imageMapEnsurer(images),

TempDir: testTempDir,
Parallelism: runtime.GOMAXPROCS(0),
Parallelism: runtime.GOMAXPROCS(parallelism),
Timeout: tc.executorTimeout,
FailFast: tc.failFast,
}

if opts.Timeout == 0 {
Expand All @@ -397,9 +438,9 @@ func TestExecutor_Integration(t *testing.T) {
executor := NewExecutor(opts)

// Run executor
executor.Start(context.Background(), tc.tasks, dummyUI)
executor.Start(ctx, tc.tasks, dummyUI)

results, err := executor.Wait(context.Background())
results, err := executor.Wait()
if tc.wantErrInclude == "" {
if err != nil {
t.Fatalf("execution failed: %s", err)
Expand Down Expand Up @@ -490,10 +531,10 @@ func TestExecutor_Integration(t *testing.T) {

// Make sure that all the Tasks have been updated correctly
if have, want := len(dummyUI.finished), tc.wantFinished; have != want {
t.Fatalf("wrong number of finished tasks. want=%d, have=%d", want, have)
t.Fatalf("wrong number of UI finished tasks. want=%d, have=%d", want, have)
}
if have, want := len(dummyUI.finishedWithErr), tc.wantFinishedWithErr; have != want {
t.Fatalf("wrong number of finished-with-err tasks. want=%d, have=%d", want, have)
t.Fatalf("wrong number of UI finished-with-err tasks. want=%d, have=%d", want, have)
}
})
}
Expand Down Expand Up @@ -797,7 +838,8 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive)
}
}

cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images)
ctx := context.Background()
cr, _ := workspace.NewCreator(ctx, "bind", testTempDir, testTempDir, images)
// Setup executor
executor := NewExecutor(NewExecutorOpts{
Creator: cr,
Expand All @@ -810,8 +852,8 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive)
Timeout: 30 * time.Second,
})

executor.Start(context.Background(), tasks, newDummyTaskExecutionUI())
return executor.Wait(context.Background())
executor.Start(ctx, tasks, newDummyTaskExecutionUI())
return executor.Wait()
}

func imageMapEnsurer(m map[string]docker.Image) imageEnsurer {
Expand Down
Loading