Skip to content

Commit 8119afe

Browse files
authored
fix: Discard events from closed connections. (#867)
This PR + #842 should help to prevent issues like #840.
1 parent c659e87 commit 8119afe

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

packages/shared/sdk-client/src/polling/PollingProcessor.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import { Flags } from '../types';
1313

1414
export type PollingErrorHandler = (err: LDPollingError) => void;
1515

16+
function reportClosed(logger?: LDLogger) {
17+
logger?.debug(`Poll completed after the processor was closed. Skipping processing.`);
18+
}
19+
1620
/**
1721
* @internal
1822
*/
@@ -50,6 +54,12 @@ export default class PollingProcessor implements subsystem.LDStreamProcessor {
5054
try {
5155
const res = await this._requestor.requestPayload();
5256
try {
57+
// If the processor has been stopped, we discard the response.
58+
// This response could be for a no longer active context.
59+
if (this._stopped) {
60+
reportClosed(this._logger);
61+
return;
62+
}
5363
const flags = JSON.parse(res);
5464
try {
5565
this._dataHandler?.(flags);
@@ -60,6 +70,12 @@ export default class PollingProcessor implements subsystem.LDStreamProcessor {
6070
reportJsonError(res);
6171
}
6272
} catch (err) {
73+
// If the processor has been stopped, we discard this error.
74+
// The original caller would consider this connection no longer active.
75+
if (this._stopped) {
76+
reportClosed(this._logger);
77+
return;
78+
}
6379
const requestError = err as LDRequestError;
6480
if (requestError.status !== undefined) {
6581
if (!isHttpRecoverable(requestError.status)) {

packages/shared/sdk-client/src/streaming/StreamingProcessor.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,21 @@ const reportJsonError = (
3333
);
3434
};
3535

36+
function reportEventClosed(eventName: string, logger?: LDLogger) {
37+
logger?.debug(`Received ${eventName} event after processor was closed. Skipping processing.`);
38+
}
39+
40+
function reportPingClosed(logger?: LDLogger) {
41+
logger?.debug('Ping completed after processor was closed. Skipping processing.');
42+
}
43+
3644
class StreamingProcessor implements subsystem.LDStreamProcessor {
3745
private readonly _headers: { [key: string]: string | string[] };
3846
private readonly _streamUri: string;
3947

4048
private _eventSource?: EventSource;
4149
private _connectionAttemptStartTime?: number;
50+
private _stopped = false;
4251

4352
constructor(
4453
private readonly _plainContextString: string,
@@ -157,6 +166,13 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
157166

158167
this._listeners.forEach(({ deserializeData, processJson }, eventName) => {
159168
eventSource.addEventListener(eventName, (event) => {
169+
// If an event comes in after the processor has been stopped, we skip processing it.
170+
// This event could be for a context which is no longer active.
171+
if (this._stopped) {
172+
reportEventClosed(eventName, this._logger);
173+
return;
174+
}
175+
160176
this._logger?.debug(`Received ${eventName} event`);
161177

162178
if (event?.data) {
@@ -186,6 +202,12 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
186202
try {
187203
const res = await this._pollingRequestor.requestPayload();
188204
try {
205+
// If the ping completes after the processor has been stopped, then we discard it.
206+
// This event could be for a context which is no longer active.
207+
if (this._stopped) {
208+
reportPingClosed(this._logger);
209+
return;
210+
}
189211
const payload = JSON.parse(res);
190212
try {
191213
// forward the payload on to the PUT listener
@@ -204,6 +226,12 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
204226
);
205227
}
206228
} catch (err) {
229+
if (this._stopped) {
230+
// If the ping errors after the processor has been stopped, then we discard it.
231+
// The original caller would consider this connection no longer active.
232+
reportPingClosed(this._logger);
233+
return;
234+
}
207235
const requestError = err as LDRequestError;
208236
this._errorHandler?.(
209237
new LDPollingError(
@@ -219,6 +247,7 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
219247
stop() {
220248
this._eventSource?.close();
221249
this._eventSource = undefined;
250+
this._stopped = true;
222251
}
223252

224253
close() {

0 commit comments

Comments
 (0)