-
Notifications
You must be signed in to change notification settings - Fork 81
Support wildcard cluster names in anomaly detection data source indices #1612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Support wildcard cluster names in anomaly detection data source indices #1612
Conversation
2788eb3 to
4aa1195
Compare
… detection jobs - Updated time field and category field validation methods to query all remote indices when wildcard specified for cluster name - Added @Inject annotation for TransportService in multiple action handlers Signed-off-by: Tim Baker <baker.timothy96@gmail.com>
4aa1195 to
34445ef
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1612 +/- ##
============================================
- Coverage 81.31% 81.07% -0.25%
+ Complexity 6144 6139 -5
============================================
Files 542 542
Lines 24998 25065 +67
Branches 2543 2557 +14
============================================
- Hits 20328 20321 -7
- Misses 3394 3461 +67
- Partials 1276 1283 +7
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
I will get a changelog item and tests added 👌 |
|
|
||
| protected void validateTimeField(boolean indexingDryRun, ActionListener<T> listener) { | ||
| protected void validateTimeFieldInAllClusters(boolean indexingDryRun, ActionListener<T> listener) { | ||
| List<String> wildcardClusterIndices = config.getIndices().stream().filter(idx -> idx.startsWith("*:")).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's duplicate code in validateTimeFieldInAllClusters and validateCategoricalFieldsInAllClusters, I suggest we extract to a method to handle some of this validation, if you want to have different validation issue types you can pass that in as variables.
| for (String remote : remoteClusters) { | ||
| List<String> remoteIndices = wildcardClusterIndices | ||
| .stream() | ||
| .map(idx -> idx.substring(2)) // remove "*:" prefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
here if a user passes this to indices:
"indices": [ "*:" ],
then we have empty indices being passed, we can probably filter or validate earlier here so error message makes more sense then: Exceptions: [Fail to get the index mapping of []
| import org.opensearch.timeseries.util.RestHandlerUtils; | ||
| import org.opensearch.timeseries.util.SecurityClientUtil; | ||
| import org.opensearch.transport.TransportService; | ||
| import org.opensearch.transport.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expand wildcard imports here, we try to have all imports be specific (for example:
import org.opensearch.timeseries.util.ParseUtils;
import org.opensearch.timeseries.util.RestHandlerUtils;
import org.opensearch.timeseries.util.SecurityClientUtil;
| listener | ||
| .onFailure( | ||
| new ValidationException( | ||
| "No indices specified for time field validation.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently i know that we are inside time field validation but for this error specifically its more so that no indices are specific for the given configuration, this can be put as a ValidationIssueType.INDICES as the error here isn't necessarily for time field. Same comment can be made on similar code in checking of categorical field. if we move the code to a different method we can relook at the error messages we have.
|
Thank you for making this change! |
| if (!nonWildcardClusterIndices.isEmpty()) { | ||
| HashMap<String, List<String>> nonWildcardIndicesMap = CrossClusterConfigUtils | ||
| .separateClusterIndexes(nonWildcardClusterIndices, clusterService); | ||
| clusterIndicesMap.putAll(nonWildcardIndicesMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When merging wildcard and non-wildcard cluster indices, the putAll() call overwrites entries from the wildcard loop if both target the same cluster. For example, if a user specifies both *:index1 and remote1:index2, only index2 will be validated for remote1. The indices should be combined, not replaced. This affects both time field and categorical field validation.
| import org.opensearch.timeseries.model.Config; | ||
| import org.opensearch.timeseries.transport.ValidateConfigResponse; | ||
| import org.opensearch.timeseries.util.SecurityClientUtil; | ||
| import org.opensearch.transport.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use wildcard import
| import org.opensearch.timeseries.model.Config; | ||
| import org.opensearch.timeseries.transport.ValidateConfigResponse; | ||
| import org.opensearch.timeseries.util.SecurityClientUtil; | ||
| import org.opensearch.transport.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use wildcard import
| // For each remote cluster, add all indices matching the wildcard pattern | ||
| for (String remote : remoteClusters) { | ||
| List<String> remoteIndices = wildcardClusterIndices | ||
| .stream() | ||
| .map(idx -> idx.substring(2)) // remove "*:" prefix | ||
| .toList(); | ||
|
|
||
| if (!remoteIndices.isEmpty()) { | ||
| clusterIndicesMap.put(remote, remoteIndices); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the wildcardClusterIndices list doesn’t depend on the loop variable (remote), the same list of stripped index names is recomputed on every iteration. How about:
List remoteIndices = wildcardClusterIndices.stream()
.map(idx -> idx.substring(2))
.toList();
for (String remote : remoteClusters) {
clusterIndicesMap.put(remote, remoteIndices);
}
| RemoteClusterService remoteClusterService = transportService.getRemoteClusterService(); | ||
| Set<String> remoteClusters = remoteClusterService.getRegisteredRemoteClusterNames(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we wrap the call in a try/catch where we throw listener.onFailure in case of exception?
| import org.opensearch.action.support.HandledTransportAction; | ||
| import org.opensearch.cluster.service.ClusterService; | ||
| import org.opensearch.common.CheckedConsumer; | ||
| import org.opensearch.common.inject.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove since you don't need Inject
| private final Clock clock; | ||
| private final Class<ConfigType> configTypeClass; | ||
|
|
||
| @Inject |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
| import org.opensearch.action.support.ActionFilters; | ||
| import org.opensearch.action.support.HandledTransportAction; | ||
| import org.opensearch.cluster.service.ClusterService; | ||
| import org.opensearch.common.inject.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
|
|
||
| protected boolean runOnce; | ||
|
|
||
| @Inject |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
| import org.opensearch.cluster.node.DiscoveryNode; | ||
| import org.opensearch.cluster.node.DiscoveryNodes; | ||
| import org.opensearch.cluster.service.ClusterService; | ||
| import org.opensearch.common.inject.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
Description
Updates to the validation of time field and categorical field to query all remote clusters when a wildcard cluster prefix is specified.
Related Issues
This is related to the proposal opened here #1606
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.