-
Notifications
You must be signed in to change notification settings - Fork 25.3k
QL: retry SQL and EQL requests in a mixed-node (rolling upgrade) cluster #68602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
QL: retry SQL and EQL requests in a mixed-node (rolling upgrade) cluster #68602
Conversation
Add mixed-node tests to SQL and EQL
Pinging @elastic/es-ql (Team:QL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a comment regarding a base class between the two redirect listeners - looks good otherwise.
Thanks for extensive tests!
@@ -0,0 +1,66 @@ | |||
apply plugin: 'elasticsearch.testclusters' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This must have been fun...
} | ||
} | ||
|
||
private List<String> getSequencesBulkEntries() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about externalizing this and reading each entry line by line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
@@ -21,6 +22,7 @@ | |||
* the resulting ES document as a field. | |||
*/ | |||
public class QlSourceBuilder { | |||
public static final Version FIELDS_API_INTRODUCTION_VERSION = Version.V_7_10_0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is not very clear - how about USE_FIELD_API_VERSION
or FIELD_API_USAGE_VERSION
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is not very clear
It'd be great if we could "standardise" on a format, given that these introducing version constants will only get more (nanos, fields api, unsigned long, arrays, plus to come).
wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), e -> { | ||
// the search request will likely run on nodes with different versions of ES | ||
// we will retry on a node with an older version that should generate a backwards compatible _search request | ||
if (e instanceof SearchPhaseExecutionException | ||
&& ((SearchPhaseExecutionException) e).getCause() instanceof VersionMismatchException) { | ||
|
||
SearchPhaseExecutionException spee = (SearchPhaseExecutionException) e; | ||
if (log.isTraceEnabled()) { | ||
log.trace("Caught exception type [{}] with cause [{}].", e.getClass().getName(), e.getCause()); | ||
} | ||
DiscoveryNode localNode = clusterService.state().nodes().getLocalNode(); | ||
DiscoveryNode candidateNode = null; | ||
for (DiscoveryNode node : clusterService.state().nodes()) { | ||
// find the first node that's older than the current node | ||
if (node != localNode && node.getVersion().before(localNode.getVersion())) { | ||
candidateNode = node; | ||
break; | ||
} | ||
} | ||
if (candidateNode != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class and the one in EQL can be simplified by moving the common code into a base class in QL.
The exception check plus DiscoveryNode
selection, plus logging and retry are the same.
The only differences that I can see are calling transportService and the planExecutor call which can be passed as Runnable
or Function
in case a property needs to passed in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved the common code to a separate method in QL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. Only left some small comments and a question on the retrying logic.
@@ -149,12 +150,13 @@ public static SearchRequest prepareRequest(Client client, | |||
SearchSourceBuilder source, | |||
boolean includeFrozen, | |||
String... indices) { | |||
return client.prepareSearch(indices) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client
function argument can now be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely spot. Removed.
listener::onFailure)); | ||
Holder<Boolean> retrySecondTime = new Holder<Boolean>(false); | ||
planExecutor.eql(cfg, request.query(), params, wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), e -> { | ||
// the search request will likely run on nodes with different versions of ES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this actually true? it is possible, but "likely"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I put "likely" there because the actual search might run on a sub-set of shards than the one at the start of the search process. There is an initial phase in the search process that's called canMatch
that is really quick and can evaluate if a request on a shard should actually be executed on the shard or not, based on a timestamp range
query. If I am not mistaken, this canMatch
phase is also part of this different versions mismatch.
The canMatch
can quickly "discard" some shards and the actual request truly runs on a sub-set of them. If you want to read more about this, I think this is the initial PR adding this - #25658. And some other good resources here: https://stackoverflow.com/a/64693782/3498062
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I think what threw me off is the future tense, in a failure handler :-).
I guess "the search request likely ran on nodes with different versions of ES" might be clearer. Anyways, not super relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
// find the first node that's older than the current node | ||
if (Objects.equals(node, localNode) == false && node.getVersion().before(localNode.getVersion())) { | ||
candidateNode = node; | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess normally the difference, should be the same for all nodes (if not null), but would it not make sense to get the oldest, to potentially prevent another redirection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a longer discussion about the improvements here... I would keep the algorithm as is for the moment and, maybe, improve in the next versions if we hit bumps along the way. Not sure we support a multiple versions (more than two) rolling upgrades. Our documentation say upgrade one by one until all are upgraded, but this assumes upgrading from version X to Y, there is no Z in the story.
})); | ||
if (retrySecondTime.get()) { | ||
if (log.isTraceEnabled()) { | ||
log.trace("No candidate node found, likely all were upgraded in the meantime. Re-trying the original request."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was wondering about the trace logging choice here (and above) vs. for instance debug.
( Pico-nit: the repetitive part of the message could maybe simply be extracted as a comment, as a stand-alone message - i.e. not knowing what's going on; and supposedly this being the reason for adding the explanation - it is not quite clear what the "candidate" is. )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I based my decision for trace
on the likeliness of this logging actually being used - hopefully very rarely to none. Willing to change it for a good reason.
Regarding the tracing messages I think the first one (where retrySecondTime
is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to debug
.
listener.onFailure(e); | ||
} | ||
})); | ||
if (retrySecondTime.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are a higher number of nodes in the cluster, the "old" ones are the most likely to be take offline. So I guess this means there's a higher chance the request will fail when sent to one "old" node (if there's a race between querying for the versions and sending the request).
Would it make sense to reattempt transport-redirecting, for as long as there are old nodes in the cluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the moment, I'd like to keep it as is and try to improve the algorithm in a future PR. There are, for sure, things that can be improved.
|
||
final List<String> bulkEntries = getSequencesBulkEntries(); | ||
StringBuilder builder = new StringBuilder(); | ||
for (int i = 1; i < 16; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bulkEntries.size()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored this to read the bulk entries from a file as per @costin's suggestion.
@@ -21,6 +22,7 @@ | |||
* the resulting ES document as a field. | |||
*/ | |||
public class QlSourceBuilder { | |||
public static final Version FIELDS_API_INTRODUCTION_VERSION = Version.V_7_10_0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is not very clear
It'd be great if we could "standardise" on a format, given that these introducing version constants will only get more (nanos, fields api, unsigned long, arrays, plus to come).
@elasticmachine run elasticsearch-ci/default-distro |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Nice stuff!
Left some very minor comments.
TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().query("foo where blah"), "", "", "node_id", | ||
new ActionListener<>() { | ||
TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().query("foo where blah"), "", | ||
mock(TransportService.class), mockClusterService, new ActionListener<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: since mock(TransportService.class)
is reused, you could also assign it to a var.
// we will retry on a node with an older version that should generate a backwards compatible _search request | ||
if (e instanceof SearchPhaseExecutionException | ||
&& ((SearchPhaseExecutionException) e).getCause() instanceof VersionMismatchException) { | ||
if (log.isTraceEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not familiar with the strategy here, just double checking if it maybe should be debug instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to debug
.
@@ -0,0 +1,59 @@ | |||
"properties": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: why are these entries indented like this? (many whitespaces from the beginning of the line)
@@ -0,0 +1,35 @@ | |||
"properties": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, why the leading whitespaces?
…stic/elasticsearch into eql_sql_request_retry
@@ -21,6 +22,7 @@ | |||
* the resulting ES document as a field. | |||
*/ | |||
public class QlSourceBuilder { | |||
public static final Version FIELDS_API_USAGE_VERSION = Version.V_7_10_0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally find this naming less evocative ("is the fields api only used in that version?"), but not a biggie.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upside is that these are internal constants that we can rename in the future if we find better names.
Any suggestions for better naming?
FIELDS_API_MIGRATION_VERSION
, MIGRATE_TO_FIELD_API_VERSION
, SWITCH_OVER/TO_FIELDS_API_VERSION
?
listener::onFailure)); | ||
Holder<Boolean> retrySecondTime = new Holder<Boolean>(false); | ||
planExecutor.eql(cfg, request.query(), params, wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), e -> { | ||
// the search request will likely run on nodes with different versions of ES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I think what threw me off is the future tense, in a failure handler :-).
I guess "the search request likely ran on nodes with different versions of ES" might be clearer. Anyways, not super relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -0,0 +1,30 @@ | |||
{"index":{"_id":1}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
executeRequestWithRetryAttempt(clusterService, listener::onFailure, | ||
onFailure -> planExecutor.eql(cfg, request.query(), params, | ||
wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), onFailure)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -21,6 +22,7 @@ | |||
* the resulting ES document as a field. | |||
*/ | |||
public class QlSourceBuilder { | |||
public static final Version FIELDS_API_USAGE_VERSION = Version.V_7_10_0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upside is that these are internal constants that we can rename in the future if we find better names.
Any suggestions for better naming?
FIELDS_API_MIGRATION_VERSION
, MIGRATE_TO_FIELD_API_VERSION
, SWITCH_OVER/TO_FIELDS_API_VERSION
?
true.
I like this one myself. But yes, can be done later, it's hair splitting. |
…stic/elasticsearch into eql_sql_request_retry
* Integrate "fields" API into QL (elastic#68467) * QL: retry SQL and EQL requests in a mixed-node (rolling upgrade) cluster (elastic#68602) * Adapt nested fields extraction from "fields" API output to the new un-flattened structure (elastic#68745) (cherry picked from commit ee5cc54)
These changes make use of previous work added with PR #65896 (adds minimum compatibility version to search requests in ES) by using a minimum compatibility version when creating a search request against ES and re-trying the request if the search proves to be executed on at least one incompatible shard.
The retrial happens on a node that has an older version, the original request (SQL/EQL request) being sent through transport layer.
The node receiving the retried request will re-parse it and create another query DSL to be sent to ES.
As it is at the moment when this PR was created, the introduction of "fields" API in QL is such a change that needs this feature.
Testing happens in two new qa similar projects, one for SQL and one for EQL.