Skip to content

Commit

Permalink
Use go-docappender bulk indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Apr 15, 2024
1 parent 03a96a4 commit 1ecda8e
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 84 deletions.
195 changes: 133 additions & 62 deletions exporter/elasticsearchexporter/elasticsearch_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,26 @@ import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/elastic/go-docappender"

Check failure on line 18 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / collector-build

missing go.sum entry for module providing package github.com/elastic/go-docappender (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:

Check failure on line 18 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / collector-build

missing go.sum entry for module providing package github.com/elastic/go-docappender (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:

Check failure on line 18 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / docker-build

missing go.sum entry for module providing package github.com/elastic/go-docappender (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:

Check failure on line 18 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / docker-build

missing go.sum entry for module providing package github.com/elastic/go-docappender (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:

Check failure on line 18 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / docker-build

missing go.sum entry for module providing package github.com/elastic/go-docappender (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
esutil7 "github.com/elastic/go-elasticsearch/v7/esutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

Check failure on line 21 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / collector-build

missing go.sum entry for module providing package golang.org/x/sync/errgroup (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:

Check failure on line 21 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / collector-build

missing go.sum entry for module providing package golang.org/x/sync/errgroup (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:

Check failure on line 21 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / docker-build

missing go.sum entry for module providing package golang.org/x/sync/errgroup (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:

Check failure on line 21 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / docker-build

missing go.sum entry for module providing package golang.org/x/sync/errgroup (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:

Check failure on line 21 in exporter/elasticsearchexporter/elasticsearch_bulk.go

View workflow job for this annotation

GitHub Actions / docker-build

missing go.sum entry for module providing package golang.org/x/sync/errgroup (imported by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter); to add:

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
)

type esClientCurrent = elasticsearch7.Client
type esConfigCurrent = elasticsearch7.Config
type esBulkIndexerCurrent = esutil7.BulkIndexer

type esBulkIndexerItem = esutil7.BulkIndexerItem
type esBulkIndexerResponseItem = esutil7.BulkIndexerResponseItem
type esBulkIndexerCurrent = BulkIndexerPool

type esBulkIndexerItem = docappender.BulkIndexerItem

// clientLogger implements the estransport.Logger interface
// that is required by the Elasticsearch client for logging.
Expand Down Expand Up @@ -136,22 +137,6 @@ func newTransport(config *Config, tlsCfg *tls.Config) *http.Transport {
return transport
}

func newBulkIndexer(logger *zap.Logger, client *elasticsearch7.Client, config *Config) (esBulkIndexerCurrent, error) {
// TODO: add debug logger
return esutil7.NewBulkIndexer(esutil7.BulkIndexerConfig{
NumWorkers: config.NumWorkers,
FlushBytes: config.Flush.Bytes,
FlushInterval: config.Flush.Interval,
Client: client,
Pipeline: config.Pipeline,
Timeout: config.Timeout,

OnError: func(_ context.Context, err error) {
logger.Error(fmt.Sprintf("Bulk indexer error: %v", err))
},
})
}

func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Duration {
if !config.Enabled {
return nil
Expand All @@ -175,52 +160,138 @@ func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Durati
}
}

func shouldRetryEvent(status int) bool {
for _, retryable := range retryOnStatus {
if status == retryable {
return true
func pushDocuments(ctx context.Context, index string, document []byte, bulkIndexer *esBulkIndexerCurrent) error {
return bulkIndexer.Add(ctx, index, bytes.NewReader(document))
}

func newBulkIndexer(logger *zap.Logger, client *elasticsearch7.Client, config *Config) (*esBulkIndexerCurrent, error) {
numWorkers := config.NumWorkers
if numWorkers == 0 {
numWorkers = 1
}

flushInterval := config.Flush.Interval
if flushInterval == 0 {
flushInterval = 30 * time.Second
}

flushBytes := config.Flush.Bytes
if flushBytes == 0 {
flushBytes = 5e+6
}

var maxDocRetry int
if config.Retry.Enabled {
// max_requests includes initial attempt
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
maxDocRetry = config.Retry.MaxRequests - 1
}
group := &errgroup.Group{}
items := make(chan esBulkIndexerItem, config.NumWorkers)
stats := bulkIndexerStats{}

for i := 0; i < numWorkers; i++ {
w := worker{
indexer: docappender.NewBulkIndexer(client, 0, maxDocRetry),
items: items,
flushInterval: flushInterval,
flushTimeout: config.Timeout,
flushBytes: flushBytes,
logger: logger,
stats: &stats,
}
group.Go(w.run)
}
return false
return &BulkIndexerPool{
items: items,
errgroup: group,
stats: &stats,
}, nil
}

type bulkIndexerStats struct {
docsIndexed atomic.Int64
}

type BulkIndexerPool struct {
items chan esBulkIndexerItem
errgroup *errgroup.Group
stats *bulkIndexerStats
}

func pushDocuments(ctx context.Context, logger *zap.Logger, index string, document []byte, bulkIndexer esBulkIndexerCurrent, maxAttempts int) error {
attempts := 1
body := bytes.NewReader(document)
item := esBulkIndexerItem{Action: createAction, Index: index, Body: body}
// Setup error handler. The handler handles the per item response status based on the
// selective ACKing in the bulk response.
item.OnFailure = func(ctx context.Context, item esBulkIndexerItem, resp esBulkIndexerResponseItem, err error) {
switch {
case attempts < maxAttempts && shouldRetryEvent(resp.Status):
logger.Debug("Retrying to index",
zap.String("name", index),
zap.Int("attempt", attempts),
zap.Int("status", resp.Status),
zap.NamedError("reason", err))

attempts++
_, _ = body.Seek(0, io.SeekStart)
_ = bulkIndexer.Add(ctx, item)

case resp.Status == 0 && err != nil:
// Encoding error. We didn't even attempt to send the event
logger.Error("Drop docs: failed to add docs to the bulk request buffer.",
zap.NamedError("reason", err))

case err != nil:
logger.Error("Drop docs: failed to index",
zap.String("name", index),
zap.Int("attempt", attempts),
zap.Int("status", resp.Status),
zap.NamedError("reason", err))

default:
logger.Error(fmt.Sprintf("Drop docs: failed to index: %#v", resp.Error),
zap.Int("attempt", attempts),
zap.Int("status", resp.Status))
func (p *BulkIndexerPool) Add(ctx context.Context, index string, document io.WriterTo) error {
item := esBulkIndexerItem{
Index: index,
Body: document,
}
select {
case <-ctx.Done():
return ctx.Err()
case p.items <- item:
return nil
}
}

func (p *BulkIndexerPool) Close(ctx context.Context) error {
close(p.items)
doneCh := make(chan struct{})
go func() {
p.errgroup.Wait()
close(doneCh)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-doneCh:
return nil
}
}

type worker struct {
indexer *docappender.BulkIndexer
items chan esBulkIndexerItem
flushInterval time.Duration
flushTimeout time.Duration
flushBytes int

stats *bulkIndexerStats

logger *zap.Logger
}

func (w *worker) run() error {
flushTick := time.NewTicker(w.flushInterval)
for {
select {
case item := <-w.items:
// check if BulkIndexer is closing
zero := esBulkIndexerItem{}
if item == zero {
w.flush()
return nil
}

w.indexer.Add(item)
// w.indexer.Len() can be either compressed or uncompressed bytes
if w.indexer.Len() >= w.flushBytes {
w.flush()
flushTick.Reset(w.flushInterval)
}
case <-flushTick.C:
// bulk indexer needs to be flushed every flush interval because
// there may be pending bytes in bulk indexer buffer due to e.g. document level 429
w.flush()
}
}
}

return bulkIndexer.Add(ctx, item)
func (w *worker) flush() error {
ctx, cancel := context.WithTimeout(context.Background(), w.flushTimeout)
defer cancel()
stat, err := w.indexer.Flush(ctx)
w.stats.docsIndexed.Add(stat.Indexed)
if err != nil {
w.logger.Error("bulk indexer flush error", zap.Error(err))
}
return err
}
161 changes: 161 additions & 0 deletions exporter/elasticsearchexporter/elasticsearch_bulk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter

import (
"context"
"errors"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/elastic/go-elasticsearch/v7"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)

var defaultRoundTripFunc = func(*http.Request) (*http.Response, error) {
return &http.Response{
Body: io.NopCloser(strings.NewReader("{}")),
}, nil
}

type mockTransport struct {
RoundTripFunc func(*http.Request) (*http.Response, error)
}

func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if t.RoundTripFunc == nil {
return defaultRoundTripFunc(req)
}
return t.RoundTripFunc(req)
}

const successResp = `{
"took": 30,
"errors": false,
"items": [
{
"create": {
"_index": "foo",
"status": 201
}
}
]
}`

func TestBulkIndexer_flushOnClose(t *testing.T) {
cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 2 << 30}}
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
RoundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(successResp)),
}, nil
},
}})
require.NoError(t, err)
bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &cfg)
require.NoError(t, err)
assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
assert.NoError(t, bulkIndexer.Close(context.Background()))
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
}

func TestBulkIndexer_flush(t *testing.T) {
tests := []struct {
name string
config Config
}{
{
name: "flush.bytes",
config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}},
},
{
name: "flush.interval",
config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: 50 * time.Millisecond, Bytes: 2 << 30}},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
RoundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(successResp)),
}, nil
},
}})
require.NoError(t, err)
bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &tt.config)
require.NoError(t, err)
assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
assert.NoError(t, bulkIndexer.Close(context.Background()))
})
}
}

func TestBulkIndexer_flush_error(t *testing.T) {
tests := []struct {
name string
roundTripFunc func(*http.Request) (*http.Response, error)
}{
{
name: "500",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 500,
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader("error")),
}, nil
},
},
{
name: "429",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 429,
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader("error")),
}, nil
},
},
{
name: "transport error",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return nil, errors.New("transport error")
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}}
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
RoundTripFunc: tt.roundTripFunc,
}})
require.NoError(t, err)
core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))
bulkIndexer, err := newBulkIndexer(zap.New(core), client, &cfg)
require.NoError(t, err)
assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
assert.NoError(t, bulkIndexer.Close(context.Background()))
assert.Equal(t, 1, observed.FilterMessage("bulk indexer flush error").Len())
})
}
}
Loading

0 comments on commit 1ecda8e

Please sign in to comment.