Skip to content

fix: Discard events from closed connections. #867

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/shared/sdk-client/src/polling/PollingProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import { Flags } from '../types';

export type PollingErrorHandler = (err: LDPollingError) => void;

function reportClosed(logger?: LDLogger) {
logger?.debug(`Poll completed after the processor was closed. Skipping processing.`);
}

/**
* @internal
*/
Expand Down Expand Up @@ -50,6 +54,12 @@ export default class PollingProcessor implements subsystem.LDStreamProcessor {
try {
const res = await this._requestor.requestPayload();
try {
// If the processor has been stopped, we discard the response.
// This response could be for a no longer active context.
if (this._stopped) {
reportClosed(this._logger);
return;
}
const flags = JSON.parse(res);
try {
this._dataHandler?.(flags);
Expand All @@ -60,6 +70,12 @@ export default class PollingProcessor implements subsystem.LDStreamProcessor {
reportJsonError(res);
}
} catch (err) {
// If the processor has been stopped, we discard this error.
// The original caller would consider this connection no longer active.
if (this._stopped) {
reportClosed(this._logger);
return;
}
const requestError = err as LDRequestError;
if (requestError.status !== undefined) {
if (!isHttpRecoverable(requestError.status)) {
Expand Down
29 changes: 29 additions & 0 deletions packages/shared/sdk-client/src/streaming/StreamingProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,21 @@ const reportJsonError = (
);
};

function reportEventClosed(eventName: string, logger?: LDLogger) {
logger?.debug(`Received ${eventName} event after processor was closed. Skipping processing.`);
}

function reportPingClosed(logger?: LDLogger) {
logger?.debug('Ping completed after processor was closed. Skipping processing.');
}

class StreamingProcessor implements subsystem.LDStreamProcessor {
private readonly _headers: { [key: string]: string | string[] };
private readonly _streamUri: string;

private _eventSource?: EventSource;
private _connectionAttemptStartTime?: number;
private _stopped = false;

constructor(
private readonly _plainContextString: string,
Expand Down Expand Up @@ -157,6 +166,13 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {

this._listeners.forEach(({ deserializeData, processJson }, eventName) => {
eventSource.addEventListener(eventName, (event) => {
// If an event comes in after the processor has been stopped, we skip processing it.
// This event could be for a context which is no longer active.
if (this._stopped) {
reportEventClosed(eventName, this._logger);
return;
}

this._logger?.debug(`Received ${eventName} event`);

if (event?.data) {
Expand Down Expand Up @@ -186,6 +202,12 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
try {
const res = await this._pollingRequestor.requestPayload();
try {
// If the ping completes after the processor has been stopped, then we discard it.
// This event could be for a context which is no longer active.
if (this._stopped) {
reportPingClosed(this._logger);
return;
}
const payload = JSON.parse(res);
try {
// forward the payload on to the PUT listener
Expand All @@ -204,6 +226,12 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
);
}
} catch (err) {
if (this._stopped) {
// If the ping errors after the processor has been stopped, then we discard it.
// The original caller would consider this connection no longer active.
reportPingClosed(this._logger);
return;
}
const requestError = err as LDRequestError;
this._errorHandler?.(
new LDPollingError(
Expand All @@ -219,6 +247,7 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
stop() {
this._eventSource?.close();
this._eventSource = undefined;
this._stopped = true;
}

close() {
Expand Down