Skip to content

[Rule-based segments] Add storage interface and implementations for Redis and Pluggable storages #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/dtos/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ export interface ISplitCondition {
conditionType: 'ROLLOUT' | 'WHITELIST'
}

export interface IRBSegment {
name: string,
changeNumber: number,
status: 'ACTIVE' | 'ARCHIVED',
excluded: {
keys: string[],
segments: string[]
},
conditions: ISplitCondition[],
}

export interface ISplit {
name: string,
changeNumber: number,
Expand Down
12 changes: 12 additions & 0 deletions src/storages/KeyBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ export class KeyBuilder {
return `${this.prefix}.split.`;
}

buildRBSegmentKey(splitName: string) {
return `${this.prefix}.rbsegment.${splitName}`;
}

buildRBSegmentsTillKey() {
return `${this.prefix}.rbsegments.till`;
}

buildRBSegmentKeyPrefix() {
return `${this.prefix}.rbsegment.`;
}

buildSegmentNameKey(segmentName: string) {
return `${this.prefix}.segment.${segmentName}`;
}
Expand Down
4 changes: 4 additions & 0 deletions src/storages/KeyBuilderSS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ export class KeyBuilderSS extends KeyBuilder {
return `${this.buildSplitKeyPrefix()}*`;
}

searchPatternForRBSegmentKeys() {
return `${this.buildRBSegmentKeyPrefix()}*`;
}

/* Telemetry keys */

buildLatencyKey(method: Method, bucket: number) {
Expand Down
59 changes: 59 additions & 0 deletions src/storages/__tests__/RBSegmentsCacheAsync.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { RBSegmentsCacheInRedis } from '../inRedis/RBSegmentsCacheInRedis';
import { RBSegmentsCachePluggable } from '../pluggable/RBSegmentsCachePluggable';
import { KeyBuilderSS } from '../KeyBuilderSS';
import { rbSegment, rbSegmentWithInSegmentMatcher } from '../__tests__/testUtils';
import { loggerMock } from '../../logger/__tests__/sdkLogger.mock';
import { metadata } from './KeyBuilder.spec';
import { RedisAdapter } from '../inRedis/RedisAdapter';
import { wrapperMockFactory } from '../pluggable/__tests__/wrapper.mock';

const keys = new KeyBuilderSS('RBSEGMENT', metadata);

const redisClient = new RedisAdapter(loggerMock);
const cacheInRedis = new RBSegmentsCacheInRedis(loggerMock, keys, redisClient);

const storageWrapper = wrapperMockFactory();
const cachePluggable = new RBSegmentsCachePluggable(loggerMock, keys, storageWrapper);

describe.each([{ cache: cacheInRedis, wrapper: redisClient }, { cache: cachePluggable, wrapper: storageWrapper }])('Rule-based segments cache async (Redis & Pluggable)', ({ cache, wrapper }) => {

afterAll(async () => {
await wrapper.del(keys.buildRBSegmentsTillKey());
await wrapper.disconnect();
});

test('update should add and remove segments correctly', async () => {
// Add segments
expect(await cache.update([rbSegment, rbSegmentWithInSegmentMatcher], [], 1)).toBe(true);
expect(await cache.get(rbSegment.name)).toEqual(rbSegment);
expect(await cache.get(rbSegmentWithInSegmentMatcher.name)).toEqual(rbSegmentWithInSegmentMatcher);
expect(await cache.getChangeNumber()).toBe(1);

// Remove a segment
expect(await cache.update([], [rbSegment], 2)).toBe(true);
expect(await cache.get(rbSegment.name)).toBeNull();
expect(await cache.get(rbSegmentWithInSegmentMatcher.name)).toEqual(rbSegmentWithInSegmentMatcher);
expect(await cache.getChangeNumber()).toBe(2);

// Remove remaining segment
expect(await cache.update([], [rbSegmentWithInSegmentMatcher], 3)).toBe(true);
expect(await cache.get(rbSegment.name)).toBeNull();
expect(await cache.get(rbSegmentWithInSegmentMatcher.name)).toBeNull();
expect(await cache.getChangeNumber()).toBe(3);

// No changes
expect(await cache.update([], [rbSegmentWithInSegmentMatcher], 4)).toBe(false);
expect(await cache.getChangeNumber()).toBe(4);
});

test('contains should check for segment existence correctly', async () => {
await cache.update([rbSegment, rbSegmentWithInSegmentMatcher], [], 1);

expect(await cache.contains(new Set([rbSegment.name]))).toBe(true);
expect(await cache.contains(new Set([rbSegment.name, rbSegmentWithInSegmentMatcher.name]))).toBe(true);
expect(await cache.contains(new Set(['nonexistent']))).toBe(false);
expect(await cache.contains(new Set([rbSegment.name, 'nonexistent']))).toBe(false);

await cache.update([], [rbSegment, rbSegmentWithInSegmentMatcher], 2);
});
});
8 changes: 7 additions & 1 deletion src/storages/__tests__/testUtils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ISplit } from '../../dtos/types';
import { IRBSegment, ISplit } from '../../dtos/types';
import { IStorageSync, IStorageAsync, IImpressionsCacheSync, IEventsCacheSync } from '../types';

// Assert that instances created by storage factories have the expected interface
Expand Down Expand Up @@ -45,3 +45,9 @@ export const featureFlagTwo: ISplit = { name: 'ff_two', sets: ['t','w','o'] };
export const featureFlagThree: ISplit = { name: 'ff_three', sets: ['t','h','r','e'] };
//@ts-ignore
export const featureFlagWithoutFS: ISplit = { name: 'ff_four' };

// Rule-based segments
//@ts-ignore
export const rbSegment: IRBSegment = { name: 'rb_segment', conditions: [{ matcherGroup: { matchers: [{ matcherType: 'EQUAL_TO', unaryNumericMatcherData: { value: 10 } }] } }] };
//@ts-ignore
export const rbSegmentWithInSegmentMatcher: IRBSegment = { name: 'rb_segment_with_in_segment_matcher', conditions: [{ matcherGroup: { matchers: [{ matcherType: 'IN_SEGMENT', userDefinedSegmentMatcherData: { segmentName: 'employees' } }] } }] };
79 changes: 79 additions & 0 deletions src/storages/inRedis/RBSegmentsCacheInRedis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { isNaNNumber } from '../../utils/lang';
import { IRBSegmentsCacheAsync } from '../types';
import { ILogger } from '../../logger/types';
import { IRBSegment } from '../../dtos/types';
import { LOG_PREFIX } from './constants';
import { setToArray } from '../../utils/lang/sets';
import { RedisAdapter } from './RedisAdapter';
import { KeyBuilderSS } from '../KeyBuilderSS';

export class RBSegmentsCacheInRedis implements IRBSegmentsCacheAsync {

private readonly log: ILogger;
private readonly keys: KeyBuilderSS;
private readonly redis: RedisAdapter;

constructor(log: ILogger, keys: KeyBuilderSS, redis: RedisAdapter) {
this.log = log;
this.keys = keys;
this.redis = redis;
}

get(name: string): Promise<IRBSegment | null> {
return this.redis.get(this.keys.buildRBSegmentKey(name))
.then(maybeRBSegment => maybeRBSegment && JSON.parse(maybeRBSegment));
}

private getNames(): Promise<string[]> {
return this.redis.keys(this.keys.searchPatternForRBSegmentKeys()).then(
(listOfKeys) => listOfKeys.map(this.keys.extractKey)
);
}

contains(names: Set<string>): Promise<boolean> {
const namesArray = setToArray(names);
return this.getNames().then(namesInStorage => {
return namesArray.every(name => namesInStorage.includes(name));
});
}

update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): Promise<boolean> {
return Promise.all([
this.setChangeNumber(changeNumber),
Promise.all(toAdd.map(toAdd => {
const key = this.keys.buildRBSegmentKey(toAdd.name);
const stringifiedNewRBSegment = JSON.stringify(toAdd);
return this.redis.set(key, stringifiedNewRBSegment).then(() => true);
})),
Promise.all(toRemove.map(toRemove => {
const key = this.keys.buildRBSegmentKey(toRemove.name);
return this.redis.del(key).then(status => status === 1);
}))
]).then(([, added, removed]) => {
return added.some(result => result) || removed.some(result => result);
});
}

setChangeNumber(changeNumber: number) {
return this.redis.set(this.keys.buildRBSegmentsTillKey(), changeNumber + '').then(
status => status === 'OK'
);
}

getChangeNumber(): Promise<number> {
return this.redis.get(this.keys.buildRBSegmentsTillKey()).then((value: string | null) => {
const i = parseInt(value as string, 10);

return isNaNNumber(i) ? -1 : i;
}).catch((e) => {
this.log.error(LOG_PREFIX + 'Could not retrieve changeNumber from storage. Error: ' + e);
return -1;
});
}

// @TODO implement if required by DataLoader or producer mode
clear() {
return Promise.resolve();
}

}
76 changes: 76 additions & 0 deletions src/storages/pluggable/RBSegmentsCachePluggable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { isNaNNumber } from '../../utils/lang';
import { KeyBuilder } from '../KeyBuilder';
import { IPluggableStorageWrapper, IRBSegmentsCacheAsync } from '../types';
import { ILogger } from '../../logger/types';
import { IRBSegment } from '../../dtos/types';
import { LOG_PREFIX } from './constants';
import { setToArray } from '../../utils/lang/sets';

export class RBSegmentsCachePluggable implements IRBSegmentsCacheAsync {

private readonly log: ILogger;
private readonly keys: KeyBuilder;
private readonly wrapper: IPluggableStorageWrapper;

constructor(log: ILogger, keys: KeyBuilder, wrapper: IPluggableStorageWrapper) {
this.log = log;
this.keys = keys;
this.wrapper = wrapper;
}

get(name: string): Promise<IRBSegment | null> {
return this.wrapper.get(this.keys.buildRBSegmentKey(name))
.then(maybeRBSegment => maybeRBSegment && JSON.parse(maybeRBSegment));
}

private getNames(): Promise<string[]> {
return this.wrapper.getKeysByPrefix(this.keys.buildRBSegmentKeyPrefix()).then(
(listOfKeys) => listOfKeys.map(this.keys.extractKey)
);
}

contains(names: Set<string>): Promise<boolean> {
const namesArray = setToArray(names);
return this.getNames().then(namesInStorage => {
return namesArray.every(name => namesInStorage.includes(name));
});
}

update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): Promise<boolean> {
return Promise.all([
this.setChangeNumber(changeNumber),
Promise.all(toAdd.map(toAdd => {
const key = this.keys.buildRBSegmentKey(toAdd.name);
const stringifiedNewRBSegment = JSON.stringify(toAdd);
return this.wrapper.set(key, stringifiedNewRBSegment).then(() => true);
})),
Promise.all(toRemove.map(toRemove => {
const key = this.keys.buildRBSegmentKey(toRemove.name);
return this.wrapper.del(key);
}))
]).then(([, added, removed]) => {
return added.some(result => result) || removed.some(result => result);
});
}

setChangeNumber(changeNumber: number) {
return this.wrapper.set(this.keys.buildRBSegmentsTillKey(), changeNumber + '');
}

getChangeNumber(): Promise<number> {
return this.wrapper.get(this.keys.buildRBSegmentsTillKey()).then((value) => {
const i = parseInt(value as string, 10);

return isNaNNumber(i) ? -1 : i;
}).catch((e) => {
this.log.error(LOG_PREFIX + 'Could not retrieve changeNumber from storage. Error: ' + e);
return -1;
});
}

// @TODO implement if required by DataLoader or producer mode
clear() {
return Promise.resolve();
}

}
30 changes: 29 additions & 1 deletion src/storages/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import SplitIO from '../../types/splitio';
import { MaybeThenable, ISplit, IMySegmentsResponse } from '../dtos/types';
import { MaybeThenable, ISplit, IRBSegment, IMySegmentsResponse } from '../dtos/types';
import { MySegmentsData } from '../sync/polling/types';
import { EventDataType, HttpErrors, HttpLatencies, ImpressionDataType, LastSync, Method, MethodExceptions, MethodLatencies, MultiMethodExceptions, MultiMethodLatencies, MultiConfigs, OperationType, StoredEventWithMetadata, StoredImpressionWithMetadata, StreamingEvent, UniqueKeysPayloadCs, UniqueKeysPayloadSs, TelemetryUsageStatsPayload, UpdatesFromSSEEnum } from '../sync/submitters/types';
import { ISettings } from '../types';
Expand Down Expand Up @@ -225,6 +225,34 @@ export interface ISplitsCacheAsync extends ISplitsCacheBase {
getNamesByFlagSets(flagSets: string[]): Promise<Set<string>[]>
}

/** Rule-Based Segments cache */

export interface IRBSegmentsCacheBase {
update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): MaybeThenable<boolean>,
get(name: string): MaybeThenable<IRBSegment | null>,
getChangeNumber(): MaybeThenable<number>,
clear(): MaybeThenable<boolean | void>,
contains(names: Set<string>): MaybeThenable<boolean>,
}

export interface IRBSegmentsCacheSync extends IRBSegmentsCacheBase {
update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): boolean,
get(name: string): IRBSegment | null,
getChangeNumber(): number,
clear(): void,
contains(names: Set<string>): boolean,
// Used only for smart pausing in client-side standalone. Returns true if the storage contains a RBSegment using segments or large segments matchers
usesSegments(): boolean,
}

export interface IRBSegmentsCacheAsync extends IRBSegmentsCacheBase {
update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): Promise<boolean>,
get(name: string): Promise<IRBSegment | null>,
getChangeNumber(): Promise<number>,
clear(): Promise<boolean | void>,
contains(names: Set<string>): Promise<boolean>,
}

/** Segments cache */

export interface ISegmentsCacheBase {
Expand Down