Skip to content

Commit b42341d

Browse files
authored
[Flight] Use cacheController instead of abortListeners for Streams (#33633)
Now that we have `cacheSignal()` we can just use that instead of the `abortListeners` concept which was really just the same thing for cancelling the streams (ReadableStream, Blob, AsyncIterable).
1 parent 7a3ffef commit b42341d

File tree

1 file changed

+59
-93
lines changed

1 file changed

+59
-93
lines changed

packages/react-server/src/ReactFlightServer.js

Lines changed: 59 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,6 @@ export type Request = {
433433
nextChunkId: number,
434434
pendingChunks: number,
435435
hints: Hints,
436-
abortListeners: Set<(reason: mixed) => void>,
437436
abortableTasks: Set<Task>,
438437
pingedTasks: Array<Task>,
439438
completedImportChunks: Array<Chunk>,
@@ -547,7 +546,6 @@ function RequestInstance(
547546
this.nextChunkId = 0;
548547
this.pendingChunks = 0;
549548
this.hints = hints;
550-
this.abortListeners = new Set();
551549
this.abortableTasks = abortSet;
552550
this.pingedTasks = pingedTasks;
553551
this.completedImportChunks = ([]: Array<Chunk>);
@@ -839,13 +837,11 @@ function serializeThenable(
839837
if (request.status === ABORTING) {
840838
// We can no longer accept any resolved values
841839
request.abortableTasks.delete(newTask);
842-
newTask.status = ABORTED;
843840
if (enableHalt && request.type === PRERENDER) {
844-
request.pendingChunks--;
841+
haltTask(newTask, request);
845842
} else {
846843
const errorId: number = (request.fatalError: any);
847-
const model = stringify(serializeByValueID(errorId));
848-
emitModelChunk(request, newTask.id, model);
844+
abortTask(newTask, request, errorId);
849845
}
850846
return newTask.id;
851847
}
@@ -936,29 +932,26 @@ function serializeReadableStream(
936932
__DEV__ ? task.debugStack : null,
937933
__DEV__ ? task.debugTask : null,
938934
);
939-
request.abortableTasks.delete(streamTask);
940-
941-
request.pendingChunks++; // The task represents the Start row. This adds a Stop row.
942935

936+
// The task represents the Stop row. This adds a Start row.
937+
request.pendingChunks++;
943938
const startStreamRow =
944939
streamTask.id.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n';
945940
request.completedRegularChunks.push(stringToChunk(startStreamRow));
946941

947-
// There's a race condition between when the stream is aborted and when the promise
948-
// resolves so we track whether we already aborted it to avoid writing twice.
949-
let aborted = false;
950942
function progress(entry: {done: boolean, value: ReactClientValue, ...}) {
951-
if (aborted) {
943+
if (streamTask.status !== PENDING) {
952944
return;
953945
}
954946

955947
if (entry.done) {
948+
streamTask.status = COMPLETED;
956949
const endStreamRow = streamTask.id.toString(16) + ':C\n';
957950
request.completedRegularChunks.push(stringToChunk(endStreamRow));
951+
request.abortableTasks.delete(streamTask);
952+
request.cacheController.signal.removeEventListener('abort', abortStream);
958953
enqueueFlush(request);
959-
request.abortListeners.delete(abortStream);
960954
callOnAllReadyIfReady(request);
961-
aborted = true;
962955
} else {
963956
try {
964957
streamTask.model = entry.value;
@@ -972,34 +965,36 @@ function serializeReadableStream(
972965
}
973966
}
974967
function error(reason: mixed) {
975-
if (aborted) {
968+
if (streamTask.status !== PENDING) {
976969
return;
977970
}
978-
aborted = true;
979-
request.abortListeners.delete(abortStream);
971+
request.cacheController.signal.removeEventListener('abort', abortStream);
980972
erroredTask(request, streamTask, reason);
981973
enqueueFlush(request);
982974

983975
// $FlowFixMe should be able to pass mixed
984976
reader.cancel(reason).then(error, error);
985977
}
986-
function abortStream(reason: mixed) {
987-
if (aborted) {
978+
function abortStream() {
979+
if (streamTask.status !== PENDING) {
988980
return;
989981
}
990-
aborted = true;
991-
request.abortListeners.delete(abortStream);
982+
const signal = request.cacheController.signal;
983+
signal.removeEventListener('abort', abortStream);
984+
const reason = signal.reason;
992985
if (enableHalt && request.type === PRERENDER) {
993-
request.pendingChunks--;
986+
haltTask(streamTask, request);
987+
request.abortableTasks.delete(streamTask);
994988
} else {
989+
// TODO: Make this use abortTask() instead.
995990
erroredTask(request, streamTask, reason);
996991
enqueueFlush(request);
997992
}
998993
// $FlowFixMe should be able to pass mixed
999994
reader.cancel(reason).then(error, error);
1000995
}
1001996

1002-
request.abortListeners.add(abortStream);
997+
request.cacheController.signal.addEventListener('abort', abortStream);
1003998
reader.read().then(progress, error);
1004999
return serializeByValueID(streamTask.id);
10051000
}
@@ -1028,10 +1023,9 @@ function serializeAsyncIterable(
10281023
__DEV__ ? task.debugStack : null,
10291024
__DEV__ ? task.debugTask : null,
10301025
);
1031-
request.abortableTasks.delete(streamTask);
1032-
1033-
request.pendingChunks++; // The task represents the Start row. This adds a Stop row.
10341026

1027+
// The task represents the Stop row. This adds a Start row.
1028+
request.pendingChunks++;
10351029
const startStreamRow =
10361030
streamTask.id.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n';
10371031
request.completedRegularChunks.push(stringToChunk(startStreamRow));
@@ -1043,19 +1037,17 @@ function serializeAsyncIterable(
10431037
}
10441038
}
10451039

1046-
// There's a race condition between when the stream is aborted and when the promise
1047-
// resolves so we track whether we already aborted it to avoid writing twice.
1048-
let aborted = false;
10491040
function progress(
10501041
entry:
10511042
| {done: false, +value: ReactClientValue, ...}
10521043
| {done: true, +value: ReactClientValue, ...},
10531044
) {
1054-
if (aborted) {
1045+
if (streamTask.status !== PENDING) {
10551046
return;
10561047
}
10571048

10581049
if (entry.done) {
1050+
streamTask.status = COMPLETED;
10591051
let endStreamRow;
10601052
if (entry.value === undefined) {
10611053
endStreamRow = streamTask.id.toString(16) + ':C\n';
@@ -1075,10 +1067,13 @@ function serializeAsyncIterable(
10751067
}
10761068
}
10771069
request.completedRegularChunks.push(stringToChunk(endStreamRow));
1070+
request.abortableTasks.delete(streamTask);
1071+
request.cacheController.signal.removeEventListener(
1072+
'abort',
1073+
abortIterable,
1074+
);
10781075
enqueueFlush(request);
1079-
request.abortListeners.delete(abortIterable);
10801076
callOnAllReadyIfReady(request);
1081-
aborted = true;
10821077
} else {
10831078
try {
10841079
streamTask.model = entry.value;
@@ -1097,11 +1092,10 @@ function serializeAsyncIterable(
10971092
}
10981093
}
10991094
function error(reason: mixed) {
1100-
if (aborted) {
1095+
if (streamTask.status !== PENDING) {
11011096
return;
11021097
}
1103-
aborted = true;
1104-
request.abortListeners.delete(abortIterable);
1098+
request.cacheController.signal.removeEventListener('abort', abortIterable);
11051099
erroredTask(request, streamTask, reason);
11061100
enqueueFlush(request);
11071101
if (typeof (iterator: any).throw === 'function') {
@@ -1110,16 +1104,19 @@ function serializeAsyncIterable(
11101104
iterator.throw(reason).then(error, error);
11111105
}
11121106
}
1113-
function abortIterable(reason: mixed) {
1114-
if (aborted) {
1107+
function abortIterable() {
1108+
if (streamTask.status !== PENDING) {
11151109
return;
11161110
}
1117-
aborted = true;
1118-
request.abortListeners.delete(abortIterable);
1111+
const signal = request.cacheController.signal;
1112+
signal.removeEventListener('abort', abortIterable);
1113+
const reason = signal.reason;
11191114
if (enableHalt && request.type === PRERENDER) {
1120-
request.pendingChunks--;
1115+
haltTask(streamTask, request);
1116+
request.abortableTasks.delete(streamTask);
11211117
} else {
1122-
erroredTask(request, streamTask, reason);
1118+
// TODO: Make this use abortTask() instead.
1119+
erroredTask(request, streamTask, signal.reason);
11231120
enqueueFlush(request);
11241121
}
11251122
if (typeof (iterator: any).throw === 'function') {
@@ -1128,7 +1125,7 @@ function serializeAsyncIterable(
11281125
iterator.throw(reason).then(error, error);
11291126
}
11301127
}
1131-
request.abortListeners.add(abortIterable);
1128+
request.cacheController.signal.addEventListener('abort', abortIterable);
11321129
if (__DEV__) {
11331130
callIteratorInDEV(iterator, progress, error);
11341131
} else {
@@ -2675,16 +2672,14 @@ function serializeBlob(request: Request, blob: Blob): string {
26752672

26762673
const reader = blob.stream().getReader();
26772674

2678-
let aborted = false;
26792675
function progress(
26802676
entry: {done: false, value: Uint8Array} | {done: true, value: void},
26812677
): Promise<void> | void {
2682-
if (aborted) {
2678+
if (newTask.status !== PENDING) {
26832679
return;
26842680
}
26852681
if (entry.done) {
2686-
request.abortListeners.delete(abortBlob);
2687-
aborted = true;
2682+
request.cacheController.signal.removeEventListener('abort', abortBlob);
26882683
pingTask(request, newTask);
26892684
return;
26902685
}
@@ -2694,33 +2689,34 @@ function serializeBlob(request: Request, blob: Blob): string {
26942689
return reader.read().then(progress).catch(error);
26952690
}
26962691
function error(reason: mixed) {
2697-
if (aborted) {
2692+
if (newTask.status !== PENDING) {
26982693
return;
26992694
}
2700-
aborted = true;
2701-
request.abortListeners.delete(abortBlob);
2695+
request.cacheController.signal.removeEventListener('abort', abortBlob);
27022696
erroredTask(request, newTask, reason);
27032697
enqueueFlush(request);
27042698
// $FlowFixMe should be able to pass mixed
27052699
reader.cancel(reason).then(error, error);
27062700
}
2707-
function abortBlob(reason: mixed) {
2708-
if (aborted) {
2701+
function abortBlob() {
2702+
if (newTask.status !== PENDING) {
27092703
return;
27102704
}
2711-
aborted = true;
2712-
request.abortListeners.delete(abortBlob);
2705+
const signal = request.cacheController.signal;
2706+
signal.removeEventListener('abort', abortBlob);
2707+
const reason = signal.reason;
27132708
if (enableHalt && request.type === PRERENDER) {
2714-
request.pendingChunks--;
2709+
haltTask(newTask, request);
27152710
} else {
2711+
// TODO: Make this use abortTask() instead.
27162712
erroredTask(request, newTask, reason);
27172713
enqueueFlush(request);
27182714
}
27192715
// $FlowFixMe should be able to pass mixed
27202716
reader.cancel(reason).then(error, error);
27212717
}
27222718

2723-
request.abortListeners.add(abortBlob);
2719+
request.cacheController.signal.addEventListener('abort', abortBlob);
27242720

27252721
// $FlowFixMe[incompatible-call]
27262722
reader.read().then(progress).catch(error);
@@ -5005,16 +5001,15 @@ function retryTask(request: Request, task: Task): void {
50055001
} catch (thrownValue) {
50065002
if (request.status === ABORTING) {
50075003
request.abortableTasks.delete(task);
5008-
task.status = ABORTED;
5004+
task.status = PENDING;
50095005
if (enableHalt && request.type === PRERENDER) {
50105006
// When aborting a prerener with halt semantics we don't emit
50115007
// anything into the slot for a task that aborts, it remains unresolved
5012-
request.pendingChunks--;
5008+
haltTask(task, request);
50135009
} else {
50145010
// Otherwise we emit an error chunk into the task slot.
50155011
const errorId: number = (request.fatalError: any);
5016-
const model = stringify(serializeByValueID(errorId));
5017-
emitModelChunk(request, task.id, model);
5012+
abortTask(task, request, errorId);
50185013
}
50195014
return;
50205015
}
@@ -5257,8 +5252,9 @@ function enqueueFlush(request: Request): void {
52575252
}
52585253

52595254
function callOnAllReadyIfReady(request: Request): void {
5260-
if (request.abortableTasks.size === 0 && request.abortListeners.size === 0) {
5261-
request.onAllReady();
5255+
if (request.abortableTasks.size === 0) {
5256+
const onAllReady = request.onAllReady;
5257+
onAllReady();
52625258
}
52635259
}
52645260

@@ -5294,6 +5290,7 @@ export function abort(request: Request, reason: mixed): void {
52945290
if (request.status <= OPEN) {
52955291
request.status = ABORTING;
52965292
request.cacheController.abort(reason);
5293+
callOnAllReadyIfReady(request);
52975294
}
52985295
const abortableTasks = request.abortableTasks;
52995296
if (abortableTasks.size > 0) {
@@ -5345,37 +5342,6 @@ export function abort(request: Request, reason: mixed): void {
53455342
callOnAllReadyIfReady(request);
53465343
}
53475344
}
5348-
const abortListeners = request.abortListeners;
5349-
if (abortListeners.size > 0) {
5350-
let error;
5351-
if (
5352-
enablePostpone &&
5353-
typeof reason === 'object' &&
5354-
reason !== null &&
5355-
(reason: any).$$typeof === REACT_POSTPONE_TYPE
5356-
) {
5357-
// We aborted with a Postpone but since we're passing this to an
5358-
// external handler, passing this object would leak it outside React.
5359-
// We create an alternative reason for it instead.
5360-
error = new Error('The render was aborted due to being postponed.');
5361-
} else {
5362-
error =
5363-
reason === undefined
5364-
? new Error(
5365-
'The render was aborted by the server without a reason.',
5366-
)
5367-
: typeof reason === 'object' &&
5368-
reason !== null &&
5369-
typeof reason.then === 'function'
5370-
? new Error(
5371-
'The render was aborted by the server with a promise.',
5372-
)
5373-
: reason;
5374-
}
5375-
abortListeners.forEach(callback => callback(error));
5376-
abortListeners.clear();
5377-
callOnAllReadyIfReady(request);
5378-
}
53795345
if (request.destination !== null) {
53805346
flushCompletedChunks(request, request.destination);
53815347
}

0 commit comments

Comments
 (0)