Skip to content

Commit 06f142d

Browse files
authored
[Ingest Manager] Fix config rollout move to limit concurrent config change instead of config per second (#72931)
1 parent 9a22b95 commit 06f142d

File tree

6 files changed

+80
-45
lines changed

6 files changed

+80
-45
lines changed

x-pack/plugins/ingest_manager/common/types/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ export interface IngestManagerConfigType {
2222
host?: string;
2323
ca_sha256?: string;
2424
};
25-
agentConfigRollupRateLimitIntervalMs: number;
26-
agentConfigRollupRateLimitRequestPerInterval: number;
25+
agentConfigRolloutConcurrency: number;
2726
};
2827
}
2928

x-pack/plugins/ingest_manager/server/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ export const config = {
3535
host: schema.maybe(schema.string()),
3636
ca_sha256: schema.maybe(schema.string()),
3737
}),
38-
agentConfigRollupRateLimitIntervalMs: schema.number({ defaultValue: 5000 }),
39-
agentConfigRollupRateLimitRequestPerInterval: schema.number({ defaultValue: 50 }),
38+
agentConfigRolloutConcurrency: schema.number({ defaultValue: 10 }),
4039
}),
4140
}),
4241
};
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import * as Rx from 'rxjs';
8+
import { share } from 'rxjs/operators';
9+
import { createSubscriberConcurrencyLimiter } from './rxjs_utils';
10+
11+
function createSpyObserver(o: Rx.Observable<any>): [Rx.Subscription, jest.Mock] {
12+
const spy = jest.fn();
13+
const observer = o.subscribe(spy);
14+
return [observer, spy];
15+
}
16+
17+
describe('createSubscriberConcurrencyLimiter', () => {
18+
it('should not publish to more than n concurrent subscriber', async () => {
19+
const subject = new Rx.Subject<any>();
20+
const sharedObservable = subject.pipe(share());
21+
22+
const limiter = createSubscriberConcurrencyLimiter(2);
23+
24+
const [observer1, spy1] = createSpyObserver(sharedObservable.pipe(limiter()));
25+
const [observer2, spy2] = createSpyObserver(sharedObservable.pipe(limiter()));
26+
const [observer3, spy3] = createSpyObserver(sharedObservable.pipe(limiter()));
27+
const [observer4, spy4] = createSpyObserver(sharedObservable.pipe(limiter()));
28+
subject.next('test1');
29+
30+
expect(spy1).toBeCalled();
31+
expect(spy2).toBeCalled();
32+
expect(spy3).not.toBeCalled();
33+
expect(spy4).not.toBeCalled();
34+
35+
observer1.unsubscribe();
36+
expect(spy3).toBeCalled();
37+
expect(spy4).not.toBeCalled();
38+
39+
observer2.unsubscribe();
40+
expect(spy4).toBeCalled();
41+
42+
observer3.unsubscribe();
43+
observer4.unsubscribe();
44+
});
45+
});

x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,49 +43,32 @@ export const toPromiseAbortable = <T>(
4343
}
4444
});
4545

46-
export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerInterval: number) {
47-
function createCurrentInterval() {
48-
return {
49-
startedAt: Rx.asyncScheduler.now(),
50-
numRequests: 0,
51-
};
52-
}
53-
54-
let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval();
46+
export function createSubscriberConcurrencyLimiter(maxConcurrency: number) {
5547
let observers: Array<[Rx.Subscriber<any>, any]> = [];
56-
let timerSubscription: Rx.Subscription | undefined;
48+
let activeObservers: Array<Rx.Subscriber<any>> = [];
5749

58-
function createTimeout() {
59-
if (timerSubscription) {
50+
function processNext() {
51+
if (activeObservers.length >= maxConcurrency) {
6052
return;
6153
}
62-
timerSubscription = Rx.asyncScheduler.schedule(() => {
63-
timerSubscription = undefined;
64-
currentInterval = createCurrentInterval();
65-
for (const [waitingObserver, value] of observers) {
66-
if (currentInterval.numRequests >= ratelimitRequestPerInterval) {
67-
createTimeout();
68-
continue;
69-
}
70-
currentInterval.numRequests++;
71-
waitingObserver.next(value);
72-
}
73-
}, ratelimitIntervalMs);
54+
const observerValuePair = observers.shift();
55+
56+
if (!observerValuePair) {
57+
return;
58+
}
59+
60+
const [observer, value] = observerValuePair;
61+
activeObservers.push(observer);
62+
observer.next(value);
7463
}
7564

7665
return function limit<T>(): Rx.MonoTypeOperatorFunction<T> {
7766
return (observable) =>
7867
new Rx.Observable<T>((observer) => {
7968
const subscription = observable.subscribe({
8069
next(value) {
81-
if (currentInterval.numRequests < ratelimitRequestPerInterval) {
82-
currentInterval.numRequests++;
83-
observer.next(value);
84-
return;
85-
}
86-
8770
observers = [...observers, [observer, value]];
88-
createTimeout();
71+
processNext();
8972
},
9073
error(err) {
9174
observer.error(err);
@@ -96,8 +79,10 @@ export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerIn
9679
});
9780

9881
return () => {
82+
activeObservers = activeObservers.filter((o) => o !== observer);
9983
observers = observers.filter((o) => o[0] !== observer);
10084
subscription.unsubscribe();
85+
processNext();
10186
};
10287
});
10388
};

x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ import { AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS } from '../../../constants';
1313

1414
function agentCheckinStateFactory() {
1515
const agentConnected = agentCheckinStateConnectedAgentsFactory();
16-
const newActions = agentCheckinStateNewActionsFactory();
16+
let newActions: ReturnType<typeof agentCheckinStateNewActionsFactory>;
1717
let interval: NodeJS.Timeout;
18+
1819
function start() {
20+
newActions = agentCheckinStateNewActionsFactory();
1921
interval = setInterval(async () => {
2022
try {
2123
await agentConnected.updateLastCheckinAt();
@@ -31,15 +33,20 @@ function agentCheckinStateFactory() {
3133
}
3234
}
3335
return {
34-
subscribeToNewActions: (
36+
subscribeToNewActions: async (
3537
soClient: SavedObjectsClientContract,
3638
agent: Agent,
3739
options?: { signal: AbortSignal }
38-
) =>
39-
agentConnected.wrapPromise(
40+
) => {
41+
if (!newActions) {
42+
throw new Error('Agent checkin state not initialized');
43+
}
44+
45+
return agentConnected.wrapPromise(
4046
agent.id,
4147
newActions.subscribeToNewActions(soClient, agent, options)
42-
),
48+
);
49+
},
4350
start,
4451
stop,
4552
};

x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import * as APIKeysService from '../../api_keys';
2828
import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants';
2929
import { createAgentAction, getNewActionsSince } from '../actions';
3030
import { appContextService } from '../../app_context';
31-
import { toPromiseAbortable, AbortError, createLimiter } from './rxjs_utils';
31+
import { toPromiseAbortable, AbortError, createSubscriberConcurrencyLimiter } from './rxjs_utils';
3232

3333
function getInternalUserSOClient() {
3434
const fakeRequest = ({
@@ -134,9 +134,8 @@ export function agentCheckinStateNewActionsFactory() {
134134
const agentConfigs$ = new Map<string, Observable<FullAgentConfig | null>>();
135135
const newActions$ = createNewActionsSharedObservable();
136136
// Rx operators
137-
const rateLimiter = createLimiter(
138-
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitIntervalMs || 5000,
139-
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitRequestPerInterval || 50
137+
const concurrencyLimiter = createSubscriberConcurrencyLimiter(
138+
appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 10
140139
);
141140

142141
async function subscribeToNewActions(
@@ -155,10 +154,11 @@ export function agentCheckinStateNewActionsFactory() {
155154
if (!agentConfig$) {
156155
throw new Error(`Invalid state no observable for config ${configId}`);
157156
}
157+
158158
const stream$ = agentConfig$.pipe(
159159
timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0),
160160
filter((config) => shouldCreateAgentConfigAction(agent, config)),
161-
rateLimiter(),
161+
concurrencyLimiter(),
162162
mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)),
163163
merge(newActions$),
164164
mergeMap(async (data) => {

0 commit comments

Comments
 (0)