Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions x-pack/plugins/alerting/server/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
import { EncryptedSavedObjectsPluginStart } from '../../../plugins/encrypted_saved_objects/server';
import { TaskManagerStartContract } from '../../../plugins/task_manager/server';
import { taskInstanceToAlertTaskInstance } from './task_runner/alert_task_instance';
import { deleteTaskIfItExists } from './lib/delete_task_if_it_exists';

type NormalizedAlertAction = Omit<AlertAction, 'actionTypeId'>;
export type CreateAPIKeyResult =
Expand Down Expand Up @@ -268,7 +269,7 @@ export class AlertsClient {
const removeResult = await this.savedObjectsClient.delete('alert', id);

await Promise.all([
taskIdToRemove ? this.taskManager.remove(taskIdToRemove) : null,
taskIdToRemove ? deleteTaskIfItExists(this.taskManager, taskIdToRemove) : null,
apiKeyToInvalidate ? this.invalidateApiKey({ apiKey: apiKeyToInvalidate }) : null,
]);

Expand Down Expand Up @@ -510,7 +511,9 @@ export class AlertsClient {
);

await Promise.all([
attributes.scheduledTaskId ? this.taskManager.remove(attributes.scheduledTaskId) : null,
attributes.scheduledTaskId
? deleteTaskIfItExists(this.taskManager, attributes.scheduledTaskId)
: null,
apiKeyToInvalidate ? this.invalidateApiKey({ apiKey: apiKeyToInvalidate }) : null,
]);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import uuid from 'uuid';
import { taskManagerMock } from '../../../task_manager/server/mocks';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import { deleteTaskIfItExists } from './delete_task_if_it_exists';

describe('deleteTaskIfItExists', () => {
test('removes the task by its ID', async () => {
const tm = taskManagerMock.createStart();
const id = uuid.v4();

expect(await deleteTaskIfItExists(tm, id)).toBe(undefined);

expect(tm.remove).toHaveBeenCalledWith(id);
});

test('handles 404 errors caused by the task not existing', async () => {
const tm = taskManagerMock.createStart();
const id = uuid.v4();

tm.remove.mockRejectedValue(SavedObjectsErrorHelpers.createGenericNotFoundError('task', id));

expect(await deleteTaskIfItExists(tm, id)).toBe(undefined);

expect(tm.remove).toHaveBeenCalledWith(id);
});

test('throws if any other errro is caused by task removal', async () => {
const tm = taskManagerMock.createStart();
const id = uuid.v4();

const error = SavedObjectsErrorHelpers.createInvalidVersionError(uuid.v4());
tm.remove.mockRejectedValue(error);

expect(deleteTaskIfItExists(tm, id)).rejects.toBe(error);

expect(tm.remove).toHaveBeenCalledWith(id);
});
});
17 changes: 17 additions & 0 deletions x-pack/plugins/alerting/server/lib/delete_task_if_it_exists.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskManagerStartContract } from '../../../task_manager/server';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';

export async function deleteTaskIfItExists(taskManager: TaskManagerStartContract, taskId: string) {
try {
await taskManager.remove(taskId);
} catch (err) {
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
throw err;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { isAlertSavedObjectNotFoundError } from './is_alert_not_found_error';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import uuid from 'uuid';

describe('isAlertSavedObjectNotFoundError', () => {
test('identifies SavedObjects Not Found errors', () => {
const id = uuid.v4();
// ensure the error created by SO parses as a string with the format we expect
expect(
`${SavedObjectsErrorHelpers.createGenericNotFoundError('alert', id)}`.includes(`alert/${id}`)
).toBe(true);

const errorBySavedObjectsHelper = SavedObjectsErrorHelpers.createGenericNotFoundError(
'alert',
id
);

expect(isAlertSavedObjectNotFoundError(errorBySavedObjectsHelper, id)).toBe(true);
});

test('identifies generic errors', () => {
const id = uuid.v4();
expect(isAlertSavedObjectNotFoundError(new Error(`not found`), id)).toBe(false);
});
});
11 changes: 11 additions & 0 deletions x-pack/plugins/alerting/server/lib/is_alert_not_found_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';

export function isAlertSavedObjectNotFoundError(err: Error, alertId: string) {
return SavedObjectsErrorHelpers.isNotFoundError(err) && `${err}`.includes(alertId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels yucky enough that I feel like we should ask for a better helper - like being able to pass alertId into that isNotFoundError() function, rather than text searching the error message ourselves. The alertId values are UUIDs today, which seems safe enough to test for, but who knows what tomorrow holds - action id's can now be user-specified via pre-configured actions ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fair, but as there seems to be pressure to get this backported to 7.7, I think we'll have to defer that.

}
33 changes: 33 additions & 0 deletions x-pack/plugins/alerting/server/task_runner/task_runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { PluginStartContract as ActionsPluginStart } from '../../../actions/serv
import { actionsMock } from '../../../actions/server/mocks';
import { eventLoggerMock } from '../../../event_log/server/event_logger.mock';
import { IEventLogger } from '../../../event_log/server';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';

const alertType = {
id: 'test',
Expand Down Expand Up @@ -665,4 +666,36 @@ describe('Task Runner', () => {
}
`);
});

test('avoids rescheduling a failed Alert Task Runner when it throws due to failing to fetch the alert', async () => {
savedObjectsClient.get.mockImplementation(() => {
throw SavedObjectsErrorHelpers.createGenericNotFoundError('task', '1');
});

const taskRunner = new TaskRunner(
alertType,
mockedTaskInstance,
taskRunnerFactoryInitializerParams
);

encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '1',
type: 'alert',
attributes: {
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [],
});

const runnerResult = await taskRunner.run();

expect(runnerResult).toMatchInlineSnapshot(`
Object {
"runAt": undefined,
"state": Object {
"previousStartedAt": 1970-01-01T00:00:00.000Z,
},
}
`);
});
});
30 changes: 19 additions & 11 deletions x-pack/plugins/alerting/server/task_runner/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import { taskInstanceToAlertTaskInstance } from './alert_task_instance';
import { AlertInstances } from '../alert_instance/alert_instance';
import { EVENT_LOG_ACTIONS } from '../plugin';
import { IEvent, IEventLogger } from '../../../event_log/server';
import { isAlertSavedObjectNotFoundError } from '../lib/is_alert_not_found_error';

const FALLBACK_RETRY_INTERVAL: IntervalSchedule = { interval: '5m' };

interface AlertTaskRunResult {
state: AlertTaskState;
runAt: Date;
runAt: Date | undefined;
}

interface AlertTaskInstance extends ConcreteTaskInstance {
Expand Down Expand Up @@ -328,22 +329,29 @@ export class TaskRunner {
};
},
(err: Error) => {
this.logger.error(`Executing Alert "${alertId}" has resulted in Error: ${err.message}`);
const message = `Executing Alert "${alertId}" has resulted in Error: ${err.message}`;
if (isAlertSavedObjectNotFoundError(err, alertId)) {
this.logger.debug(message);
} else {
this.logger.error(message);
}
return {
...originalState,
previousStartedAt,
};
}
),
runAt: resolveErr<Date, Error>(runAt, () =>
getNextRunAt(
new Date(),
// if we fail at this point we wish to recover but don't have access to the Alert's
// attributes, so we'll use a default interval to prevent the underlying task from
// falling into a failed state
FALLBACK_RETRY_INTERVAL
)
),
runAt: resolveErr<Date | undefined, Error>(runAt, err => {
return isAlertSavedObjectNotFoundError(err, alertId)
? undefined
: getNextRunAt(
new Date(),
// if we fail at this point we wish to recover but don't have access to the Alert's
// attributes, so we'll use a default interval to prevent the underlying task from
// falling into a failed state
FALLBACK_RETRY_INTERVAL
);
}),
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/

import moment from 'moment';
import {
getMockCallWithInternal,
getMockConfig,
Expand All @@ -13,6 +12,7 @@ import {
} from '../../../test_utils';
import { visualizationsTaskRunner } from './task_runner';
import { TaskInstance } from '../../../../../task_manager/server';
import { getNextMidnight } from '../../get_next_midnight';

describe('visualizationsTaskRunner', () => {
let mockTaskInstance: TaskInstance;
Expand Down Expand Up @@ -41,12 +41,6 @@ describe('visualizationsTaskRunner', () => {
});

test('Summarizes visualization response data', async () => {
const getNextMidnight = () =>
moment()
.add(1, 'days')
.startOf('day')
.toDate();

const runner = visualizationsTaskRunner(mockTaskInstance, getMockConfig(), getMockEs());
const result = await runner();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { isTaskSavedObjectNotFoundError } from './is_task_not_found_error';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import uuid from 'uuid';

describe('isTaskSavedObjectNotFoundError', () => {
test('identifies SavedObjects Not Found errors', () => {
const id = uuid.v4();
// ensure the error created by SO parses as a string with the format we expect
expect(
`${SavedObjectsErrorHelpers.createGenericNotFoundError('task', id)}`.includes(`task/${id}`)
).toBe(true);

const errorBySavedObjectsHelper = SavedObjectsErrorHelpers.createGenericNotFoundError(
'task',
id
);

expect(isTaskSavedObjectNotFoundError(errorBySavedObjectsHelper, id)).toBe(true);
});

test('identifies generic errors', () => {
const id = uuid.v4();
expect(isTaskSavedObjectNotFoundError(new Error(`not found`), id)).toBe(false);
});
});
11 changes: 11 additions & 0 deletions x-pack/plugins/task_manager/server/lib/is_task_not_found_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';

export function isTaskSavedObjectNotFoundError(err: Error, taskId: string) {
return SavedObjectsErrorHelpers.isNotFoundError(err) && `${err}`.includes(taskId);
}
25 changes: 25 additions & 0 deletions x-pack/plugins/task_manager/server/task_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import sinon from 'sinon';
import { TaskPool, TaskPoolRunResult } from './task_pool';
import { mockLogger, resolvable, sleep } from './test_utils';
import { asOk } from './lib/result_type';
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';

describe('TaskPool', () => {
test('occupiedWorkers are a sum of running tasks', async () => {
Expand Down Expand Up @@ -101,6 +102,30 @@ describe('TaskPool', () => {
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
});

test('should not log when running a Task fails due to the Task SO having been deleted while in flight', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 3,
logger,
});

const taskFailedToRun = mockTask();
taskFailedToRun.run.mockImplementation(async () => {
throw SavedObjectsErrorHelpers.createGenericNotFoundError('task', taskFailedToRun.id);
});

const result = await pool.run([mockTask(), taskFailedToRun, mockTask()]);

expect(logger.debug.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"Task TaskType \\"shooooo\\" failed in attempt to run: Saved object [task/foo] not found",
]
`);
expect(logger.warn).not.toHaveBeenCalled();

expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
});

test('Running a task which fails still takes up capacity', async () => {
const logger = mockLogger();
const pool = new TaskPool({
Expand Down
13 changes: 12 additions & 1 deletion x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import { performance } from 'perf_hooks';
import { Logger } from './types';
import { TaskRunner } from './task_runner';
import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error';

interface Opts {
maxWorkers: number;
Expand Down Expand Up @@ -125,7 +126,17 @@ export class TaskPool {
taskRunner
.run()
.catch(err => {
this.logger.warn(`Task ${taskRunner.toString()} failed in attempt to run: ${err.message}`);
// If a task Saved Object can't be found by an in flight task runner
// we asssume the underlying task has been deleted while it was running
// so we will log this as a debug, rather than a warn
const errorLogLine = `Task ${taskRunner.toString()} failed in attempt to run: ${
err.message
}`;
if (isTaskSavedObjectNotFoundError(err, taskRunner.id)) {
this.logger.debug(errorLogLine);
} else {
this.logger.warn(errorLogLine);
}
})
.then(() => this.running.delete(taskRunner));
}
Expand Down