Skip to content

Commit

Permalink
resubscribe on more specific error
Browse files Browse the repository at this point in the history
  • Loading branch information
kuiperda committed Sep 16, 2024
1 parent 2d49493 commit 961aa95
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
4 changes: 3 additions & 1 deletion pkg/stanza/operator/input/windows/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,14 @@ func (i *Input) readToEnd(ctx context.Context) {
}
}

var errHandleInvalid = errors.New("The handle is invalid.")

// read will read events from the subscription.
func (i *Input) read(ctx context.Context) int {
events, err := i.subscription.Read(i.maxReads)
if err != nil {
i.Logger().Error("Failed to read events from subscription", zap.Error(err))
if i.isRemote() {
if i.isRemote() && (err == errHandleInvalid || err == errSubscriptionNotOpen) {
i.Logger().Info("Resubscribing, closing subscription")
closeErr := i.subscription.Close()
if closeErr != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/stanza/operator/input/windows/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ func (s *Subscription) Close() error {
return nil
}

var errSubscriptionNotOpen = errors.New("subscription handle is not open")

// Read will read events from the subscription.
func (s *Subscription) Read(maxReads int) ([]Event, error) {
if s.handle == 0 {
return nil, fmt.Errorf("subscription handle is not open")
return nil, errSubscriptionNotOpen
}

if maxReads < 1 {
Expand Down

0 comments on commit 961aa95

Please sign in to comment.