Skip to content

Commit

Permalink
[Ingest Manager] Internal action for policy reassign (#78493) (#79657)
Browse files Browse the repository at this point in the history
  • Loading branch information
nchaulet authored Oct 6, 2020
1 parent 0a746fe commit ef824c0
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 19 deletions.
8 changes: 7 additions & 1 deletion x-pack/plugins/ingest_manager/common/types/models/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ export type AgentStatus =
| 'updating'
| 'degraded';

export type AgentActionType = 'POLICY_CHANGE' | 'UNENROLL' | 'UPGRADE';
export type AgentActionType =
| 'POLICY_CHANGE'
| 'UNENROLL'
| 'UPGRADE'
// INTERNAL* actions are mean to interupt long polling calls these actions will not be distributed to the agent
| 'INTERNAL_POLICY_REASSIGN';

export interface NewAgentAction {
type: AgentActionType;
data?: any;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ export async function getAgentActionsForCheckin(
nodeTypes.literal.buildNode(false),
])
),
nodeTypes.function.buildNode(
'not',
nodeTypes.function.buildNodeWithArgumentNodes('is', [
nodeTypes.literal.buildNode(`${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.type`),
nodeTypes.literal.buildNode('INTERNAL_POLICY_REASSIGN'),
nodeTypes.literal.buildNode(false),
])
),
nodeTypes.function.buildNodeWithArgumentNodes('is', [
nodeTypes.literal.buildNode(`${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.agent_id`),
nodeTypes.literal.buildNode(agentId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import semver from 'semver';
import { timer, from, Observable, TimeoutError } from 'rxjs';
import { timer, from, Observable, TimeoutError, of, EMPTY } from 'rxjs';
import { omit } from 'lodash';
import {
shareReplay,
share,
distinctUntilKeyChanged,
switchMap,
concatMap,
merge,
filter,
timeout,
Expand Down Expand Up @@ -38,6 +41,7 @@ import {
} from '../actions';
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils';
import { getAgent } from '../crud';

function getInternalUserSOClient() {
const fakeRequest = ({
Expand Down Expand Up @@ -69,7 +73,8 @@ function createNewActionsSharedObservable(): Observable<AgentAction[]> {
lastTimestamp = new Date().toISOString();
return from(getNewActionsSince(internalSOClient, timestamp));
}),
shareReplay({ refCount: true, bufferSize: 1 })
filter((data) => data.length > 0),
share()
);
}

Expand Down Expand Up @@ -201,6 +206,18 @@ export function agentCheckinStateNewActionsFactory() {
rateLimiterMaxDelay
);

function getOrCreateAgentPolicyObservable(agentPolicyId: string) {
if (!agentPolicies$.has(agentPolicyId)) {
agentPolicies$.set(agentPolicyId, createAgentPolicyActionSharedObservable(agentPolicyId));
}
const agentPolicy$ = agentPolicies$.get(agentPolicyId);
if (!agentPolicy$) {
throw new Error(`Invalid state, no observable for policy ${agentPolicyId}`);
}

return agentPolicy$;
}

async function subscribeToNewActions(
soClient: SavedObjectsClientContract,
agent: Agent,
Expand All @@ -209,14 +226,7 @@ export function agentCheckinStateNewActionsFactory() {
if (!agent.policy_id) {
throw new Error('Agent does not have a policy');
}
const agentPolicyId = agent.policy_id;
if (!agentPolicies$.has(agentPolicyId)) {
agentPolicies$.set(agentPolicyId, createAgentPolicyActionSharedObservable(agentPolicyId));
}
const agentPolicy$ = agentPolicies$.get(agentPolicyId);
if (!agentPolicy$) {
throw new Error(`Invalid state, no observable for policy ${agentPolicyId}`);
}
const agentPolicy$ = getOrCreateAgentPolicyObservable(agent.policy_id);

const stream$ = agentPolicy$.pipe(
timeout(pollingTimeoutMs),
Expand All @@ -229,25 +239,43 @@ export function agentCheckinStateNewActionsFactory() {
(!agent.policy_revision || action.policy_revision > agent.policy_revision)
),
rateLimiter(),
switchMap((policyAction) => createAgentActionFromPolicyAction(soClient, agent, policyAction)),
concatMap((policyAction) => createAgentActionFromPolicyAction(soClient, agent, policyAction)),
merge(newActions$),
switchMap(async (data) => {
if (!data) {
return;
concatMap((data: AgentAction[] | undefined) => {
if (data === undefined) {
return EMPTY;
}
const newActions = data.filter((action) => action.agent_id === agent.id);
if (newActions.length === 0) {
return;
return EMPTY;
}

return newActions;
const hasConfigReassign = newActions.some(
(action) => action.type === 'INTERNAL_POLICY_REASSIGN'
);
if (hasConfigReassign) {
return from(getAgent(soClient, agent.id)).pipe(
concatMap((refreshedAgent) => {
if (!refreshedAgent.policy_id) {
throw new Error('Agent does not have a policy assigned');
}
const newAgentPolicy$ = getOrCreateAgentPolicyObservable(refreshedAgent.policy_id);
return newAgentPolicy$;
}),
rateLimiter(),
concatMap((policyAction) =>
createAgentActionFromPolicyAction(soClient, agent, policyAction)
)
);
}

return of(newActions);
}),
filter((data) => data !== undefined),
take(1)
);
try {
const data = await toPromiseAbortable(stream$, options?.signal);

return data || [];
} catch (err) {
if (err instanceof TimeoutError || err instanceof AbortError) {
Expand Down
20 changes: 19 additions & 1 deletion x-pack/plugins/ingest_manager/server/services/agents/reassign.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { AGENT_SAVED_OBJECT_TYPE } from '../../constants';
import { AgentSOAttributes } from '../../types';
import { agentPolicyService } from '../agent_policy';
import { getAgents, listAllAgents } from './crud';
import { createAgentAction, bulkCreateAgentActions } from './actions';

export async function reassignAgent(
soClient: SavedObjectsClientContract,
Expand All @@ -25,6 +26,12 @@ export async function reassignAgent(
policy_id: newAgentPolicyId,
policy_revision: null,
});

await createAgentAction(soClient, {
agent_id: agentId,
created_at: new Date().toISOString(),
type: 'INTERNAL_POLICY_REASSIGN',
});
}

export async function reassignAgents(
Expand Down Expand Up @@ -56,7 +63,7 @@ export async function reassignAgents(
const agentsToUpdate = agents.filter((agent) => agent.policy_id !== newAgentPolicyId);

// Update the necessary agents
return await soClient.bulkUpdate<AgentSOAttributes>(
const res = await soClient.bulkUpdate<AgentSOAttributes>(
agentsToUpdate.map((agent) => ({
type: AGENT_SAVED_OBJECT_TYPE,
id: agent.id,
Expand All @@ -66,4 +73,15 @@ export async function reassignAgents(
},
}))
);
const now = new Date().toISOString();
await bulkCreateAgentActions(
soClient,
agentsToUpdate.map((agent) => ({
agent_id: agent.id,
created_at: now,
type: 'INTERNAL_POLICY_REASSIGN',
}))
);

return res;
}
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/types/models/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export const NewAgentActionSchema = schema.object({
schema.literal('POLICY_CHANGE'),
schema.literal('UNENROLL'),
schema.literal('UPGRADE'),
schema.literal('INTERNAL_POLICY_REASSIGN'),
]),
data: schema.maybe(schema.any()),
ack_data: schema.maybe(schema.any()),
Expand Down

0 comments on commit ef824c0

Please sign in to comment.