Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES|QL CCS uses skip_unavailable setting for handling disconnected remote clusters #115266

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ static TransportVersion def(int id) {
public static final TransportVersion REVERT_REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(8_774_00_0);
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_PARENT_SIMPLIFIED = def(8_775_00_0);
public static final TransportVersion INFERENCE_DONT_PERSIST_ON_READ = def(8_776_00_0);
public static final TransportVersion ESQL_CCS_EXEC_INFO_WITH_FAILURES = def(8_777_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
Expand Down Expand Up @@ -246,7 +247,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() {

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("no_such_index"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
// TODO: a follow on PR will change this to throw an Exception when the local cluster requests a concrete index that is missing
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertThat(localCluster.getTotalShards(), equalTo(0));
Expand Down Expand Up @@ -803,6 +805,14 @@ Map<String, Object> setupTwoClusters() {
clusterInfo.put("local.index", localIndex);
clusterInfo.put("remote.num_shards", numShardsRemote);
clusterInfo.put("remote.index", remoteIndex);

String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
.getClusterSettings()
.get(skipUnavailableSetting);
clusterInfo.put("remote.skip_unavailable", skipUnavailable);

return clusterInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -281,6 +282,7 @@ public static class Cluster implements ToXContentFragment, Writeable {
private final Integer successfulShards;
private final Integer skippedShards;
private final Integer failedShards;
private final List<ShardSearchFailure> failures;
private final TimeValue took; // search latency for this cluster sub-search

/**
Expand All @@ -300,7 +302,7 @@ public String toString() {
}

public Cluster(String clusterAlias, String indexExpression) {
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null);
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null, null);
}

/**
Expand All @@ -312,7 +314,7 @@ public Cluster(String clusterAlias, String indexExpression) {
* @param skipUnavailable whether this Cluster is marked as skip_unavailable in remote cluster settings
*/
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable) {
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null);
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null);
}

/**
Expand All @@ -324,7 +326,7 @@ public Cluster(String clusterAlias, String indexExpression, boolean skipUnavaila
* @param status current status of the search on this Cluster
*/
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable, Cluster.Status status) {
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null);
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null, null);
}

public Cluster(
Expand All @@ -336,6 +338,7 @@ public Cluster(
Integer successfulShards,
Integer skippedShards,
Integer failedShards,
List<ShardSearchFailure> failures,
TimeValue took
) {
assert clusterAlias != null : "clusterAlias cannot be null";
Expand All @@ -349,6 +352,11 @@ public Cluster(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.failedShards = failedShards;
if (failures == null) {
this.failures = Collections.emptyList();
} else {
this.failures = failures;
}
this.took = took;
}

Expand All @@ -362,6 +370,11 @@ public Cluster(StreamInput in) throws IOException {
this.failedShards = in.readOptionalInt();
this.took = in.readOptionalTimeValue();
this.skipUnavailable = in.readBoolean();
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure));
} else {
this.failures = Collections.emptyList(); // MP TODO: should this be emptyList or null?
}
}

@Override
Expand All @@ -375,6 +388,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalInt(failedShards);
out.writeOptionalTimeValue(took);
out.writeBoolean(skipUnavailable);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
out.writeCollection(failures);
}
}

/**
Expand All @@ -387,12 +403,12 @@ public void writeTo(StreamOutput out) throws IOException {
* All other fields can be set and override the value in the "copyFrom" Cluster.
*/
public static class Builder {
private String indexExpression;
private Cluster.Status status;
private Integer totalShards;
private Integer successfulShards;
private Integer skippedShards;
private Integer failedShards;
private List<ShardSearchFailure> failures;
private TimeValue took;
private final Cluster original;

Expand All @@ -408,22 +424,18 @@ public Builder(Cluster copyFrom) {
public Cluster build() {
return new Cluster(
original.getClusterAlias(),
indexExpression == null ? original.getIndexExpression() : indexExpression,
original.getIndexExpression(),
original.isSkipUnavailable(),
status != null ? status : original.getStatus(),
totalShards != null ? totalShards : original.getTotalShards(),
successfulShards != null ? successfulShards : original.getSuccessfulShards(),
skippedShards != null ? skippedShards : original.getSkippedShards(),
failedShards != null ? failedShards : original.getFailedShards(),
failures != null ? failures : original.getFailures(),
took != null ? took : original.getTook()
);
}

public Cluster.Builder setIndexExpression(String indexExpression) {
this.indexExpression = indexExpression;
return this;
}

public Cluster.Builder setStatus(Cluster.Status status) {
this.status = status;
return this;
Expand All @@ -449,6 +461,11 @@ public Cluster.Builder setFailedShards(int failedShards) {
return this;
}

public Cluster.Builder setFailures(List<ShardSearchFailure> failures) {
this.failures = failures;
return this;
}

public Cluster.Builder setTook(TimeValue took) {
this.took = took;
return this;
Expand All @@ -466,7 +483,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString());
builder.field(INDICES_FIELD.getPreferredName(), indexExpression);
if (took != null) {
// TODO: change this to took_nanos and call took.nanos?
builder.field(TOOK.getPreferredName(), took.millis());
}
if (totalShards != null) {
Expand All @@ -483,6 +499,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
if (failures != null && failures.size() > 0) {
builder.startArray(RestActions.FAILURES_FIELD.getPreferredName());
for (ShardSearchFailure failure : failures) {
failure.toXContent(builder, params);
}
builder.endArray();
}
}
builder.endObject();
return builder;
Expand Down Expand Up @@ -529,6 +552,10 @@ public Integer getFailedShards() {
return failedShards;
}

public List<ShardSearchFailure> getFailures() {
return failures;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ private void lookupPolicies(
if (remotePolicies.isEmpty() == false) {
for (String cluster : remoteClusters) {
ActionListener<LookupResponse> lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp));
// MP TODO: need to track how errors are handled here
getRemoteConnection(
cluster,
lookupListener.delegateFailureAndWrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,28 @@
*/
package org.elasticsearch.xpack.esql.index;

import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.core.Nullable;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public final class IndexResolution {

public static IndexResolution valid(EsIndex index, Set<String> unavailableClusters) {
public static IndexResolution valid(EsIndex index, Map<String, FieldCapabilitiesFailure> unavailableClusters) {
Objects.requireNonNull(index, "index must not be null if it was found");
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
return new IndexResolution(index, null, unavailableClusters);
}

public static IndexResolution valid(EsIndex index) {
return valid(index, Collections.emptySet());
return valid(index, Collections.emptyMap());
}

public static IndexResolution invalid(String invalid) {
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
return new IndexResolution(null, invalid, Collections.emptySet());
return new IndexResolution(null, invalid, Collections.emptyMap());
}

public static IndexResolution notFound(String name) {
Expand All @@ -39,9 +40,9 @@ public static IndexResolution notFound(String name) {
private final String invalid;

// remote clusters included in the user's index expression that could not be connected to
private final Set<String> unavailableClusters;
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;

private IndexResolution(EsIndex index, @Nullable String invalid, Set<String> unavailableClusters) {
private IndexResolution(EsIndex index, @Nullable String invalid, Map<String, FieldCapabilitiesFailure> unavailableClusters) {
this.index = index;
this.invalid = invalid;
this.unavailableClusters = unavailableClusters;
Expand Down Expand Up @@ -70,7 +71,11 @@ public boolean isValid() {
return invalid == null;
}

public Set<String> getUnavailableClusters() {
/**
* @return Map of unavailable clusters (could not be connected to during field-caps query). Key of map is cluster alias,
* value is the {@link FieldCapabilitiesFailure} describing the issue.
*/
public Map<String, FieldCapabilitiesFailure> getUnavailableClusters() {
return unavailableClusters;
}

Expand Down
Loading