Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EP Meta Telemetry Perf #104396

Merged
merged 25 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
52f8db9
Add comments for other developers.
pjhampton Jul 1, 2021
f2757c5
Move OS infomation into meta key.
pjhampton Jul 1, 2021
763020f
Refmt endpoint metrics.
pjhampton Jul 1, 2021
cf0a7c7
Add helper funcs to batch sending.
pjhampton Jul 1, 2021
4ad2672
Add test to ensure opt in status.
pjhampton Jul 5, 2021
f9291d2
Add helpers test.
pjhampton Jul 5, 2021
3753c12
Finish reshaping the document based on feedback.
pjhampton Jul 6, 2021
b9dde6d
Merge branch 'master' into pjhampton/ep-telemetry-perf
kibanamachine Jul 7, 2021
76ae366
Add better type safety. Add policy package version to output.
pjhampton Jul 7, 2021
bbc6945
Fix sender implementation for aggregating EP datastreams.
pjhampton Jul 8, 2021
1b1b3cc
Fix type issues.
pjhampton Jul 8, 2021
25b7bc4
Merge branch 'master' into pjhampton/ep-telemetry-perf
kibanamachine Jul 8, 2021
d2189d8
Fix cadence inference + miss default agent id.
pjhampton Jul 8, 2021
dea2711
Dynamically control search ranges for metrics + policy responses.
pjhampton Jul 9, 2021
7cad599
Set back to 24h.
pjhampton Jul 9, 2021
a276838
Merge branch 'master' into pjhampton/ep-telemetry-perf
kibanamachine Jul 9, 2021
7c6b7f4
Merge branch 'master' into pjhampton/ep-telemetry-perf
kibanamachine Jul 14, 2021
6afef3a
Merge branch 'master' into pjhampton/ep-telemetry-perf
kibanamachine Jul 16, 2021
65343d8
Add comment for ignoring the default policy id.
pjhampton Jul 16, 2021
d783013
explicitly type the sub agg search query.
pjhampton Jul 16, 2021
7877da2
Improve type safety.
pjhampton Jul 16, 2021
dba608e
Merge branch 'master' into pjhampton/ep-telemetry-perf
kibanamachine Jul 19, 2021
e451818
Add additional type safety + try/catch the last block.
pjhampton Jul 19, 2021
ea4e07e
Remove unneeded optional chaining.
pjhampton Jul 19, 2021
d0cf37d
Destructure host metrics.
pjhampton Jul 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../task_manager/server';
import { getLastTaskExecutionTimestamp } from './helpers';
import { getPreviousDiagTaskTimestamp } from './helpers';
import { TelemetryEventsSender, TelemetryEvent } from './sender';

export const TelemetryDiagTaskConstants = {
Expand Down Expand Up @@ -44,7 +44,7 @@ export class TelemetryDiagTask {
return {
run: async () => {
const executeTo = moment().utc().toISOString();
const executeFrom = getLastTaskExecutionTimestamp(
const executeFrom = getPreviousDiagTaskTimestamp(
executeTo,
taskInstance.state?.lastExecutionTimestamp
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
*/

import { loggingSystemMock } from 'src/core/server/mocks';
import { TaskStatus } from '../../../../task_manager/server';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { TelemetryEndpointTask } from './endpoint_task';
import { createMockTelemetryEventsSender } from './mocks';

import { TelemetryEndpointTask, TelemetryEndpointTaskConstants } from './endpoint_task';
import { createMockTelemetryEventsSender, MockTelemetryEndpointTask } from './mocks';

describe('test', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
Expand All @@ -18,7 +20,7 @@ describe('test', () => {
});

describe('endpoint alert telemetry checks', () => {
test('the task can register', () => {
test('the endpoint task can register', () => {
const telemetryEndpointTask = new TelemetryEndpointTask(
logger,
taskManagerMock.createSetup(),
Expand Down Expand Up @@ -48,4 +50,30 @@ describe('test', () => {
await telemetryEndpointTask.start(mockTaskManagerStart);
expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled();
});

test('endpoint task should not query elastic if telemetry is not opted in', async () => {
const mockSender = createMockTelemetryEventsSender(false);
const mockTaskManager = taskManagerMock.createSetup();
new MockTelemetryEndpointTask(logger, mockTaskManager, mockSender);

const mockTaskInstance = {
id: TelemetryEndpointTaskConstants.TYPE,
runAt: new Date(),
attempts: 0,
ownerId: '',
status: TaskStatus.Running,
startedAt: new Date(),
scheduledAt: new Date(),
retryAt: new Date(),
params: {},
state: {},
taskType: TelemetryEndpointTaskConstants.TYPE,
};
const createTaskRunner =
mockTaskManager.registerTaskDefinitions.mock.calls[0][0][TelemetryEndpointTaskConstants.TYPE]
.createTaskRunner;
const taskRunner = createTaskRunner({ taskInstance: mockTaskInstance });
await taskRunner.run();
expect(mockSender.fetchDiagnosticAlerts).not.toHaveBeenCalled();
});
});
188 changes: 140 additions & 48 deletions x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,30 @@ import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../task_manager/server';
import { getLastTaskExecutionTimestamp } from './helpers';
import { batchTelemetryRecords, getPreviousEpMetaTaskTimestamp } from './helpers';
import { TelemetryEventsSender } from './sender';
import { FullAgentPolicyInput } from '../../../../fleet/common/types/models/agent_policy';
import { PolicyData } from '../../../common/endpoint/types';
import { PackagePolicy } from '../../../../fleet/common/types/models/package_policy';
import { FLEET_ENDPOINT_PACKAGE } from '../../../../fleet/common';
import {
EndpointMetricsAggregation,
EndpointPolicyResponseAggregation,
EndpointPolicyResponseDocument,
FleetAgentCacheItem,
} from './types';

export const TelemetryEndpointTaskConstants = {
TIMEOUT: '5m',
TYPE: 'security:endpoint-meta-telemetry',
INTERVAL: '24m',
INTERVAL: '24h',
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
VERSION: '1.0.0',
};

/** Telemetry Endpoint Task
*
* The Endpoint Telemetry task is a daily batch job that collects and transmits non-sensitive
* endpoint performance and policy logs to Elastic Security Data Engineering. It is used to
* identify bugs or common UX issues with the Elastic Security Endpoint agent.
*/
export class TelemetryEndpointTask {
private readonly logger: Logger;
private readonly sender: TelemetryEventsSender;
Expand All @@ -50,17 +57,21 @@ export class TelemetryEndpointTask {

return {
run: async () => {
const executeTo = moment().utc().toISOString();
const lastExecutionTimestamp = getLastTaskExecutionTimestamp(
executeTo,
const taskExecutionTime = moment().utc().toISOString();
const lastExecutionTimestamp = getPreviousEpMetaTaskTimestamp(
taskExecutionTime,
taskInstance.state?.lastExecutionTimestamp
);

const hits = await this.runTask(taskInstance.id);
const hits = await this.runTask(
taskInstance.id,
lastExecutionTimestamp,
taskExecutionTime
);

return {
state: {
lastExecutionTimestamp,
lastExecutionTimestamp: taskExecutionTime,
runs: (state.runs || 0) + 1,
hits,
},
Expand Down Expand Up @@ -94,23 +105,23 @@ export class TelemetryEndpointTask {
return `${TelemetryEndpointTaskConstants.TYPE}:${TelemetryEndpointTaskConstants.VERSION}`;
};

private async fetchEndpointData() {
const [epMetricsResponse, fleetAgentsResponse, policyResponse] = await Promise.allSettled([
this.sender.fetchEndpointMetrics(),
private async fetchEndpointData(executeFrom: string, executeTo: string) {
const [fleetAgentsResponse, epMetricsResponse, policyResponse] = await Promise.allSettled([
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
this.sender.fetchFleetAgents(),
this.sender.fetchFailedEndpointPolicyResponses(),
this.sender.fetchEndpointMetrics(executeFrom, executeTo),
this.sender.fetchEndpointPolicyResponses(executeFrom, executeTo),
]);

return {
endpointMetrics:
epMetricsResponse.status === 'fulfilled' ? epMetricsResponse.value : undefined,
fleetAgentsResponse:
fleetAgentsResponse.status === 'fulfilled' ? fleetAgentsResponse.value : undefined,
endpointMetrics:
epMetricsResponse.status === 'fulfilled' ? epMetricsResponse.value : undefined,
epPolicyResponse: policyResponse.status === 'fulfilled' ? policyResponse.value : undefined,
};
}

public runTask = async (taskId: string) => {
public runTask = async (taskId: string, executeFrom: string, executeTo: string) => {
if (taskId !== this.getTaskId()) {
this.logger.debug(`Outdated task running: ${taskId}`);
return 0;
Expand All @@ -122,8 +133,15 @@ export class TelemetryEndpointTask {
return 0;
}

const endpointData = await this.fetchEndpointData();
const endpointData = await this.fetchEndpointData(executeFrom, executeTo);

/** STAGE 1 - Fetch Endpoint Agent Metrics
*
* Reads Endpoint Agent metrics out of the `.ds-metrics-endpoint.metrics` data stream
* and buckets them by Endpoint Agent id and sorts by the top hit. The EP agent will
* report its metrics once per day OR every time a policy change has occured. If
* a metric document(s) exists for an EP agent we map to fleet agent and policy
*/
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
const { body: endpointMetricsResponse } = (endpointData.endpointMetrics as unknown) as {
Copy link
Contributor

@jonathan-buttner jonathan-buttner Jul 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just responding to your question from slack here, I know you didn't change this but yeah if you have time to make a few changes here that'd be great.

I believe it's possible for endpointMetricsResponse to be undefined based on the return value of the this.fetchEndpointData() call. Below I think we can change the if to if (endpointMetricsResponse?.aggregation to address that

I think if we could remove the cast to unknown that'd be awesome. I think we can achieve that by adding something like this:

interface MetricsType {
  key: string;
  doc_count: number;
  latest_metrics: EndpointMetricHits;
}

const endpointAgentsAggs = endpointMetricsResponse.aggregations
      .endpoint_agents as AggregationsMultiBucketAggregate<MetricsType>;

    const endpointMetrics = endpointAgentsAggs.buckets.map((epMetrics) => {
      return {
        endpoint_agent: epMetrics.latest_metrics.hits.hits[0]._source.agent.id,
        endpoint_version: epMetrics.latest_metrics.hits.hits[0]._source.agent.version,
        endpoint_metrics: epMetrics.latest_metrics.hits.hits[0]._source,
      };

As a side note it looks like EndpointMetricsAggregation might need total I think that is already typed as part of the SearchResponse type, so we might be able to get rid of that part.

export interface EndpointMetricsAggregation {
  hits: {
    total: { value: number };
  };

body: EndpointMetricsAggregation;
};
Expand All @@ -136,55 +154,92 @@ export class TelemetryEndpointTask {
(epMetrics) => {
return {
endpoint_agent: epMetrics.latest_metrics.hits.hits[0]._source.agent.id,
endpoint_version: epMetrics.latest_metrics.hits.hits[0]._source.agent.version,
endpoint_metrics: epMetrics.latest_metrics.hits.hits[0]._source,
};
}
);

if (endpointMetrics.length === 0) {
this.logger.debug('no reported endpoint metrics');
return 0;
}

/** STAGE 2 - Fetch Fleet Agent Config
*
* As the policy id + policy version does not exist on the Endpoint Metrics document
* we need to fetch information about the Fleet Agent and sync the metrics document
* with the Fleet agent's policy data.
*
* 7.14 ~ An issue was created with the Endpoint agent team to add the policy id +
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
* policy version to the metrics document to circumvent and refactor away from
* this expensive join operation.
*/
const agentsResponse = endpointData.fleetAgentsResponse;
if (agentsResponse === undefined) {
this.logger.debug('no agents to report');
return 0;
}

const fleetAgents = agentsResponse?.agents.reduce((cache, agent) => {
cache.set(agent.id, { policy_id: agent.policy_id, policy_version: agent.policy_revision });
if (agent.id === '00000000-0000-0000-0000-000000000000') {
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
return cache;
}

if (agent.policy_id !== null && agent.policy_id !== undefined) {
cache.set(agent.id, agent.policy_id);
}

return cache;
}, new Map<string, FleetAgentCacheItem>());
}, new Map<string, string>());

const endpointPolicyCache = new Map<string, FullAgentPolicyInput>();
const endpointPolicyCache = new Map<string, PolicyData>();
for (const policyInfo of fleetAgents.values()) {
if (
policyInfo.policy_id !== null &&
policyInfo.policy_id !== undefined &&
!endpointPolicyCache.has(policyInfo.policy_id)
) {
const packagePolicies = await this.sender.fetchEndpointPolicyConfigs(policyInfo.policy_id);
packagePolicies?.inputs.forEach((input) => {
if (input.type === 'endpoint' && policyInfo.policy_id !== undefined) {
endpointPolicyCache.set(policyInfo.policy_id, input);
}
});
if (policyInfo !== null && policyInfo !== undefined && !endpointPolicyCache.has(policyInfo)) {
const agentPolicy = await this.sender.fetchPolicyConfigs(policyInfo);
const packagePolicies = agentPolicy?.package_policies as PackagePolicy[];
pjhampton marked this conversation as resolved.
Show resolved Hide resolved

packagePolicies
.map((pPolicy) => pPolicy as PolicyData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the map just to cast the values, or should this be a filter of any null values? If the former could we just remove the map like below? Also, can the pPolicy.inputs array be empty or the config null instead of undefined?

packagePolicies
  .forEach((pPolicy as PolicyData) => {
    if (pPolicy.inputs[0].config !== undefined) { // <-- Also, can this be null too?
      pPolicy.inputs.forEach((input) => {
        if (
          input.type === FLEET_ENDPOINT_PACKAGE &&
          input.config !== undefined &&
          policyInfo !== undefined
        ) {
          endpointPolicyCache.set(policyInfo, pPolicy);
        }
      });
    }
  });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the map just to cast the values, or should this be a filter of any null values?

It is the former - I am just casting. There shouldn't be any null values (but anything is possible).

Also, can the pPolicy.inputs array be empty or the config null instead of undefined?

I haven't seen it - as these are package policies instead of agent policies I think they should always have an input. But I'm running with the assumption that it is possible.

.forEach((pPolicy) => {
if (pPolicy.inputs[0].config !== undefined) {
pPolicy.inputs.forEach((input) => {
if (
input.type === FLEET_ENDPOINT_PACKAGE &&
input.config !== undefined &&
policyInfo !== undefined
) {
endpointPolicyCache.set(policyInfo, pPolicy);
}
});
}
});
}
}

/** STAGE 3 - Fetch Endpoint Policy Responses
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
*
* Reads Endpoint Agent policy responses out of the `.ds-metrics-endpoint.policy*` data
* stream and creates a local K/V structure that stores the policy response (V) with
* the Endpoint Agent Id (K). A value will only exist if there has been a endpoint
* enrolled in the last 24 hours OR a policy change has occurred. We only send
* non-successful responses. If the field is null, we assume no responses in
* the last 24h or no failures/warnings in the policy applied.
*
*/
const { body: failedPolicyResponses } = (endpointData.epPolicyResponse as unknown) as {
body: EndpointPolicyResponseAggregation;
};
const policyResponses = failedPolicyResponses.aggregations.policy_responses.buckets.reduce(
(cache, bucket) => {
const doc = bucket.latest_response.hits.hits[0];
cache.set(bucket.key, doc);
(cache, endpointAgentId) => {
const doc = endpointAgentId.latest_response.hits.hits[0];
cache.set(endpointAgentId.key, doc);
return cache;
},
new Map<string, EndpointPolicyResponseDocument>()
);

/** STAGE 4 - Create the telemetry log records
*
* Iterates through the endpoint metrics documents at STAGE 1 and joins them together
* to form the telemetry log that is sent back to Elastic Security developers to
* make improvements to the product.
*
*/
const telemetryPayloads = endpointMetrics.map((endpoint) => {
let policyConfig = null;
let failedPolicy = null;
Expand All @@ -193,28 +248,65 @@ export class TelemetryEndpointTask {
const endpointAgentId = endpoint.endpoint_agent;

const policyInformation = fleetAgents.get(fleetAgentId);
if (policyInformation?.policy_id) {
policyConfig = endpointPolicyCache.get(policyInformation?.policy_id);
if (policyInformation) {
policyConfig = endpointPolicyCache.get(policyInformation);

if (policyConfig) {
failedPolicy = policyResponses.get(policyConfig?.id);
}
}

return {
'@timestamp': executeTo,
agent_id: fleetAgentId,
endpoint_id: endpointAgentId,
endpoint_version: endpoint.endpoint_version,
endpoint_package_version: policyConfig?.package?.version || null,
endpoint_metrics: {
cpu: {
histogram: endpoint.endpoint_metrics.Endpoint.metrics.cpu.endpoint.histogram,
latest: endpoint.endpoint_metrics.Endpoint.metrics.cpu.endpoint.latest,
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
mean: endpoint.endpoint_metrics.Endpoint.metrics.cpu.endpoint.mean,
},
memory: {
latest: endpoint.endpoint_metrics.Endpoint.metrics.memory.endpoint.private.latest,
mean: endpoint.endpoint_metrics.Endpoint.metrics.memory.endpoint.private.mean,
},
uptime: {
endpoint: endpoint.endpoint_metrics.Endpoint.metrics.uptime.endpoint,
system: endpoint.endpoint_metrics.Endpoint.metrics.uptime.system,
},
},
endpoint_meta: {
os: endpoint.endpoint_metrics.host.os,
cpu: endpoint.endpoint_metrics.Endpoint.metrics.cpu,
memory: endpoint.endpoint_metrics.Endpoint.metrics.memory,
uptime: endpoint.endpoint_metrics.Endpoint.metrics.uptime,
},
policy_config: policyConfig,
policy_failure: failedPolicy,
policy_config: policyConfig !== null ? policyConfig?.inputs[0].config.policy : {},
policy_response:
failedPolicy !== null && failedPolicy !== undefined
? {
agent_policy_status: failedPolicy?._source.event.agent_id_status,
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
manifest_version:
failedPolicy?._source.Endpoint.policy.applied.artifacts.global.version,
status: failedPolicy?._source.Endpoint.policy.applied.status,
actions: failedPolicy?._source.Endpoint.policy.applied.actions
.map((action) => (action.status !== 'success' ? action : null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a reduce combining these two be easier? So:

failedPolicy?._source.Endpoint.policy.applied.actions.reduce((memo, action) => {
	if (action.status !== 'success') memo.push(action);
	return memo;
}, []);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a great suggestion, but I prefer how my implementation reads by functional chaining. Hope that is ok :-)

.filter((action) => action !== null),
}
: {},
telemetry_meta: {
metrics_timestamp: endpoint.endpoint_metrics['@timestamp'],
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
},
};
});

this.sender.sendOnDemand('endpoint-metadata', telemetryPayloads);
/**
* STAGE 5 - Send the documents
*
* Send the documents in a batches of 100
*/
batchTelemetryRecords(telemetryPayloads, 100).forEach((telemetryBatch) =>
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
this.sender.sendOnDemand('endpoint-metadata', telemetryBatch)
);
return telemetryPayloads.length;
};
}
Loading