Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.expression.function.FunctionProperties;

Expand All @@ -39,6 +40,10 @@ public class CalcitePlanContext {
/** This thread local variable is only used to skip script encoding in script pushdown. */
public static final ThreadLocal<Boolean> skipEncoding = ThreadLocal.withInitial(() -> false);

/** Thread-local switch that tells whether the current query prefers legacy behavior. */
private static final ThreadLocal<Boolean> legacyPreferredFlag =
ThreadLocal.withInitial(() -> true);

@Getter @Setter private boolean isResolvingJoinCondition = false;
@Getter @Setter private boolean isResolvingSubquery = false;
@Getter @Setter private boolean inCoalesceFunction = false;
Expand Down Expand Up @@ -105,6 +110,27 @@ public static CalcitePlanContext create(
return new CalcitePlanContext(config, querySizeLimit, queryType);
}

/**
* Executes {@code action} with the thread-local legacy flag set according to the supplied
* settings.
*/
public static void run(Runnable action, Settings settings) {
Boolean preferred = settings.getSettingValue(Settings.Key.PPL_SYNTAX_LEGACY_PREFERRED);
legacyPreferredFlag.set(preferred);
try {
action.run();
} finally {
legacyPreferredFlag.remove();
}
}

/**
* @return {@code true} when the current planning prefer legacy behavior.
*/
public static boolean isLegacyPreferred() {
return legacyPreferredFlag.get();
}

public void putRexLambdaRefMap(Map<String, RexLambdaRef> candidateMap) {
this.rexLambdaRefMap.putAll(candidateMap);
}
Expand Down
122 changes: 67 additions & 55 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,68 +90,80 @@ public void executeWithCalcite(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
RelNode relNode = analyze(plan, context);
RelNode optimized = optimize(relNode, context);
RelNode calcitePlan = convertToCalcitePlan(optimized);
executionEngine.execute(calcitePlan, context, listener);
return null;
});
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
log.warn("Fallback to V2 query engine since got exception", t);
executeWithLegacy(plan, queryType, listener, Optional.of(t));
} else {
if (t instanceof Exception) {
listener.onFailure((Exception) t);
} else if (t instanceof VirtualMachineError) {
// throw and fast fail the VM errors such as OOM (same with v2).
throw t;
} else {
// Calcite may throw AssertError during query execution.
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
}
}
}
CalcitePlanContext.run(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap with CalcitePlanContext.run, No code logic change.

() -> {
try {
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
RelNode relNode = analyze(plan, context);
RelNode optimized = optimize(relNode, context);
RelNode calcitePlan = convertToCalcitePlan(optimized);
executionEngine.execute(calcitePlan, context, listener);
return null;
});
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
log.warn("Fallback to V2 query engine since got exception", t);
executeWithLegacy(plan, queryType, listener, Optional.of(t));
} else {
if (t instanceof Exception) {
listener.onFailure((Exception) t);
} else if (t instanceof VirtualMachineError) {
// throw and fast fail the VM errors such as OOM (same with v2).
throw t;
} else {
// Calcite may throw AssertError during query execution.
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
}
}
}
},
settings);
}

public void explainWithCalcite(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.ExplainResponse> listener,
Explain.ExplainFormat format) {
try {
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
RelNode relNode = analyze(plan, context);
RelNode optimized = optimize(relNode, context);
RelNode calcitePlan = convertToCalcitePlan(optimized);
executionEngine.explain(calcitePlan, format, context, listener);
return null;
});
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t)) {
log.warn("Fallback to V2 query engine since got exception", t);
explainWithLegacy(plan, queryType, listener, format, Optional.of(t));
} else {
if (t instanceof Error) {
// Calcite may throw AssertError during query execution.
listener.onFailure(new CalciteUnsupportedException(t.getMessage()));
} else {
listener.onFailure((Exception) t);
}
}
}
CalcitePlanContext.run(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap with CalcitePlanContext.run, No code logic change.

() -> {
try {
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
context.run(
() -> {
RelNode relNode = analyze(plan, context);
RelNode optimized = optimize(relNode, context);
RelNode calcitePlan = convertToCalcitePlan(optimized);
executionEngine.explain(calcitePlan, format, context, listener);
},
settings);
return null;
});
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t)) {
log.warn("Fallback to V2 query engine since got exception", t);
explainWithLegacy(plan, queryType, listener, format, Optional.of(t));
} else {
if (t instanceof Error) {
// Calcite may throw AssertError during query execution.
listener.onFailure(new CalciteUnsupportedException(t.getMessage()));
} else {
listener.onFailure((Exception) t);
}
}
}
},
settings);
}

public void executeWithLegacy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,20 @@ protected void registerOperator(
typeChecker);
}

protected void registerDivideFunction(BuiltinFunctionName functionName) {
register(
functionName,
(FunctionImp2)
(builder, left, right) -> {
SqlOperator operator =
CalcitePlanContext.isLegacyPreferred()
? PPLBuiltinOperators.DIVIDE
: SqlLibraryOperators.SAFE_DIVIDE;
return builder.makeCall(operator, left, right);
},
PPLTypeChecker.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC));
}

void populate() {
// register operators for comparison
registerOperator(NOTEQUAL, PPLBuiltinOperators.NOT_EQUALS_IP, SqlStdOperatorTable.NOT_EQUALS);
Expand Down Expand Up @@ -733,8 +747,8 @@ void populate() {
registerOperator(MODULUS, PPLBuiltinOperators.MOD);
registerOperator(MODULUSFUNCTION, PPLBuiltinOperators.MOD);
registerOperator(CRC32, PPLBuiltinOperators.CRC32);
registerOperator(DIVIDE, PPLBuiltinOperators.DIVIDE);
registerOperator(DIVIDEFUNCTION, PPLBuiltinOperators.DIVIDE);
registerDivideFunction(DIVIDE);
registerDivideFunction(DIVIDEFUNCTION);
registerOperator(SHA2, PPLBuiltinOperators.SHA2);
registerOperator(CIDRMATCH, PPLBuiltinOperators.CIDRMATCH);
registerOperator(INTERNAL_GROK, PPLBuiltinOperators.GROK);
Expand Down
19 changes: 18 additions & 1 deletion docs/user/ppl/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ This configuration is introduced since 3.3.0 which is used to switch some behavi
The behaviours it controlled includes:

- The default value of argument ``bucket_nullable`` in ``stats`` command. Check `stats command <../cmd/stats.rst>`_ for details.
- The return value of ``divide`` and ``/`` operator. Check `expressions <../functions/expressions.rst>`_ for details.

Example
Example 1
-------

You can update the setting with a new value like this.
Expand All @@ -227,6 +228,22 @@ PPL query::
}
}

Example 2
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be removed. #4441

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #4449

---------

Reset to default (true) by setting to null:

PPL query::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient" : {"plugins.ppl.syntax.legacy.preferred" : null}}'
{
"acknowledged": true,
"persistent": {},
"transient": {}
}

plugins.ppl.values.max.limit
============================

Expand Down
6 changes: 4 additions & 2 deletions docs/user/ppl/functions/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ Arithmetic expression is an expression formed by numeric literals and binary ari
1. ``+``: Add.
2. ``-``: Subtract.
3. ``*``: Multiply.
4. ``/``: Divide. For integers, the result is an integer with fractional part discarded. Returns NULL when dividing by zero.
4. ``/``: Divide. Integer operands follow the legacy truncating result when
`plugins.ppl.syntax.legacy.preferred <../admin/settings.rst>`_ is ``true`` (default). When the
setting is ``false`` the operands are promoted to floating point, preserving
the fractional part. Division by zero still returns ``NULL``.
5. ``%``: Modulo. This can be used with integers only with remainder of the division as result.

Precedence
Expand Down Expand Up @@ -172,4 +175,3 @@ NOT operator ::
| 36 |
| 28 |
+-----+

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: true
plugins.ppl.syntax.legacy.preferred: true
- do:
indices.create:
index: test_divide_settings
body:
settings:
number_of_shards: 1
number_of_replicas: 0
- do:
bulk:
index: test_divide_settings
refresh: true
body:
- '{"index": {}}'
- '{"id": 1}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: false
plugins.ppl.syntax.legacy.preferred: true
- do:
indices.delete:
index: test_divide_settings

---
"legacy division retains integer truncation":
- skip:
features:
- headers
- allowed_warnings
- do:
allowed_warnings: []
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_divide_settings | eval a=4/2 | eval b=2/4 | eval c=2/40 | fields a,b,c
- match: { total: 1 }
- match: { datarows: [[2,0,0]] }

---
"non-legacy division returns floating values":
- skip:
features:
- headers
- allowed_warnings
- do:
query.settings:
body:
transient:
plugins.ppl.syntax.legacy.preferred: false
- do:
allowed_warnings: []
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_divide_settings | eval a=4/2 | eval b=2/4 | eval c=2/40 | fields a,b,c
- match: { total: 1 }
- match: { datarows: [[2.0,0.5,0.05]] }
Loading