Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support system generated search pipeline. ([#19128](https://github.com/opensearch-project/OpenSearch/pull/19128))
- Add `epoch_micros` date format ([#14669](https://github.com/opensearch-project/OpenSearch/issues/14669))
- Grok processor supports capturing multiple values for same field name ([#18799](https://github.com/opensearch-project/OpenSearch/pull/18799))
- Add support for search tie-breaking by _shard_doc ([#18924](https://github.com/opensearch-project/OpenSearch/pull/18924))
- Upgrade opensearch-protobufs dependency to 0.13.0 and update transport-grpc module compatibility ([#19007](https://github.com/opensearch-project/OpenSearch/issues/19007))
- Add new extensible method to DocRequest to specify type ([#19313](https://github.com/opensearch-project/OpenSearch/pull/19313))
- [Rule based auto-tagging] Add Rule based auto-tagging IT ([#18550](https://github.com/opensearch-project/OpenSearch/pull/18550))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.benchmark.search.sort;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* JMH microbenchmarks for the _shard_doc composite key path:
* key = (shardKeyPrefix | (docBase + doc))
*
* Mirrors hot operations in ShardDocFieldComparatorSource without needing Lucene classes.
*/
@Fork(3)
@Warmup(iterations = 5)
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)

public class ShardDocComparatorBenchmark {

@Param({ "1", "4", "16" })
public int segments;

@Param({ "50000" })
public int docsPerSegment;

@Param({ "7" })
public int shardId;

private long shardKeyPrefix;
private int[] docBases;
private int[] docs;
private long[] keys; // precomputed composite keys

// per-doc global doc (docBase + doc) for doc-only baseline
private int[] globalDocs;

@Setup
public void setup() {
shardKeyPrefix = ((long) shardId) << 32; // Must mirror ShardDocFieldComparatorSource.shardKeyPrefix

docBases = new int[segments];
for (int i = 1; i < segments; i++) {
docBases[i] = docBases[i - 1] + docsPerSegment;
}

int total = segments * docsPerSegment;
docs = new int[total];
keys = new long[total];
globalDocs = new int[total];

Random r = new Random(42);
int pos = 0;
for (int s = 0; s < segments; s++) {
int base = docBases[s];
for (int d = 0; d < docsPerSegment; d++) {
int doc = r.nextInt(docsPerSegment);
docs[pos] = doc;
keys[pos] = computeGlobalDocKey(base, doc);
globalDocs[pos] = base + doc;
pos++;
}
}
}

/** Baseline: compare only globalDoc */
@Benchmark
public long compareDocOnlyAsc() {
long acc = 0;
for (int i = 1; i < globalDocs.length; i++) {
acc += Integer.compare(globalDocs[i - 1], globalDocs[i]);
}
return acc;
}

/** raw key packing cost */
@Benchmark
public void packKey(Blackhole bh) {
int total = segments * docsPerSegment;
int idx = 0;
for (int s = 0; s < segments; s++) {
int base = docBases[s];
for (int d = 0; d < docsPerSegment; d++) {
long k = computeGlobalDocKey(base, docs[idx++]);
bh.consume(k);
}
}
}

/** compare already-packed keys as ASC */
@Benchmark
public long compareAsc() {
long acc = 0;
for (int i = 1; i < keys.length; i++) {
acc += Long.compare(keys[i - 1], keys[i]);
}
return acc;
}

/** compare already-packed keys as DESC */
@Benchmark
public long compareDesc() {
long acc = 0;
for (int i = 1; i < keys.length; i++) {
acc += Long.compare(keys[i], keys[i - 1]); // reversed
}
return acc;
}

/** rough “collector loop” mix: copy + occasional compareBottom */
@Benchmark
public int copyAndCompareBottomAsc() {
long bottom = Long.MIN_VALUE;
int worse = 0;
for (int i = 0; i < keys.length; i++) {
long v = keys[i]; // simulate copy(slot, doc)
if ((i & 31) == 0) bottom = v; // simulate setBottom every 32 items
if (Long.compare(bottom, v) < 0) worse++;
}
return worse;
}

// Must mirror ShardDocFieldComparatorSource.computeGlobalDocKey: (shardId << 32) | (docBase + doc)
private long computeGlobalDocKey(int docBase, int doc) {
return shardKeyPrefix | (docBase + doc);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
---
setup:
- skip:
version: " - 3.2.99"
reason: "introduced in 3.3.0"

# Multi-shard index
- do:
indices.create:
index: sharddoc_paging
body:
settings:
number_of_shards: 4
number_of_replicas: 0
mappings:
properties:
id: { type: integer }
txt: { type: keyword }
- do:
cluster.health:
wait_for_status: green
index: sharddoc_paging
- do:
bulk:
refresh: true
index: sharddoc_paging
body: |
{"index":{}}
{"id":1,"txt":"a"}
{"index":{}}
{"id":2,"txt":"b"}
{"index":{}}
{"id":3,"txt":"c"}
{"index":{}}
{"id":4,"txt":"d"}
{"index":{}}
{"id":5,"txt":"e"}
{"index":{}}
{"id":6,"txt":"f"}
{"index":{}}
{"id":7,"txt":"g"}
{"index":{}}
{"id":8,"txt":"h"}
{"index":{}}
{"id":9,"txt":"i"}
{"index":{}}
{"id":10,"txt":"j"}
{"index":{}}
{"id":11,"txt":"k"}
{"index":{}}
{"id":12,"txt":"l"}
{"index":{}}
{"id":13,"txt":"m"}
{"index":{}}
{"id":14,"txt":"n"}
{"index":{}}
{"id":15,"txt":"o"}
{"index":{}}
{"id":16,"txt":"p"}
{"index":{}}
{"id":17,"txt":"q"}
{"index":{}}
{"id":18,"txt":"r"}
{"index":{}}
{"id":19,"txt":"s"}
{"index":{}}
{"id":20,"txt":"t"}
{"index":{}}
{"id":21,"txt":"u"}
{"index":{}}
{"id":22,"txt":"v"}

# -------------------------------------------------------------------
# VALIDATION
# -------------------------------------------------------------------

---
"reject _shard_doc without PIT":
- do:
catch: bad_request
search:
index: sharddoc_paging
body:
sort:
- _shard_doc
- match: { status: 400 }
- match: { error.type: action_request_validation_exception }
- match: { error.reason: "/.*_shard_doc is only supported with point-in-time.*|.*PIT.*/" }

---
"detect _shard_doc via FieldSortBuilder-style object without PIT":
- do:
catch: bad_request
search:
index: sharddoc_paging
body:
sort:
- _shard_doc: { } # object form, still invalid without PIT
- match: { status: 400 }
- match: { error.type: action_request_validation_exception }
- match: { error.reason: "/.*_shard_doc is only supported with point-in-time.*|.*PIT.*/" }


# -------------------------------------------------------------------
# HAPPY PATH: PAGINATION WITH PIT ON MULTI-SHARD INDEX
# -------------------------------------------------------------------

---
"accept _shard_doc with PIT + paginate with search_after (multi-shard)":
- do:
create_pit:
index: sharddoc_paging
keep_alive: 1m
- set: { pit_id: pit_id }

# Page 1
- do:
search:
body:
size: 10
pit: { id: "$pit_id", keep_alive: "1m" }
sort:
- _shard_doc: {}
- match: { _shards.failed: 0 }
- length: { hits.hits: 10 }
- is_true: hits.hits.9.sort

- set: { hits.hits.9.sort: after1 }

# Page 2
- do:
search:
body:
size: 10
pit: { id: "$pit_id", keep_alive: "1m" }
sort:
- _shard_doc: { }
search_after: $after1

- match: { _shards.failed: 0 }
- length: { hits.hits: 10 }
- is_true: hits.hits.9.sort

- set: { hits.hits.9.sort: after2 }
- set: { hits.hits.9.sort.0: last_value_page2 }

# Check that the sort values increase from one hit to the next without ever decreasing.
- set: { hits.hits.0.sort.0: prev }
- gt: { hits.hits.1.sort.0: $prev }

- set: { hits.hits.1.sort.0: prev }
- gt: { hits.hits.2.sort.0: $prev }

- set: { hits.hits.2.sort.0: prev }
- gt: { hits.hits.3.sort.0: $prev }

- set: { hits.hits.3.sort.0: prev }
- gt: { hits.hits.4.sort.0: $prev }

- set: { hits.hits.4.sort.0: prev }
- gt: { hits.hits.5.sort.0: $prev }

- set: { hits.hits.5.sort.0: prev }
- gt: { hits.hits.6.sort.0: $prev }

- set: { hits.hits.6.sort.0: prev }
- gt: { hits.hits.7.sort.0: $prev }

- set: { hits.hits.7.sort.0: prev }
- gt: { hits.hits.8.sort.0: $prev }

- set: { hits.hits.8.sort.0: prev }
- gt: { hits.hits.9.sort.0: $prev }

# Page 3: drain the rest (22 docs total => 10 + 10 + 2)
- do:
search:
body:
size: 10
pit: { id: "$pit_id", keep_alive: "1m" }
sort:
- _shard_doc: {}
search_after: $after2

- match: { _shards.failed: 0 }
- length: { hits.hits: 2 }

- do:
delete_pit:
body:
pit_id: [ "$pit_id" ]
Loading
Loading