Skip to content

Commit

Permalink
Customizable backoff bulk processor
Browse files Browse the repository at this point in the history
See #646 and #650
  • Loading branch information
rwynn authored and olivere committed Dec 27, 2017
1 parent a2982a5 commit ddb058c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
32 changes: 17 additions & 15 deletions bulk_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ type BulkProcessorService struct {
bulkSize int // # of bytes after which to commit
flushInterval time.Duration // periodic flush interval
wantStats bool // indicates whether to gather statistics
initialTimeout time.Duration // initial wait time before retry on errors
maxTimeout time.Duration // max time to wait for retry on errors
backoff Backoff // a custom Backoff to use for errors
}

// NewBulkProcessorService creates a new BulkProcessorService.
Expand All @@ -49,8 +48,10 @@ func NewBulkProcessorService(client *Client) *BulkProcessorService {
numWorkers: 1,
bulkActions: 1000,
bulkSize: 5 << 20, // 5 MB
initialTimeout: time.Duration(200) * time.Millisecond,
maxTimeout: time.Duration(10000) * time.Millisecond,
backoff: NewExponentialBackoff(
time.Duration(200) * time.Millisecond,
time.Duration(10000) * time.Millisecond,
),
}
}

Expand Down Expand Up @@ -120,6 +121,12 @@ func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService {
return s
}

// Set the backoff strategy to use for errors
func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService {
s.backoff = backoff
return s
}

// Do creates a new BulkProcessor and starts it.
// Consider the BulkProcessor as a running instance that accepts bulk requests
// and commits them to Elasticsearch, spreading the work across one or more
Expand All @@ -146,8 +153,7 @@ func (s *BulkProcessorService) Do(ctx context.Context) (*BulkProcessor, error) {
s.bulkSize,
s.flushInterval,
s.wantStats,
s.initialTimeout,
s.maxTimeout)
s.backoff)

err := p.Start(ctx)
if err != nil {
Expand Down Expand Up @@ -235,8 +241,7 @@ type BulkProcessor struct {
flushInterval time.Duration
flusherStopC chan struct{}
wantStats bool
initialTimeout time.Duration // initial wait time before retry on errors
maxTimeout time.Duration // max time to wait for retry on errors
backoff Backoff

startedMu sync.Mutex // guards the following block
started bool
Expand All @@ -255,8 +260,7 @@ func newBulkProcessor(
bulkSize int,
flushInterval time.Duration,
wantStats bool,
initialTimeout time.Duration,
maxTimeout time.Duration) *BulkProcessor {
backoff Backoff) *BulkProcessor {
return &BulkProcessor{
c: client,
beforeFn: beforeFn,
Expand All @@ -267,8 +271,7 @@ func newBulkProcessor(
bulkSize: bulkSize,
flushInterval: flushInterval,
wantStats: wantStats,
initialTimeout: initialTimeout,
maxTimeout: maxTimeout,
backoff: backoff,
}
}

Expand Down Expand Up @@ -473,7 +476,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
}
// notifyFunc will be called if retry fails
notifyFunc := func(err error) {
w.p.c.errorf("elastic: bulk processor %q failed but will retry: %v", w.p.name, err)
w.p.c.errorf("elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
}

id := atomic.AddInt64(&w.p.executionId, 1)
Expand All @@ -494,8 +497,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
}

// Commit bulk requests
policy := NewExponentialBackoff(w.p.initialTimeout, w.p.maxTimeout)
err := RetryNotify(commitFunc, policy, notifyFunc)
err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
w.updateStats(res)
if err != nil {
w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)
Expand Down
3 changes: 3 additions & 0 deletions bulk_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func TestBulkProcessorDefaults(t *testing.T) {
if got, want := p.wantStats, false; got != want {
t.Errorf("expected %v; got: %v", want, got)
}
if p.backoff == nil {
t.Fatalf("expected non-nill backoff; got: %v", p.backoff)
}
}

func TestBulkProcessorCommitOnBulkActions(t *testing.T) {
Expand Down

0 comments on commit ddb058c

Please sign in to comment.