Skip to content

Bahrmichael/dropped edits 1 #1158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,063 changes: 4,063 additions & 0 deletions batch-spec.yaml

Large diffs are not rendered by default.

135 changes: 129 additions & 6 deletions cmd/src/batch_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -505,8 +506,40 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error
execUI.LogFilesKept(logFiles)
}

specs = append(specs, freshSpecs...)
// Add a mutex to protect specs from race conditions
var specsMutex sync.Mutex

// Function to filter specs with empty diffs
filterValidSpecs := func(inputSpecs []*batcheslib.ChangesetSpec) []*batcheslib.ChangesetSpec {
var validSpecs []*batcheslib.ChangesetSpec
for _, spec := range inputSpecs {
diff, err := spec.Diff()
if err != nil {
// Log the error but continue processing other specs
execUI.ExecutingTasksSkippingErrors(errors.Wrap(err, "error getting diff"))
continue
}
if len(diff) > 0 {
validSpecs = append(validSpecs, spec)
} else {
execUI.ExecutingTasksSkippingErrors(errors.New("skipping spec with empty diff"))
}
}
return validSpecs
}

// Filter both cached and fresh specs
specsMutex.Lock()
// Filter previously cached specs
validCachedSpecs := filterValidSpecs(specs)
// Filter fresh specs
validFreshSpecs := filterValidSpecs(freshSpecs)

// Reset specs and add filtered specs
specs = validCachedSpecs
specs = append(specs, validFreshSpecs...)
specs = append(specs, importedSpecs...)
specsMutex.Unlock()

err = svc.ValidateChangesetSpecs(repos, specs)
if err != nil {
Expand All @@ -518,13 +551,103 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error
if len(specs) > 0 {
execUI.UploadingChangesetSpecs(len(specs))

// Create a mutex to synchronize access to the IDs slice
var idsMutex sync.Mutex
// Create a wait group to wait for all goroutines to finish
var wg sync.WaitGroup

// Limit concurrency to avoid overloading the server
semaphore := make(chan struct{}, 3) // Reduced from 5 to 3 to reduce load

// Create a map to track successful uploads
var successfulUploads sync.Map

// Create a slice to hold any errors for skipping
var uploadErrors []error
var errorsMutex sync.Mutex

for i, spec := range specs {
id, err := svc.CreateChangesetSpec(ctx, spec)
if err != nil {
return err
// Capture loop variables
specIndex := i
currentSpec := spec

wg.Add(1)
go func() {
defer wg.Done()
// Acquire semaphore
semaphore <- struct{}{}
defer func() { <-semaphore }()

// Double-check diff is not empty before sending (extra safeguard)
diff, err := currentSpec.Diff()
if err != nil {
errorsMutex.Lock()
uploadErrors = append(uploadErrors, errors.Wrapf(err, "spec %d: error getting diff", specIndex))
errorsMutex.Unlock()
return
}
if len(diff) == 0 {
errorsMutex.Lock()
uploadErrors = append(uploadErrors, errors.Newf("spec %d: diff is empty", specIndex))
errorsMutex.Unlock()
return
}

// Create the changeset spec with retries for transient network errors
const maxRetries = 3
var retryCount int
var id graphql.ChangesetSpecID
var createErr error

for retryCount < maxRetries {
id, createErr = svc.CreateChangesetSpec(ctx, currentSpec)
if createErr == nil {
break // Success
}

// If it's a network error, retry after a short delay
if strings.Contains(createErr.Error(), "Transport") ||
strings.Contains(createErr.Error(), "connection") ||
strings.Contains(createErr.Error(), "timeout") {
retryCount++
time.Sleep(time.Duration(retryCount*500) * time.Millisecond) // Exponential backoff
continue
}

// Non-retryable error
break
}

if createErr != nil {
errorsMutex.Lock()
uploadErrors = append(uploadErrors, errors.Wrapf(createErr, "error creating changeset spec %d", specIndex))
errorsMutex.Unlock()
return
}

// Mark as successful
successfulUploads.Store(specIndex, id)

// Safely update the IDs slice
idsMutex.Lock()
ids[specIndex] = id
idsMutex.Unlock()

execUI.UploadingChangesetSpecsProgress(specIndex+1, len(specs))
}()
}

// Wait for all goroutines to finish
wg.Wait()

// Log any errors
if len(uploadErrors) > 0 && opts.flags.skipErrors {
for _, err := range uploadErrors {
execUI.ExecutingTasksSkippingErrors(err)
}
ids[i] = id
execUI.UploadingChangesetSpecsProgress(i+1, len(specs))
} else if len(uploadErrors) > 0 {
// If we're not skipping errors and we have at least one error, return the first one
return uploadErrors[0]
}

execUI.UploadingChangesetSpecsSuccess(ids)
Expand Down
20 changes: 20 additions & 0 deletions fix-idea-1-buffer-capacity.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Fix Idea 1: Buffer Capacity Issues

## Problem
When processing large batch changes with thousands of directory-branch mappings, the system may hit buffer limits in the diff generation process, resulting in silently dropped edits and empty diffs.

## Proposed Solution
Increase buffer sizes and add explicit buffer overflow checks in critical code paths that handle diff generation:

1. Review all buffer allocations in the diff generation process and increase capacity for large batch operations
2. Add explicit bounds checking when writing diffs to buffers
3. Implement fallback mechanisms to chunk or paginate extremely large diffs
4. Add more robust error handling for buffer-related failures

## Implementation Areas
- `internal/batches/diff` package: Review any buffer limits
- `internal/batches/executor/run_steps.go`: Add additional validation of diff content before processing
- Git related functions: Ensure they can handle large diffs without truncation

## Expected Outcome
The system should either properly handle large diffs without dropping edits or fail with a clear error message about size limitations rather than silently dropping content.
21 changes: 21 additions & 0 deletions fix-idea-2-race-conditions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Fix Idea 2: Race Conditions in Parallel Processing

## Problem
The batch processing system runs multiple operations in parallel, which could lead to race conditions when accessing shared resources or when handling diffs. These race conditions might cause edits to be dropped silently.

## Proposed Solution
Implement stricter concurrency controls and better synchronization:

1. Review all concurrent operations during batch processing, particularly in `RunSteps` and related functions
2. Add mutex locks around critical sections that modify shared data structures
3. Consider using channels for safer communication between concurrent operations
4. Add transaction-like semantics to ensure all edits within a changeset are processed atomically
5. Implement retry mechanisms for concurrent operations that might fail due to race conditions

## Implementation Areas
- `internal/batches/executor/run_steps.go`: Add synchronization around diff generation and validation
- `cmd/src/batch_common.go`: Review concurrent operations in changeset spec creation
- Consider adding a concurrency limit based on the size of batch specs

## Expected Outcome
Eliminate race conditions that could lead to dropped edits, ensuring that all changes are consistently applied even with high levels of parallelism.
21 changes: 21 additions & 0 deletions fix-idea-3-memory-optimization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Fix Idea 3: Memory Management and Optimization

## Problem
With large batch specs containing thousands of changesets, the system may experience memory pressure, resulting in edits being dropped due to out-of-memory conditions or aggressive garbage collection cycles.

## Proposed Solution
Improve memory management and implement batch processing optimizations:

1. Implement chunking for very large batch specs, processing them in smaller groups
2. Add memory usage monitoring during batch processing to detect potential issues
3. Review all large data structures and optimize for memory efficiency
4. Consider streaming approaches for diff handling rather than loading everything into memory
5. Implement configurable limits for batch size with clear error messages when exceeded

## Implementation Areas
- `cmd/src/batch_common.go`: Add chunking logic for large batch specs
- `internal/batches/executor/run_steps.go`: Optimize memory use in diff handling
- Consider adding a memory profiling option for batch operations to identify bottlenecks

## Expected Outcome
The system should handle large batch specs efficiently without running into memory issues that could cause edits to be silently dropped. Any memory-related limitations should result in clear error messages rather than silent failures.
50 changes: 47 additions & 3 deletions internal/batches/executor/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"context"
"sync"

"github.com/sourcegraph/sourcegraph/lib/errors"

Expand All @@ -24,6 +25,11 @@ type Coordinator struct {
opts NewCoordinatorOpts

exec taskExecutor

// cacheMutex protects concurrent access to the cache
cacheMutex sync.Mutex
// specsMutex protects access to the changesets specs during build
specsMutex sync.Mutex
}

type NewCoordinatorOpts struct {
Expand Down Expand Up @@ -66,6 +72,10 @@ func (c *Coordinator) CheckCache(ctx context.Context, batchSpec *batcheslib.Batc
}

func (c *Coordinator) ClearCache(ctx context.Context, tasks []*Task) error {
// Lock to protect cache operations from race conditions
c.cacheMutex.Lock()
defer c.cacheMutex.Unlock()

for _, task := range tasks {
for i := len(task.Steps) - 1; i > -1; i-- {
key := task.CacheKey(c.opts.GlobalEnv, c.opts.ExecOpts.WorkingDirectory, i)
Expand All @@ -86,22 +96,39 @@ func (c *Coordinator) checkCacheForTask(ctx context.Context, batchSpec *batchesl
// we build changeset specs and return.
// TODO: This doesn't consider skipped steps.
if task.CachedStepResultFound && task.CachedStepResult.StepIndex == len(task.Steps)-1 {
// Lock to protect cache operations and ensure atomicity
c.cacheMutex.Lock()

// If the cached result resulted in an empty diff, we don't need to
// add it to the list of specs that are displayed to the user and
// send to the server. Instead, we can just report that the task is
// complete and move on.
if len(task.CachedStepResult.Diff) == 0 {
return specs, true, nil
c.cacheMutex.Unlock()
// Force re-execution by clearing cache for this task
key := task.CacheKey(c.opts.GlobalEnv, c.opts.ExecOpts.WorkingDirectory, task.CachedStepResult.StepIndex)
c.opts.Cache.Clear(ctx, key)
return specs, false, nil // Return false to force re-execution
}

specs, err = c.buildChangesetSpecs(task, batchSpec, task.CachedStepResult)
c.cacheMutex.Unlock()
return specs, true, err
}

return specs, false, nil
}

func (c *Coordinator) buildChangesetSpecs(task *Task, batchSpec *batcheslib.BatchSpec, result execution.AfterStepResult) ([]*batcheslib.ChangesetSpec, error) {
// Lock to protect spec building and ensure atomicity
c.specsMutex.Lock()
defer c.specsMutex.Unlock()

// Validate diff is not empty
if len(result.Diff) == 0 {
return nil, errors.New("diff was empty during changeset spec creation")
}

version := 1
if c.opts.BinaryDiffs {
version = 2
Expand Down Expand Up @@ -131,6 +158,10 @@ func (c *Coordinator) buildChangesetSpecs(task *Task, batchSpec *batcheslib.Batc
}

func (c *Coordinator) loadCachedStepResults(ctx context.Context, task *Task, globalEnv []string) error {
// Lock to protect cache operations from race conditions
c.cacheMutex.Lock()
defer c.cacheMutex.Unlock()

// We start at the back so that we can find the _last_ cached step,
// then restart execution on the following step.
for i := len(task.Steps) - 1; i > -1; i-- {
Expand All @@ -153,6 +184,10 @@ func (c *Coordinator) loadCachedStepResults(ctx context.Context, task *Task, glo
}

func (c *Coordinator) buildSpecs(ctx context.Context, batchSpec *batcheslib.BatchSpec, taskResult taskResult, ui TaskExecutionUI) ([]*batcheslib.ChangesetSpec, error) {
// Lock to protect spec building from race conditions
c.specsMutex.Lock()
defer c.specsMutex.Unlock()

if len(taskResult.stepResults) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -184,20 +219,29 @@ func (c *Coordinator) ExecuteAndBuildSpecs(ctx context.Context, batchSpec *batch
c.exec.Start(ctx, tasks, ui)
results, errs := c.exec.Wait()

// Create a copy of results to safely iterate over during cache operations
resultsCopy := make([]taskResult, len(results))
copy(resultsCopy, results)

// Write all step cache results to the cache.
for _, res := range results {
// Lock to protect cache operations from race conditions
c.cacheMutex.Lock()
for _, res := range resultsCopy {
for _, stepRes := range res.stepResults {
cacheKey := res.task.CacheKey(c.opts.GlobalEnv, c.opts.ExecOpts.WorkingDirectory, stepRes.StepIndex)
if err := c.opts.Cache.Set(ctx, cacheKey, stepRes); err != nil {
c.cacheMutex.Unlock() // Release the lock before returning
return nil, nil, errors.Wrapf(err, "caching result for step %d", stepRes.StepIndex)
}
}
}
c.cacheMutex.Unlock()

var specs []*batcheslib.ChangesetSpec

// Build ChangesetSpecs if possible and add to list.
for _, taskResult := range results {
// Using the copy of results to avoid race conditions
for _, taskResult := range resultsCopy {
// Don't build changeset specs for failed workspaces.
if taskResult.err != nil {
continue
Expand Down
1 change: 1 addition & 0 deletions internal/batches/executor/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@

// Run with a cold cache.
t.Run("cold cache", func(t *testing.T) {
t.Timeout(60 * time.Second) // Reduce timeout to 60 seconds to fail faster

Check failure on line 344 in internal/batches/executor/coordinator_test.go

View workflow job for this annotation

GitHub Actions / go-test (1.24.1, ubuntu-latest)

t.Timeout undefined (type *testing.T has no field or method Timeout)

Check failure on line 344 in internal/batches/executor/coordinator_test.go

View workflow job for this annotation

GitHub Actions / go-test (1.24.1, ubuntu-latest)

undefined: time

Check failure on line 344 in internal/batches/executor/coordinator_test.go

View workflow job for this annotation

GitHub Actions / go-lint

t.Timeout undefined (type *testing.T has no field or method Timeout)

Check failure on line 344 in internal/batches/executor/coordinator_test.go

View workflow job for this annotation

GitHub Actions / go-lint

undefined: time (typecheck)

Check failure on line 344 in internal/batches/executor/coordinator_test.go

View workflow job for this annotation

GitHub Actions / go-test (1.24.1, macos-latest)

t.Timeout undefined (type *testing.T has no field or method Timeout)

Check failure on line 344 in internal/batches/executor/coordinator_test.go

View workflow job for this annotation

GitHub Actions / go-test (1.24.1, macos-latest)

undefined: time
execute(t)
verifyCache(t)
})
Expand Down
Loading
Loading