Skip to content

Commit 8d95a13

Browse files
authored
feat: introduce -fail-fast flag for src batch (#1154)
1 parent 912ba5c commit 8d95a13

File tree

6 files changed

+97
-65
lines changed

6 files changed

+97
-65
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ All notable changes to `src-cli` are documented in this file.
1111

1212
## Unreleased
1313

14+
### Added
15+
16+
- Batch Changes: Added `-fail-fast` flag to `src batch preview` and `src batch apply` that causes execution to immediately halt on the first error instead of continuing with other repositories. This enables faster iteration on batch specs. [#1154](https://github.com/sourcegraph/src-cli/pull/1154)
17+
1418
## 6.1.0
1519
- Support uploading GZIP compressed SCIP indexes [1146](https://github.com/sourcegraph/src-cli/pull/1146)
1620
- Remove deprecated `lsif` subcommand, and remove LSIF->SCIP conversion support [1147](https://github.com/sourcegraph/src-cli/pull/1147)

cmd/src/batch_common.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ type batchExecuteFlags struct {
9898
skipErrors bool
9999
runAsRoot bool
100100

101+
// If true, fail fast on first error instead of continuing execution
102+
failFast bool
103+
101104
// EXPERIMENTAL
102105
textOnly bool
103106
}
@@ -164,6 +167,11 @@ func newBatchExecuteFlags(flagSet *flag.FlagSet, cacheDir, tempDir string) *batc
164167
"If true, forces all step containers to run as root.",
165168
)
166169

170+
flagSet.BoolVar(
171+
&caf.failFast, "fail-fast", false,
172+
"Halts execution immediately upon first error instead of continuing with other tasks.",
173+
)
174+
167175
return caf
168176
}
169177

@@ -430,6 +438,7 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error
430438
TempDir: opts.flags.tempDir,
431439
GlobalEnv: os.Environ(),
432440
ForceRoot: opts.flags.runAsRoot,
441+
FailFast: opts.flags.failFast,
433442
BinaryDiffs: ffs.BinaryDiffs,
434443
},
435444
Logger: logManager,

internal/batches/executor/coordinator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
type taskExecutor interface {
1616
Start(context.Context, []*Task, TaskExecutionUI)
17-
Wait(context.Context) ([]taskResult, error)
17+
Wait() ([]taskResult, error)
1818
}
1919

2020
// Coordinator coordinates the execution of Tasks. It makes use of an executor,
@@ -182,7 +182,7 @@ func (c *Coordinator) ExecuteAndBuildSpecs(ctx context.Context, batchSpec *batch
182182

183183
// Run executor.
184184
c.exec.Start(ctx, tasks, ui)
185-
results, errs := c.exec.Wait(ctx)
185+
results, errs := c.exec.Wait()
186186

187187
// Write all step cache results to the cache.
188188
for _, res := range results {

internal/batches/executor/coordinator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ func assertCacheSize(t *testing.T, cache *inMemoryExecutionCache, want int) {
481481
// expectCachedResultForStep returns a function that can be used as a
482482
// startCallback on dummyExecutor to assert that the first Task has no cached results.
483483
func assertNoCachedResult(t *testing.T) func(context.Context, []*Task, TaskExecutionUI) {
484-
return func(c context.Context, tasks []*Task, ui TaskExecutionUI) {
484+
return func(_ context.Context, tasks []*Task, ui TaskExecutionUI) {
485485
t.Helper()
486486

487487
task := tasks[0]
@@ -562,7 +562,7 @@ func (d *dummyExecutor) Start(ctx context.Context, ts []*Task, ui TaskExecutionU
562562
// "noop noop noop", the crowd screams
563563
}
564564

565-
func (d *dummyExecutor) Wait(context.Context) ([]taskResult, error) {
565+
func (d *dummyExecutor) Wait() ([]taskResult, error) {
566566
return d.results, d.waitErr
567567
}
568568

internal/batches/executor/executor.go

Lines changed: 29 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ package executor
33
import (
44
"context"
55
"fmt"
6-
"sync"
76
"time"
87

9-
"github.com/neelance/parallel"
8+
"github.com/sourcegraph/conc/pool"
109

1110
"github.com/sourcegraph/sourcegraph/lib/errors"
1211

@@ -69,26 +68,22 @@ type NewExecutorOpts struct {
6968
IsRemote bool
7069
GlobalEnv []string
7170
ForceRoot bool
71+
FailFast bool
7272

7373
BinaryDiffs bool
7474
}
7575

7676
type executor struct {
7777
opts NewExecutorOpts
7878

79-
par *parallel.Run
79+
workPool *pool.ResultContextPool[*taskResult]
8080
doneEnqueuing chan struct{}
81-
82-
results []taskResult
83-
resultsMu sync.Mutex
8481
}
8582

8683
func NewExecutor(opts NewExecutorOpts) *executor {
8784
return &executor{
88-
opts: opts,
89-
85+
opts: opts,
9086
doneEnqueuing: make(chan struct{}),
91-
par: parallel.NewRun(opts.Parallelism),
9287
}
9388
}
9489

@@ -97,55 +92,45 @@ func NewExecutor(opts NewExecutorOpts) *executor {
9792
func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI) {
9893
defer func() { close(x.doneEnqueuing) }()
9994

95+
x.workPool = pool.NewWithResults[*taskResult]().WithMaxGoroutines(x.opts.Parallelism).WithContext(ctx)
96+
if x.opts.FailFast {
97+
x.workPool = x.workPool.WithCancelOnError()
98+
}
99+
100100
for _, task := range tasks {
101101
select {
102102
case <-ctx.Done():
103103
return
104104
default:
105105
}
106106

107-
x.par.Acquire()
108-
109-
go func(task *Task, ui TaskExecutionUI) {
110-
defer x.par.Release()
111-
112-
select {
113-
case <-ctx.Done():
114-
return
115-
default:
116-
err := x.do(ctx, task, ui)
117-
if err != nil {
118-
x.par.Error(err)
119-
}
120-
}
121-
}(task, ui)
107+
x.workPool.Go(func(c context.Context) (*taskResult, error) {
108+
return x.do(c, task, ui)
109+
})
122110
}
123111
}
124112

125113
// Wait blocks until all Tasks enqueued with Start have been executed.
126-
func (x *executor) Wait(ctx context.Context) ([]taskResult, error) {
114+
func (x *executor) Wait() ([]taskResult, error) {
127115
<-x.doneEnqueuing
128116

129-
result := make(chan error, 1)
130-
131-
go func(ch chan error) {
132-
ch <- x.par.Wait()
133-
}(result)
134-
135-
select {
136-
case <-ctx.Done():
137-
return x.results, ctx.Err()
138-
case err := <-result:
139-
close(result)
140-
if err != nil {
141-
return x.results, err
117+
r, err := x.workPool.Wait()
118+
results := make([]taskResult, len(r))
119+
for i, r := range r {
120+
if r == nil {
121+
results[i] = taskResult{
122+
task: nil,
123+
stepResults: nil,
124+
err: err,
125+
}
126+
} else {
127+
results[i] = *r
142128
}
143129
}
144-
145-
return x.results, nil
130+
return results, err
146131
}
147132

148-
func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err error) {
133+
func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (result *taskResult, err error) {
149134
// Ensure that the status is updated when we're done.
150135
defer func() {
151136
ui.TaskFinished(task, err)
@@ -157,7 +142,7 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err
157142
// Let's set up our logging.
158143
l, err := x.opts.Logger.AddTask(util.SlugForPathInRepo(task.Repository.Name, task.Repository.Rev(), task.Path))
159144
if err != nil {
160-
return errors.Wrap(err, "creating log file")
145+
return nil, errors.Wrap(err, "creating log file")
161146
}
162147
defer l.Close()
163148

@@ -196,18 +181,10 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err
196181
}
197182
l.MarkErrored()
198183
}
199-
x.addResult(task, stepResults, err)
200-
201-
return err
202-
}
203-
204-
func (x *executor) addResult(task *Task, stepResults []execution.AfterStepResult, err error) {
205-
x.resultsMu.Lock()
206-
defer x.resultsMu.Unlock()
207184

208-
x.results = append(x.results, taskResult{
185+
return &taskResult{
209186
task: task,
210187
stepResults: stepResults,
211188
err: err,
212-
})
189+
}, err
213190
}

internal/batches/executor/executor_test.go

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ func TestExecutor_Integration(t *testing.T) {
7878
wantFinishedWithErr int
7979

8080
wantCacheCount int
81+
82+
failFast bool
8183
}{
8284
{
8385
name: "success",
@@ -316,6 +318,39 @@ func TestExecutor_Integration(t *testing.T) {
316318
wantFinishedWithErr: 1,
317319
wantCacheCount: 2,
318320
},
321+
{
322+
name: "fail fast mode",
323+
archives: []mock.RepoArchive{
324+
{RepoName: testRepo1.Name, Commit: testRepo1.Rev(), Files: map[string]string{
325+
"README.md": "# Welcome to the README\n",
326+
}},
327+
{RepoName: testRepo2.Name, Commit: testRepo2.Rev(), Files: map[string]string{
328+
"README.md": "# Sourcegraph README\n",
329+
}},
330+
},
331+
steps: []batcheslib.Step{
332+
{
333+
Run: `exit 1`,
334+
// We must fail for the first repository, so that the other repo's work is cancelled in fail fast mode.
335+
If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo1.Name),
336+
},
337+
{
338+
// We introduce an artificial way for the second repository, so that it can't complete before the failure of the first one.
339+
Run: `sleep 0.1`,
340+
If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo2.Name),
341+
},
342+
{Run: `echo -e "foobar\n" >> README.md`},
343+
},
344+
tasks: []*Task{
345+
{Repository: testRepo1},
346+
{Repository: testRepo2},
347+
},
348+
wantErrInclude: "execution in github.com/sourcegraph/src-cli failed: run: exit 1",
349+
// In fail fast mode, we expect that other steps are cancelled.
350+
wantFinished: 0,
351+
wantFinishedWithErr: 2,
352+
failFast: true,
353+
},
319354
{
320355
name: "mount path",
321356
archives: []mock.RepoArchive{
@@ -376,17 +411,23 @@ func TestExecutor_Integration(t *testing.T) {
376411
// Temp dir for log files and downloaded archives
377412
testTempDir := t.TempDir()
378413

379-
cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images)
414+
ctx := context.Background()
415+
cr, _ := workspace.NewCreator(ctx, "bind", testTempDir, testTempDir, images)
380416
// Setup executor
417+
parallelism := 0
418+
if tc.failFast {
419+
parallelism = 1
420+
}
381421
opts := NewExecutorOpts{
382422
Creator: cr,
383423
RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false),
384424
Logger: mock.LogNoOpManager{},
385425
EnsureImage: imageMapEnsurer(images),
386426

387427
TempDir: testTempDir,
388-
Parallelism: runtime.GOMAXPROCS(0),
428+
Parallelism: runtime.GOMAXPROCS(parallelism),
389429
Timeout: tc.executorTimeout,
430+
FailFast: tc.failFast,
390431
}
391432

392433
if opts.Timeout == 0 {
@@ -397,9 +438,9 @@ func TestExecutor_Integration(t *testing.T) {
397438
executor := NewExecutor(opts)
398439

399440
// Run executor
400-
executor.Start(context.Background(), tc.tasks, dummyUI)
441+
executor.Start(ctx, tc.tasks, dummyUI)
401442

402-
results, err := executor.Wait(context.Background())
443+
results, err := executor.Wait()
403444
if tc.wantErrInclude == "" {
404445
if err != nil {
405446
t.Fatalf("execution failed: %s", err)
@@ -490,10 +531,10 @@ func TestExecutor_Integration(t *testing.T) {
490531

491532
// Make sure that all the Tasks have been updated correctly
492533
if have, want := len(dummyUI.finished), tc.wantFinished; have != want {
493-
t.Fatalf("wrong number of finished tasks. want=%d, have=%d", want, have)
534+
t.Fatalf("wrong number of UI finished tasks. want=%d, have=%d", want, have)
494535
}
495536
if have, want := len(dummyUI.finishedWithErr), tc.wantFinishedWithErr; have != want {
496-
t.Fatalf("wrong number of finished-with-err tasks. want=%d, have=%d", want, have)
537+
t.Fatalf("wrong number of UI finished-with-err tasks. want=%d, have=%d", want, have)
497538
}
498539
})
499540
}
@@ -797,7 +838,8 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive)
797838
}
798839
}
799840

800-
cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images)
841+
ctx := context.Background()
842+
cr, _ := workspace.NewCreator(ctx, "bind", testTempDir, testTempDir, images)
801843
// Setup executor
802844
executor := NewExecutor(NewExecutorOpts{
803845
Creator: cr,
@@ -810,8 +852,8 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive)
810852
Timeout: 30 * time.Second,
811853
})
812854

813-
executor.Start(context.Background(), tasks, newDummyTaskExecutionUI())
814-
return executor.Wait(context.Background())
855+
executor.Start(ctx, tasks, newDummyTaskExecutionUI())
856+
return executor.Wait()
815857
}
816858

817859
func imageMapEnsurer(m map[string]docker.Image) imageEnsurer {

0 commit comments

Comments
 (0)