Skip to content

[Rule-based segments] Handle streaming notification #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Split has built and maintains SDKs for:
* .NET [Github](https://github.com/splitio/dotnet-client) [Docs](https://help.split.io/hc/en-us/articles/360020240172--NET-SDK)
* Android [Github](https://github.com/splitio/android-client) [Docs](https://help.split.io/hc/en-us/articles/360020343291-Android-SDK)
* Angular [Github](https://github.com/splitio/angular-sdk-plugin) [Docs](https://help.split.io/hc/en-us/articles/6495326064397-Angular-utilities)
* Elixir thin-client [Github](https://github.com/splitio/elixir-thin-client) [Docs](https://help.split.io/hc/en-us/articles/26988707417869-Elixir-Thin-Client-SDK)
* Flutter [Github](https://github.com/splitio/flutter-sdk-plugin) [Docs](https://help.split.io/hc/en-us/articles/8096158017165-Flutter-plugin)
* GO [Github](https://github.com/splitio/go-client) [Docs](https://help.split.io/hc/en-us/articles/360020093652-Go-SDK)
* iOS [Github](https://github.com/splitio/ios-client) [Docs](https://help.split.io/hc/en-us/articles/360020401491-iOS-SDK)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "message",
"data": "{\"id\":\"mc4i3NENoA:0:0\",\"clientId\":\"NDEzMTY5Mzg0MA==:MTM2ODE2NDMxNA==\",\"timestamp\":1457552621899,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_NDEzMjQ1MzA0Nw==_splits\",\"data\":\"{\\\"type\\\":\\\"RB_SEGMENT_UPDATE\\\",\\\"changeNumber\\\":1457552620999}\"}"
}
2 changes: 1 addition & 1 deletion src/logger/messages/warn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const codesWarn: [number, string][] = codesError.concat([
[c.WARN_SDK_KEY, c.LOG_PREFIX_SETTINGS + ': You already have %s. We recommend keeping only one instance of the factory at all times (Singleton pattern) and reusing it throughout your application'],

[c.STREAMING_PARSING_MEMBERSHIPS_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching Memberships due to an error processing %s notification: %s'],
[c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing SPLIT_UPDATE notification: %s'],
[c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing %s notification: %s'],
[c.WARN_INVALID_FLAGSET, '%s: you passed %s, flag set must adhere to the regular expressions %s. This means a flag set must start with a letter or number, be in lowercase, alphanumeric and have a max length of 50 characters. %s was discarded.'],
[c.WARN_LOWERCASE_FLAGSET, '%s: flag set %s should be all lowercase - converting string to lowercase.'],
[c.WARN_FLAGSET_WITHOUT_FLAGS, '%s: you passed %s flag set that does not contain cached feature flag names. Please double check what flag sets are in use in the Split user interface.'],
Expand Down
12 changes: 6 additions & 6 deletions src/storages/__tests__/KeyBuilder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ test('KEYS / latency and exception keys (telemetry)', () => {
test('getStorageHash', () => {
expect(getStorageHash({
core: { authorizationKey: '<fake-token-rfc>' },
sync: { __splitFiltersValidation: { queryString: '&names=p1__split,p2__split' }, flagSpecVersion: '1.2' }
} as ISettings)).toBe('7ccd6b31');
sync: { __splitFiltersValidation: { queryString: '&names=p1__split,p2__split' }, flagSpecVersion: '1.3' }
} as ISettings)).toBe('2ce5cc38');

expect(getStorageHash({
core: { authorizationKey: '<fake-token-rfc>' },
sync: { __splitFiltersValidation: { queryString: '&names=p2__split,p3__split' }, flagSpecVersion: '1.2' }
} as ISettings)).toBe('2a25d0e1');
sync: { __splitFiltersValidation: { queryString: '&names=p2__split,p3__split' }, flagSpecVersion: '1.3' }
} as ISettings)).toBe('e65079c6');

expect(getStorageHash({
core: { authorizationKey: '<fake-token-rfc>' },
sync: { __splitFiltersValidation: { queryString: null }, flagSpecVersion: '1.2' }
} as ISettings)).toBe('db8943b4');
sync: { __splitFiltersValidation: { queryString: null }, flagSpecVersion: '1.3' }
} as ISettings)).toBe('193e6f3f');
});
4 changes: 2 additions & 2 deletions src/storages/__tests__/RBSegmentsCacheSync.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ describe.each([cacheInMemory, cacheInLocal])('Rule-based segments cache sync (Me
});

test('usesSegments should track segments usage correctly', () => {
expect(cache.usesSegments()).toBe(true); // Initially true when changeNumber is -1
expect(cache.usesSegments()).toBe(false); // No rbSegments, so false

cache.update([rbSegment], [], 1); // rbSegment doesn't have IN_SEGMENT matcher
expect(cache.usesSegments()).toBe(false);
Expand All @@ -70,6 +70,6 @@ describe.each([cacheInMemory, cacheInLocal])('Rule-based segments cache sync (Me
expect(cache.usesSegments()).toBe(true);

cache.clear();
expect(cache.usesSegments()).toBe(true); // True after clear since changeNumber is -1
expect(cache.usesSegments()).toBe(false); // False after clear since there are no rbSegments
});
});
6 changes: 0 additions & 6 deletions src/storages/inLocalStorage/RBSegmentsCacheInLocal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export class RBSegmentsCacheInLocal implements IRBSegmentsCacheSync {

private readonly keys: KeyBuilderCS;
private readonly log: ILogger;
private hasSync?: boolean;

constructor(settings: ISettings, keys: KeyBuilderCS) {
this.keys = keys;
Expand All @@ -22,7 +21,6 @@ export class RBSegmentsCacheInLocal implements IRBSegmentsCacheSync {
clear() {
this.getNames().forEach(name => this.remove(name));
localStorage.removeItem(this.keys.buildRBSegmentsTillKey());
this.hasSync = false;
}

update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): boolean {
Expand All @@ -35,7 +33,6 @@ export class RBSegmentsCacheInLocal implements IRBSegmentsCacheSync {
try {
localStorage.setItem(this.keys.buildRBSegmentsTillKey(), changeNumber + '');
localStorage.setItem(this.keys.buildLastUpdatedKey(), Date.now() + '');
this.hasSync = true;
} catch (e) {
this.log.error(LOG_PREFIX + e);
}
Expand Down Expand Up @@ -128,9 +125,6 @@ export class RBSegmentsCacheInLocal implements IRBSegmentsCacheSync {
}

usesSegments(): boolean {
// If cache hasn't been synchronized, assume we need segments
if (!this.hasSync) return true;

const storedCount = localStorage.getItem(this.keys.buildSplitsWithSegmentCountKey());
const splitsWithSegmentsCount = storedCount === null ? 0 : toNumber(storedCount);

Expand Down
2 changes: 1 addition & 1 deletion src/storages/inMemory/RBSegmentsCacheInMemory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class RBSegmentsCacheInMemory implements IRBSegmentsCacheSync {
}

usesSegments(): boolean {
return this.getChangeNumber() === -1 || this.segmentsCount > 0;
return this.segmentsCount > 0;
}

}
4 changes: 2 additions & 2 deletions src/sync/polling/types.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { ISplit } from '../../dtos/types';
import { IRBSegment, ISplit } from '../../dtos/types';
import { IReadinessManager } from '../../readiness/types';
import { IStorageSync } from '../../storages/types';
import { MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE } from '../streaming/types';
import { ITask, ISyncTask } from '../types';

export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }], boolean> { }
export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit | IRBSegment, changeNumber: number }], boolean> { }

export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { }

Expand Down
2 changes: 1 addition & 1 deletion src/sync/polling/updaters/segmentChangesUpdater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export function segmentChangesUpdaterFactory(
* Returned promise will not be rejected.
*
* @param fetchOnlyNew - if true, only fetch the segments that not exists, i.e., which `changeNumber` is equal to -1.
* This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE notifications.
* This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE or RB_SEGMENT_UPDATE notifications.
* @param segmentName - segment name to fetch. By passing `undefined` it fetches the list of segments registered at the storage
* @param noCache - true to revalidate data to fetch on a SEGMENT_UPDATE notifications.
* @param till - till target for the provided segmentName, for CDN bypass.
Expand Down
7 changes: 6 additions & 1 deletion src/sync/streaming/SSEHandler/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// @ts-nocheck
import { SSEHandlerFactory } from '..';
import { PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants';
import { PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RB_SEGMENT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants';
import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';

// update messages
import splitUpdateMessage from '../../../../__tests__/mocks/message.SPLIT_UPDATE.1457552620999.json';
import rbsegmentUpdateMessage from '../../../../__tests__/mocks/message.RB_SEGMENT_UPDATE.1457552620999.json';
import splitKillMessage from '../../../../__tests__/mocks/message.SPLIT_KILL.1457552650000.json';
import segmentUpdateMessage from '../../../../__tests__/mocks/message.SEGMENT_UPDATE.1457552640000.json';

Expand Down Expand Up @@ -144,6 +145,10 @@ test('`handlerMessage` for update notifications (NotificationProcessor) and stre
sseHandler.handleMessage(splitUpdateMessage);
expect(pushEmitter.emit).toHaveBeenLastCalledWith(SPLIT_UPDATE, ...expectedParams); // must emit SPLIT_UPDATE with the message change number

expectedParams = [{ type: 'RB_SEGMENT_UPDATE', changeNumber: 1457552620999 }];
sseHandler.handleMessage(rbsegmentUpdateMessage);
expect(pushEmitter.emit).toHaveBeenLastCalledWith(RB_SEGMENT_UPDATE, ...expectedParams); // must emit RB_SEGMENT_UPDATE with the message change number

expectedParams = [{ type: 'SPLIT_KILL', changeNumber: 1457552650000, splitName: 'whitelist', defaultTreatment: 'not_allowed' }];
sseHandler.handleMessage(splitKillMessage);
expect(pushEmitter.emit).toHaveBeenLastCalledWith(SPLIT_KILL, ...expectedParams); // must emit SPLIT_KILL with the message change number, split name and default treatment
Expand Down
3 changes: 2 additions & 1 deletion src/sync/streaming/SSEHandler/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { errorParser, messageParser } from './NotificationParser';
import { notificationKeeperFactory } from './NotificationKeeper';
import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE } from '../constants';
import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, RB_SEGMENT_UPDATE } from '../constants';
import { IPushEventEmitter } from '../types';
import { ISseEventHandler } from '../SSEClient/types';
import { INotificationError, INotificationMessage } from './types';
Expand Down Expand Up @@ -84,6 +84,7 @@ export function SSEHandlerFactory(log: ILogger, pushEmitter: IPushEventEmitter,
case MEMBERSHIPS_MS_UPDATE:
case MEMBERSHIPS_LS_UPDATE:
case SPLIT_KILL:
case RB_SEGMENT_UPDATE:
pushEmitter.emit(parsedData.type, parsedData);
break;

Expand Down
4 changes: 2 additions & 2 deletions src/sync/streaming/SSEHandler/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ControlType } from '../constants';
import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE } from '../types';
import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE, RB_SEGMENT_UPDATE } from '../types';

export enum Compression {
None = 0,
Expand Down Expand Up @@ -42,7 +42,7 @@ export interface ISegmentUpdateData {
}

export interface ISplitUpdateData {
type: SPLIT_UPDATE,
type: SPLIT_UPDATE | RB_SEGMENT_UPDATE,
changeNumber: number,
pcn?: number,
d?: string,
Expand Down
Loading