Skip to content

Conversation

@jackiehanyang
Copy link
Collaborator

Description

  • Introducing a new Insights API
    • POST /_plugins/_anomaly_detection/insights/_start - Start insights job
    • GET /_plugins/_anomaly_detection/insights/_status - Get insights job status
    • GET /_plugins/_anomaly_detection/insights/_results - Get latest insights results
    • POST /_plugins/_anomaly_detection/insights/_stop - Stop insights job
  • Introducing ml-commons metrics correlation runtime dependency
    • sending anomaly results to ml-commons metrics correlation algorithm to analyze
    • write analyze results into insights-results index
    • frontend will read from this index to display insights on dashboard

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
@kaituo
Copy link
Collaborator

kaituo commented Nov 13, 2025

CI failed due to jacoco changes in build.gradle. Not sure how to fix. One naive way is to add correlation request, response, and Action in AD to avoid ml-commons dependency.

* What went wrong:
Execution failed for task ':jacocoTestCoverageVerification'.
> A failure occurred while executing org.gradle.internal.jacoco.JacocoCoverageAction
   > Rule violated for class org.opensearch.ad.AnomalyDetectorRunner: branches covered ratio is 0.35, but expected minimum is 0.60
     Rule violated for class org.opensearch.ad.AnomalyDetectorRunner: lines covered ratio is 0.47, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.util.ModelUtil: branches covered ratio is 0.32, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.util.ModelUtil: lines covered ratio is 0.48, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.util.DataUtil: lines covered ratio is 0.72, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.feature.AbstractRetriever: branches covered ratio is 0.55, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.feature.AbstractRetriever: lines covered ratio is 0.63, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.feature.SearchFeatureDao: branches covered ratio is 0.28, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.feature.SearchFeatureDao: lines covered ratio is 0.59, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.rest.handler.ModelValidationActionHandler: branches covered ratio is 0.00, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.rest.handler.ModelValidationActionHandler: lines covered ratio is 0.00, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.rest.handler.ConfigUpdateConfirmer: branches covered ratio is 0.06, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.rest.handler.ConfigUpdateConfirmer: lines covered ratio is 0.19, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.rest.handler.AggregationPrep: branches covered ratio is 0.36, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.rest.handler.AggregationPrep: lines covered ratio is 0.40, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.rest.handler.IntervalCalculation: branches covered ratio is 0.06, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.rest.handler.IntervalCalculation: lines covered ratio is 0.18, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.rest.handler.LatestTimeRetriever: branches covered ratio is 0.00, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.rest.handler.LatestTimeRetriever: lines covered ratio is 0.00, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.rest.handler.IntervalCalculation.IntervalRecommendationListener: branches covered ratio is 0.37, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.rest.handler.IntervalCalculation.IntervalRecommendationListener: lines covered ratio is 0.55, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.ratelimit.ColdStartWorker: branches covered ratio is 0.45, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.ratelimit.ColdStartWorker: lines covered ratio is 0.73, but expected minimum is 0.75
     Rule violated for class org.opensearch.ad.transport.SuggestAnomalyDetectorParamTransportAction: branches covered ratio is 0.00, but expected minimum is 0.60
     Rule violated for class org.opensearch.ad.transport.SuggestAnomalyDetectorParamTransportAction: lines covered ratio is 0.11, but expected minimum is 0.75
     Rule violated for class org.opensearch.ad.transport.ADSuggestName: branches covered ratio is 0.00, but expected minimum is 0.60
     Rule violated for class org.opensearch.ad.transport.ADSuggestName: lines covered ratio is 0.57, but expected minimum is 0.75
     Rule violated for class org.opensearch.ad.transport.ADResultProcessor: branches covered ratio is 0.00, but expected minimum is 0.60
     Rule violated for class org.opensearch.ad.transport.ADResultProcessor: lines covered ratio is 0.61, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.model.InitProgressProfile: branches covered ratio is 0.50, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.model.IntervalTimeConfiguration: branches covered ratio is 0.50, but expected minimum is 0.60
     Rule violated for class org.opensearch.ad.ml.MLCommonsClient: lines covered ratio is 0.62, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.rest.RestValidateAction: branches covered ratio is 0.00, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.rest.RestValidateAction: lines covered ratio is 0.26, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.rest.RestJobAction: branches covered ratio is 0.00, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.rest.RestJobAction: lines covered ratio is 0.25, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.rest.AbstractSearchAction: lines covered ratio is 0.60, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.transport.SuggestConfigParamResponse: branches covered ratio is 0.59, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.transport.SuggestConfigParamRequest: branches covered ratio is 0.50, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.transport.SuggestConfigParamRequest: lines covered ratio is 0.68, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.transport.SuggestConfigParamResponse.Builder: lines covered ratio is 0.57, but expected minimum is 0.75
     Rule violated for class org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler: branches covered ratio is 0.54, but expected minimum is 0.60
     Rule violated for class org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler: lines covered ratio is 0.68, but expected minimum is 0.75
     Rule violated for class org.opensearch.ad.ratelimit.ADSaveResultStrategy: branches covered ratio is 0.43, but expected minimum is 0.60
     Rule violated for class org.opensearch.ad.ratelimit.ADSaveResultStrategy: lines covered ratio is 0.53, but expected minimum is 0.75

Copy link
Collaborator

@kaituo kaituo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review

Copy link
Collaborator

@kaituo kaituo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review

builder.startObject();

// Task metadata
builder.field("task_id", "task_" + ADCommonName.INSIGHTS_JOB_NAME + "_" + UUID.randomUUID().toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need task id? AD task id is the doc id of state index.


if (parts.length > 1) {
String seriesKey = parts[1];
seriesKeys.add(seriesKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the entities set redundant with seriesKeys set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't necessarily need it. Just followed the current practice to have this logical run identifier for the insights generation. Maybe it's useful in the future when integrate with Investigation so we can refer to a specific insights run this this id.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we need it in the future, we can add it later. Currently we can remove it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this field is currently used at frontend side
Screenshot 2025-12-01 at 11 31 49

// Use MAX score if multiple anomalies in same bucket
double currentScore = bucketScores.getOrDefault(bucketIndex, 0.0);
double newScore = anomaly.getAnomalyScore();
bucketScores.put(bucketIndex, Math.max(currentScore, newScore));
Copy link
Collaborator

@kaituo kaituo Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider interval? Our anomalies are interval anomalies. We can put anomalies scores to all of the buckets interleaving current interval [data start, data end]. If you have already done it, can you point me the code? I cannot find it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synced offline, no changes needed here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't know where your code is. Can you point me the location?

Signed-off-by: Jackie <jkhanjob@gmail.com>
Copy link
Collaborator

@kaituo kaituo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review

.sort("generated_at", SortOrder.DESC)
);

client.search(searchRequest, ActionListener.wrap(searchResponse -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to add backend role filtering before search? Please add security tests with backend role filtering on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do tenant-isolated search, but not necessarily backend role filtering here. For insights generation, it's a background job, so followed existing pattern to use InjectSecurity directly for background work, just impersonate the stored user via InjectSecurity, then execute search directly. For user-facing search APIs like search anomaly result transport action, AD reads the current user from thread context and then adds backend role filtering.

Adding security tests in the next revision

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have three kinds of auth now:

  1. fgac role
  2. backend-role filtering
  3. resource sharing.

We will need to cover all three of them.

try {
injectSecurity.inject(user, roles);

localClient
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should verify if mapping is changed by customer before writing. If yes, report error/stop job and stop writing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, updated in the new revision

Signed-off-by: Jackie <jkhanjob@gmail.com>
Signed-off-by: Jackie <jkhanjob@gmail.com>
Copy link
Collaborator

@kaituo kaituo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review


log.info("Running Insights job for time window: {} to {}", executionStartTime, executionEndTime);

querySystemResultIndex(jobParameter, lockService, lock, executionStartTime, executionEndTime);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only querying system result index would hinder your ability to go GA alone. You have to tie insights with Auto AD creation. One route to go to GA is to add a text box in AD overview page. That would add a summary on top of existing detectors' results.

Copy link
Collaborator

@kaituo kaituo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review

fetchDetectorMetadataAndProceed(allAnomalies, jobParameter, lockService, lock, executionStartTime, executionEndTime);
} else {
log.info("No anomalies found in time window, skipping ML correlation");
releaseLock(jobParameter, lockService, lock);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

releaseLock is scattered through InsightsJobProcessor right now is fragile. It’s easy to add a new early-return or error path and forget to release the lock. Also, it is hard for others to maintain.

How about

private void runInsightsJob(Job job, LockService lockService, LockModel lock,
                            Instant start, Instant end) {
    if (lock == null) {
        log.warn("Can't run Insights job due to null lock for {}", job.getName());
        return;
    }

    ActionListener<Void> lockReleasing = guardedLockReleasingListener(job, lockService, lock);

    // Top-level listener for “anomalies finished”
    ActionListener<List<AnomalyResult>> anomaliesListener = ActionListener.wrap(
        anomalies -> {
            if (anomalies.isEmpty()) {
                log.info("No anomalies, skipping ML correlation");
                lockReleasing.onResponse(null);
                return;
            }
            fetchDetectorMetadataAndProceed(anomalies, job, start, end, lockReleasing);
        },
        lockReleasing::onFailure
    );

    querySystemResultIndex(job, start, end, anomaliesListener);
}

private ActionListener<Void> guardedLockReleasingListener(Job job, LockService lockService, LockModel lock) {
    AtomicBoolean done = new AtomicBoolean(false);

    return ActionListener.wrap(
        r -> {
            if (done.compareAndSet(false, true)) {
                releaseLock(job, lockService, lock);
            } else {
                log.warn("Lock already released for Insights job {}", job.getName());
            }
        },
        e -> {
            if (done.compareAndSet(false, true)) {
                log.error("Insights job failed", e);
                releaseLock(job, lockService, lock);
            } else {
                log.warn("Lock already released for Insights job {} (got extra failure)", job.getName(), e);
            }
        }
    );
}


Now:

querySystemResultIndex(...) only gets ActionListener<List> listener. fetchPagedAnomalies(...) only gets ActionListener<List> listener.

They know nothing about locks, nothing about completion – they just follow the normal “always call your listener” rule.

This would reduce the “special” callsites to:

runInsightsJob (creates lockReleasing).

A few terminal branches in the top-level logic (no anomalies, ML disabled, write succeeded/failed, etc.).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored, now it has a single guarded lock releasing listener and lock-free sub processes

Signed-off-by: Jackie <jkhanjob@gmail.com>
Copy link
Collaborator

@kaituo kaituo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished one round of review.

* @param listener Action listener for the response
*/
public void startInsightsJob(String frequency, ActionListener<InsightsJobResponse> listener) {
logger.info("Starting insights job with frequency: {}", frequency);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be safe, you will need to verify if users have search result permission by using their credentials run search api and check if there is any security exception.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean we should check if users have search insights result index permission? _results api is checking it, _start is just about starting the job, no search insights result involved.

// entity level task to track just one specific entity's state, init progress, error etc.
HISTORICAL_HC_ENTITY;
HISTORICAL_HC_ENTITY,
INSIGHTS;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you use the new task type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added it while implementing but ended up not using it, removed

}

public void testInsightsApisUseSystemContextForJobIndex() throws IOException {
// Use a non-admin user with AD access (alice) to exercise Insights APIs end-to-end under security
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also use non-admin user (e.g., bobUser) to check if things would fail?

Copy link
Collaborator Author

@jackiehanyang jackiehanyang Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in the new revision

Response stopResp = TestHelpers.makeRequest(aliceClient, "POST", stopPath, ImmutableMap.of(), "", null);
assertEquals("Stop insights job failed", RestStatus.OK, TestHelpers.restStatus(stopResp));
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have alice create detectors, check if results are generated, start insights, check if any insights can be generated after a few minutes; query insights using normal user; then stop insights?

if (bucketIndex >= 0) {
// Use MAX score if multiple anomalies in same bucket
double currentScore = bucketScores.getOrDefault(bucketIndex, 0.0);
double newScore = anomaly.getAnomalyScore();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAnomalyScore() can be null.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when querying anomaly results, I'm only querying those with anomaly_grade > 0, so at here anomalyScore won't be null


if (parts.length > 1) {
String seriesKey = parts[1];
seriesKeys.add(seriesKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we need it in the future, we can add it later. Currently we can remove it.

// Use MAX score if multiple anomalies in same bucket
double currentScore = bucketScores.getOrDefault(bucketIndex, 0.0);
double newScore = anomaly.getAnomalyScore();
bucketScores.put(bucketIndex, Math.max(currentScore, newScore));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't know where your code is. Can you point me the location?

.sort("generated_at", SortOrder.DESC)
);

client.search(searchRequest, ActionListener.wrap(searchResponse -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have three kinds of auth now:

  1. fgac role
  2. backend-role filtering
  3. resource sharing.

We will need to cover all three of them.

@kaituo
Copy link
Collaborator

kaituo commented Nov 26, 2025

Any idea why "Build and Test anomaly detection" CI are skipped (saw it happens yesterday and today)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport 2.x infra Changes to infrastructure, testing, CI/CD, pipelines, etc.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants