Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] job saved objects initialization #82639

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
01b7621
[ML] job saved objects initialization
jgowdyelastic Nov 3, 2020
f934953
fixing job count logic
jgowdyelastic Nov 3, 2020
46d0081
adding missing files
jgowdyelastic Nov 3, 2020
a6b230c
attempting to fix build crash
jgowdyelastic Nov 3, 2020
d80b4a7
fixing kibana.json
jgowdyelastic Nov 3, 2020
eb8a4f6
changes based on review
jgowdyelastic Nov 4, 2020
27efbe3
removing accidentally added export
jgowdyelastic Nov 5, 2020
9388579
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine Nov 5, 2020
43cd5f6
adding intialization promise
jgowdyelastic Nov 5, 2020
84fa0f9
use finally so errors dont stop initialization
jgowdyelastic Nov 6, 2020
0b1bbf7
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine Nov 6, 2020
8c6751f
function rename
jgowdyelastic Nov 6, 2020
b9d45ff
removing duplicate header
jgowdyelastic Nov 6, 2020
a959ee7
adding job initialization count to log message
jgowdyelastic Nov 6, 2020
d2800ab
adding error to log message
jgowdyelastic Nov 6, 2020
c323370
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine Nov 9, 2020
63769d4
moving initialization file
jgowdyelastic Nov 9, 2020
fb6a991
moving intialization file back again to fix git stash issues
jgowdyelastic Nov 9, 2020
216a9d2
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine Nov 9, 2020
3701886
removing .kibana index search
jgowdyelastic Nov 10, 2020
f7f1e6a
Merge branch 'master' into job-saved-object-initialization-2
kibanamachine Nov 10, 2020
5c3d751
creating internal saved object client
jgowdyelastic Nov 10, 2020
b839622
code clean up
jgowdyelastic Nov 10, 2020
9324bdd
removing commented code
jgowdyelastic Nov 10, 2020
b81496e
adding check for spaces enabled
jgowdyelastic Nov 10, 2020
51bbb2b
adding ids to saved objects
jgowdyelastic Nov 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugins/ml/common/types/saved_objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
*/

export type JobType = 'anomaly-detector' | 'data-frame-analytics';
export const ML_SAVED_OBJECT_TYPE = 'ml-job';
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import { HttpService } from '../http_service';

import { basePath } from './index';
import { JobType } from '../../../../common/types/saved_objects';

export const savedObjectsApiProvider = (httpService: HttpService) => ({
jobsSpaces() {
Expand All @@ -17,4 +18,20 @@ export const savedObjectsApiProvider = (httpService: HttpService) => ({
method: 'GET',
});
},
assignJobToSpace(jobType: JobType, jobIds: string[], spaces: string[]) {
const body = JSON.stringify({ jobType, jobIds, spaces });
return httpService.http<any>({
path: `${basePath()}/saved_objects/assign_job_to_space`,
method: 'POST',
body,
});
},
removeJobFromSpace(jobType: JobType, jobIds: string[], spaces: string[]) {
const body = JSON.stringify({ jobType, jobIds, spaces });
return httpService.http<any>({
path: `${basePath()}/saved_objects/remove_job_from_space`,
method: 'POST',
body,
});
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { KibanaRequest } from 'kibana/server';
import type { MlClient } from '../../lib/ml_client';
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';
import {
MlCapabilities,
adminMlCapabilities,
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/ml/server/lib/capabilities/upgrade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import type { MlClient } from '../../lib/ml_client';
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';

export function upgradeCheckProvider(mlClient: MlClient) {
async function isUpgradeInProgress(): Promise<boolean> {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/ml/server/lib/check_annotations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { IScopedClusterClient } from 'kibana/server';
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';

import {
ML_ANNOTATIONS_INDEX_ALIAS_READ,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { cloneDeep, each, remove, sortBy, get } from 'lodash';

import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';

import { INTERVALS } from './intervals';
import { singleSeriesCheckerFactory } from './single_series_checker';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* Bucket spans: 5m, 10m, 30m, 1h, 3h
*/

import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';
import { INTERVALS, LONG_INTERVALS } from './intervals';

export function singleSeriesCheckerFactory({ asCurrentUser }) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
prefixDatafeedId,
splitIndexPatternNames,
} from '../../../common/util/job_utils';
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';
import { calculateModelMemoryLimitProvider } from '../calculate_model_memory_limit';
import { fieldsServiceProvider } from '../fields_service';
import { jobServiceProvider } from '../job_service';
Expand Down
11 changes: 8 additions & 3 deletions x-pack/plugins/ml/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { PLUGIN_ID } from '../common/constants/app';
import { MlCapabilities } from '../common/types/capabilities';

import { initMlTelemetry } from './lib/telemetry';
import { initMlServerLog } from './client/log';
import { initMlServerLog } from './lib/log';
import { initSampleDataSets } from './lib/sample_data_sets';

import { annotationRoutes } from './routes/annotations';
Expand All @@ -50,7 +50,7 @@ import { getPluginPrivileges } from '../common/types/capabilities';
import { setupCapabilitiesSwitcher } from './lib/capabilities';
import { registerKibanaSettings } from './lib/register_settings';
import { trainedModelsRoutes } from './routes/trained_models';
import { setupSavedObjects } from './saved_objects';
import { setupSavedObjects, jobSavedObjectsInitializationFactory } from './saved_objects';
import { RouteGuard } from './lib/route_guard';

export type MlPluginSetup = SharedServices;
Expand Down Expand Up @@ -181,10 +181,15 @@ export class MlServerPlugin implements Plugin<MlPluginSetup, MlPluginStart, Plug
};
}

public start(coreStart: CoreStart): MlPluginStart {
public async start(coreStart: CoreStart): Promise<MlPluginStart> {
this.capabilities = coreStart.capabilities;
this.clusterClient = coreStart.elasticsearch.client;
this.savedObjectsStart = coreStart.savedObjects;

// check whether the job saved objects exist
// and create them if needed.
const { initializeJobs } = jobSavedObjectsInitializationFactory(coreStart);
await initializeJobs();
}

public stop() {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/ml/server/routes/saved_objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { wrapError } from '../client/error_wrapper';
import { RouteInitialization } from '../types';
import { checksFactory } from '../saved_objects';
import { checksFactory, repairFactory } from '../saved_objects';
import { jobsAndSpaces, repairJobObjects } from './schemas/saved_objects';

/**
Expand Down Expand Up @@ -67,7 +67,7 @@ export function savedObjectsRoutes({ router, routeGuard }: RouteInitialization)
routeGuard.fullLicenseAPIGuard(async ({ client, request, response, jobSavedObjectService }) => {
try {
const { simulate } = request.query;
const { repairJobs } = checksFactory(client, jobSavedObjectService);
const { repairJobs } = repairFactory(client, jobSavedObjectService);
const savedObjects = await repairJobs(simulate);

return response.ok({
Expand Down Expand Up @@ -100,7 +100,7 @@ export function savedObjectsRoutes({ router, routeGuard }: RouteInitialization)
routeGuard.fullLicenseAPIGuard(async ({ client, request, response, jobSavedObjectService }) => {
try {
const { simulate } = request.query;
const { initSavedObjects } = checksFactory(client, jobSavedObjectService);
const { initSavedObjects } = repairFactory(client, jobSavedObjectService);
const savedObjects = await initSavedObjects(simulate);

return response.ok({
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/ml/server/routes/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { schema } from '@kbn/config-schema';
import { Request } from '@hapi/hapi';
import { IScopedClusterClient } from 'kibana/server';
import { wrapError } from '../client/error_wrapper';
import { mlLog } from '../client/log';
import { mlLog } from '../lib/log';
import { capabilitiesProvider } from '../lib/capabilities';
import { spacesUtilsProvider } from '../lib/spaces_utils';
import { RouteInitialization, SystemRouteDeps } from '../types';
Expand Down
205 changes: 4 additions & 201 deletions x-pack/plugins/ml/server/saved_objects/checks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
* you may not use this file except in compliance with the Elastic License.
*/

import Boom from '@hapi/boom';
import { IScopedClusterClient } from 'kibana/server';
import { SearchResponse } from 'elasticsearch';
import type { JobObject, JobSavedObjectService } from './service';
import { ML_SAVED_OBJECT_TYPE } from './saved_objects';
import { JobType } from '../../common/types/saved_objects';
import type { JobSavedObjectService } from './service';
import { JobType, ML_SAVED_OBJECT_TYPE } from '../../common/types/saved_objects';

import { Job } from '../../common/types/anomaly_detection_jobs';
import { Datafeed } from '../../common/types/anomaly_detection_jobs';
Expand Down Expand Up @@ -163,201 +161,6 @@ export function checksFactory(
};
}

async function repairJobs(simulate: boolean = false) {
type Result = Record<string, { success: boolean; error?: any }>;
const results: {
savedObjectsCreated: Result;
savedObjectsDeleted: Result;
datafeedsAdded: Result;
datafeedsRemoved: Result;
} = {
savedObjectsCreated: {},
savedObjectsDeleted: {},
datafeedsAdded: {},
datafeedsRemoved: {},
};

const { body: datafeeds } = await client.asInternalUser.ml.getDatafeeds<{
datafeeds: Datafeed[];
}>();

const tasks: Array<() => Promise<void>> = [];

const status = await checkStatus();
for (const job of status.jobs['anomaly-detector']) {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
results.savedObjectsCreated[job.jobId] = { success: true };
} else {
// create AD saved objects for jobs which are missing them
const jobId = job.jobId;
const datafeedId = job.datafeedId;
tasks.push(async () => {
try {
await jobSavedObjectService.createAnomalyDetectionJob(jobId, datafeedId ?? undefined);
results.savedObjectsCreated[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsCreated[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.jobs['data-frame-analytics']) {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
results.savedObjectsCreated[job.jobId] = { success: true };
} else {
// create DFA saved objects for jobs which are missing them
const jobId = job.jobId;
tasks.push(async () => {
try {
await jobSavedObjectService.createDataFrameAnalyticsJob(jobId);
results.savedObjectsCreated[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsCreated[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}

for (const job of status.savedObjects['anomaly-detector']) {
if (job.checks.jobExists === false) {
if (simulate === true) {
results.savedObjectsDeleted[job.jobId] = { success: true };
} else {
// Delete AD saved objects for jobs which no longer exist
const jobId = job.jobId;
tasks.push(async () => {
try {
await jobSavedObjectService.deleteAnomalyDetectionJob(jobId);
results.savedObjectsDeleted[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsDeleted[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.savedObjects['data-frame-analytics']) {
if (job.checks.jobExists === false) {
if (simulate === true) {
results.savedObjectsDeleted[job.jobId] = { success: true };
} else {
// Delete DFA saved objects for jobs which no longer exist
const jobId = job.jobId;
tasks.push(async () => {
try {
await jobSavedObjectService.deleteDataFrameAnalyticsJob(jobId);
results.savedObjectsDeleted[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsDeleted[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}

for (const job of status.savedObjects['anomaly-detector']) {
if (job.checks.datafeedExists === true && job.datafeedId === null) {
// add datafeed id for jobs where the datafeed exists but the id is missing from the saved object
if (simulate === true) {
results.datafeedsAdded[job.jobId] = { success: true };
} else {
const df = datafeeds.datafeeds.find((d) => d.job_id === job.jobId);
const jobId = job.jobId;
const datafeedId = df?.datafeed_id;

tasks.push(async () => {
try {
if (datafeedId !== undefined) {
await jobSavedObjectService.addDatafeed(datafeedId, jobId);
}
results.datafeedsAdded[job.jobId] = { success: true };
} catch (error) {
results.datafeedsAdded[job.jobId] = { success: false, error };
}
});
}
} else if (
job.checks.jobExists === true &&
job.checks.datafeedExists === false &&
job.datafeedId !== null &&
job.datafeedId !== undefined
) {
// remove datafeed id for jobs where the datafeed no longer exists but the id is populated in the saved object
if (simulate === true) {
results.datafeedsRemoved[job.jobId] = { success: true };
} else {
const datafeedId = job.datafeedId;
tasks.push(async () => {
try {
await jobSavedObjectService.deleteDatafeed(datafeedId);
results.datafeedsRemoved[job.jobId] = { success: true };
} catch (error) {
results.datafeedsRemoved[job.jobId] = { success: false, error: error.body ?? error };
}
});
}
}
}
await Promise.allSettled(tasks.map((t) => t()));
return results;
}

async function initSavedObjects(simulate: boolean = false, namespaces: string[] = ['*']) {
const results: { jobs: Array<{ id: string; type: string }>; success: boolean; error?: any } = {
jobs: [],
success: true,
};
const status = await checkStatus();

const jobs: JobObject[] = [];
const types: JobType[] = ['anomaly-detector', 'data-frame-analytics'];

types.forEach((type) => {
status.jobs[type].forEach((job) => {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
results.jobs.push({ id: job.jobId, type });
} else {
jobs.push({
job_id: job.jobId,
datafeed_id: job.datafeedId ?? null,
type,
});
}
}
});
});
try {
const createResults = await jobSavedObjectService.bulkCreateJobs(jobs, namespaces);
createResults.saved_objects.forEach(({ attributes }) => {
results.jobs.push({
id: attributes.job_id,
type: attributes.type,
});
});
} catch (error) {
results.success = false;
results.error = Boom.boomify(error).output;
}
return results;
}

async function _loadAllJobSavedObjects() {
const { body } = await client.asInternalUser.search<SearchResponse<SavedObjectJob>>({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I overlooked this in the first PR. We need to be using the saved objects client to retrieve all jobs, instead of the raw ES client to query the index.

The way this is currently written, it won't support users who have changed their kibana index, and it may return results from indices that match this pattern that aren't actually the "kibana index".

Using the Saved Objects Client will also ensure that the Kibana index has been fully initialized, and is ready for plugins to access.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a search here because I need a list of all saved objects across all spaces, even the ones the user doesn't have access to. As far as i'm aware the saved object client can't do that?
checkStatus needs to know if a job has a saved object, otherwise we'll end up with duplicates when repairing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@legrego i've updated these searches so it now uses the saved objects client

index: '.kibana*',
Expand All @@ -369,7 +172,7 @@ export function checksFactory(
filter: [
{
term: {
type: 'ml-job',
type: ML_SAVED_OBJECT_TYPE,
},
},
],
Expand All @@ -388,5 +191,5 @@ export function checksFactory(
});
}

return { checkStatus, repairJobs, initSavedObjects };
return { checkStatus };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we name it getCurrentStatus instead?

}
2 changes: 2 additions & 0 deletions x-pack/plugins/ml/server/saved_objects/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@
export { setupSavedObjects } from './saved_objects';
export { JobObject, JobSavedObjectService, jobSavedObjectServiceFactory } from './service';
export { checksFactory } from './checks';
export { repairFactory } from './repair';
export { jobSavedObjectsInitializationFactory } from './initialization';
Loading