Skip to content

Commit

Permalink
Detect grpc size limit error to avoid infinite retry (#34762)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Mar 9, 2023
1 parent b6aa375 commit 02e71a0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
21 changes: 20 additions & 1 deletion libbeat/outputs/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,26 @@ func (s *shipper) publish(ctx context.Context, batch publisher.Batch) error {
Events: toSend,
})

if status.Code(err) != codes.OK {
if err != nil {
if status.Code(err) == codes.ResourceExhausted {
// This error can only come from the gRPC connection, and
// most likely indicates this request exceeds the shipper's
// RPC size limit. The correct thing to do here is split
// the batch as in https://github.com/elastic/beats/issues/29778.
// Since this isn't supported yet, we drop this batch to avoid
// permanently blocking the pipeline.
s.log.Errorf("dropping %d events because of RPC failure: %v", len(events), err)
batch.Drop()
s.observer.Dropped(len(events))
return nil
}
// All other known errors are, in theory, retryable once the
// RPC connection is successfully restarted, and don't depend on
// the contents of the request. We should be cautious, though: if an
// error is deterministic based on the contents of a publish
// request, then cancelling here (instead of dropping or retrying)
// will cause an infinite retry loop, wedging the pipeline.

batch.Cancelled() // does not decrease the TTL
s.observer.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events
return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(toSend), err)
Expand Down
15 changes: 15 additions & 0 deletions libbeat/outputs/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -210,6 +212,19 @@ func TestPublish(t *testing.T) {
serverError: errors.New("some error"),
expError: "failed to publish the batch to the shipper, none of the 3 events were accepted",
},
{
name: "drops the batch on resource exceeded error",
events: events,
expSignals: []outest.BatchSignal{
{
Tag: outest.BatchDrop,
},
},
marshalMethod: toShipperEvent,
qSize: 3,
observerExpected: &TestObserver{batch: 3, dropped: 3},
serverError: status.Error(codes.ResourceExhausted, "rpc size limit exceeded"),
},
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
Expand Down

0 comments on commit 02e71a0

Please sign in to comment.