-
Notifications
You must be signed in to change notification settings - Fork 8.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] job saved objects initialization #82639
Merged
jgowdyelastic
merged 26 commits into
elastic:master
from
jgowdyelastic:job-saved-object-initialization-2
Nov 10, 2020
Merged
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
01b7621
[ML] job saved objects initialization
jgowdyelastic f934953
fixing job count logic
jgowdyelastic 46d0081
adding missing files
jgowdyelastic a6b230c
attempting to fix build crash
jgowdyelastic d80b4a7
fixing kibana.json
jgowdyelastic eb8a4f6
changes based on review
jgowdyelastic 27efbe3
removing accidentally added export
jgowdyelastic 9388579
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine 43cd5f6
adding intialization promise
jgowdyelastic 84fa0f9
use finally so errors dont stop initialization
jgowdyelastic 0b1bbf7
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine 8c6751f
function rename
jgowdyelastic b9d45ff
removing duplicate header
jgowdyelastic a959ee7
adding job initialization count to log message
jgowdyelastic d2800ab
adding error to log message
jgowdyelastic c323370
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine 63769d4
moving initialization file
jgowdyelastic fb6a991
moving intialization file back again to fix git stash issues
jgowdyelastic 216a9d2
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine 3701886
removing .kibana index search
jgowdyelastic f7f1e6a
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine 5c3d751
creating internal saved object client
jgowdyelastic b839622
code clean up
jgowdyelastic 9324bdd
removing commented code
jgowdyelastic b81496e
adding check for spaces enabled
jgowdyelastic 51bbb2b
adding ids to saved objects
jgowdyelastic File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,12 +4,10 @@ | |
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import Boom from '@hapi/boom'; | ||
import { IScopedClusterClient } from 'kibana/server'; | ||
import { SearchResponse } from 'elasticsearch'; | ||
import type { JobObject, JobSavedObjectService } from './service'; | ||
import { ML_SAVED_OBJECT_TYPE } from './saved_objects'; | ||
import { JobType } from '../../common/types/saved_objects'; | ||
import type { JobSavedObjectService } from './service'; | ||
import { JobType, ML_SAVED_OBJECT_TYPE } from '../../common/types/saved_objects'; | ||
|
||
import { Job } from '../../common/types/anomaly_detection_jobs'; | ||
import { Datafeed } from '../../common/types/anomaly_detection_jobs'; | ||
|
@@ -163,201 +161,6 @@ export function checksFactory( | |
}; | ||
} | ||
|
||
async function repairJobs(simulate: boolean = false) { | ||
type Result = Record<string, { success: boolean; error?: any }>; | ||
const results: { | ||
savedObjectsCreated: Result; | ||
savedObjectsDeleted: Result; | ||
datafeedsAdded: Result; | ||
datafeedsRemoved: Result; | ||
} = { | ||
savedObjectsCreated: {}, | ||
savedObjectsDeleted: {}, | ||
datafeedsAdded: {}, | ||
datafeedsRemoved: {}, | ||
}; | ||
|
||
const { body: datafeeds } = await client.asInternalUser.ml.getDatafeeds<{ | ||
datafeeds: Datafeed[]; | ||
}>(); | ||
|
||
const tasks: Array<() => Promise<void>> = []; | ||
|
||
const status = await checkStatus(); | ||
for (const job of status.jobs['anomaly-detector']) { | ||
if (job.checks.savedObjectExits === false) { | ||
if (simulate === true) { | ||
results.savedObjectsCreated[job.jobId] = { success: true }; | ||
} else { | ||
// create AD saved objects for jobs which are missing them | ||
const jobId = job.jobId; | ||
const datafeedId = job.datafeedId; | ||
tasks.push(async () => { | ||
try { | ||
await jobSavedObjectService.createAnomalyDetectionJob(jobId, datafeedId ?? undefined); | ||
results.savedObjectsCreated[job.jobId] = { success: true }; | ||
} catch (error) { | ||
results.savedObjectsCreated[job.jobId] = { | ||
success: false, | ||
error: error.body ?? error, | ||
}; | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
for (const job of status.jobs['data-frame-analytics']) { | ||
if (job.checks.savedObjectExits === false) { | ||
if (simulate === true) { | ||
results.savedObjectsCreated[job.jobId] = { success: true }; | ||
} else { | ||
// create DFA saved objects for jobs which are missing them | ||
const jobId = job.jobId; | ||
tasks.push(async () => { | ||
try { | ||
await jobSavedObjectService.createDataFrameAnalyticsJob(jobId); | ||
results.savedObjectsCreated[job.jobId] = { success: true }; | ||
} catch (error) { | ||
results.savedObjectsCreated[job.jobId] = { | ||
success: false, | ||
error: error.body ?? error, | ||
}; | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
|
||
for (const job of status.savedObjects['anomaly-detector']) { | ||
if (job.checks.jobExists === false) { | ||
if (simulate === true) { | ||
results.savedObjectsDeleted[job.jobId] = { success: true }; | ||
} else { | ||
// Delete AD saved objects for jobs which no longer exist | ||
const jobId = job.jobId; | ||
tasks.push(async () => { | ||
try { | ||
await jobSavedObjectService.deleteAnomalyDetectionJob(jobId); | ||
results.savedObjectsDeleted[job.jobId] = { success: true }; | ||
} catch (error) { | ||
results.savedObjectsDeleted[job.jobId] = { | ||
success: false, | ||
error: error.body ?? error, | ||
}; | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
for (const job of status.savedObjects['data-frame-analytics']) { | ||
if (job.checks.jobExists === false) { | ||
if (simulate === true) { | ||
results.savedObjectsDeleted[job.jobId] = { success: true }; | ||
} else { | ||
// Delete DFA saved objects for jobs which no longer exist | ||
const jobId = job.jobId; | ||
tasks.push(async () => { | ||
try { | ||
await jobSavedObjectService.deleteDataFrameAnalyticsJob(jobId); | ||
results.savedObjectsDeleted[job.jobId] = { success: true }; | ||
} catch (error) { | ||
results.savedObjectsDeleted[job.jobId] = { | ||
success: false, | ||
error: error.body ?? error, | ||
}; | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
|
||
for (const job of status.savedObjects['anomaly-detector']) { | ||
if (job.checks.datafeedExists === true && job.datafeedId === null) { | ||
// add datafeed id for jobs where the datafeed exists but the id is missing from the saved object | ||
if (simulate === true) { | ||
results.datafeedsAdded[job.jobId] = { success: true }; | ||
} else { | ||
const df = datafeeds.datafeeds.find((d) => d.job_id === job.jobId); | ||
const jobId = job.jobId; | ||
const datafeedId = df?.datafeed_id; | ||
|
||
tasks.push(async () => { | ||
try { | ||
if (datafeedId !== undefined) { | ||
await jobSavedObjectService.addDatafeed(datafeedId, jobId); | ||
} | ||
results.datafeedsAdded[job.jobId] = { success: true }; | ||
} catch (error) { | ||
results.datafeedsAdded[job.jobId] = { success: false, error }; | ||
} | ||
}); | ||
} | ||
} else if ( | ||
job.checks.jobExists === true && | ||
job.checks.datafeedExists === false && | ||
job.datafeedId !== null && | ||
job.datafeedId !== undefined | ||
) { | ||
// remove datafeed id for jobs where the datafeed no longer exists but the id is populated in the saved object | ||
if (simulate === true) { | ||
results.datafeedsRemoved[job.jobId] = { success: true }; | ||
} else { | ||
const datafeedId = job.datafeedId; | ||
tasks.push(async () => { | ||
try { | ||
await jobSavedObjectService.deleteDatafeed(datafeedId); | ||
results.datafeedsRemoved[job.jobId] = { success: true }; | ||
} catch (error) { | ||
results.datafeedsRemoved[job.jobId] = { success: false, error: error.body ?? error }; | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
await Promise.allSettled(tasks.map((t) => t())); | ||
return results; | ||
} | ||
|
||
async function initSavedObjects(simulate: boolean = false, namespaces: string[] = ['*']) { | ||
const results: { jobs: Array<{ id: string; type: string }>; success: boolean; error?: any } = { | ||
jobs: [], | ||
success: true, | ||
}; | ||
const status = await checkStatus(); | ||
|
||
const jobs: JobObject[] = []; | ||
const types: JobType[] = ['anomaly-detector', 'data-frame-analytics']; | ||
|
||
types.forEach((type) => { | ||
status.jobs[type].forEach((job) => { | ||
if (job.checks.savedObjectExits === false) { | ||
if (simulate === true) { | ||
results.jobs.push({ id: job.jobId, type }); | ||
} else { | ||
jobs.push({ | ||
job_id: job.jobId, | ||
datafeed_id: job.datafeedId ?? null, | ||
type, | ||
}); | ||
} | ||
} | ||
}); | ||
}); | ||
try { | ||
const createResults = await jobSavedObjectService.bulkCreateJobs(jobs, namespaces); | ||
createResults.saved_objects.forEach(({ attributes }) => { | ||
results.jobs.push({ | ||
id: attributes.job_id, | ||
type: attributes.type, | ||
}); | ||
}); | ||
} catch (error) { | ||
results.success = false; | ||
results.error = Boom.boomify(error).output; | ||
} | ||
return results; | ||
} | ||
|
||
async function _loadAllJobSavedObjects() { | ||
const { body } = await client.asInternalUser.search<SearchResponse<SavedObjectJob>>({ | ||
index: '.kibana*', | ||
|
@@ -369,7 +172,7 @@ export function checksFactory( | |
filter: [ | ||
{ | ||
term: { | ||
type: 'ml-job', | ||
type: ML_SAVED_OBJECT_TYPE, | ||
}, | ||
}, | ||
], | ||
|
@@ -388,5 +191,5 @@ export function checksFactory( | |
}); | ||
} | ||
|
||
return { checkStatus, repairJobs, initSavedObjects }; | ||
return { checkStatus }; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we name it |
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I overlooked this in the first PR. We need to be using the saved objects client to retrieve all jobs, instead of the raw ES client to query the index.
The way this is currently written, it won't support users who have changed their kibana index, and it may return results from indices that match this pattern that aren't actually the "kibana index".
Using the Saved Objects Client will also ensure that the Kibana index has been fully initialized, and is ready for plugins to access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used a search here because I need a list of all saved objects across all spaces, even the ones the user doesn't have access to. As far as i'm aware the saved object client can't do that?
checkStatus
needs to know if a job has a saved object, otherwise we'll end up with duplicates when repairing.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@legrego i've updated these searches so it now uses the saved objects client