Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add Reset by Channel feature #290

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions engine/memcached_state_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class MemcachedStateStore {
await this.initAsync(id, initData);
}

async resetAllAsync() {
console.error("Shared Storage Reset Failed.\nMemcache-client: Flush All Command Not Implemented Yet");
}

async getAsync(id, key) {
const storeKey = "" + this.keyPrefix + id + key;
const data = await this.client.get(storeKey);
Expand Down
20 changes: 18 additions & 2 deletions engine/memory_state_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,31 @@ class MemoryStateStore {
async initAsync(id, initData) {
if (!this.sharedStates[id]) {
this.sharedStates[id] = {};
Object.keys(initData).forEach(key => {
Object.keys(initData).forEach((key) => {
this.sharedStates[id][key] = initData[key];
});
}
return this.sharedStates[id];
}

async resetAsync(id, initData) {
this.sharedStates[id] = null;
await this.initAsync(id, initData);
}

async resetAllAsync() {
this.sharedStates = {};
this.globalSharedStates = {};
}

async getAsync(id, key) {
let value;
if (id === "" || id === null) {
value = this.globalSharedStates[key];
} else {
if (!this.sharedStates[id]) {
return null;
}
value = this.sharedStates[id][key];
}
return value;
Expand All @@ -30,6 +43,9 @@ class MemoryStateStore {
if (id === "" || id === null) {
this.globalSharedStates[key] = value;
} else {
if (!this.sharedStates[id]) {
this.sharedStates[id] = {};
}
this.sharedStates[id][key] = value;
return this.sharedStates[id][key];
}
Expand All @@ -41,7 +57,7 @@ class MemoryStateStore {

async removeAsync(id, key) {
delete this.sharedStates[id][key];
}
}
}

module.exports = MemoryStateStore;
19 changes: 12 additions & 7 deletions engine/redis_state_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class RedisStateStore {
this.volatileKeyTTL = DEFAULT_VOLATILE_KEY_TTL;
if (opts.volatileKeyTTL) {
debug(`Overriding default, volatileKeyTTL=${opts.volatileKeyTTL}s`);
this.volatileKeyTTL = opts.volatileKeyTTL;
this.volatileKeyTTL = opts.volatileKeyTTL;
}
this.client = createClient(opts.redisUrl);
}
Expand All @@ -23,14 +23,14 @@ class RedisStateStore {
const isInitiated = await this.getAsync(id, "_initiated");
let data = {};
if (!isInitiated) {
for(const key of Object.keys(initData)) {
for (const key of Object.keys(initData)) {
debug(`${this.keyPrefix}:${id}: Initiating key ${key} with init data`);
data[key] = await this.setAsync(id, key, initData[key]);
}
await this.setAsync(id, "_initiated", true);
} else {
debug(`${this.keyPrefix}:${id}: Already initiated, not initiating with init data`);
for(const key of Object.keys(initData)) {
for (const key of Object.keys(initData)) {
debug(`${this.keyPrefix}:${id}: Initiating key ${key} with data from store`);
data[key] = await this.getAsync(id, key);
}
Expand All @@ -39,8 +39,13 @@ class RedisStateStore {
}

async resetAsync(id, initData) {
await this.setAsync(id, "_initiated", false);
await this.initAsync(id, initData);
}

async resetAllAsync() {
const resetAsync = new Promise((resolve, reject) => {
this.client.flushall((err, reply) => {
this.client.flushall((err, reply) => {
if (!err) {
console.log("Flushed Redis db: ", reply);
resolve();
Expand All @@ -64,7 +69,7 @@ class RedisStateStore {
} else {
reject(err);
}
});
});
});
const data = await getAsync;
return data;
Expand All @@ -76,7 +81,7 @@ class RedisStateStore {
const setAsync = new Promise((resolve, reject) => {
this.client.set(storeKey, JSON.stringify(value), (err, res) => {
const ioTimeMs = Date.now() - startMs;
debug(`REDIS set ${storeKey}: ${res} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? 'REDISSLOW!' : ''}`);
debug(`REDIS set ${storeKey}: ${res} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
if (!err) {
resolve(value);
} else {
Expand Down Expand Up @@ -110,7 +115,7 @@ class RedisStateStore {
const delAsync = new Promise((resolve, reject) => {
this.client.del(storeKey, (err, res) => {
const ioTimeMs = Date.now() - startMs;
debug(`REDIS remove ${storeKey}: (${ioTimeMs}ms) ${ioTimeMs > 1000 ? 'REDISSLOW!' : ''}`);
debug(`REDIS remove ${storeKey}: (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
if (!err) {
resolve();
} else {
Expand Down
76 changes: 74 additions & 2 deletions engine/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export interface ChannelEngineOpts {
noSessionDataTags?: boolean;
volatileKeyTTL?: number;
autoCreateSession?: boolean;
sessionResetKey?:string;
}

interface StreamerOpts {
Expand Down Expand Up @@ -203,6 +204,7 @@ export class ChannelEngine {
private adCopyMgrUri?: string;
private adXchangeUri?: string;
private autoCreateSession: boolean = false;
private sessionResetKey: string = "";

constructor(assetMgr: IAssetManager, options?: ChannelEngineOpts) {
this.options = options;
Expand All @@ -221,6 +223,10 @@ export class ChannelEngine {
this.dummySubtitleEndpoint = (options && options.dummySubtitleEndpoint) ? options.dummySubtitleEndpoint : DefaultDummySubtitleEndpointPath;
this.subtitleSliceEndpoint = (options && options.subtitleSliceEndpoint) ? options.subtitleSliceEndpoint : DefaultSubtitleSpliceEndpointPath;

this.sessionResetKey = "";
if (options && options.sessionResetKey) {
this.sessionResetKey = options.sessionResetKey;
}
this.alwaysNewSegments = false;
if (options && options.alwaysNewSegments) {
this.alwaysNewSegments = true;
Expand Down Expand Up @@ -361,7 +367,8 @@ export class ChannelEngine {
this.server.get('/status/:sessionId', this._handleStatus.bind(this));
this.server.get('/health', this._handleAggregatedSessionHealth.bind(this));
this.server.get('/health/:sessionId', this._handleSessionHealth.bind(this));
this.server.get('/reset', this._handleSessionReset.bind(this));
this.server.get('/reset', this._handleSessionsReset.bind(this));
this.server.get('/reset/:sessionId', this._handleSessionReset.bind(this));
this.server.get(DefaultDummySubtitleEndpointPath, this._handleDummySubtitleEndpoint.bind(this));
this.server.get(DefaultSubtitleSpliceEndpointPath, this._handleSubtitleSliceEndpoint.bind(this));

Expand Down Expand Up @@ -466,6 +473,7 @@ export class ChannelEngine {
subtitleSliceEndpoint: this.subtitleSliceEndpoint,
useVTTSubtitles: this.useVTTSubtitles,
alwaysNewSegments: options.alwaysNewSegments,
sessionResetKey: options.sessionResetKey,
partialStoreHLSVod: options.partialStoreHLSVod,
alwaysMapBandwidthByNearest: options.alwaysMapBandwidthByNearest,
noSessionDataTags: options.noSessionDataTags,
Expand Down Expand Up @@ -1005,8 +1013,18 @@ export class ChannelEngine {
}
}

async _handleSessionReset(req, res, next) {
async _handleSessionsReset(req, res, next) {
debug(`req.url=${req.url}`);
if (this.sessionResetKey && req.query.key !== this.sessionResetKey) {
res.sendRaw(403, JSON.stringify({ "message": "Invalid Session-Reset-Key" }),
{
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
});
next();
return;
}
let sessionResets = [];
for (const sessionId of Object.keys(sessions)) {
const session = sessions[sessionId];
Expand All @@ -1027,6 +1045,60 @@ export class ChannelEngine {
});
}

async _handleSessionReset(req, res, next) {
debug(`req.url=${req.url}`);
if (this.sessionResetKey && req.query.key !== this.sessionResetKey) {
res.sendRaw(403, JSON.stringify({ "message": "Invalid Session-Reset-Key" }),
{
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
});
next();
return;
}
try {
let sessionId;
if (req.params && req.params.sessionId) {
sessionId = req.params.sessionId;
}
let sessionResets = [];
const session = sessions[sessionId];
const sessionLive = sessionsLive[sessionId];
if (session && sessionLive) {
await session.resetAsync(sessionId);
sessionResets.push(sessionId);
} else {
res.sendRaw(400, JSON.stringify({ "message": "Invalid Session ID" }),
{
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
});
next();
return;
}

res.sendRaw(200, JSON.stringify({ "status": "ok", "instanceId": this.instanceId, "resets": sessionResets }),
{
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
});
next();
} catch (e) {
res.sendRaw(500, JSON.stringify({ "error": e}),
{
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
});
const err = new errs.NotFoundError('Session Reset Failed');
next(err);
}
}


_gracefulErrorHandler(errMsg) {
console.error(errMsg);
const err = new errs.NotFoundError(errMsg);
Expand Down
10 changes: 7 additions & 3 deletions engine/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,13 @@ class Session {
}
}

async resetAsync() {
await this._sessionStateStore.reset(this._sessionId);
await this._playheadStateStore.reset(this._sessionId);
async resetAsync(id) {
if (id) {
await this._sessionStateStore.reset(id);
await this._playheadStateStore.reset(id);
}
await this._sessionStateStore.resetAll();
await this._playheadStateStore.resetAll();
}

async getSessionState() {
Expand Down
4 changes: 4 additions & 0 deletions engine/shared_state_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class SharedStateStore {
await this.store.resetAsync(id, this.initData);
}

async resetAll() {
await this.store.resetAllAsync();
}

async get(id, key) {
//debug(`${this.type}:${id}:${key} Reading from shared store`);
let data = await this.store.getAsync(id, key);
Expand Down