Skip to content

Commit 1029807

Browse files
committed
Merge remote-tracking branch 'origin/main' into verbose-simulate-api
2 parents aede211 + 56d4f2c commit 1029807

File tree

10 files changed

+86
-55
lines changed

10 files changed

+86
-55
lines changed

.github/dependabot.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,7 @@ updates:
1010
reviewers:
1111
- "elastic/ecosystem"
1212
open-pull-requests-limit: 10
13+
groups:
14+
k8s:
15+
patterns:
16+
- "k8s.io/*"

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ test-stack-command-oldest:
5959
./scripts/test-stack-command.sh 7.14.2
6060

6161
test-stack-command-7x:
62-
./scripts/test-stack-command.sh 7.17.18
62+
./scripts/test-stack-command.sh 7.17.19
6363

6464
# Keeping a test for 8.6 because it has an specific configuration file.
6565
test-stack-command-86:
6666
./scripts/test-stack-command.sh 8.6.2
6767

6868
test-stack-command-8x:
69-
./scripts/test-stack-command.sh 8.13.0-SNAPSHOT
69+
./scripts/test-stack-command.sh 8.14.0-SNAPSHOT
7070

7171
test-stack-command-with-apm-server:
7272
APM_SERVER_ENABLED=true ./scripts/test-stack-command.sh

internal/benchrunner/runners/pipeline/benchmark.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package pipeline
66

77
import (
8+
"context"
89
"encoding/json"
910
"errors"
1011
"fmt"
@@ -75,9 +76,9 @@ func (p BenchmarkValue) String() (r string) {
7576
return r
7677
}
7778

78-
func (r *runner) benchmarkPipeline(b *benchmark, entryPipeline string) (*BenchmarkResult, error) {
79+
func (r *runner) benchmarkPipeline(ctx context.Context, b *benchmark, entryPipeline string) (*BenchmarkResult, error) {
7980
// Run benchmark
80-
bench, err := r.benchmarkIngest(b, entryPipeline)
81+
bench, err := r.benchmarkIngest(ctx, b, entryPipeline)
8182
if err != nil {
8283
return nil, fmt.Errorf("failed running benchmark: %w", err)
8384
}
@@ -196,9 +197,9 @@ type ingestResult struct {
196197
numDocs int
197198
}
198199

199-
func (r *runner) benchmarkIngest(b *benchmark, entryPipeline string) (ingestResult, error) {
200+
func (r *runner) benchmarkIngest(ctx context.Context, b *benchmark, entryPipeline string) (ingestResult, error) {
200201
baseDocs := resizeDocs(b.events, b.config.NumDocs)
201-
return r.runSingleBenchmark(entryPipeline, baseDocs)
202+
return r.runSingleBenchmark(ctx, entryPipeline, baseDocs)
202203
}
203204

204205
type processorPerformance struct {
@@ -298,12 +299,12 @@ func (agg aggregation) collect(fn mapFn) ([]BenchmarkValue, error) {
298299
return r, nil
299300
}
300301

301-
func (r *runner) runSingleBenchmark(entryPipeline string, docs []json.RawMessage) (ingestResult, error) {
302+
func (r *runner) runSingleBenchmark(ctx context.Context, entryPipeline string, docs []json.RawMessage) (ingestResult, error) {
302303
if len(docs) == 0 {
303304
return ingestResult{}, errors.New("no docs supplied for benchmark")
304305
}
305306

306-
if _, err := ingest.SimulatePipeline(r.options.API, entryPipeline, docs, "test-generic-default"); err != nil {
307+
if _, err := ingest.SimulatePipeline(ctx, r.options.API, entryPipeline, docs, "test-generic-default"); err != nil {
307308
return ingestResult{}, fmt.Errorf("simulate failed: %w", err)
308309
}
309310

internal/benchrunner/runners/pipeline/runner.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,24 +58,24 @@ func (r *runner) SetUp(ctx context.Context) error {
5858

5959
// TearDown shuts down the pipeline benchmark runner.
6060
func (r *runner) TearDown(ctx context.Context) error {
61-
if err := ingest.UninstallPipelines(r.options.API, r.pipelines); err != nil {
61+
if err := ingest.UninstallPipelines(ctx, r.options.API, r.pipelines); err != nil {
6262
return fmt.Errorf("uninstalling ingest pipelines failed: %w", err)
6363
}
6464
return nil
6565
}
6666

6767
// Run runs the pipeline benchmarks defined under the given folder
6868
func (r *runner) Run(ctx context.Context) (reporters.Reportable, error) {
69-
return r.run()
69+
return r.run(ctx)
7070
}
7171

72-
func (r *runner) run() (reporters.Reportable, error) {
72+
func (r *runner) run(ctx context.Context) (reporters.Reportable, error) {
7373
b, err := r.loadBenchmark()
7474
if err != nil {
7575
return nil, fmt.Errorf("loading benchmark failed: %w", err)
7676
}
7777

78-
benchmark, err := r.benchmarkPipeline(b, r.entryPipeline)
78+
benchmark, err := r.benchmarkPipeline(ctx, b, r.entryPipeline)
7979
if err != nil {
8080
return nil, err
8181
}

internal/benchrunner/runners/rally/runner.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -319,13 +319,13 @@ func (r *runner) setUp(ctx context.Context) error {
319319
dataStreamManifest.Name,
320320
)
321321

322-
r.indexTemplateBody, err = r.extractSimulatedTemplate(indexTemplate)
322+
r.indexTemplateBody, err = r.extractSimulatedTemplate(ctx, indexTemplate)
323323
if err != nil {
324324
return fmt.Errorf("error extracting routing path: %s: %w", indexTemplate, err)
325325
}
326326
}
327327

328-
if err := r.wipeDataStreamOnSetup(); err != nil {
328+
if err := r.wipeDataStreamOnSetup(ctx); err != nil {
329329
return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err)
330330
}
331331

@@ -343,8 +343,11 @@ func (r *runner) setUp(ctx context.Context) error {
343343
return nil
344344
}
345345

346-
func (r *runner) extractSimulatedTemplate(indexTemplate string) (string, error) {
347-
simulateTemplate, err := r.options.ESAPI.Indices.SimulateTemplate(r.options.ESAPI.Indices.SimulateTemplate.WithName(indexTemplate))
346+
func (r *runner) extractSimulatedTemplate(ctx context.Context, indexTemplate string) (string, error) {
347+
simulateTemplate, err := r.options.ESAPI.Indices.SimulateTemplate(
348+
r.options.ESAPI.Indices.SimulateTemplate.WithContext(ctx),
349+
r.options.ESAPI.Indices.SimulateTemplate.WithName(indexTemplate),
350+
)
348351
if err != nil {
349352
return "", fmt.Errorf("error simulating template from composable template: %s: %w", indexTemplate, err)
350353
}
@@ -384,18 +387,18 @@ func (r *runner) extractSimulatedTemplate(indexTemplate string) (string, error)
384387
return string(newTemplate), nil
385388
}
386389

387-
func (r *runner) wipeDataStreamOnSetup() error {
390+
func (r *runner) wipeDataStreamOnSetup(ctx context.Context) error {
388391
// Delete old data
389392
logger.Debug("deleting old data in data stream...")
390393
r.wipeDataStreamHandler = func(ctx context.Context) error {
391394
logger.Debugf("deleting data in data stream...")
392-
if err := r.deleteDataStreamDocs(r.runtimeDataStream); err != nil {
395+
if err := r.deleteDataStreamDocs(ctx, r.runtimeDataStream); err != nil {
393396
return fmt.Errorf("error deleting data in data stream: %w", err)
394397
}
395398
return nil
396399
}
397400

398-
return r.deleteDataStreamDocs(r.runtimeDataStream)
401+
return r.deleteDataStreamDocs(ctx, r.runtimeDataStream)
399402
}
400403

401404
func (r *runner) run(ctx context.Context) (report reporters.Reportable, err error) {
@@ -443,7 +446,7 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro
443446
return nil, fmt.Errorf("can't summarize metrics: %w", err)
444447
}
445448

446-
if err := r.reindexData(); err != nil {
449+
if err := r.reindexData(ctx); err != nil {
447450
return nil, fmt.Errorf("can't reindex data: %w", err)
448451
}
449452

@@ -529,9 +532,11 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) {
529532
return sum, err
530533
}
531534

532-
func (r *runner) deleteDataStreamDocs(dataStream string) error {
535+
func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error {
533536
body := strings.NewReader(`{ "query": { "match_all": {} } }`)
534-
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body)
537+
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body,
538+
r.options.ESAPI.DeleteByQuery.WithContext(ctx),
539+
)
535540
if err != nil {
536541
return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err)
537542
}
@@ -941,7 +946,7 @@ func (r *runner) runRally(ctx context.Context) ([]rallyStat, error) {
941946
}
942947

943948
// reindexData will read all data generated during the benchmark and will reindex it to the metrisctore
944-
func (r *runner) reindexData() error {
949+
func (r *runner) reindexData(ctx context.Context) error {
945950
if !r.options.ReindexData {
946951
return nil
947952
}
@@ -954,6 +959,7 @@ func (r *runner) reindexData() error {
954959
logger.Debug("getting orignal mappings...")
955960
// Get the mapping from the source data stream
956961
mappingRes, err := r.options.ESAPI.Indices.GetMapping(
962+
r.options.ESAPI.Indices.GetMapping.WithContext(ctx),
957963
r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream),
958964
)
959965
if err != nil {
@@ -999,6 +1005,7 @@ func (r *runner) reindexData() error {
9991005

10001006
createRes, err := r.options.ESMetricsAPI.Indices.Create(
10011007
indexName,
1008+
r.options.ESMetricsAPI.Indices.Create.WithContext(ctx),
10021009
r.options.ESMetricsAPI.Indices.Create.WithBody(reader),
10031010
)
10041011
if err != nil {
@@ -1014,6 +1021,7 @@ func (r *runner) reindexData() error {
10141021

10151022
logger.Debug("starting scrolling of events...")
10161023
res, err := r.options.ESAPI.Search(
1024+
r.options.ESAPI.Search.WithContext(ctx),
10171025
r.options.ESAPI.Search.WithIndex(r.runtimeDataStream),
10181026
r.options.ESAPI.Search.WithBody(bodyReader),
10191027
r.options.ESAPI.Search.WithScroll(time.Minute),
@@ -1042,7 +1050,7 @@ func (r *runner) reindexData() error {
10421050
break
10431051
}
10441052

1045-
err := r.bulkMetrics(indexName, sr)
1053+
err := r.bulkMetrics(ctx, indexName, sr)
10461054
if err != nil {
10471055
return err
10481056
}
@@ -1063,7 +1071,7 @@ type searchResponse struct {
10631071
} `json:"hits"`
10641072
}
10651073

1066-
func (r *runner) bulkMetrics(indexName string, sr searchResponse) error {
1074+
func (r *runner) bulkMetrics(ctx context.Context, indexName string, sr searchResponse) error {
10671075
var bulkBodyBuilder strings.Builder
10681076
for _, hit := range sr.Hits {
10691077
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID))
@@ -1077,7 +1085,9 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error {
10771085

10781086
logger.Debugf("bulk request of %d events...", len(sr.Hits))
10791087

1080-
resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()))
1088+
resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()),
1089+
r.options.ESMetricsAPI.Bulk.WithContext(ctx),
1090+
)
10811091
if err != nil {
10821092
return fmt.Errorf("error performing the bulk index request: %w", err)
10831093
}
@@ -1091,6 +1101,7 @@ func (r *runner) bulkMetrics(indexName string, sr searchResponse) error {
10911101
}
10921102

10931103
resp, err = r.options.ESAPI.Scroll(
1104+
r.options.ESAPI.Scroll.WithContext(ctx),
10941105
r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID),
10951106
r.options.ESAPI.Scroll.WithScroll(time.Minute),
10961107
)

internal/benchrunner/runners/stream/runner.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (r *runner) setUp(ctx context.Context) error {
199199
return nil
200200
}
201201

202-
if err := r.wipeDataStreamsOnSetup(); err != nil {
202+
if err := r.wipeDataStreamsOnSetup(ctx); err != nil {
203203
return fmt.Errorf("error cleaning up old data in data streams: %w", err)
204204
}
205205

@@ -224,21 +224,21 @@ func (r *runner) setUp(ctx context.Context) error {
224224
return nil
225225
}
226226

227-
func (r *runner) wipeDataStreamsOnSetup() error {
227+
func (r *runner) wipeDataStreamsOnSetup(ctx context.Context) error {
228228
// Delete old data
229229
logger.Debug("deleting old data in data stream...")
230230
r.wipeDataStreamHandler = func(ctx context.Context) error {
231231
logger.Debugf("deleting data in data stream...")
232232
for _, runtimeDataStream := range r.runtimeDataStreams {
233-
if err := r.deleteDataStreamDocs(runtimeDataStream); err != nil {
233+
if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil {
234234
return fmt.Errorf("error deleting data in data stream: %w", err)
235235
}
236236
}
237237
return nil
238238
}
239239

240240
for _, runtimeDataStream := range r.runtimeDataStreams {
241-
if err := r.deleteDataStreamDocs(runtimeDataStream); err != nil {
241+
if err := r.deleteDataStreamDocs(ctx, runtimeDataStream); err != nil {
242242
return fmt.Errorf("error deleting data in data stream: %w", err)
243243
}
244244
}
@@ -278,9 +278,11 @@ func (r *runner) installPackageFromPackageRoot(ctx context.Context) error {
278278
return nil
279279
}
280280

281-
func (r *runner) deleteDataStreamDocs(dataStream string) error {
281+
func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) error {
282282
body := strings.NewReader(`{ "query": { "match_all": {} } }`)
283-
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body)
283+
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body,
284+
r.options.ESAPI.DeleteByQuery.WithContext(ctx),
285+
)
284286
if err != nil {
285287
return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err)
286288
}
@@ -484,8 +486,11 @@ func (r *runner) collectBulkRequestBody(indexName, scenarioName string, buf *byt
484486
return bulkBodyBuilder, nil
485487
}
486488

487-
func (r *runner) performBulkRequest(bulkRequest string) error {
488-
resp, err := r.options.ESAPI.Bulk(strings.NewReader(bulkRequest))
489+
func (r *runner) performBulkRequest(ctx context.Context, bulkRequest string) error {
490+
resp, err := r.options.ESAPI.Bulk(strings.NewReader(bulkRequest),
491+
r.options.ESAPI.Bulk.WithContext(ctx),
492+
)
493+
489494
if err != nil {
490495
return err
491496
}
@@ -595,7 +600,7 @@ func (r *runner) runStreamGenerator(ctx context.Context, scenarioName string) er
595600
}
596601
}
597602

598-
err := r.performBulkRequest(bulkBodyBuilder.String())
603+
err := r.performBulkRequest(ctx, bulkBodyBuilder.String())
599604
if err != nil {
600605
return fmt.Errorf("error performing bulk request: %w", err)
601606
}
@@ -628,7 +633,7 @@ func (r *runner) runBackfillGenerator(ctx context.Context, scenarioName string)
628633
}
629634
}
630635

631-
return r.performBulkRequest(bulkBodyBuilder.String())
636+
return r.performBulkRequest(ctx, bulkBodyBuilder.String())
632637
}
633638

634639
type benchMeta struct {

0 commit comments

Comments
 (0)