Skip to content

Commit

Permalink
Setting random portion of cloud deployments to use mget
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Oct 15, 2024
1 parent fc3ce54 commit fbd5e8c
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 19 deletions.
3 changes: 0 additions & 3 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"auto_calculate_default_ech_capacity": false,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
Expand Down Expand Up @@ -77,7 +76,6 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"auto_calculate_default_ech_capacity": false,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
Expand Down Expand Up @@ -138,7 +136,6 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"auto_calculate_default_ech_capacity": false,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ export const configSchema = schema.object(
max: 100,
min: 1,
}),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_UPDATE_BY_QUERY }),
claim_strategy: schema.maybe(schema.string()),
request_timeouts: requestTimeoutsConfig,
auto_calculate_default_ech_capacity: schema.boolean({ defaultValue: false }),
},
Expand Down
197 changes: 197 additions & 0 deletions x-pack/plugins/task_manager/server/lib/set_claim_strategy.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import {
CLAIM_STRATEGY_MGET,
CLAIM_STRATEGY_UPDATE_BY_QUERY,
DEFAULT_POLL_INTERVAL,
MGET_DEFAULT_POLL_INTERVAL,
} from '../config';
import { mockLogger } from '../test_utils';
import { setClaimStrategy } from './set_claim_strategy';

const getConfigWithoutClaimStrategy = () => ({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {
enabled: false,
level: 'debug' as const,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 4000,
monitored_stats_running_average_window: 50,
request_capacity: 1000,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
request_timeouts: {
update_by_query: 1000,
},
poll_interval: DEFAULT_POLL_INTERVAL,
auto_calculate_default_ech_capacity: false,
});

const logger = mockLogger();

const deploymentIdUpdateByQuery = 'd2f0e7c6bc464a9b8b16e5730b9c40f9';
const deploymentIdMget = 'ab4e88d139f93d43024837d96144e7d4';
describe('setClaimStrategy', () => {
beforeEach(() => {
jest.clearAllMocks();
});
for (const isServerless of [true, false]) {
for (const isCloud of [true, false]) {
for (const deploymentId of [undefined, deploymentIdMget, deploymentIdUpdateByQuery]) {
for (const configuredStrategy of [CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY]) {
test(`should return config as is when claim strategy is already defined: isServerless=${isServerless}, isCloud=${isCloud}, deploymentId=${deploymentId}`, () => {
const config = {
...getConfigWithoutClaimStrategy(),
claim_strategy: configuredStrategy,
};

const returnedConfig = setClaimStrategy({
config,
logger,
isCloud,
isServerless,
deploymentId,
});

expect(returnedConfig).toStrictEqual(config);
if (deploymentId) {
expect(logger.info).toHaveBeenCalledWith(
`Using claim strategy ${configuredStrategy} as configured for deployment ${deploymentId}`
);
} else {
expect(logger.info).toHaveBeenCalledWith(
`Using claim strategy ${configuredStrategy} as configured`
);
}
});
}
}
}
}

for (const isCloud of [true, false]) {
for (const deploymentId of [undefined, deploymentIdMget, deploymentIdUpdateByQuery]) {
test(`should set claim strategy to mget if in serverless: isCloud=${isCloud}, deploymentId=${deploymentId}`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud,
isServerless: true,
deploymentId,
});

expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_MGET);
expect(returnedConfig.poll_interval).toBe(MGET_DEFAULT_POLL_INTERVAL);

if (deploymentId) {
expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to mget for serverless deployment ${deploymentId}`
);
} else {
expect(logger.info).toHaveBeenCalledWith(`Setting claim strategy to mget`);
}
});
}
}

test(`should set claim strategy to update_by_query if not cloud and not serverless`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: false,
isServerless: false,
});

expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY);
expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL);

expect(logger.info).not.toHaveBeenCalled();
});

test(`should set claim strategy to update_by_query if cloud and not serverless with undefined deploymentId`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isServerless: false,
});

expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY);
expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL);

expect(logger.info).not.toHaveBeenCalled();
});

test(`should set claim strategy to update_by_query if cloud and not serverless and deploymentId does not start with a or b`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isServerless: false,
deploymentId: deploymentIdUpdateByQuery,
});

expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY);
expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL);

expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to update_by_query for deployment ${deploymentIdUpdateByQuery}`
);
});

test(`should set claim strategy to mget if cloud and not serverless and deploymentId starts with a or b`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isServerless: false,
deploymentId: deploymentIdMget,
});

expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_MGET);
expect(returnedConfig.poll_interval).toBe(MGET_DEFAULT_POLL_INTERVAL);

expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to mget for deployment ${deploymentIdMget}`
);
});
});
76 changes: 76 additions & 0 deletions x-pack/plugins/task_manager/server/lib/set_claim_strategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Logger } from '@kbn/core/server';
import {
CLAIM_STRATEGY_MGET,
CLAIM_STRATEGY_UPDATE_BY_QUERY,
DEFAULT_POLL_INTERVAL,
MGET_DEFAULT_POLL_INTERVAL,
TaskManagerConfig,
} from '../config';

interface SetClaimStrategyOpts {
config: TaskManagerConfig;
deploymentId?: string;
isServerless: boolean;
isCloud: boolean;
logger: Logger;
}

export function setClaimStrategy(opts: SetClaimStrategyOpts): TaskManagerConfig {
// if the claim strategy is already defined, return immediately
if (opts.config.claim_strategy) {
opts.logger.info(
`Using claim strategy ${opts.config.claim_strategy} as configured${
opts.deploymentId ? ` for deployment ${opts.deploymentId}` : ''
}`
);
return opts.config;
}

if (opts.isServerless) {
// use mget for serverless
opts.logger.info(
`Setting claim strategy to mget${
opts.deploymentId ? ` for serverless deployment ${opts.deploymentId}` : ''
}`
);
return {
...opts.config,
claim_strategy: CLAIM_STRATEGY_MGET,
poll_interval: MGET_DEFAULT_POLL_INTERVAL,
};
}

let defaultToMget = false;

if (opts.isCloud && !opts.isServerless && opts.deploymentId) {
defaultToMget = opts.deploymentId.startsWith('a') || opts.deploymentId.startsWith('b');
if (defaultToMget) {
opts.logger.info(`Setting claim strategy to mget for deployment ${opts.deploymentId}`);
} else {
opts.logger.info(
`Setting claim strategy to update_by_query for deployment ${opts.deploymentId}`
);
}
}

if (defaultToMget) {
return {
...opts.config,
claim_strategy: CLAIM_STRATEGY_MGET,
poll_interval: MGET_DEFAULT_POLL_INTERVAL,
};
}

return {
...opts.config,
claim_strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
poll_interval: DEFAULT_POLL_INTERVAL,
};
}
21 changes: 15 additions & 6 deletions x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
ServiceStatusLevels,
CoreStatus,
} from '@kbn/core/server';
import type { CloudStart } from '@kbn/cloud-plugin/server';
import type { CloudSetup, CloudStart } from '@kbn/cloud-plugin/server';
import {
registerDeleteInactiveNodesTaskDefinition,
scheduleDeleteInactiveNodesTaskDefinition,
Expand All @@ -45,6 +45,7 @@ import { metricsStream, Metrics } from './metrics';
import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector';
import { TaskPartitioner } from './lib/task_partitioner';
import { getDefaultCapacity } from './lib/get_default_capacity';
import { setClaimStrategy } from './lib/set_claim_strategy';

export interface TaskManagerSetupContract {
/**
Expand Down Expand Up @@ -126,18 +127,26 @@ export class TaskManagerPlugin

public setup(
core: CoreSetup<TaskManagerStartContract, unknown>,
plugins: { usageCollection?: UsageCollectionSetup }
plugins: { cloud?: CloudSetup; usageCollection?: UsageCollectionSetup }
): TaskManagerSetupContract {
this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$);

this.config = setClaimStrategy({
config: this.config,
deploymentId: plugins.cloud?.deploymentId,
isServerless: this.initContext.env.packageInfo.buildFlavor === 'serverless',
isCloud: plugins.cloud?.isCloudEnabled ?? false,
logger: this.logger,
});

core.metrics
.getOpsMetrics$()
.pipe(distinctUntilChanged())
.subscribe((metrics) => {
this.heapSizeLimit = metrics.process.memory.heap.size_limit;
});

setupSavedObjects(core.savedObjects, this.config);
setupSavedObjects(core.savedObjects);
this.taskManagerId = this.initContext.env.instanceUuid;

if (!this.taskManagerId) {
Expand Down Expand Up @@ -301,9 +310,9 @@ export class TaskManagerPlugin
this.config!.claim_strategy
} isBackgroundTaskNodeOnly=${this.isNodeBackgroundTasksOnly()} heapSizeLimit=${
this.heapSizeLimit
} defaultCapacity=${defaultCapacity} autoCalculateDefaultEchCapacity=${
this.config.auto_calculate_default_ech_capacity
}`
} defaultCapacity=${defaultCapacity} pollingInterval=${
this.config!.poll_interval
} autoCalculateDefaultEchCapacity=${this.config.auto_calculate_default_ech_capacity}`
);

const managedConfiguration = createManagedConfiguration({
Expand Down
Loading

0 comments on commit fbd5e8c

Please sign in to comment.