Skip to content

[Rule-based segments] Update splitsChangesUpdater and splitsChangesFetcher #392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,655 changes: 1,330 additions & 1,325 deletions src/__tests__/mocks/splitchanges.since.-1.json

Large diffs are not rendered by default.

12 changes: 10 additions & 2 deletions src/dtos/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,16 @@ export type ISplitPartial = Pick<ISplit, 'conditions' | 'configurations' | 'traf

/** Interface of the parsed JSON response of `/splitChanges` */
export interface ISplitChangesResponse {
till: number,
splits: ISplit[]
ff?: {
t: number,
s?: number,
d: ISplit[]
},
rbs?: {
t: number,
s?: number,
d: IRBSegment[]
}
}

/** Interface of the parsed JSON response of `/segmentChanges/{segmentName}` */
Expand Down
1 change: 1 addition & 0 deletions src/logger/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export const RETRIEVE_MANAGER = 29;
export const SYNC_OFFLINE_DATA = 30;
export const SYNC_SPLITS_FETCH = 31;
export const SYNC_SPLITS_UPDATE = 32;
export const SYNC_RBS_UPDATE = 33;
export const STREAMING_NEW_MESSAGE = 35;
export const SYNC_TASK_START = 36;
export const SYNC_TASK_EXECUTE = 37;
Expand Down
5 changes: 3 additions & 2 deletions src/logger/messages/debug.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ export const codesDebug: [number, string][] = codesInfo.concat([
[c.RETRIEVE_MANAGER, 'Retrieving manager instance.'],
// synchronizer
[c.SYNC_OFFLINE_DATA, c.LOG_PREFIX_SYNC_OFFLINE + 'Feature flags data: \n%s'],
[c.SYNC_SPLITS_FETCH, c.LOG_PREFIX_SYNC_SPLITS + 'Spin up feature flags update using since = %s'],
[c.SYNC_SPLITS_UPDATE, c.LOG_PREFIX_SYNC_SPLITS + 'New feature flags %s. Removed feature flags %s. Segment names collected %s'],
[c.SYNC_SPLITS_FETCH, c.LOG_PREFIX_SYNC_SPLITS + 'Spin up feature flags update using since = %s and rbSince = %s.'],
[c.SYNC_SPLITS_UPDATE, c.LOG_PREFIX_SYNC_SPLITS + 'New feature flags %s. Removed feature flags %s.'],
[c.SYNC_RBS_UPDATE, c.LOG_PREFIX_SYNC_SPLITS + 'New rule-based segments %s. Removed rule-based segments %s.'],
[c.STREAMING_NEW_MESSAGE, c.LOG_PREFIX_SYNC_STREAMING + 'New SSE message received, with data: %s.'],
[c.SYNC_TASK_START, c.LOG_PREFIX_SYNC + ': Starting %s. Running each %s millis'],
[c.SYNC_TASK_EXECUTE, c.LOG_PREFIX_SYNC + ': Running %s'],
Expand Down
8 changes: 4 additions & 4 deletions src/services/__tests__/splitApi.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ describe('splitApi', () => {
assertHeaders(settings, headers);
expect(url).toBe('sdk/segmentChanges/segmentName?since=-1&till=90');

splitApi.fetchSplitChanges(-1, false, 100);
splitApi.fetchSplitChanges(-1, false, 100, -1);
[url, { headers }] = fetchMock.mock.calls[3];
assertHeaders(settings, headers);
expect(url).toBe(expecteFlagsUrl(-1, 100, settings.validateFilters || false, settings));
expect(url).toBe(expectedFlagsUrl(-1, 100, settings.validateFilters || false, settings, -1));

splitApi.postEventsBulk('fake-body');
assertHeaders(settings, fetchMock.mock.calls[4][1].headers);
Expand All @@ -66,9 +66,9 @@ describe('splitApi', () => {
fetchMock.mockClear();


function expecteFlagsUrl(since: number, till: number, usesFilter: boolean, settings: ISettings) {
function expectedFlagsUrl(since: number, till: number, usesFilter: boolean, settings: ISettings, rbSince?: number) {
const filterQueryString = settings.sync.__splitFiltersValidation && settings.sync.__splitFiltersValidation.queryString;
return `sdk/splitChanges?s=1.1&since=${since}${usesFilter ? filterQueryString : ''}${till ? '&till=' + till : ''}`;
return `sdk/splitChanges?s=1.1&since=${since}${rbSince ? '&rbSince=' + rbSince : ''}${usesFilter ? filterQueryString : ''}${till ? '&till=' + till : ''}`;
}
});

Expand Down
4 changes: 2 additions & 2 deletions src/services/splitApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ export function splitApiFactory(
return splitHttpClient(url, undefined, telemetryTracker.trackHttp(TOKEN));
},

fetchSplitChanges(since: number, noCache?: boolean, till?: number) {
const url = `${urls.sdk}/splitChanges?s=${flagSpecVersion}&since=${since}${filterQueryString || ''}${till ? '&till=' + till : ''}`;
fetchSplitChanges(since: number, noCache?: boolean, till?: number, rbSince?: number) {
const url = `${urls.sdk}/splitChanges?s=${flagSpecVersion}&since=${since}${rbSince ? '&rbSince=' + rbSince : ''}${filterQueryString || ''}${till ? '&till=' + till : ''}`;
return splitHttpClient(url, noCache ? noCacheHeaderOptions : undefined, telemetryTracker.trackHttp(SPLITS))
.catch((err) => {
if (err.statusCode === 414) settings.log.error(ERROR_TOO_MANY_SETS);
Expand Down
2 changes: 1 addition & 1 deletion src/services/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export type ISplitHttpClient = (url: string, options?: IRequestOptions, latencyT

export type IFetchAuth = (userKeys?: string[]) => Promise<IResponse>

export type IFetchSplitChanges = (since: number, noCache?: boolean, till?: number) => Promise<IResponse>
export type IFetchSplitChanges = (since: number, noCache?: boolean, till?: number, rbSince?: number) => Promise<IResponse>

export type IFetchSegmentChanges = (since: number, segmentName: string, noCache?: boolean, till?: number) => Promise<IResponse>

Expand Down
3 changes: 2 additions & 1 deletion src/sync/polling/fetchers/splitChangesFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ export function splitChangesFetcherFactory(fetchSplitChanges: IFetchSplitChanges
since: number,
noCache?: boolean,
till?: number,
rbSince?: number,
// Optional decorator for `fetchSplitChanges` promise, such as timeout or time tracker
decorator?: (promise: Promise<IResponse>) => Promise<IResponse>
) {

let splitsPromise = fetchSplitChanges(since, noCache, till);
let splitsPromise = fetchSplitChanges(since, noCache, till, rbSince);
if (decorator) splitsPromise = decorator(splitsPromise);

return splitsPromise.then(resp => resp.json());
Expand Down
1 change: 1 addition & 0 deletions src/sync/polling/fetchers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type ISplitChangesFetcher = (
since: number,
noCache?: boolean,
till?: number,
rbSince?: number,
decorator?: (promise: Promise<IResponse>) => Promise<IResponse>
) => Promise<ISplitChangesResponse>

Expand Down
62 changes: 46 additions & 16 deletions src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { ISplit } from '../../../../dtos/types';
import { IRBSegment, ISplit } from '../../../../dtos/types';
import { readinessManagerFactory } from '../../../../readiness/readinessManager';
import { splitApiFactory } from '../../../../services/splitApi';
import { SegmentsCacheInMemory } from '../../../../storages/inMemory/SegmentsCacheInMemory';
import { SplitsCacheInMemory } from '../../../../storages/inMemory/SplitsCacheInMemory';
import { splitChangesFetcherFactory } from '../../fetchers/splitChangesFetcher';
import { splitChangesUpdaterFactory, parseSegments, computeSplitsMutation } from '../splitChangesUpdater';
import { splitChangesUpdaterFactory, parseSegments, computeMutation } from '../splitChangesUpdater';
import splitChangesMock1 from '../../../../__tests__/mocks/splitchanges.since.-1.json';
import fetchMock from '../../../../__tests__/testUtils/fetchMock';
import { fullSettings, settingsSplitApi } from '../../../../utils/settingsValidation/__tests__/settings.mocks';
import { EventEmitter } from '../../../../utils/MinEvents';
import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';
import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker';
import { splitNotifications } from '../../../streaming/__tests__/dataMocks';
import { RBSegmentsCacheInMemory } from '../../../../storages/inMemory/RBSegmentsCacheInMemory';

const ARCHIVED_FF = 'ARCHIVED';

Expand Down Expand Up @@ -94,58 +95,60 @@ test('splitChangesUpdater / segments parser', () => {
test('splitChangesUpdater / compute splits mutation', () => {
const splitFiltersValidation = { queryString: null, groupedFilters: { bySet: [], byName: [], byPrefix: [] }, validFilters: [] };

let splitsMutation = computeSplitsMutation([activeSplitWithSegments, archivedSplit] as ISplit[], splitFiltersValidation);
let segments = new Set<string>();
let splitsMutation = computeMutation([activeSplitWithSegments, archivedSplit] as ISplit[], segments, splitFiltersValidation);

expect(splitsMutation.added).toEqual([activeSplitWithSegments]);
expect(splitsMutation.removed).toEqual([archivedSplit]);
expect(splitsMutation.segments).toEqual(['A', 'B']);
expect(Array.from(segments)).toEqual(['A', 'B']);

// SDK initialization without sets
// should process all the notifications
splitsMutation = computeSplitsMutation([testFFSetsAB, test2FFSetsX] as ISplit[], splitFiltersValidation);
segments = new Set<string>();
splitsMutation = computeMutation([testFFSetsAB, test2FFSetsX] as ISplit[], segments, splitFiltersValidation);

expect(splitsMutation.added).toEqual([testFFSetsAB, test2FFSetsX]);
expect(splitsMutation.removed).toEqual([]);
expect(splitsMutation.segments).toEqual([]);
expect(Array.from(segments)).toEqual([]);
});

test('splitChangesUpdater / compute splits mutation with filters', () => {
// SDK initialization with sets: [set_a, set_b]
let splitFiltersValidation = { queryString: '&sets=set_a,set_b', groupedFilters: { bySet: ['set_a', 'set_b'], byName: ['name_1'], byPrefix: [] }, validFilters: [] };

// fetching new feature flag in sets A & B
let splitsMutation = computeSplitsMutation([testFFSetsAB], splitFiltersValidation);
let splitsMutation = computeMutation([testFFSetsAB], new Set(), splitFiltersValidation);

// should add it to mutations
expect(splitsMutation.added).toEqual([testFFSetsAB]);
expect(splitsMutation.removed).toEqual([]);

// fetching existing test feature flag removed from set B
splitsMutation = computeSplitsMutation([testFFRemoveSetB], splitFiltersValidation);
splitsMutation = computeMutation([testFFRemoveSetB], new Set(), splitFiltersValidation);

expect(splitsMutation.added).toEqual([testFFRemoveSetB]);
expect(splitsMutation.removed).toEqual([]);

// fetching existing test feature flag removed from set B
splitsMutation = computeSplitsMutation([testFFRemoveSetA], splitFiltersValidation);
splitsMutation = computeMutation([testFFRemoveSetA], new Set(), splitFiltersValidation);

expect(splitsMutation.added).toEqual([]);
expect(splitsMutation.removed).toEqual([testFFRemoveSetA]);

// fetching existing test feature flag removed from set B
splitsMutation = computeSplitsMutation([testFFEmptySet], splitFiltersValidation);
splitsMutation = computeMutation([testFFEmptySet], new Set(), splitFiltersValidation);

expect(splitsMutation.added).toEqual([]);
expect(splitsMutation.removed).toEqual([testFFEmptySet]);

// SDK initialization with names: ['test2']
splitFiltersValidation = { queryString: '&names=test2', groupedFilters: { bySet: [], byName: ['test2'], byPrefix: [] }, validFilters: [] };
splitsMutation = computeSplitsMutation([testFFSetsAB], splitFiltersValidation);
splitsMutation = computeMutation([testFFSetsAB], new Set(), splitFiltersValidation);

expect(splitsMutation.added).toEqual([]);
expect(splitsMutation.removed).toEqual([testFFSetsAB]);

splitsMutation = computeSplitsMutation([test2FFSetsX, testFFEmptySet], splitFiltersValidation);
splitsMutation = computeMutation([test2FFSetsX, testFFEmptySet], new Set(), splitFiltersValidation);

expect(splitsMutation.added).toEqual([test2FFSetsX]);
expect(splitsMutation.removed).toEqual([testFFEmptySet]);
Expand All @@ -161,10 +164,13 @@ describe('splitChangesUpdater', () => {
const splits = new SplitsCacheInMemory();
const updateSplits = jest.spyOn(splits, 'update');

const rbSegments = new RBSegmentsCacheInMemory();
const updateRbSegments = jest.spyOn(rbSegments, 'update');

const segments = new SegmentsCacheInMemory();
const registerSegments = jest.spyOn(segments, 'registerSegments');

const storage = { splits, segments };
const storage = { splits, rbSegments, segments };

const readinessManager = readinessManagerFactory(EventEmitter, fullSettings);
const splitsEmitSpy = jest.spyOn(readinessManager.splits, 'emit');
Expand All @@ -179,22 +185,29 @@ describe('splitChangesUpdater', () => {

test('test without payload', async () => {
const result = await splitChangesUpdater();

expect(fetchSplitChanges).toBeCalledTimes(1);
expect(fetchSplitChanges).lastCalledWith(-1, undefined, undefined, -1);
expect(updateSplits).toBeCalledTimes(1);
expect(updateSplits).lastCalledWith(splitChangesMock1.splits, [], splitChangesMock1.till);
expect(updateSplits).lastCalledWith(splitChangesMock1.ff.d, [], splitChangesMock1.ff.t);
expect(updateRbSegments).toBeCalledTimes(0); // no rbSegments to update
expect(registerSegments).toBeCalledTimes(1);
expect(splitsEmitSpy).toBeCalledWith('state::splits-arrived');
expect(result).toBe(true);
});

test('test with payload', async () => {
test('test with ff payload', async () => {
let index = 0;
for (const notification of splitNotifications) {
const payload = notification.decoded as Pick<ISplit, 'name' | 'changeNumber' | 'killed' | 'defaultTreatment' | 'trafficTypeName' | 'conditions' | 'status' | 'seed' | 'trafficAllocation' | 'trafficAllocationSeed' | 'configurations'>;
const changeNumber = payload.changeNumber;

await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber })).resolves.toBe(true);
// fetch not being called

// fetch and RBSegments.update not being called
expect(fetchSplitChanges).toBeCalledTimes(0);
expect(updateRbSegments).toBeCalledTimes(0);

expect(updateSplits).toBeCalledTimes(index + 1);
// Change number being updated
expect(updateSplits.mock.calls[index][2]).toEqual(changeNumber);
Expand All @@ -209,6 +222,23 @@ describe('splitChangesUpdater', () => {
}
});

test('test with rbsegment payload', async () => {
const payload = { name: 'rbsegment', status: 'ACTIVE', changeNumber: 1684329854385, conditions: [] } as unknown as IRBSegment;
const changeNumber = payload.changeNumber;

await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber })).resolves.toBe(true);

// fetch and Splits.update not being called
expect(fetchSplitChanges).toBeCalledTimes(0);
expect(updateSplits).toBeCalledTimes(0);

expect(updateRbSegments).toBeCalledTimes(1);
expect(updateRbSegments).toBeCalledWith([payload], [], changeNumber);

expect(registerSegments).toBeCalledTimes(1);
expect(registerSegments).toBeCalledWith([]);
});

test('flag sets splits-arrived emission', async () => {
const payload = splitNotifications[3].decoded as Pick<ISplit, 'name' | 'changeNumber' | 'killed' | 'defaultTreatment' | 'trafficTypeName' | 'conditions' | 'status' | 'seed' | 'trafficAllocation' | 'trafficAllocationSeed' | 'configurations'>;
const setMocks = [
Expand Down
Loading