Skip to content

Commit

Permalink
Fix some minor issues around docs etc.
Browse files Browse the repository at this point in the history
See #914
  • Loading branch information
olivere committed Sep 27, 2018
1 parent 6560fa1 commit 005b2f1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
20 changes: 14 additions & 6 deletions bulk_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ import (
"time"
)

var ErrBulkItemRetry = errors.New("Uncommitted bulk response items")
var (
// ErrBulkItemRetry is returned in BulkProcessor from a worker when
// a response item needs to be retried.
ErrBulkItemRetry = errors.New("elastic: uncommitted bulk response items")

defaultRetryItemStatusCodes = []int{408, 429, 503, 507}
)

// BulkProcessorService allows to easily process bulk requests. It allows setting
// policies when to flush new bulk requests, e.g. based on a number of actions,
Expand All @@ -24,7 +30,9 @@ var ErrBulkItemRetry = errors.New("Uncommitted bulk response items")
// BulkProcessorService, by default, commits either every 1000 requests or when the
// (estimated) size of the bulk requests exceeds 5 MB. However, it does not
// commit periodically. BulkProcessorService also does retry by default, using
// an exponential backoff algorithm.
// an exponential backoff algorithm. It also will automatically re-enqueue items
// returned with a status of 408, 429, 503 or 507. You can change this
// behavior with RetryItemStatusCodes.
//
// The caller is responsible for setting the index and type on every
// bulk request added to BulkProcessorService.
Expand Down Expand Up @@ -57,7 +65,7 @@ func NewBulkProcessorService(client *Client) *BulkProcessorService {
time.Duration(200)*time.Millisecond,
time.Duration(10000)*time.Millisecond,
),
retryItemStatusCodes: []int{408, 429, 503, 507},
retryItemStatusCodes: defaultRetryItemStatusCodes,
}
}

Expand Down Expand Up @@ -134,7 +142,7 @@ func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService {
}

// RetryItemStatusCodes sets an array of status codes that indicate that a bulk
// response line item may be retried
// response line item should be retried.
func (s *BulkProcessorService) RetryItemStatusCodes(retryItemStatusCodes ...int) *BulkProcessorService {
s.retryItemStatusCodes = retryItemStatusCodes
return s
Expand Down Expand Up @@ -533,7 +541,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
// res.Items will be 1 to 1 with reqs in same order
for i, item := range res.Items {
for _, result := range item {
if _, exists := w.p.retryItemStatusCodes[result.Status]; exists {
if _, found := w.p.retryItemStatusCodes[result.Status]; found {
w.service.Add(reqs[i])
if err == nil {
err = ErrBulkItemRetry
Expand Down Expand Up @@ -602,7 +610,7 @@ func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) {
return
}
case <-t.C:
client.healthcheck(context.Background(), time.Duration(3)*time.Second, true)
client.healthcheck(context.Background(), 3*time.Second, true)
if client.mustActiveConn() == nil {
// found an active connection
// exit and signal done to the WaitGroup
Expand Down
3 changes: 3 additions & 0 deletions bulk_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func TestBulkProcessorDefaults(t *testing.T) {
if got, want := p.wantStats, false; got != want {
t.Errorf("expected %v; got: %v", want, got)
}
if got, want := p.retryItemStatusCodes, defaultRetryItemStatusCodes; !reflect.Equal(got, want) {
t.Errorf("expected %v; got: %v", want, got)
}
if p.backoff == nil {
t.Fatalf("expected non-nill backoff; got: %v", p.backoff)
}
Expand Down

0 comments on commit 005b2f1

Please sign in to comment.