-
Notifications
You must be signed in to change notification settings - Fork 81
Introduce Insights API #1610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Introduce Insights API #1610
Conversation
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
4883d42 to
77fcda9
Compare
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
0eba385 to
49018e0
Compare
|
CI failed due to jacoco changes in build.gradle. Not sure how to fix. One naive way is to add correlation request, response, and Action in AD to avoid ml-commons dependency. |
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
src/main/java/org/opensearch/ad/rest/RestInsightsJobAction.java
Outdated
Show resolved
Hide resolved
| builder.startObject(); | ||
|
|
||
| // Task metadata | ||
| builder.field("task_id", "task_" + ADCommonName.INSIGHTS_JOB_NAME + "_" + UUID.randomUUID().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need task id? AD task id is the doc id of state index.
|
|
||
| if (parts.length > 1) { | ||
| String seriesKey = parts[1]; | ||
| seriesKeys.add(seriesKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the entities set redundant with seriesKeys set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't necessarily need it. Just followed the current practice to have this logical run identifier for the insights generation. Maybe it's useful in the future when integrate with Investigation so we can refer to a specific insights run this this id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we need it in the future, we can add it later. Currently we can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Use MAX score if multiple anomalies in same bucket | ||
| double currentScore = bucketScores.getOrDefault(bucketIndex, 0.0); | ||
| double newScore = anomaly.getAnomalyScore(); | ||
| bucketScores.put(bucketIndex, Math.max(currentScore, newScore)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider interval? Our anomalies are interval anomalies. We can put anomalies scores to all of the buckets interleaving current interval [data start, data end]. If you have already done it, can you point me the code? I cannot find it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced offline, no changes needed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't know where your code is. Can you point me the location?
Signed-off-by: Jackie <jkhanjob@gmail.com>
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
src/main/java/org/opensearch/ad/transport/InsightsJobTransportAction.java
Show resolved
Hide resolved
| .sort("generated_at", SortOrder.DESC) | ||
| ); | ||
|
|
||
| client.search(searchRequest, ActionListener.wrap(searchResponse -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to add backend role filtering before search? Please add security tests with backend role filtering on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to do tenant-isolated search, but not necessarily backend role filtering here. For insights generation, it's a background job, so followed existing pattern to use InjectSecurity directly for background work, just impersonate the stored user via InjectSecurity, then execute search directly. For user-facing search APIs like search anomaly result transport action, AD reads the current user from thread context and then adds backend role filtering.
Adding security tests in the next revision
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have three kinds of auth now:
- fgac role
- backend-role filtering
- resource sharing.
We will need to cover all three of them.
| try { | ||
| injectSecurity.inject(user, roles); | ||
|
|
||
| localClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should verify if mapping is changed by customer before writing. If yes, report error/stop job and stop writing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, updated in the new revision
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Show resolved
Hide resolved
Signed-off-by: Jackie <jkhanjob@gmail.com>
3076f15 to
1356920
Compare
Signed-off-by: Jackie <jkhanjob@gmail.com>
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Outdated
Show resolved
Hide resolved
|
|
||
| log.info("Running Insights job for time window: {} to {}", executionStartTime, executionEndTime); | ||
|
|
||
| querySystemResultIndex(jobParameter, lockService, lock, executionStartTime, executionEndTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only querying system result index would hinder your ability to go GA alone. You have to tie insights with Auto AD creation. One route to go to GA is to add a text box in AD overview page. That would add a summary on top of existing detectors' results.
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
| fetchDetectorMetadataAndProceed(allAnomalies, jobParameter, lockService, lock, executionStartTime, executionEndTime); | ||
| } else { | ||
| log.info("No anomalies found in time window, skipping ML correlation"); | ||
| releaseLock(jobParameter, lockService, lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
releaseLock is scattered through InsightsJobProcessor right now is fragile. It’s easy to add a new early-return or error path and forget to release the lock. Also, it is hard for others to maintain.
How about
private void runInsightsJob(Job job, LockService lockService, LockModel lock,
Instant start, Instant end) {
if (lock == null) {
log.warn("Can't run Insights job due to null lock for {}", job.getName());
return;
}
ActionListener<Void> lockReleasing = guardedLockReleasingListener(job, lockService, lock);
// Top-level listener for “anomalies finished”
ActionListener<List<AnomalyResult>> anomaliesListener = ActionListener.wrap(
anomalies -> {
if (anomalies.isEmpty()) {
log.info("No anomalies, skipping ML correlation");
lockReleasing.onResponse(null);
return;
}
fetchDetectorMetadataAndProceed(anomalies, job, start, end, lockReleasing);
},
lockReleasing::onFailure
);
querySystemResultIndex(job, start, end, anomaliesListener);
}
private ActionListener<Void> guardedLockReleasingListener(Job job, LockService lockService, LockModel lock) {
AtomicBoolean done = new AtomicBoolean(false);
return ActionListener.wrap(
r -> {
if (done.compareAndSet(false, true)) {
releaseLock(job, lockService, lock);
} else {
log.warn("Lock already released for Insights job {}", job.getName());
}
},
e -> {
if (done.compareAndSet(false, true)) {
log.error("Insights job failed", e);
releaseLock(job, lockService, lock);
} else {
log.warn("Lock already released for Insights job {} (got extra failure)", job.getName(), e);
}
}
);
}
Now:
querySystemResultIndex(...) only gets ActionListener<List> listener. fetchPagedAnomalies(...) only gets ActionListener<List> listener.
They know nothing about locks, nothing about completion – they just follow the normal “always call your listener” rule.
This would reduce the “special” callsites to:
runInsightsJob (creates lockReleasing).
A few terminal branches in the top-level logic (no anomalies, ML disabled, write succeeded/failed, etc.).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored, now it has a single guarded lock releasing listener and lock-free sub processes
Signed-off-by: Jackie <jkhanjob@gmail.com>
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished one round of review.
| * @param listener Action listener for the response | ||
| */ | ||
| public void startInsightsJob(String frequency, ActionListener<InsightsJobResponse> listener) { | ||
| logger.info("Starting insights job with frequency: {}", frequency); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be safe, you will need to verify if users have search result permission by using their credentials run search api and check if there is any security exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean we should check if users have search insights result index permission? _results api is checking it, _start is just about starting the job, no search insights result involved.
| // entity level task to track just one specific entity's state, init progress, error etc. | ||
| HISTORICAL_HC_ENTITY; | ||
| HISTORICAL_HC_ENTITY, | ||
| INSIGHTS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you use the new task type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added it while implementing but ended up not using it, removed
src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java
Show resolved
Hide resolved
| } | ||
|
|
||
| public void testInsightsApisUseSystemContextForJobIndex() throws IOException { | ||
| // Use a non-admin user with AD access (alice) to exercise Insights APIs end-to-end under security |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also use non-admin user (e.g., bobUser) to check if things would fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in the new revision
| Response stopResp = TestHelpers.makeRequest(aliceClient, "POST", stopPath, ImmutableMap.of(), "", null); | ||
| assertEquals("Stop insights job failed", RestStatus.OK, TestHelpers.restStatus(stopResp)); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have alice create detectors, check if results are generated, start insights, check if any insights can be generated after a few minutes; query insights using normal user; then stop insights?
| if (bucketIndex >= 0) { | ||
| // Use MAX score if multiple anomalies in same bucket | ||
| double currentScore = bucketScores.getOrDefault(bucketIndex, 0.0); | ||
| double newScore = anomaly.getAnomalyScore(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getAnomalyScore() can be null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when querying anomaly results, I'm only querying those with anomaly_grade > 0, so at here anomalyScore won't be null
|
|
||
| if (parts.length > 1) { | ||
| String seriesKey = parts[1]; | ||
| seriesKeys.add(seriesKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we need it in the future, we can add it later. Currently we can remove it.
| // Use MAX score if multiple anomalies in same bucket | ||
| double currentScore = bucketScores.getOrDefault(bucketIndex, 0.0); | ||
| double newScore = anomaly.getAnomalyScore(); | ||
| bucketScores.put(bucketIndex, Math.max(currentScore, newScore)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't know where your code is. Can you point me the location?
| .sort("generated_at", SortOrder.DESC) | ||
| ); | ||
|
|
||
| client.search(searchRequest, ActionListener.wrap(searchResponse -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have three kinds of auth now:
- fgac role
- backend-role filtering
- resource sharing.
We will need to cover all three of them.
|
Any idea why "Build and Test anomaly detection" CI are skipped (saw it happens yesterday and today)? |

Description
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
--signoff.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.