Skip to content

Commit

Permalink
Fix BES error when bazel doesn't receive all ACKs (#3998)
Browse files Browse the repository at this point in the history
  • Loading branch information
bduffany authored May 22, 2023
1 parent 0ddd3b0 commit 295765a
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 5 deletions.
3 changes: 3 additions & 0 deletions cli/devnull/devnull.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func (c *nullEventChannel) HandleEvent(event *pepb.PublishBuildToolEventStreamRe
func (c *nullEventChannel) GetNumDroppedEvents() uint64 {
return 0
}
func (c *nullEventChannel) GetInitialSequenceNumber() int64 {
return 1
}
func (c *nullEventChannel) Close() {}

func (c *nullEventChannel) Context() context.Context {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ const (

// Exit code in Finished event indicating that the build was interrupted (i.e. killed by user).
InterruptedExitCode = 8

// First sequence number that we expect to see in the ordered build event
// stream.
firstExpectedSequenceNumber = 1
)

var (
Expand Down Expand Up @@ -758,6 +762,7 @@ type EventChannel struct {
bufferedEvents []*inpb.InvocationEvent
unprocessedStartingEvents map[string]struct{}
numDroppedEventsBeforeProcessing uint64
initialSequenceNumber int64
hasReceivedEventWithOptions bool
hasReceivedStartedEvent bool
logWriter *eventlog.EventLogWriter
Expand Down Expand Up @@ -917,6 +922,26 @@ func (e *EventChannel) handleEvent(event *pepb.PublishBuildToolEventStreamReques
streamID := event.OrderedBuildEvent.StreamId
iid := streamID.InvocationId

if e.initialSequenceNumber == 0 {
e.initialSequenceNumber = seqNo
}
// We only allow initial sequence numbers greater than one in the case where
// Bazel failed to receive all of our ACKs after we finalized an invocation
// (marking it complete). In that case we just void the channel and ACK all
// events without doing any work. Otherwise, we treat it as a client error.
if e.initialSequenceNumber > firstExpectedSequenceNumber {
in, err := e.env.GetInvocationDB().LookupInvocation(e.ctx, iid)
if err != nil {
return status.WrapErrorf(err, "build event had unexpected initial sequence number %d, and failed to look up existing invocation", seqNo)
}
if in.InvocationStatus == int64(inspb.InvocationStatus_COMPLETE_INVOCATION_STATUS) {
log.Infof("Voiding EventChannel for invocation %s: invocation is already finalized", iid)
e.isVoid = true
return nil
}
return status.InvalidArgumentErrorf("received initial sequence number %d for build event stream retry of incomplete invocation, but expected sequence number %d", seqNo, firstExpectedSequenceNumber)
}

if isFinalEvent(event.OrderedBuildEvent) {
return nil
}
Expand Down Expand Up @@ -1225,6 +1250,10 @@ func (e *EventChannel) GetNumDroppedEvents() uint64 {
return e.numDroppedEventsBeforeProcessing
}

func (e *EventChannel) GetInitialSequenceNumber() int64 {
return e.initialSequenceNumber
}

func extractOptions(event *build_event_stream.BuildEvent) (string, error) {
switch p := event.Payload.(type) {
case *build_event_stream.BuildEvent_Started:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func TestHandleEventWithUsageTracking(t *testing.T) {
channel := handler.OpenChannel(ctx, testInvocationID)

// Send started event with api key
request := streamRequest(startedEvent("--remote_header='"+testauth.APIKeyHeader+"=USER1' --should_be_redacted=USER1"), testInvocationID, 2)
request := streamRequest(startedEvent("--remote_header='"+testauth.APIKeyHeader+"=USER1' --should_be_redacted=USER1"), testInvocationID, 1)
err = channel.HandleEvent(request)
assert.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,14 @@ func postProcessStream(ctx context.Context, channel interfaces.BuildEventChannel
// everything all at once, which means we don't need to worry about
// cross-server consistency of messages in an invocation.
sort.Sort(sort.IntSlice(acks))
for i, ack := range acks {
if ack != i+1 {
log.CtxWarningf(ctx, "Missing ack: saw %d and wanted %d. Bailing!", ack, i+1)
return status.UnknownErrorf("event sequence number mismatch: received %d, wanted %d", ack, i+1)

expectedSeqNo := channel.GetInitialSequenceNumber()
for _, ack := range acks {
if ack != int(expectedSeqNo) {
log.CtxWarningf(ctx, "Missing ack: saw %d and wanted %d. Bailing!", ack, expectedSeqNo)
return status.UnknownErrorf("event sequence number mismatch: received %d, wanted %d", ack, expectedSeqNo)
}
expectedSeqNo++
}

if channel != nil {
Expand Down
1 change: 1 addition & 0 deletions server/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ type BuildEventChannel interface {
FinalizeInvocation(iid string) error
HandleEvent(event *pepb.PublishBuildToolEventStreamRequest) error
GetNumDroppedEvents() uint64
GetInitialSequenceNumber() int64
Close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,31 @@ func testInjectFailureAfterBazelEvent(t *testing.T, payloadMsg interface{}) {
assert.Equal(t, true, res.Invocation[0].Success)
}

func TestBuildWithRetry_InjectFailureWhileServerIsSendingACKs(t *testing.T) {
app := buildbuddy.Run(t)
bepClient := app.PublishBuildEventClient(t)
proxy := StartBEPProxy(t, bepClient)
proxy.FailOnce(BeforeServerSendsNthACK(2))

ctx := context.Background()
ws := testbazel.MakeTempWorkspace(t, workspaceContents)
buildFlags := append([]string{"//:hello.txt"}, app.BESBazelFlags()...)
buildFlags = append(buildFlags, "--bes_backend="+proxy.GRPCAddress())

result := testbazel.Invoke(ctx, t, ws, "build", buildFlags...)

require.NoError(t, result.Error)
assert.Contains(t, result.Stderr, "Build completed successfully")
bbService := app.BuildBuddyServiceClient(t)
res, err := bbService.GetInvocation(
context.Background(),
&inpb.GetInvocationRequest{Lookup: &inpb.InvocationLookup{InvocationId: result.InvocationID}})
require.NoError(t, err)
require.GreaterOrEqual(t, len(res.Invocation), 0)
assert.Equal(t, inspb.InvocationStatus_COMPLETE_INVOCATION_STATUS, res.Invocation[0].GetInvocationStatus())
assert.Equal(t, true, res.Invocation[0].Success)
}

// StreamEvent represents an event occuring in the stream proxy as it forwards
// messages between the client and server during a streaming RPC. Exactly one
// of ServerRecv or ClientRecv will be non-nil.
Expand Down Expand Up @@ -138,6 +163,20 @@ func AfterForwardBazelEvent(t *testing.T, payloadMsg interface{}) StreamErrorInj
}
}

// BeforeServerSendsNthACK returns an error injector that injects an error just
// before the server sends the n'th ACK.
func BeforeServerSendsNthACK(n int64) StreamErrorInjector {
return func(event *StreamEvent) error {
if event.ServerRecv == nil || event.ServerRecv.res == nil {
return nil
}
if event.ServerRecv.res.SequenceNumber == n {
return status.UnavailableErrorf("Proxy: Injected error before server sends ACK %d", n)
}
return nil
}
}

// BEPProxyServer sits in between Bazel and the build event server, proxying
// messages directly. It allows injecting errors into the stream to ensure that
// they are handled properly.
Expand Down

0 comments on commit 295765a

Please sign in to comment.