Skip to content

[Rule-based segments] Handle streaming notification #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "message",
"data": "{\"id\":\"mc4i3NENoA:0:0\",\"clientId\":\"NDEzMTY5Mzg0MA==:MTM2ODE2NDMxNA==\",\"timestamp\":1457552621899,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_NDEzMjQ1MzA0Nw==_splits\",\"data\":\"{\\\"type\\\":\\\"RB_SEGMENT_UPDATE\\\",\\\"changeNumber\\\":1457552620999}\"}"
}
2 changes: 1 addition & 1 deletion src/logger/messages/warn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const codesWarn: [number, string][] = codesError.concat([
[c.WARN_SDK_KEY, c.LOG_PREFIX_SETTINGS + ': You already have %s. We recommend keeping only one instance of the factory at all times (Singleton pattern) and reusing it throughout your application'],

[c.STREAMING_PARSING_MEMBERSHIPS_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching Memberships due to an error processing %s notification: %s'],
[c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing SPLIT_UPDATE notification: %s'],
[c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing %s notification: %s'],
[c.WARN_INVALID_FLAGSET, '%s: you passed %s, flag set must adhere to the regular expressions %s. This means a flag set must start with a letter or number, be in lowercase, alphanumeric and have a max length of 50 characters. %s was discarded.'],
[c.WARN_LOWERCASE_FLAGSET, '%s: flag set %s should be all lowercase - converting string to lowercase.'],
[c.WARN_FLAGSET_WITHOUT_FLAGS, '%s: you passed %s flag set that does not contain cached feature flag names. Please double check what flag sets are in use in the Split user interface.'],
Expand Down
4 changes: 2 additions & 2 deletions src/sync/polling/types.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { ISplit } from '../../dtos/types';
import { IRBSegment, ISplit } from '../../dtos/types';
import { IReadinessManager } from '../../readiness/types';
import { IStorageSync } from '../../storages/types';
import { MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE } from '../streaming/types';
import { ITask, ISyncTask } from '../types';

export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }], boolean> { }
export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit | IRBSegment, changeNumber: number }], boolean> { }

export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { }

Expand Down
2 changes: 1 addition & 1 deletion src/sync/polling/updaters/segmentChangesUpdater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export function segmentChangesUpdaterFactory(
* Returned promise will not be rejected.
*
* @param fetchOnlyNew - if true, only fetch the segments that not exists, i.e., which `changeNumber` is equal to -1.
* This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE notifications.
* This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE or RB_SEGMENT_UPDATE notifications.
* @param segmentName - segment name to fetch. By passing `undefined` it fetches the list of segments registered at the storage
* @param noCache - true to revalidate data to fetch on a SEGMENT_UPDATE notifications.
* @param till - till target for the provided segmentName, for CDN bypass.
Expand Down
7 changes: 6 additions & 1 deletion src/sync/streaming/SSEHandler/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// @ts-nocheck
import { SSEHandlerFactory } from '..';
import { PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants';
import { PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RB_SEGMENT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants';
import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';

// update messages
import splitUpdateMessage from '../../../../__tests__/mocks/message.SPLIT_UPDATE.1457552620999.json';
import rbsegmentUpdateMessage from '../../../../__tests__/mocks/message.RB_SEGMENT_UPDATE.1457552620999.json';
import splitKillMessage from '../../../../__tests__/mocks/message.SPLIT_KILL.1457552650000.json';
import segmentUpdateMessage from '../../../../__tests__/mocks/message.SEGMENT_UPDATE.1457552640000.json';

Expand Down Expand Up @@ -144,6 +145,10 @@ test('`handlerMessage` for update notifications (NotificationProcessor) and stre
sseHandler.handleMessage(splitUpdateMessage);
expect(pushEmitter.emit).toHaveBeenLastCalledWith(SPLIT_UPDATE, ...expectedParams); // must emit SPLIT_UPDATE with the message change number

expectedParams = [{ type: 'RB_SEGMENT_UPDATE', changeNumber: 1457552620999 }];
sseHandler.handleMessage(rbsegmentUpdateMessage);
expect(pushEmitter.emit).toHaveBeenLastCalledWith(RB_SEGMENT_UPDATE, ...expectedParams); // must emit RB_SEGMENT_UPDATE with the message change number

expectedParams = [{ type: 'SPLIT_KILL', changeNumber: 1457552650000, splitName: 'whitelist', defaultTreatment: 'not_allowed' }];
sseHandler.handleMessage(splitKillMessage);
expect(pushEmitter.emit).toHaveBeenLastCalledWith(SPLIT_KILL, ...expectedParams); // must emit SPLIT_KILL with the message change number, split name and default treatment
Expand Down
3 changes: 2 additions & 1 deletion src/sync/streaming/SSEHandler/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { errorParser, messageParser } from './NotificationParser';
import { notificationKeeperFactory } from './NotificationKeeper';
import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE } from '../constants';
import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, RB_SEGMENT_UPDATE } from '../constants';
import { IPushEventEmitter } from '../types';
import { ISseEventHandler } from '../SSEClient/types';
import { INotificationError, INotificationMessage } from './types';
Expand Down Expand Up @@ -84,6 +84,7 @@ export function SSEHandlerFactory(log: ILogger, pushEmitter: IPushEventEmitter,
case MEMBERSHIPS_MS_UPDATE:
case MEMBERSHIPS_LS_UPDATE:
case SPLIT_KILL:
case RB_SEGMENT_UPDATE:
pushEmitter.emit(parsedData.type, parsedData);
break;

Expand Down
4 changes: 2 additions & 2 deletions src/sync/streaming/SSEHandler/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ControlType } from '../constants';
import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE } from '../types';
import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE, RB_SEGMENT_UPDATE } from '../types';

export enum Compression {
None = 0,
Expand Down Expand Up @@ -42,7 +42,7 @@ export interface ISegmentUpdateData {
}

export interface ISplitUpdateData {
type: SPLIT_UPDATE,
type: SPLIT_UPDATE | RB_SEGMENT_UPDATE,
changeNumber: number,
pcn?: number,
d?: string,
Expand Down
159 changes: 93 additions & 66 deletions src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ISplit } from '../../../dtos/types';
import { IRBSegment, ISplit } from '../../../dtos/types';
import { STREAMING_PARSING_SPLIT_UPDATE } from '../../../logger/constants';
import { ILogger } from '../../../logger/types';
import { SDK_SPLITS_ARRIVED } from '../../../readiness/constants';
import { ISplitsEventEmitter } from '../../../readiness/types';
Expand All @@ -7,94 +8,120 @@ import { ITelemetryTracker } from '../../../trackers/types';
import { Backoff } from '../../../utils/Backoff';
import { SPLITS } from '../../../utils/constants';
import { ISegmentsSyncTask, ISplitsSyncTask } from '../../polling/types';
import { RB_SEGMENT_UPDATE } from '../constants';
import { parseFFUpdatePayload } from '../parseUtils';
import { ISplitKillData, ISplitUpdateData } from '../SSEHandler/types';
import { FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT, FETCH_BACKOFF_MAX_RETRIES } from './constants';
import { IUpdateWorker } from './types';

/**
* SplitsUpdateWorker factory
*/
export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData, payload?: ISplit]> & { killSplit(event: ISplitKillData): void } {
export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData]> & { killSplit(event: ISplitKillData): void } {

let maxChangeNumber = 0;
let handleNewEvent = false;
let isHandlingEvent: boolean;
let cdnBypass: boolean;
let payload: ISplit | undefined;
const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT);
function SplitsUpdateWorker() {
let maxChangeNumber = 0;
let handleNewEvent = false;
let isHandlingEvent: boolean;
let cdnBypass: boolean;
let payload: ISplit | IRBSegment | undefined;
const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT);

function __handleSplitUpdateCall() {
isHandlingEvent = true;
if (maxChangeNumber > splitsCache.getChangeNumber()) {
handleNewEvent = false;
const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined;
// fetch splits revalidating data if cached
splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => {
if (!isHandlingEvent) return; // halt if `stop` has been called
if (handleNewEvent) {
__handleSplitUpdateCall();
} else {
if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS);
// fetch new registered segments for server-side API. Not retrying on error
if (segmentsSyncTask) segmentsSyncTask.execute(true);
function __handleSplitUpdateCall() {
isHandlingEvent = true;
if (maxChangeNumber > splitsCache.getChangeNumber()) {
handleNewEvent = false;
const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined;
// fetch splits revalidating data if cached
splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => {
if (!isHandlingEvent) return; // halt if `stop` has been called
if (handleNewEvent) {
__handleSplitUpdateCall();
} else {
if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS);
// fetch new registered segments for server-side API. Not retrying on error
if (segmentsSyncTask) segmentsSyncTask.execute(true);

const attempts = backoff.attempts + 1;
const attempts = backoff.attempts + 1;

if (maxChangeNumber <= splitsCache.getChangeNumber()) {
log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`);
isHandlingEvent = false;
return;
}
if (maxChangeNumber <= splitsCache.getChangeNumber()) {
log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`);
isHandlingEvent = false;
return;
}

if (attempts < FETCH_BACKOFF_MAX_RETRIES) {
backoff.scheduleCall();
return;
}
if (attempts < FETCH_BACKOFF_MAX_RETRIES) {
backoff.scheduleCall();
return;
}

if (cdnBypass) {
log.debug(`No changes fetched after ${attempts} attempts with CDN bypassed.`);
isHandlingEvent = false;
} else {
backoff.reset();
cdnBypass = true;
__handleSplitUpdateCall();
if (cdnBypass) {
log.debug(`No changes fetched after ${attempts} attempts with CDN bypassed.`);
isHandlingEvent = false;
} else {
backoff.reset();
cdnBypass = true;
__handleSplitUpdateCall();
}
}
}
});
} else {
isHandlingEvent = false;
});
} else {
isHandlingEvent = false;
}
}
}

/**
* Invoked by NotificationProcessor on SPLIT_UPDATE event
*
* @param changeNumber - change number of the SPLIT_UPDATE notification
*/
function put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit) {
const currentChangeNumber = splitsCache.getChangeNumber();
return {
/**
* Invoked by NotificationProcessor on SPLIT_UPDATE or RB_SEGMENT_UPDATE event
*
* @param changeNumber - change number of the notification
*/
put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit | IRBSegment) {
const currentChangeNumber = splitsCache.getChangeNumber();

if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return;
if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return;

maxChangeNumber = changeNumber;
handleNewEvent = true;
cdnBypass = false;
payload = undefined;
maxChangeNumber = changeNumber;
handleNewEvent = true;
cdnBypass = false;
payload = undefined;

if (_payload && currentChangeNumber === pcn) {
payload = _payload;
}
if (_payload && currentChangeNumber === pcn) {
payload = _payload;
}

if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall();
backoff.reset();
if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall();
backoff.reset();
},
stop() {
isHandlingEvent = false;
backoff.reset();
}
};
}

const ff = SplitsUpdateWorker();
const rbs = SplitsUpdateWorker();

return {
put,
put(parsedData) {
if (parsedData.d && parsedData.c !== undefined) {
try {
const payload = parseFFUpdatePayload(parsedData.c, parsedData.d);
if (payload) {
(parsedData.type === RB_SEGMENT_UPDATE ? rbs : ff).put(parsedData, payload);
return;
}
} catch (e) {
log.warn(STREAMING_PARSING_SPLIT_UPDATE, [parsedData.type, e]);
}
}
(parsedData.type === RB_SEGMENT_UPDATE ? rbs : ff).put(parsedData);
},
/**
* Invoked by NotificationProcessor on SPLIT_KILL event
*
* @param changeNumber - change number of the SPLIT_UPDATE notification
* @param changeNumber - change number of the notification
* @param splitName - name of split to kill
* @param defaultTreatment - default treatment value
*/
Expand All @@ -104,12 +131,12 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync,
splitsEventEmitter.emit(SDK_SPLITS_ARRIVED, true);
}
// queues the SplitChanges fetch (only if changeNumber is newer)
put({ changeNumber } as ISplitUpdateData);
ff.put({ changeNumber } as ISplitUpdateData);
},

stop() {
isHandlingEvent = false;
backoff.reset();
ff.stop();
rbs.stop();
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ describe('SplitsUpdateWorker', () => {
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker);
const payload = notification.decoded;
const changeNumber = payload.changeNumber;
splitUpdateWorker.put({ changeNumber, pcn }, payload); // queued
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); // queued
expect(splitsSyncTask.execute).toBeCalledTimes(1);
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, { changeNumber, payload }]);
});
Expand All @@ -237,7 +237,7 @@ describe('SplitsUpdateWorker', () => {

let splitsSyncTask = splitsSyncTaskMock(cache);
let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker);
splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded);
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
expect(splitsSyncTask.execute).toBeCalledTimes(1);
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]);
splitsSyncTask.execute.mockClear();
Expand All @@ -250,7 +250,7 @@ describe('SplitsUpdateWorker', () => {

splitsSyncTask = splitsSyncTaskMock(cache);
splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker);
splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded);
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
expect(splitsSyncTask.execute).toBeCalledTimes(1);
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]);
splitsSyncTask.execute.mockClear();
Expand All @@ -263,7 +263,7 @@ describe('SplitsUpdateWorker', () => {

splitsSyncTask = splitsSyncTaskMock(cache);
splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker);
splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded);
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
expect(splitsSyncTask.execute).toBeCalledTimes(1);
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, { payload: notification.decoded, changeNumber }]);

Expand Down
1 change: 1 addition & 0 deletions src/sync/streaming/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export const MEMBERSHIPS_LS_UPDATE = 'MEMBERSHIPS_LS_UPDATE';
export const SEGMENT_UPDATE = 'SEGMENT_UPDATE';
export const SPLIT_KILL = 'SPLIT_KILL';
export const SPLIT_UPDATE = 'SPLIT_UPDATE';
export const RB_SEGMENT_UPDATE = 'RB_SEGMENT_UPDATE';

// Control-type push notifications, handled by NotificationKeeper
export const CONTROL = 'CONTROL';
Expand Down
4 changes: 2 additions & 2 deletions src/sync/streaming/parseUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { algorithms } from '../../utils/decompress';
import { decodeFromBase64 } from '../../utils/base64';
import { hash } from '../../utils/murmur3/murmur3';
import { Compression, IMembershipMSUpdateData, KeyList } from './SSEHandler/types';
import { ISplit } from '../../dtos/types';
import { IRBSegment, ISplit } from '../../dtos/types';

const GZIP = 1;
const ZLIB = 2;
Expand Down Expand Up @@ -82,7 +82,7 @@ export function isInBitmap(bitmap: Uint8Array, hash64hex: string) {
/**
* Parse feature flags notifications for instant feature flag updates
*/
export function parseFFUpdatePayload(compression: Compression, data: string): ISplit | undefined {
export function parseFFUpdatePayload(compression: Compression, data: string): ISplit | IRBSegment | undefined {
return compression > 0 ?
parseKeyList(data, compression, false) :
JSON.parse(decodeFromBase64(data));
Expand Down
Loading