Skip to content

Commit

Permalink
Fix flaky test MoreExpressionIT.testSpecialValueVariable in concurren…
Browse files Browse the repository at this point in the history
…t search path (#11088)

* Fix flaky test MoreExpressionIT.testSpecialValueVariable in concurrent search path

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Rename ReplaceableConstDoubleValueSource class to PerThreadReplaceableConstDoubleValueSource

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

---------

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>
  • Loading branch information
sohami authored Nov 17, 2023
1 parent 03a9650 commit f4b25d6
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,6 @@ public void testInvalidFieldMember() {
}

public void testSpecialValueVariable() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/10079",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
// i.e. _value for aggregations
createIndex("test");
ensureGreen("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ class ExpressionAggregationScript implements AggregationScript.LeafFactory {
final SimpleBindings bindings;
final DoubleValuesSource source;
final boolean needsScore;
final ReplaceableConstDoubleValueSource specialValue; // _value
final PerThreadReplaceableConstDoubleValueSource specialValue; // _value

ExpressionAggregationScript(Expression e, SimpleBindings b, boolean n, ReplaceableConstDoubleValueSource v) {
ExpressionAggregationScript(Expression e, SimpleBindings b, boolean n, PerThreadReplaceableConstDoubleValueSource v) {
exprScript = e;
bindings = b;
source = exprScript.getDoubleValuesSource(bindings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,14 @@ private static AggregationScript.LeafFactory newAggregationScript(
// instead of complicating SimpleBindings (which should stay simple)
SimpleBindings bindings = new SimpleBindings();
boolean needsScores = false;
ReplaceableConstDoubleValueSource specialValue = null;
PerThreadReplaceableConstDoubleValueSource specialValue = null;
for (String variable : expr.variables) {
try {
if (variable.equals("_score")) {
bindings.add("_score", DoubleValuesSource.SCORES);
needsScores = true;
} else if (variable.equals("_value")) {
specialValue = new ReplaceableConstDoubleValueSource();
specialValue = new PerThreadReplaceableConstDoubleValueSource();
bindings.add("_value", specialValue);
// noop: _value is special for aggregations, and is handled in ExpressionScriptBindings
// TODO: if some uses it in a scoring expression, they will get a nasty failure when evaluating...need a
Expand Down Expand Up @@ -388,15 +388,15 @@ private static ScoreScript.LeafFactory newScoreScript(Expression expr, SearchLoo
// NOTE: if we need to do anything complicated with bindings in the future, we can just extend Bindings,
// instead of complicating SimpleBindings (which should stay simple)
SimpleBindings bindings = new SimpleBindings();
ReplaceableConstDoubleValueSource specialValue = null;
PerThreadReplaceableConstDoubleValueSource specialValue = null;
boolean needsScores = false;
for (String variable : expr.variables) {
try {
if (variable.equals("_score")) {
bindings.add("_score", DoubleValuesSource.SCORES);
needsScores = true;
} else if (variable.equals("_value")) {
specialValue = new ReplaceableConstDoubleValueSource();
specialValue = new PerThreadReplaceableConstDoubleValueSource();
bindings.add("_value", specialValue);
// noop: _value is special for aggregations, and is handled in ExpressionScriptBindings
// TODO: if some uses it in a scoring expression, they will get a nasty failure when evaluating...need a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,25 @@
import org.apache.lucene.search.IndexSearcher;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double.
* A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double. This is made
* thread-safe for concurrent segment search use case by keeping the {@link DoubleValues} per thread. Any update to the value happens in
* thread specific {@link DoubleValuesSource} instance.
*/
final class ReplaceableConstDoubleValueSource extends DoubleValuesSource {
final ReplaceableConstDoubleValues fv;
final class PerThreadReplaceableConstDoubleValueSource extends DoubleValuesSource {
// Multiple slices can be processed by same thread but that will be sequential, so keeping per thread is fine
final Map<Long, ReplaceableConstDoubleValues> perThreadDoubleValues;

ReplaceableConstDoubleValueSource() {
fv = new ReplaceableConstDoubleValues();
PerThreadReplaceableConstDoubleValueSource() {
perThreadDoubleValues = new ConcurrentHashMap<>();
}

@Override
public DoubleValues getValues(LeafReaderContext ctx, DoubleValues scores) throws IOException {
return fv;
return perThreadDoubleValues.computeIfAbsent(Thread.currentThread().getId(), threadId -> new ReplaceableConstDoubleValues());
}

@Override
Expand All @@ -62,7 +67,11 @@ public boolean needsScores() {

@Override
public Explanation explain(LeafReaderContext ctx, int docId, Explanation scoreExplanation) throws IOException {
if (fv.advanceExact(docId)) return Explanation.match((float) fv.doubleValue(), "ReplaceableConstDoubleValues");
final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent(
Thread.currentThread().getId(),
threadId -> new ReplaceableConstDoubleValues()
);
if (currentFv.advanceExact(docId)) return Explanation.match((float) currentFv.doubleValue(), "ReplaceableConstDoubleValues");
else return Explanation.noMatch("ReplaceableConstDoubleValues");
}

Expand All @@ -77,7 +86,11 @@ public int hashCode() {
}

public void setValue(double v) {
fv.setValue(v);
final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent(
Thread.currentThread().getId(),
threadId -> new ReplaceableConstDoubleValues()
);
currentFv.setValue(v);
}

@Override
Expand Down

0 comments on commit f4b25d6

Please sign in to comment.