Skip to content

Commit

Permalink
[chore][exporter/elasticsearch] Add benchmark for traces consumer (#3…
Browse files Browse the repository at this point in the history
…3087)

**Description:** Similar to
#33035
the PR adds benchmarks for `ConsumeTraces`.

The PR also updates the mock es receiver to handle `traces` properly in
order to have better readability of the tests/benchmarks and avoid
confusion with the special handling -- this is done by using separate
index configuration for traces and logs to distinguish the items in the
bulk indexer payload. The PR also adds an extra metric to the benchmarks
(`Consume{Traces, Logs}`) for reporting events consumed per second.
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

**Link to tracking Issue:**
[32504](#32504)

**Testing:** `cd exporter/elasticsearchexporter/integrationtest && go
test -bench=BenchmarkExporter -run=^$ ./...`

**Documentation:** N/A
  • Loading branch information
lahsivjar authored May 22, 2024
1 parent 1910ae0 commit c2b0082
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 93 deletions.
175 changes: 132 additions & 43 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,85 @@ import (
"net/http"
"net/url"
"testing"
"time"

"github.com/elastic/go-docappender/v2/docappendertest"
"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

const (
// TestLogsIndex is used by the mock ES data receiver to indentify log events.
// Exporter LogsIndex configuration must be configured with TestLogsIndex for
// the data receiver to work properly
TestLogsIndex = "logs-test-idx"

// TestTracesIndex is used by the mock ES data receiver to indentify trace
// events. Exporter TracesIndex configuration must be configured with
// TestTracesIndex for the data receiver to work properly
TestTracesIndex = "traces-test-idx"
)

type esDataReceiver struct {
testbed.DataReceiverBase
receiver receiver.Logs
endpoint string
receiver receiver.Logs
endpoint string
decodeBulkRequest bool
t testing.TB
}

func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver {
func newElasticsearchDataReceiver(t testing.TB, decodeBulkRequest bool) *esDataReceiver {
return &esDataReceiver{
DataReceiverBase: testbed.DataReceiverBase{},
endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)),
DataReceiverBase: testbed.DataReceiverBase{},
endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)),
decodeBulkRequest: decodeBulkRequest,
t: t,
}
}

func (es *esDataReceiver) Start(_ consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
factory := receiver.NewFactory(
component.MustNewType("mockelasticsearch"),
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment),
receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment),
)
esURL, err := url.Parse(es.endpoint)
if err != nil {
return fmt.Errorf("invalid ES URL specified %s: %w", es.endpoint, err)
}
cfg := factory.CreateDefaultConfig().(*config)
cfg.ESEndpoint = es.endpoint
cfg.ServerConfig.Endpoint = esURL.Host
cfg.DecodeBulkRequests = es.decodeBulkRequest

var err error
set := receivertest.NewNopCreateSettings()
// Use an actual logger to log errors.
set.Logger = zap.Must(zap.NewDevelopment())
es.receiver, err = factory.CreateLogsReceiver(context.Background(), set, cfg, lc)
logsReceiver, err := factory.CreateLogsReceiver(context.Background(), set, cfg, lc)
if err != nil {
return err
return fmt.Errorf("failed to create logs receiver: %w", err)
}
tracesReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, tc)
if err != nil {
return fmt.Errorf("failed to create traces receiver: %w", err)
}

// Since we use SharedComponent both receivers should be same
require.Same(es.t, logsReceiver, tracesReceiver)
es.receiver = logsReceiver

return es.receiver.Start(context.Background(), componenttest.NewNopHost())
}

Expand All @@ -72,6 +105,8 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
cfgFormat := `
elasticsearch:
endpoints: [%s]
logs_index: %s
traces_index: %s
flush:
interval: 1s
sending_queue:
Expand All @@ -80,20 +115,30 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
enabled: true
max_requests: 10000
`
return fmt.Sprintf(cfgFormat, es.endpoint)
return fmt.Sprintf(cfgFormat, es.endpoint, TestLogsIndex, TestTracesIndex)
}

func (es *esDataReceiver) ProtocolName() string {
return "elasticsearch"
}

type config struct {
ESEndpoint string
confighttp.ServerConfig

// DecodeBulkRequests controls decoding of the bulk request in the mock
// ES receiver. Decoding requests would consume resources and might
// pollute the benchmark results. Note that if decode bulk request is
// set to false then the consumers will not consume any events and the
// bulk request will always return http.StatusOK.
DecodeBulkRequests bool
}

func createDefaultConfig() component.Config {
return &config{
ESEndpoint: "127.0.0.1:9200",
ServerConfig: confighttp.ServerConfig{
Endpoint: "127.0.0.1:9200",
},
DecodeBulkRequests: true,
}
}

Expand All @@ -103,20 +148,61 @@ func createLogsReceiver(
rawCfg component.Config,
next consumer.Logs,
) (receiver.Logs, error) {
cfg := rawCfg.(*config)
return newMockESReceiver(params, cfg, next)
receiver := receivers.GetOrAdd(rawCfg, func() component.Component {
return newMockESReceiver(params, rawCfg.(*config))
})
receiver.Unwrap().(*mockESReceiver).logsConsumer = next
return receiver, nil
}

func createTracesReceiver(
_ context.Context,
params receiver.CreateSettings,
rawCfg component.Config,
next consumer.Traces,
) (receiver.Traces, error) {
receiver := receivers.GetOrAdd(rawCfg, func() component.Component {
return newMockESReceiver(params, rawCfg.(*config))
})
receiver.Unwrap().(*mockESReceiver).tracesConsumer = next
return receiver, nil
}

type mockESReceiver struct {
server *http.Server
params receiver.CreateSettings
config *config

tracesConsumer consumer.Traces
logsConsumer consumer.Logs

server *http.Server
}

func newMockESReceiver(params receiver.CreateSettings, cfg *config) receiver.Logs {
return &mockESReceiver{
params: params,
config: cfg,
}
}

func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consumer.Logs) (receiver.Logs, error) {
func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error {
if es.server != nil {
return nil
}

ln, err := es.config.ToListener(ctx)
if err != nil {
return fmt.Errorf("failed to bind to address %s: %w", es.config.Endpoint, err)
}

// Ideally bulk request items should be converted to the corresponding event record
// however, since we only assert count for now there is no need to do the actual
// translation. Instead we use a pre-initialized empty logs and traces model to
// reduce allocation impact on tests and benchmarks.
emptyLogs := plog.NewLogs()
emptyLogs.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty().
LogRecords().AppendEmpty()
emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
emptyTrace := ptrace.NewTraces()
emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()

r := mux.NewRouter()
r.Use(func(next http.Handler) http.Handler {
Expand All @@ -129,51 +215,54 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
r.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
if !es.config.DecodeBulkRequests {
w.WriteHeader(http.StatusOK)
return
}
_, response := docappendertest.DecodeBulkRequest(r)
for _, itemMap := range response.Items {
for k, item := range itemMap {
// Ideally bulk request should be converted to log record
// however, since we only assert count for now there is no
// need to do the actual translation. We use a pre-initialized
// empty plog.Logs to reduce allocation impact on tests and
// benchmarks due to this.
if err := next.ConsumeLogs(context.Background(), emptyLogs); err != nil {
var consumeErr error
switch item.Index {
case TestLogsIndex:
consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs)
case TestTracesIndex:
consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace)
}
if consumeErr != nil {
response.HasErrors = true
item.Status = http.StatusTooManyRequests
item.Error.Type = "simulated_es_error"
item.Error.Reason = err.Error()
item.Error.Reason = consumeErr.Error()
}
itemMap[k] = item
}
}
if err := json.NewEncoder(w).Encode(response); err != nil {
if jsonErr := json.NewEncoder(w).Encode(response); jsonErr != nil {
w.WriteHeader(http.StatusInternalServerError)
}
})

esURL, err := url.Parse(cfg.ESEndpoint)
es.server, err = es.config.ToServer(ctx, host, es.params.TelemetrySettings, r)
if err != nil {
return nil, fmt.Errorf("failed to parse Elasticsearch endpoint: %w", err)
return fmt.Errorf("failed to create mock ES server: %w", err)
}
return &mockESReceiver{
server: &http.Server{
Addr: esURL.Host,
Handler: r,
ReadHeaderTimeout: 20 * time.Second,
},
params: params,
}, nil
}

func (es *mockESReceiver) Start(_ context.Context, _ component.Host) error {
go func() {
if err := es.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
es.params.Logger.Error("failed while running mock ES receiver", zap.Error(err))
if err := es.server.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
es.params.ReportStatus(component.NewFatalErrorEvent(err))
}
}()
return nil
}

func (es *mockESReceiver) Shutdown(ctx context.Context) error {
if es.server == nil {
return nil
}
return es.server.Shutdown(ctx)
}

// mockESReceiver serves both, traces and logs. Shared component allows for a single
// instance of mockESReceiver to serve all supported event types.
var receivers = sharedcomponent.NewSharedComponents()
Loading

0 comments on commit c2b0082

Please sign in to comment.