diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 34dd5f1c6fbffe..aefbdaa9c8c560 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -14,7 +14,6 @@ describe('config validation', () => { Object { "allow_reading_invalid_state": true, "auto_calculate_default_ech_capacity": false, - "claim_strategy": "update_by_query", "discovery": Object { "active_nodes_lookback": "30s", "interval": 10000, @@ -77,7 +76,6 @@ describe('config validation', () => { Object { "allow_reading_invalid_state": true, "auto_calculate_default_ech_capacity": false, - "claim_strategy": "update_by_query", "discovery": Object { "active_nodes_lookback": "30s", "interval": 10000, @@ -138,7 +136,6 @@ describe('config validation', () => { Object { "allow_reading_invalid_state": true, "auto_calculate_default_ech_capacity": false, - "claim_strategy": "update_by_query", "discovery": Object { "active_nodes_lookback": "30s", "interval": 10000, diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index f640ed2165f220..3eff1b507107cf 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -202,7 +202,7 @@ export const configSchema = schema.object( max: 100, min: 1, }), - claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_UPDATE_BY_QUERY }), + claim_strategy: schema.maybe(schema.string()), request_timeouts: requestTimeoutsConfig, auto_calculate_default_ech_capacity: schema.boolean({ defaultValue: false }), }, diff --git a/x-pack/plugins/task_manager/server/lib/set_claim_strategy.test.ts b/x-pack/plugins/task_manager/server/lib/set_claim_strategy.test.ts new file mode 100644 index 00000000000000..bb3d679299d339 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/set_claim_strategy.test.ts @@ -0,0 +1,197 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + CLAIM_STRATEGY_MGET, + CLAIM_STRATEGY_UPDATE_BY_QUERY, + DEFAULT_POLL_INTERVAL, + MGET_DEFAULT_POLL_INTERVAL, +} from '../config'; +import { mockLogger } from '../test_utils'; +import { setClaimStrategy } from './set_claim_strategy'; + +const getConfigWithoutClaimStrategy = () => ({ + discovery: { + active_nodes_lookback: '30s', + interval: 10000, + }, + kibanas_per_partition: 2, + capacity: 10, + max_attempts: 9, + allow_reading_invalid_state: false, + version_conflict_threshold: 80, + monitored_aggregated_stats_refresh_rate: 60000, + monitored_stats_health_verbose_log: { + enabled: false, + level: 'debug' as const, + warn_delayed_task_start_in_seconds: 60, + }, + monitored_stats_required_freshness: 4000, + monitored_stats_running_average_window: 50, + request_capacity: 1000, + monitored_task_execution_thresholds: { + default: { + error_threshold: 90, + warn_threshold: 80, + }, + custom: {}, + }, + ephemeral_tasks: { + enabled: true, + request_capacity: 10, + }, + unsafe: { + exclude_task_types: [], + authenticate_background_task_utilization: true, + }, + event_loop_delay: { + monitor: true, + warn_threshold: 5000, + }, + worker_utilization_running_average_window: 5, + metrics_reset_interval: 3000, + request_timeouts: { + update_by_query: 1000, + }, + poll_interval: DEFAULT_POLL_INTERVAL, + auto_calculate_default_ech_capacity: false, +}); + +const logger = mockLogger(); + +const deploymentIdUpdateByQuery = 'd2f0e7c6bc464a9b8b16e5730b9c40f9'; +const deploymentIdMget = 'ab4e88d139f93d43024837d96144e7d4'; +describe('setClaimStrategy', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + for (const isServerless of [true, false]) { + for (const isCloud of [true, false]) { + for (const deploymentId of [undefined, deploymentIdMget, deploymentIdUpdateByQuery]) { + for (const configuredStrategy of [CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY]) { + test(`should return config as is when claim strategy is already defined: isServerless=${isServerless}, isCloud=${isCloud}, deploymentId=${deploymentId}`, () => { + const config = { + ...getConfigWithoutClaimStrategy(), + claim_strategy: configuredStrategy, + }; + + const returnedConfig = setClaimStrategy({ + config, + logger, + isCloud, + isServerless, + deploymentId, + }); + + expect(returnedConfig).toStrictEqual(config); + if (deploymentId) { + expect(logger.info).toHaveBeenCalledWith( + `Using claim strategy ${configuredStrategy} as configured for deployment ${deploymentId}` + ); + } else { + expect(logger.info).toHaveBeenCalledWith( + `Using claim strategy ${configuredStrategy} as configured` + ); + } + }); + } + } + } + } + + for (const isCloud of [true, false]) { + for (const deploymentId of [undefined, deploymentIdMget, deploymentIdUpdateByQuery]) { + test(`should set claim strategy to mget if in serverless: isCloud=${isCloud}, deploymentId=${deploymentId}`, () => { + const config = getConfigWithoutClaimStrategy(); + const returnedConfig = setClaimStrategy({ + config, + logger, + isCloud, + isServerless: true, + deploymentId, + }); + + expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_MGET); + expect(returnedConfig.poll_interval).toBe(MGET_DEFAULT_POLL_INTERVAL); + + if (deploymentId) { + expect(logger.info).toHaveBeenCalledWith( + `Setting claim strategy to mget for serverless deployment ${deploymentId}` + ); + } else { + expect(logger.info).toHaveBeenCalledWith(`Setting claim strategy to mget`); + } + }); + } + } + + test(`should set claim strategy to update_by_query if not cloud and not serverless`, () => { + const config = getConfigWithoutClaimStrategy(); + const returnedConfig = setClaimStrategy({ + config, + logger, + isCloud: false, + isServerless: false, + }); + + expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY); + expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL); + + expect(logger.info).not.toHaveBeenCalled(); + }); + + test(`should set claim strategy to update_by_query if cloud and not serverless with undefined deploymentId`, () => { + const config = getConfigWithoutClaimStrategy(); + const returnedConfig = setClaimStrategy({ + config, + logger, + isCloud: true, + isServerless: false, + }); + + expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY); + expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL); + + expect(logger.info).not.toHaveBeenCalled(); + }); + + test(`should set claim strategy to update_by_query if cloud and not serverless and deploymentId does not start with a or b`, () => { + const config = getConfigWithoutClaimStrategy(); + const returnedConfig = setClaimStrategy({ + config, + logger, + isCloud: true, + isServerless: false, + deploymentId: deploymentIdUpdateByQuery, + }); + + expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY); + expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL); + + expect(logger.info).toHaveBeenCalledWith( + `Setting claim strategy to update_by_query for deployment ${deploymentIdUpdateByQuery}` + ); + }); + + test(`should set claim strategy to mget if cloud and not serverless and deploymentId starts with a or b`, () => { + const config = getConfigWithoutClaimStrategy(); + const returnedConfig = setClaimStrategy({ + config, + logger, + isCloud: true, + isServerless: false, + deploymentId: deploymentIdMget, + }); + + expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_MGET); + expect(returnedConfig.poll_interval).toBe(MGET_DEFAULT_POLL_INTERVAL); + + expect(logger.info).toHaveBeenCalledWith( + `Setting claim strategy to mget for deployment ${deploymentIdMget}` + ); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/set_claim_strategy.ts b/x-pack/plugins/task_manager/server/lib/set_claim_strategy.ts new file mode 100644 index 00000000000000..52d71d25c7387d --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/set_claim_strategy.ts @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Logger } from '@kbn/core/server'; +import { + CLAIM_STRATEGY_MGET, + CLAIM_STRATEGY_UPDATE_BY_QUERY, + DEFAULT_POLL_INTERVAL, + MGET_DEFAULT_POLL_INTERVAL, + TaskManagerConfig, +} from '../config'; + +interface SetClaimStrategyOpts { + config: TaskManagerConfig; + deploymentId?: string; + isServerless: boolean; + isCloud: boolean; + logger: Logger; +} + +export function setClaimStrategy(opts: SetClaimStrategyOpts): TaskManagerConfig { + // if the claim strategy is already defined, return immediately + if (opts.config.claim_strategy) { + opts.logger.info( + `Using claim strategy ${opts.config.claim_strategy} as configured${ + opts.deploymentId ? ` for deployment ${opts.deploymentId}` : '' + }` + ); + return opts.config; + } + + if (opts.isServerless) { + // use mget for serverless + opts.logger.info( + `Setting claim strategy to mget${ + opts.deploymentId ? ` for serverless deployment ${opts.deploymentId}` : '' + }` + ); + return { + ...opts.config, + claim_strategy: CLAIM_STRATEGY_MGET, + poll_interval: MGET_DEFAULT_POLL_INTERVAL, + }; + } + + let defaultToMget = false; + + if (opts.isCloud && !opts.isServerless && opts.deploymentId) { + defaultToMget = opts.deploymentId.startsWith('a') || opts.deploymentId.startsWith('b'); + if (defaultToMget) { + opts.logger.info(`Setting claim strategy to mget for deployment ${opts.deploymentId}`); + } else { + opts.logger.info( + `Setting claim strategy to update_by_query for deployment ${opts.deploymentId}` + ); + } + } + + if (defaultToMget) { + return { + ...opts.config, + claim_strategy: CLAIM_STRATEGY_MGET, + poll_interval: MGET_DEFAULT_POLL_INTERVAL, + }; + } + + return { + ...opts.config, + claim_strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY, + poll_interval: DEFAULT_POLL_INTERVAL, + }; +} diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 45960195be2165..61731c4ae82f30 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -18,7 +18,7 @@ import { ServiceStatusLevels, CoreStatus, } from '@kbn/core/server'; -import type { CloudStart } from '@kbn/cloud-plugin/server'; +import type { CloudSetup, CloudStart } from '@kbn/cloud-plugin/server'; import { registerDeleteInactiveNodesTaskDefinition, scheduleDeleteInactiveNodesTaskDefinition, @@ -45,6 +45,7 @@ import { metricsStream, Metrics } from './metrics'; import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector'; import { TaskPartitioner } from './lib/task_partitioner'; import { getDefaultCapacity } from './lib/get_default_capacity'; +import { setClaimStrategy } from './lib/set_claim_strategy'; export interface TaskManagerSetupContract { /** @@ -126,10 +127,18 @@ export class TaskManagerPlugin public setup( core: CoreSetup, - plugins: { usageCollection?: UsageCollectionSetup } + plugins: { cloud?: CloudSetup; usageCollection?: UsageCollectionSetup } ): TaskManagerSetupContract { this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$); + this.config = setClaimStrategy({ + config: this.config, + deploymentId: plugins.cloud?.deploymentId, + isServerless: this.initContext.env.packageInfo.buildFlavor === 'serverless', + isCloud: plugins.cloud?.isCloudEnabled ?? false, + logger: this.logger, + }); + core.metrics .getOpsMetrics$() .pipe(distinctUntilChanged()) @@ -137,7 +146,7 @@ export class TaskManagerPlugin this.heapSizeLimit = metrics.process.memory.heap.size_limit; }); - setupSavedObjects(core.savedObjects, this.config); + setupSavedObjects(core.savedObjects); this.taskManagerId = this.initContext.env.instanceUuid; if (!this.taskManagerId) { @@ -301,9 +310,9 @@ export class TaskManagerPlugin this.config!.claim_strategy } isBackgroundTaskNodeOnly=${this.isNodeBackgroundTasksOnly()} heapSizeLimit=${ this.heapSizeLimit - } defaultCapacity=${defaultCapacity} autoCalculateDefaultEchCapacity=${ - this.config.auto_calculate_default_ech_capacity - }` + } defaultCapacity=${defaultCapacity} pollingInterval=${ + this.config!.poll_interval + } autoCalculateDefaultEchCapacity=${this.config.auto_calculate_default_ech_capacity}` ); const managedConfiguration = createManagedConfiguration({ diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 7d8be75c2330cf..3cb6802f43eb12 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -14,7 +14,7 @@ import type { Logger, ExecutionContextStart } from '@kbn/core/server'; import { Result, asErr, mapErr, asOk, map, mapOk } from './lib/result_type'; import { ManagedConfiguration } from './lib/create_managed_configuration'; -import { TaskManagerConfig, CLAIM_STRATEGY_UPDATE_BY_QUERY } from './config'; +import { CLAIM_STRATEGY_UPDATE_BY_QUERY, TaskManagerConfig } from './config'; import { TaskMarkRunning, @@ -141,7 +141,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter this.currentPollInterval, }); }; diff --git a/x-pack/plugins/task_manager/server/saved_objects/index.ts b/x-pack/plugins/task_manager/server/saved_objects/index.ts index dc1cd976777674..5c0f8b9a0776d7 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/index.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/index.ts @@ -9,7 +9,6 @@ import type { SavedObjectsServiceSetup } from '@kbn/core/server'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { backgroundTaskNodeMapping, taskMappings } from './mappings'; import { getMigrations } from './migrations'; -import { TaskManagerConfig } from '../config'; import { getOldestIdleActionTask } from '../queries/oldest_idle_action_task'; import { TASK_MANAGER_INDEX } from '../constants'; import { backgroundTaskNodeModelVersions, taskModelVersions } from './model_versions'; @@ -17,10 +16,7 @@ import { backgroundTaskNodeModelVersions, taskModelVersions } from './model_vers export const TASK_SO_NAME = 'task'; export const BACKGROUND_TASK_NODE_SO_NAME = 'background-task-node'; -export function setupSavedObjects( - savedObjects: SavedObjectsServiceSetup, - config: TaskManagerConfig -) { +export function setupSavedObjects(savedObjects: SavedObjectsServiceSetup) { savedObjects.registerType({ name: TASK_SO_NAME, namespaceType: 'agnostic',