Skip to content

Commit

Permalink
Cleanup cancelled requests (#2387)
Browse files Browse the repository at this point in the history
  • Loading branch information
ofpiyush authored Dec 30, 2020
1 parent 611186c commit 3d7242d
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 112 deletions.
109 changes: 83 additions & 26 deletions app/client/src/sagas/evaluationsSaga.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
import {
all,
call,
fork,
put,
select,
take,
takeLatest,
} from "redux-saga/effects";
import { actionChannel, call, put, select, take } from "redux-saga/effects";

import {
EvaluationReduxAction,
Expand Down Expand Up @@ -37,6 +29,7 @@ import { Variant } from "components/ads/common";
import { Toaster } from "components/ads/Toast";
import * as Sentry from "@sentry/react";
import { EXECUTION_PARAM_KEY } from "../constants/ActionConstants";
import { Action } from "redux";

let widgetTypeConfigMap: WidgetTypeConfigMap;

Expand Down Expand Up @@ -187,7 +180,14 @@ export function* validateProperty(
});
}

const FIRST_EVAL_REDUX_ACTIONS = [
// Pages
ReduxActionTypes.FETCH_PAGE_SUCCESS,
ReduxActionTypes.FETCH_PUBLISHED_PAGE_SUCCESS,
];

const EVALUATE_REDUX_ACTIONS = [
...FIRST_EVAL_REDUX_ACTIONS,
// Actions
ReduxActionTypes.FETCH_ACTIONS_SUCCESS,
ReduxActionTypes.FETCH_ACTIONS_VIEW_MODE_SUCCESS,
Expand Down Expand Up @@ -218,21 +218,42 @@ const EVALUATE_REDUX_ACTIONS = [
// Widget Meta
ReduxActionTypes.SET_META_PROP,
ReduxActionTypes.RESET_WIDGET_META,
// Pages
ReduxActionTypes.FETCH_PAGE_SUCCESS,
ReduxActionTypes.FETCH_PUBLISHED_PAGE_SUCCESS,
// Batches
ReduxActionTypes.BATCH_UPDATES_SUCCESS,
];

function* evaluationChangeListenerSaga() {
yield fork(worker.start);
widgetTypeConfigMap = WidgetFactory.getWidgetTypeConfigMap();
yield fork(evaluateTreeSaga);
while (true) {
const action: EvaluationReduxAction<unknown | unknown[]> = yield take(
EVALUATE_REDUX_ACTIONS,
);
function evalQueueBuffer() {
let initialised = false;
let takable = false;
let postEvalActions: any = [];
const take = () => {
if (takable) {
const resp = postEvalActions;
postEvalActions = [];
takable = false;
return { postEvalActions: resp, type: "FAKE_ACTION" };
}
};
const flush = () => {
if (takable) {
return [take() as Action];
}

return [];
};

const put = (action: EvaluationReduxAction<unknown | unknown[]>) => {
if (!initialised) {
if (
![
ReduxActionTypes.FETCH_PAGE_SUCCESS,
ReduxActionTypes.FETCH_PUBLISHED_PAGE_SUCCESS,
].includes(action.type)
) {
return;
}
initialised = true;
}
// When batching success action happens, we need to only evaluate
// if the batch had any action we need to evaluate properties for
if (
Expand All @@ -245,17 +266,53 @@ function* evaluationChangeListenerSaga() {
if (
_.intersection(EVALUATE_REDUX_ACTIONS, batchedActionTypes).length === 0
) {
continue;
return;
}
}
log.debug(`Evaluating`, { action });
yield fork(evaluateTreeSaga, action.postEvalActions);

takable = true;
// TODO: If the action is the same as before, we can send only one and ignore duplicates.
if (action.postEvalActions) {
postEvalActions.push(...action.postEvalActions);
}
};

return {
take,
put,
isEmpty: () => {
return !takable;
},
flush,
};
}

function* evaluationChangeListenerSaga() {
// Explicitly shutdown old worker if present
yield call(worker.shutdown);
yield call(worker.start);
widgetTypeConfigMap = WidgetFactory.getWidgetTypeConfigMap();
const evtActionChannel = yield actionChannel(
EVALUATE_REDUX_ACTIONS,
evalQueueBuffer(),
);
while (true) {
const action: EvaluationReduxAction<unknown | unknown[]> = yield take(
evtActionChannel,
);
yield call(evaluateTreeSaga, action.postEvalActions);
}
// TODO(hetu) need an action to stop listening and evaluate (exit app)
}

export default function* evaluationSagaListeners() {
yield all([
takeLatest(ReduxActionTypes.START_EVALUATION, evaluationChangeListenerSaga),
]);
yield take(ReduxActionTypes.START_EVALUATION);
while (true) {
try {
yield call(evaluationChangeListenerSaga);
} catch (e) {
log.error(e);
Sentry.captureException(e);
}
}
}
47 changes: 44 additions & 3 deletions app/client/src/utils/WorkerUtil.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { GracefulWorkerService } from "./WorkerUtil";
import { runSaga } from "redux-saga";
import WebpackWorker from "worker-loader!";

const MessageType = "message";
class MockWorker {
class MockWorker implements WebpackWorker {
// Implement interface
onmessage: any;
onmessageerror: any;
dispatchEvent: any;
onerror: any;

callback: CallableFunction;
noop: CallableFunction;
messages: Array<any>;
Expand All @@ -26,12 +33,12 @@ class MockWorker {
this.running = true;
}

addEventListener(msgType: string, callback: CallableFunction) {
addEventListener(msgType: string, callback: any) {
expect(msgType).toEqual(MessageType);
this.callback = callback;
}

removeEventListener(msgType: string, callback: CallableFunction) {
removeEventListener(msgType: string, callback: any) {
expect(msgType).toEqual(MessageType);
expect(callback).toEqual(this.callback);
this.callback = this.noop;
Expand Down Expand Up @@ -168,4 +175,38 @@ describe("GracefulWorkerService", () => {
// The new worker should get the correct message
expect(await result2.toPromise()).toEqual(message2);
});

test("Cancelling saga before starting up should not crash", async () => {
const w = new GracefulWorkerService(MockWorker);
const message = { tree: "hello" };

const task = await runSaga({}, w.request, "cancel_test", message);
// Start shutting down
const shutdown = await runSaga({}, w.shutdown);
task.cancel();
// wait for shutdown
await shutdown.toPromise();
expect(await task.toPromise()).not.toEqual(message);
});

test("Cancelled saga should clean up", async () => {
const w = new GracefulWorkerService(MockWorker);
const message = { tree: "hello" };
await runSaga({}, w.start);

// Need this to work with eslint
if (MockWorker.instance === undefined) {
expect(MockWorker.instance).toBeDefined();
return;
}
// Make sure we get a chance to cancel before the worker can respond
MockWorker.instance.delayMilliSeconds = 100;
const task = await runSaga({}, w.request, "cancel_test", message);
// Start shutting down
const shutdown = await runSaga({}, w.shutdown);
task.cancel();
// wait for shutdown
await shutdown.toPromise();
expect(await task.toPromise()).not.toEqual(message);
});
});
Loading

0 comments on commit 3d7242d

Please sign in to comment.