From f0153b2b89b162a6713f3c7804a3d217a4862e23 Mon Sep 17 00:00:00 2001 From: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Date: Mon, 19 Jul 2021 17:25:38 -0400 Subject: [PATCH] EP Meta Telemetry Perf (#104396) (#106147) * Add comments for other developers. * Move OS infomation into meta key. * Refmt endpoint metrics. * Add helper funcs to batch sending. * Add test to ensure opt in status. * Add helpers test. * Finish reshaping the document based on feedback. * Add better type safety. Add policy package version to output. * Fix sender implementation for aggregating EP datastreams. * Fix type issues. * Fix cadence inference + miss default agent id. * Dynamically control search ranges for metrics + policy responses. * Set back to 24h. * Add comment for ignoring the default policy id. * explicitly type the sub agg search query. * Improve type safety. * Add additional type safety + try/catch the last block. * Remove unneeded optional chaining. * Destructure host metrics. Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Pete Hampton --- .../server/lib/telemetry/diagnostic_task.ts | 4 +- .../lib/telemetry/endpoint_task.test.ts | 34 ++- .../server/lib/telemetry/endpoint_task.ts | 248 +++++++++++++----- .../server/lib/telemetry/helpers.task.ts | 37 --- .../server/lib/telemetry/helpers.test.ts | 127 +++++++++ .../server/lib/telemetry/helpers.ts | 64 ++++- .../server/lib/telemetry/mocks.ts | 17 ++ .../server/lib/telemetry/sender.ts | 36 ++- .../server/lib/telemetry/types.ts | 26 +- 9 files changed, 458 insertions(+), 135 deletions(-) delete mode 100644 x-pack/plugins/security_solution/server/lib/telemetry/helpers.task.ts create mode 100644 x-pack/plugins/security_solution/server/lib/telemetry/helpers.test.ts diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/diagnostic_task.ts b/x-pack/plugins/security_solution/server/lib/telemetry/diagnostic_task.ts index 05d7396031a5f9..c83f37593a0363 100644 --- a/x-pack/plugins/security_solution/server/lib/telemetry/diagnostic_task.ts +++ b/x-pack/plugins/security_solution/server/lib/telemetry/diagnostic_task.ts @@ -12,7 +12,7 @@ import { TaskManagerSetupContract, TaskManagerStartContract, } from '../../../../task_manager/server'; -import { getLastTaskExecutionTimestamp } from './helpers'; +import { getPreviousDiagTaskTimestamp } from './helpers'; import { TelemetryEventsSender, TelemetryEvent } from './sender'; export const TelemetryDiagTaskConstants = { @@ -44,7 +44,7 @@ export class TelemetryDiagTask { return { run: async () => { const executeTo = moment().utc().toISOString(); - const executeFrom = getLastTaskExecutionTimestamp( + const executeFrom = getPreviousDiagTaskTimestamp( executeTo, taskInstance.state?.lastExecutionTimestamp ); diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.test.ts b/x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.test.ts index a056ef783f6cf6..48c996d1e9eff4 100644 --- a/x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.test.ts +++ b/x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.test.ts @@ -6,9 +6,11 @@ */ import { loggingSystemMock } from 'src/core/server/mocks'; +import { TaskStatus } from '../../../../task_manager/server'; import { taskManagerMock } from '../../../../task_manager/server/mocks'; -import { TelemetryEndpointTask } from './endpoint_task'; -import { createMockTelemetryEventsSender } from './mocks'; + +import { TelemetryEndpointTask, TelemetryEndpointTaskConstants } from './endpoint_task'; +import { createMockTelemetryEventsSender, MockTelemetryEndpointTask } from './mocks'; describe('test', () => { let logger: ReturnType; @@ -18,7 +20,7 @@ describe('test', () => { }); describe('endpoint alert telemetry checks', () => { - test('the task can register', () => { + test('the endpoint task can register', () => { const telemetryEndpointTask = new TelemetryEndpointTask( logger, taskManagerMock.createSetup(), @@ -48,4 +50,30 @@ describe('test', () => { await telemetryEndpointTask.start(mockTaskManagerStart); expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled(); }); + + test('endpoint task should not query elastic if telemetry is not opted in', async () => { + const mockSender = createMockTelemetryEventsSender(false); + const mockTaskManager = taskManagerMock.createSetup(); + new MockTelemetryEndpointTask(logger, mockTaskManager, mockSender); + + const mockTaskInstance = { + id: TelemetryEndpointTaskConstants.TYPE, + runAt: new Date(), + attempts: 0, + ownerId: '', + status: TaskStatus.Running, + startedAt: new Date(), + scheduledAt: new Date(), + retryAt: new Date(), + params: {}, + state: {}, + taskType: TelemetryEndpointTaskConstants.TYPE, + }; + const createTaskRunner = + mockTaskManager.registerTaskDefinitions.mock.calls[0][0][TelemetryEndpointTaskConstants.TYPE] + .createTaskRunner; + const taskRunner = createTaskRunner({ taskInstance: mockTaskInstance }); + await taskRunner.run(); + expect(mockSender.fetchDiagnosticAlerts).not.toHaveBeenCalled(); + }); }); diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.ts b/x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.ts index cac92983b38783..71a105d1e58f5c 100644 --- a/x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.ts +++ b/x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.ts @@ -12,23 +12,43 @@ import { TaskManagerSetupContract, TaskManagerStartContract, } from '../../../../task_manager/server'; -import { getLastTaskExecutionTimestamp } from './helpers'; +import { + batchTelemetryRecords, + getPreviousEpMetaTaskTimestamp, + isPackagePolicyList, +} from './helpers'; import { TelemetryEventsSender } from './sender'; -import { FullAgentPolicyInput } from '../../../../fleet/common/types/models/agent_policy'; +import { PolicyData } from '../../../common/endpoint/types'; +import { FLEET_ENDPOINT_PACKAGE } from '../../../../fleet/common'; import { EndpointMetricsAggregation, EndpointPolicyResponseAggregation, EndpointPolicyResponseDocument, - FleetAgentCacheItem, } from './types'; export const TelemetryEndpointTaskConstants = { TIMEOUT: '5m', TYPE: 'security:endpoint-meta-telemetry', - INTERVAL: '24m', + INTERVAL: '24h', VERSION: '1.0.0', }; +// Endpoint agent uses this Policy ID while it's installing. +const DefaultEndpointPolicyIdToIgnore = '00000000-0000-0000-0000-000000000000'; + +const EmptyFleetAgentResponse = { + agents: [], + total: 0, + page: 0, + perPage: 0, +}; + +/** Telemetry Endpoint Task + * + * The Endpoint Telemetry task is a daily batch job that collects and transmits non-sensitive + * endpoint performance and policy logs to Elastic Security Data Engineering. It is used to + * identify bugs or common UX issues with the Elastic Security Endpoint agent. + */ export class TelemetryEndpointTask { private readonly logger: Logger; private readonly sender: TelemetryEventsSender; @@ -50,17 +70,21 @@ export class TelemetryEndpointTask { return { run: async () => { - const executeTo = moment().utc().toISOString(); - const lastExecutionTimestamp = getLastTaskExecutionTimestamp( - executeTo, + const taskExecutionTime = moment().utc().toISOString(); + const lastExecutionTimestamp = getPreviousEpMetaTaskTimestamp( + taskExecutionTime, taskInstance.state?.lastExecutionTimestamp ); - const hits = await this.runTask(taskInstance.id); + const hits = await this.runTask( + taskInstance.id, + lastExecutionTimestamp, + taskExecutionTime + ); return { state: { - lastExecutionTimestamp, + lastExecutionTimestamp: taskExecutionTime, runs: (state.runs || 0) + 1, hits, }, @@ -94,23 +118,25 @@ export class TelemetryEndpointTask { return `${TelemetryEndpointTaskConstants.TYPE}:${TelemetryEndpointTaskConstants.VERSION}`; }; - private async fetchEndpointData() { - const [epMetricsResponse, fleetAgentsResponse, policyResponse] = await Promise.allSettled([ - this.sender.fetchEndpointMetrics(), + private async fetchEndpointData(executeFrom: string, executeTo: string) { + const [fleetAgentsResponse, epMetricsResponse, policyResponse] = await Promise.allSettled([ this.sender.fetchFleetAgents(), - this.sender.fetchFailedEndpointPolicyResponses(), + this.sender.fetchEndpointMetrics(executeFrom, executeTo), + this.sender.fetchEndpointPolicyResponses(executeFrom, executeTo), ]); return { + fleetAgentsResponse: + fleetAgentsResponse.status === 'fulfilled' + ? fleetAgentsResponse.value + : EmptyFleetAgentResponse, endpointMetrics: epMetricsResponse.status === 'fulfilled' ? epMetricsResponse.value : undefined, - fleetAgentsResponse: - fleetAgentsResponse.status === 'fulfilled' ? fleetAgentsResponse.value : undefined, epPolicyResponse: policyResponse.status === 'fulfilled' ? policyResponse.value : undefined, }; } - public runTask = async (taskId: string) => { + public runTask = async (taskId: string, executeFrom: string, executeTo: string) => { if (taskId !== this.getTaskId()) { this.logger.debug(`Outdated task running: ${taskId}`); return 0; @@ -122,99 +148,179 @@ export class TelemetryEndpointTask { return 0; } - const endpointData = await this.fetchEndpointData(); + const endpointData = await this.fetchEndpointData(executeFrom, executeTo); + + /** STAGE 1 - Fetch Endpoint Agent Metrics + * + * Reads Endpoint Agent metrics out of the `.ds-metrics-endpoint.metrics` data stream + * and buckets them by Endpoint Agent id and sorts by the top hit. The EP agent will + * report its metrics once per day OR every time a policy change has occured. If + * a metric document(s) exists for an EP agent we map to fleet agent and policy + */ + if (endpointData.endpointMetrics === undefined) { + this.logger.debug(`no endpoint metrics to report`); + return 0; + } const { body: endpointMetricsResponse } = (endpointData.endpointMetrics as unknown) as { body: EndpointMetricsAggregation; }; - if (endpointMetricsResponse.aggregations === undefined) { - this.logger.debug(`No endpoint metrics`); - return 0; - } const endpointMetrics = endpointMetricsResponse.aggregations.endpoint_agents.buckets.map( (epMetrics) => { return { endpoint_agent: epMetrics.latest_metrics.hits.hits[0]._source.agent.id, + endpoint_version: epMetrics.latest_metrics.hits.hits[0]._source.agent.version, endpoint_metrics: epMetrics.latest_metrics.hits.hits[0]._source, }; } ); - if (endpointMetrics.length === 0) { - this.logger.debug('no reported endpoint metrics'); - return 0; - } - + /** STAGE 2 - Fetch Fleet Agent Config + * + * As the policy id + policy version does not exist on the Endpoint Metrics document + * we need to fetch information about the Fleet Agent and sync the metrics document + * with the Fleet agent's policy data. + * + * 7.14 ~ An issue was created with the Endpoint agent team to add the policy id + + * policy version to the metrics document to circumvent and refactor away from + * this expensive join operation. + */ const agentsResponse = endpointData.fleetAgentsResponse; if (agentsResponse === undefined) { - this.logger.debug('no agents to report'); return 0; } + const fleetAgents = agentsResponse.agents.reduce((cache, agent) => { + if (agent.id === DefaultEndpointPolicyIdToIgnore) { + return cache; + } + + if (agent.policy_id !== null && agent.policy_id !== undefined) { + cache.set(agent.id, agent.policy_id); + } - const fleetAgents = agentsResponse?.agents.reduce((cache, agent) => { - cache.set(agent.id, { policy_id: agent.policy_id, policy_version: agent.policy_revision }); return cache; - }, new Map()); + }, new Map()); - const endpointPolicyCache = new Map(); + const endpointPolicyCache = new Map(); for (const policyInfo of fleetAgents.values()) { - if ( - policyInfo.policy_id !== null && - policyInfo.policy_id !== undefined && - !endpointPolicyCache.has(policyInfo.policy_id) - ) { - const packagePolicies = await this.sender.fetchEndpointPolicyConfigs(policyInfo.policy_id); - packagePolicies?.inputs.forEach((input) => { - if (input.type === 'endpoint' && policyInfo.policy_id !== undefined) { - endpointPolicyCache.set(policyInfo.policy_id, input); - } - }); + if (policyInfo !== null && policyInfo !== undefined && !endpointPolicyCache.has(policyInfo)) { + const agentPolicy = await this.sender.fetchPolicyConfigs(policyInfo); + const packagePolicies = agentPolicy?.package_policies; + + if (packagePolicies !== undefined && isPackagePolicyList(packagePolicies)) { + packagePolicies + .map((pPolicy) => pPolicy as PolicyData) + .forEach((pPolicy) => { + if (pPolicy.inputs[0].config !== undefined) { + pPolicy.inputs.forEach((input) => { + if ( + input.type === FLEET_ENDPOINT_PACKAGE && + input.config !== undefined && + policyInfo !== undefined + ) { + endpointPolicyCache.set(policyInfo, pPolicy); + } + }); + } + }); + } } } + /** STAGE 3 - Fetch Endpoint Policy Responses + * + * Reads Endpoint Agent policy responses out of the `.ds-metrics-endpoint.policy*` data + * stream and creates a local K/V structure that stores the policy response (V) with + * the Endpoint Agent Id (K). A value will only exist if there has been a endpoint + * enrolled in the last 24 hours OR a policy change has occurred. We only send + * non-successful responses. If the field is null, we assume no responses in + * the last 24h or no failures/warnings in the policy applied. + * + */ const { body: failedPolicyResponses } = (endpointData.epPolicyResponse as unknown) as { body: EndpointPolicyResponseAggregation; }; const policyResponses = failedPolicyResponses.aggregations.policy_responses.buckets.reduce( - (cache, bucket) => { - const doc = bucket.latest_response.hits.hits[0]; - cache.set(bucket.key, doc); + (cache, endpointAgentId) => { + const doc = endpointAgentId.latest_response.hits.hits[0]; + cache.set(endpointAgentId.key, doc); return cache; }, new Map() ); - const telemetryPayloads = endpointMetrics.map((endpoint) => { - let policyConfig = null; - let failedPolicy = null; + /** STAGE 4 - Create the telemetry log records + * + * Iterates through the endpoint metrics documents at STAGE 1 and joins them together + * to form the telemetry log that is sent back to Elastic Security developers to + * make improvements to the product. + * + */ + try { + const telemetryPayloads = endpointMetrics.map((endpoint) => { + let policyConfig = null; + let failedPolicy = null; + + const fleetAgentId = endpoint.endpoint_metrics.elastic.agent.id; + const endpointAgentId = endpoint.endpoint_agent; - const fleetAgentId = endpoint.endpoint_metrics.elastic.agent.id; - const endpointAgentId = endpoint.endpoint_agent; + const policyInformation = fleetAgents.get(fleetAgentId); + if (policyInformation) { + policyConfig = endpointPolicyCache.get(policyInformation); - const policyInformation = fleetAgents.get(fleetAgentId); - if (policyInformation?.policy_id) { - policyConfig = endpointPolicyCache.get(policyInformation?.policy_id); - if (policyConfig) { - failedPolicy = policyResponses.get(policyConfig?.id); + if (policyConfig) { + failedPolicy = policyResponses.get(policyConfig?.id); + } } - } - return { - agent_id: fleetAgentId, - endpoint_id: endpointAgentId, - endpoint_metrics: { - os: endpoint.endpoint_metrics.host.os, - cpu: endpoint.endpoint_metrics.Endpoint.metrics.cpu, - memory: endpoint.endpoint_metrics.Endpoint.metrics.memory, - uptime: endpoint.endpoint_metrics.Endpoint.metrics.uptime, - }, - policy_config: policyConfig, - policy_failure: failedPolicy, - }; - }); + const { cpu, memory, uptime } = endpoint.endpoint_metrics.Endpoint.metrics; + + return { + '@timestamp': executeTo, + agent_id: fleetAgentId, + endpoint_id: endpointAgentId, + endpoint_version: endpoint.endpoint_version, + endpoint_package_version: policyConfig?.package?.version || null, + endpoint_metrics: { + cpu: cpu.endpoint, + memory: memory.endpoint.private, + uptime, + }, + endpoint_meta: { + os: endpoint.endpoint_metrics.host.os, + }, + policy_config: policyConfig !== null ? policyConfig?.inputs[0].config.policy : {}, + policy_response: + failedPolicy !== null && failedPolicy !== undefined + ? { + agent_policy_status: failedPolicy._source.event.agent_id_status, + manifest_version: + failedPolicy._source.Endpoint.policy.applied.artifacts.global.version, + status: failedPolicy._source.Endpoint.policy.applied.status, + actions: failedPolicy._source.Endpoint.policy.applied.actions + .map((action) => (action.status !== 'success' ? action : null)) + .filter((action) => action !== null), + } + : {}, + telemetry_meta: { + metrics_timestamp: endpoint.endpoint_metrics['@timestamp'], + }, + }; + }); - this.sender.sendOnDemand('endpoint-metadata', telemetryPayloads); - return telemetryPayloads.length; + /** + * STAGE 5 - Send the documents + * + * Send the documents in a batches of 100 + */ + batchTelemetryRecords(telemetryPayloads, 100).forEach((telemetryBatch) => + this.sender.sendOnDemand('endpoint-metadata', telemetryBatch) + ); + return telemetryPayloads.length; + } catch (err) { + this.logger.error('Could not send endpoint alert telemetry'); + return 0; + } }; } diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/helpers.task.ts b/x-pack/plugins/security_solution/server/lib/telemetry/helpers.task.ts deleted file mode 100644 index ec81f3d0a5fa4b..00000000000000 --- a/x-pack/plugins/security_solution/server/lib/telemetry/helpers.task.ts +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import moment from 'moment'; -import { getLastTaskExecutionTimestamp } from './helpers'; - -describe('test scheduled task helpers', () => { - test('test -5 mins is returned when there is no previous task run', async () => { - const executeTo = moment().utc().toISOString(); - const executeFrom = undefined; - const newExecuteFrom = getLastTaskExecutionTimestamp(executeTo, executeFrom); - - expect(newExecuteFrom).toEqual(moment(executeTo).subtract(5, 'minutes').toISOString()); - }); - - test('test -6 mins is returned when there was a previous task run', async () => { - const executeTo = moment().utc().toISOString(); - const executeFrom = moment(executeTo).subtract(6, 'minutes').toISOString(); - const newExecuteFrom = getLastTaskExecutionTimestamp(executeTo, executeFrom); - - expect(newExecuteFrom).toEqual(executeFrom); - }); - - // it's possible if Kibana is down for a prolonged period the stored lastRun would have drifted - // if that is the case we will just roll it back to a 10 min search window - test('test 10 mins is returned when previous task run took longer than 10 minutes', async () => { - const executeTo = moment().utc().toISOString(); - const executeFrom = moment(executeTo).subtract(142, 'minutes').toISOString(); - const newExecuteFrom = getLastTaskExecutionTimestamp(executeTo, executeFrom); - - expect(newExecuteFrom).toEqual(moment(executeTo).subtract(10, 'minutes').toISOString()); - }); -}); diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/helpers.test.ts b/x-pack/plugins/security_solution/server/lib/telemetry/helpers.test.ts new file mode 100644 index 00000000000000..bee673fc8725f7 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/telemetry/helpers.test.ts @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import moment from 'moment'; +import { createMockPackagePolicy } from './mocks'; +import { + getPreviousDiagTaskTimestamp, + getPreviousEpMetaTaskTimestamp, + batchTelemetryRecords, + isPackagePolicyList, +} from './helpers'; + +describe('test diagnostic telemetry scheduled task timing helper', () => { + test('test -5 mins is returned when there is no previous task run', async () => { + const executeTo = moment().utc().toISOString(); + const executeFrom = undefined; + const newExecuteFrom = getPreviousDiagTaskTimestamp(executeTo, executeFrom); + + expect(newExecuteFrom).toEqual(moment(executeTo).subtract(5, 'minutes').toISOString()); + }); + + test('test -6 mins is returned when there was a previous task run', async () => { + const executeTo = moment().utc().toISOString(); + const executeFrom = moment(executeTo).subtract(6, 'minutes').toISOString(); + const newExecuteFrom = getPreviousDiagTaskTimestamp(executeTo, executeFrom); + + expect(newExecuteFrom).toEqual(executeFrom); + }); + + // it's possible if Kibana is down for a prolonged period the stored lastRun would have drifted + // if that is the case we will just roll it back to a 10 min search window + test('test 10 mins is returned when previous task run took longer than 10 minutes', async () => { + const executeTo = moment().utc().toISOString(); + const executeFrom = moment(executeTo).subtract(142, 'minutes').toISOString(); + const newExecuteFrom = getPreviousDiagTaskTimestamp(executeTo, executeFrom); + + expect(newExecuteFrom).toEqual(moment(executeTo).subtract(10, 'minutes').toISOString()); + }); +}); + +describe('test endpoint meta telemetry scheduled task timing helper', () => { + test('test -24 hours is returned when there is no previous task run', async () => { + const executeTo = moment().utc().toISOString(); + const executeFrom = undefined; + const newExecuteFrom = getPreviousEpMetaTaskTimestamp(executeTo, executeFrom); + + expect(newExecuteFrom).toEqual(moment(executeTo).subtract(24, 'hours').toISOString()); + }); + + test('test -24 hours is returned when there was a previous task run', async () => { + const executeTo = moment().utc().toISOString(); + const executeFrom = moment(executeTo).subtract(24, 'hours').toISOString(); + const newExecuteFrom = getPreviousEpMetaTaskTimestamp(executeTo, executeFrom); + + expect(newExecuteFrom).toEqual(executeFrom); + }); + + // it's possible if Kibana is down for a prolonged period the stored lastRun would have drifted + // if that is the case we will just roll it back to a 30 hour search window + test('test 24 hours is returned when previous task run took longer than 24 hours', async () => { + const executeTo = moment().utc().toISOString(); + const executeFrom = moment(executeTo).subtract(72, 'hours').toISOString(); // down 3 days + const newExecuteFrom = getPreviousEpMetaTaskTimestamp(executeTo, executeFrom); + + expect(newExecuteFrom).toEqual(moment(executeTo).subtract(24, 'hours').toISOString()); + }); +}); + +describe('telemetry batching logic', () => { + test('records can be batched oddly as they are sent to the telemetry channel', async () => { + const stubTelemetryRecords = [...Array(10).keys()]; + const batchSize = 3; + + const records = batchTelemetryRecords(stubTelemetryRecords, batchSize); + expect(records.length).toEqual(4); + }); + + test('records can be batched evenly as they are sent to the telemetry channel', async () => { + const stubTelemetryRecords = [...Array(299).keys()]; + const batchSize = 100; + + const records = batchTelemetryRecords(stubTelemetryRecords, batchSize); + expect(records.length).toEqual(3); + }); + + test('empty telemetry records wont be batched', async () => { + const stubTelemetryRecords = [...Array(0).keys()]; + const batchSize = 100; + + const records = batchTelemetryRecords(stubTelemetryRecords, batchSize); + expect(records.length).toEqual(0); + }); +}); + +describe('test package policy type guard', () => { + test('string records are not package policies', async () => { + const arr = ['a', 'b', 'c']; + const result = isPackagePolicyList(arr); + + expect(result).toEqual(false); + }); + + test('empty array are not package policies', () => { + const arr: string[] = []; + const result = isPackagePolicyList(arr); + + expect(result).toEqual(false); + }); + + test('undefined is not a list of package policies', () => { + const arr = undefined; + const result = isPackagePolicyList(arr); + + expect(result).toEqual(false); + }); + + test('package policies are list of package policies', () => { + const arr = [createMockPackagePolicy(), createMockPackagePolicy(), createMockPackagePolicy()]; + const result = isPackagePolicyList(arr); + + expect(result).toEqual(true); + }); +}); diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/helpers.ts b/x-pack/plugins/security_solution/server/lib/telemetry/helpers.ts index e820116462fa20..b32bd64a073378 100644 --- a/x-pack/plugins/security_solution/server/lib/telemetry/helpers.ts +++ b/x-pack/plugins/security_solution/server/lib/telemetry/helpers.ts @@ -6,8 +6,16 @@ */ import moment from 'moment'; +import { PackagePolicy } from '../../../../fleet/common/types/models/package_policy'; -export const getLastTaskExecutionTimestamp = ( +/** + * Determines the when the last run was in order to execute to. + * + * @param executeTo + * @param lastExecutionTimestamp + * @returns the timestamp to search from + */ +export const getPreviousDiagTaskTimestamp = ( executeTo: string, lastExecutionTimestamp?: string ) => { @@ -21,3 +29,57 @@ export const getLastTaskExecutionTimestamp = ( return lastExecutionTimestamp; }; + +/** + * Determines the when the last run was in order to execute to. + * + * @param executeTo + * @param lastExecutionTimestamp + * @returns the timestamp to search from + */ +export const getPreviousEpMetaTaskTimestamp = ( + executeTo: string, + lastExecutionTimestamp?: string +) => { + if (lastExecutionTimestamp === undefined) { + return moment(executeTo).subtract(24, 'hours').toISOString(); + } + + if (moment(executeTo).diff(lastExecutionTimestamp, 'hours') >= 24) { + return moment(executeTo).subtract(24, 'hours').toISOString(); + } + + return lastExecutionTimestamp; +}; + +/** + * Chunks an Array into an Array> + * This is to prevent overloading the telemetry channel + user resources + * + * @param telemetryRecords + * @param batchSize + * @returns the batch of records + */ +export const batchTelemetryRecords = ( + telemetryRecords: unknown[], + batchSize: number +): unknown[][] => + [...Array(Math.ceil(telemetryRecords.length / batchSize))].map((_) => + telemetryRecords.splice(0, batchSize) + ); + +/** + * User defined type guard for PackagePolicy + * + * @param data the union type of package policies + * @returns type confirmation + */ +export function isPackagePolicyList( + data: string[] | PackagePolicy[] | undefined +): data is PackagePolicy[] { + if (data === undefined || data.length < 1) { + return false; + } + + return (data as PackagePolicy[])[0].inputs !== undefined; +} diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/mocks.ts b/x-pack/plugins/security_solution/server/lib/telemetry/mocks.ts index 6738113da103d5..f27d22287c9d7c 100644 --- a/x-pack/plugins/security_solution/server/lib/telemetry/mocks.ts +++ b/x-pack/plugins/security_solution/server/lib/telemetry/mocks.ts @@ -9,6 +9,7 @@ import { TelemetryEventsSender } from './sender'; import { TelemetryDiagTask } from './diagnostic_task'; import { TelemetryEndpointTask } from './endpoint_task'; +import { PackagePolicy } from '../../../../fleet/common/types/models/package_policy'; /** * Creates a mocked Telemetry Events Sender @@ -33,6 +34,22 @@ export const createMockTelemetryEventsSender = ( } as unknown) as jest.Mocked; }; +/** + * Creates a mocked package policy + */ +export const createMockPackagePolicy = (): jest.Mocked => { + return ({ + id: jest.fn(), + inputs: jest.fn(), + version: jest.fn(), + revision: jest.fn(), + updated_at: jest.fn(), + updated_by: jest.fn(), + created_at: jest.fn(), + created_by: jest.fn(), + } as unknown) as jest.Mocked; +}; + /** * Creates a mocked Telemetry Diagnostic Task */ diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/sender.ts b/x-pack/plugins/security_solution/server/lib/telemetry/sender.ts index 302f56802a5a49..6f9279d04b3480 100644 --- a/x-pack/plugins/security_solution/server/lib/telemetry/sender.ts +++ b/x-pack/plugins/security_solution/server/lib/telemetry/sender.ts @@ -7,6 +7,7 @@ import { cloneDeep } from 'lodash'; import axios from 'axios'; +import { SearchRequest } from '@elastic/elasticsearch/api/types'; import { LegacyAPICaller, SavedObjectsClientContract } from 'kibana/server'; import { URL } from 'url'; import { CoreStart, ElasticsearchClient, Logger } from 'src/core/server'; @@ -139,22 +140,30 @@ export class TelemetryEventsSender { return callCluster('search', query); } - public async fetchEndpointMetrics() { + public async fetchEndpointMetrics(executeFrom: string, executeTo: string) { if (this.esClient === undefined) { throw Error('could not fetch policy responses. es client is not available'); } - const query = { + const query: SearchRequest = { expand_wildcards: 'open,hidden', - index: `.ds-metrics-endpoint.metrics*`, + index: `.ds-metrics-endpoint.metrics-*`, ignore_unavailable: false, size: 0, // no query results required - only aggregation quantity body: { + query: { + range: { + '@timestamp': { + gte: executeFrom, + lt: executeTo, + }, + }, + }, aggs: { endpoint_agents: { terms: { + field: 'agent.id', size: this.max_records, - field: 'agent.id.keyword', }, aggs: { latest_metrics: { @@ -175,7 +184,6 @@ export class TelemetryEventsSender { }, }; - // @ts-expect-error The types of 'body.aggs' are incompatible between these types. return this.esClient.search(query); } @@ -192,35 +200,38 @@ export class TelemetryEventsSender { }); } - public async fetchEndpointPolicyConfigs(id: string) { + public async fetchPolicyConfigs(id: string) { if (this.savedObjectClient === undefined) { throw Error('could not fetch endpoint policy configs. saved object client is not available'); } - return this.agentPolicyService?.getFullAgentPolicy(this.savedObjectClient, id); + return this.agentPolicyService?.get(this.savedObjectClient, id); } - public async fetchFailedEndpointPolicyResponses() { + public async fetchEndpointPolicyResponses(executeFrom: string, executeTo: string) { if (this.esClient === undefined) { throw Error('could not fetch policy responses. es client is not available'); } - const query = { + const query: SearchRequest = { expand_wildcards: 'open,hidden', index: `.ds-metrics-endpoint.policy*`, ignore_unavailable: false, size: 0, // no query results required - only aggregation quantity body: { query: { - match: { - 'Endpoint.policy.applied.status': 'failure', + range: { + '@timestamp': { + gte: executeFrom, + lt: executeTo, + }, }, }, aggs: { policy_responses: { terms: { size: this.max_records, - field: 'Endpoint.policy.applied.id.keyword', + field: 'Endpoint.policy.applied.id', }, aggs: { latest_response: { @@ -241,7 +252,6 @@ export class TelemetryEventsSender { }, }; - // @ts-expect-error The types of 'body.aggs' are incompatible between these types. return this.esClient.search(query); } diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/types.ts b/x-pack/plugins/security_solution/server/lib/telemetry/types.ts index 435f3cf49d1f1b..355393145fa0b3 100644 --- a/x-pack/plugins/security_solution/server/lib/telemetry/types.ts +++ b/x-pack/plugins/security_solution/server/lib/telemetry/types.ts @@ -5,13 +5,6 @@ * 2.0. */ -// Sec Sol Kbn telemetry instrumentation specific - -export interface FleetAgentCacheItem { - policy_id: string | undefined; - policy_version: number | undefined | null; -} - // EP Policy Response export interface EndpointPolicyResponseAggregation { @@ -45,7 +38,23 @@ export interface EndpointPolicyResponseDocument { event: { agent_id_status: string; }; - Endpoint: {}; + Endpoint: { + policy: { + applied: { + actions: Array<{ + name: string; + message: string; + status: string; + }>; + artifacts: { + global: { + version: string; + }; + }; + status: string; + }; + }; + }; }; } @@ -74,6 +83,7 @@ interface EndpointMetricDocument { '@timestamp': string; agent: { id: string; + version: string; }; Endpoint: { metrics: EndpointMetrics;