Skip to content

Commit cd00cfb

Browse files
committed
feat: TransformStream implement
1 parent fa28d9d commit cd00cfb

File tree

5 files changed

+102
-58
lines changed

5 files changed

+102
-58
lines changed

packages/webrtc/client/app/modal.tsx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import {
1717
FILE_STATE,
1818
ID_SIZE,
1919
STEAM_TYPE,
20-
destructureChunk,
20+
deserializeChunk,
2121
getMaxMessageSize,
22-
getNextChunk,
22+
serializeNextChunk,
2323
sendChunkMessage,
2424
} from "../utils/binary";
2525
import { WorkerEvent } from "../worker/event";
@@ -96,7 +96,7 @@ export const TransferModal: FC<{
9696
const { id, series, total } = data;
9797
const progress = Math.floor((series / total) * 100);
9898
updateFileProgress(id, progress);
99-
const nextChunk = getNextChunk(rtc, id, series);
99+
const nextChunk = serializeNextChunk(rtc, id, series);
100100
// 通知 接收方 发送块数据
101101
sendChunkMessage(rtc, nextChunk);
102102
} else if (data.key === MESSAGE_TYPE.FILE_FINISH) {
@@ -108,7 +108,7 @@ export const TransferModal: FC<{
108108
} else {
109109
// Binary - 接收 发送方 ArrayBuffer 数据
110110
const blob = event.data;
111-
destructureChunk(blob).then(({ id, series, data }) => {
111+
deserializeChunk(blob).then(({ id, series, data }) => {
112112
const state = FILE_STATE.get(id);
113113
if (!state) return void 0;
114114
const { size, total } = state;
@@ -121,7 +121,7 @@ export const TransferModal: FC<{
121121
} else {
122122
// 数据块序列号 [0, TOTAL)
123123
if (stream) {
124-
WorkerEvent.post(blob);
124+
WorkerEvent.post(id, data);
125125
} else {
126126
// 在内存中存储块数据
127127
const mapper = FILE_MAPPER.get(id) || [];

packages/webrtc/client/utils/binary.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export const getMaxMessageSize = (
2323
return maxSize - (ID_SIZE + CHUNK_SIZE);
2424
};
2525

26-
export const getNextChunk = (
26+
export const serializeNextChunk = (
2727
instance: React.MutableRefObject<WebRTCApi | null>,
2828
id: string,
2929
series: number
@@ -75,7 +75,7 @@ export const sendChunkMessage = async (
7575
!isSending && start(rtc);
7676
};
7777

78-
export const destructureChunk = async (chunk: BufferType) => {
78+
export const deserializeChunk = async (chunk: BufferType) => {
7979
const buffer = chunk instanceof Blob ? await chunk.arrayBuffer() : chunk;
8080
const id = new Uint8Array(buffer.slice(0, ID_SIZE));
8181
const series = new Uint8Array(buffer.slice(ID_SIZE, ID_SIZE + CHUNK_SIZE));

packages/webrtc/client/worker/event.ts

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import type { BufferType } from "../../types/client";
2-
import { HEADER_KEY } from "../../types/worker";
1+
import type { MessageType } from "../../types/worker";
2+
import { HEADER_KEY, MESSAGE_TYPE } from "../../types/worker";
33

44
export class WorkerEvent {
5+
public static channel: MessageChannel | null = null;
56
public static worker: ServiceWorkerRegistration | null = null;
7+
public static writer: Map<string, WritableStreamDefaultWriter<Uint8Array>> = new Map();
68

79
public static async register(): Promise<ServiceWorkerRegistration> {
810
try {
@@ -28,6 +30,29 @@ export class WorkerEvent {
2830
}
2931

3032
public static start(fileId: string, fileName: string, fileSize: number, fileTotal: number) {
33+
if (!WorkerEvent.channel) {
34+
WorkerEvent.channel = new MessageChannel();
35+
WorkerEvent.channel.port1.onmessage = event => {
36+
console.log("WorkerEvent", event.data);
37+
};
38+
WorkerEvent.worker?.active?.postMessage({ type: MESSAGE_TYPE.INIT_CHANNEL }, [
39+
WorkerEvent.channel.port2,
40+
]);
41+
}
42+
const ts = new TransformStream({
43+
transform(chunk, controller) {
44+
controller.enqueue(chunk);
45+
},
46+
});
47+
WorkerEvent.channel.port1.postMessage(
48+
<MessageType>{
49+
key: MESSAGE_TYPE.TRANSFER_START,
50+
id: fileId,
51+
readable: ts.readable,
52+
},
53+
[ts.readable]
54+
);
55+
WorkerEvent.writer.set(fileId, ts.writable.getWriter());
3156
const newFileName = encodeURIComponent(fileName)
3257
.replace(/['()]/g, escape)
3358
.replace(/\*/g, "%2A");
@@ -44,13 +69,20 @@ export class WorkerEvent {
4469
document.body.appendChild(iframe);
4570
}
4671

47-
public static post(data: BufferType) {
48-
if (!WorkerEvent.worker) return;
49-
WorkerEvent.worker.active?.postMessage(data);
72+
public static post(fileId: string, data: ArrayBuffer) {
73+
const ts = WorkerEvent.writer.get(fileId);
74+
ts?.write(new Uint8Array(data));
5075
}
5176

5277
public static close(fileId: string) {
5378
const iframe = document.getElementById(fileId);
5479
iframe && iframe.remove();
80+
WorkerEvent.channel?.port1.postMessage(<MessageType>{
81+
key: MESSAGE_TYPE.TRANSFER_CLOSE,
82+
id: fileId,
83+
});
84+
const ts = WorkerEvent.writer.get(fileId);
85+
ts?.close();
86+
WorkerEvent.writer.delete(fileId);
5587
}
5688
}
Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
declare let self: ServiceWorkerGlobalScope;
2-
import type { BufferType } from "../../types/client";
3-
import { HEADER_KEY } from "../../types/worker";
4-
import { destructureChunk } from "../utils/binary";
2+
import type { MessageType } from "../../types/worker";
3+
import { HEADER_KEY, MESSAGE_TYPE } from "../../types/worker";
54

65
self.addEventListener("install", () => {
76
// 跳过等待 直接激活
@@ -15,24 +14,26 @@ self.addEventListener("activate", event => {
1514
console.log("Service Worker Activate");
1615
});
1716

18-
type StreamTuple = [ReadableStream, ReadableStreamDefaultController<BufferType>, number];
17+
type StreamTuple = [ReadableStream<Uint8Array>];
1918
const map = new Map<string, StreamTuple>();
2019

2120
self.onmessage = event => {
22-
const data = <BufferType>event.data;
23-
destructureChunk(data).then(({ id, series, data }) => {
24-
const stream = map.get(id);
25-
if (!stream) return void 0;
26-
const [, controller, size] = stream;
27-
// 需要处理 BackPressure 而 TransformStream 解决了问题
28-
// 不能直接写入 ArrayBuffer 必须要写入 TypedArray 类型
29-
controller.enqueue(new Uint8Array(data));
30-
// 数据块序列号 [0, TOTAL)
31-
if (series === size - 1) {
32-
controller.close();
21+
const port = event.ports[0];
22+
if (!port) return void 0;
23+
port.onmessage = event => {
24+
const payload = event.data as MessageType;
25+
if (!payload) return void 0;
26+
if (payload.key === MESSAGE_TYPE.TRANSFER_START) {
27+
// 直接使用 ReadableStream 需要处理 BackPressure 而 TransformStream 解决了问题
28+
// controller.enqueue 不能直接写入 ArrayBuffer 必须要写入 TypedArray 类型
29+
const { id, readable } = payload;
30+
map.set(id, [readable]);
31+
}
32+
if (payload.key === MESSAGE_TYPE.TRANSFER_CLOSE) {
33+
const { id } = payload;
3334
map.delete(id);
3435
}
35-
});
36+
};
3637
};
3738

3839
self.onfetch = event => {
@@ -42,35 +43,30 @@ self.onfetch = event => {
4243
const fileName = search.get(HEADER_KEY.FILE_NAME);
4344
const fileSize = search.get(HEADER_KEY.FILE_SIZE);
4445
const fileTotal = search.get(HEADER_KEY.FILE_TOTAL);
45-
if (fileId && fileName && fileSize && fileTotal) {
46-
const newFileName = decodeURIComponent(fileName);
47-
let controller: ReadableStreamDefaultController | null = null;
48-
const readable = new ReadableStream({
49-
start(ctr) {
50-
controller = ctr;
51-
},
52-
cancel(reason) {
53-
console.log("ReadableStream Aborted", reason);
54-
},
55-
});
56-
map.set(fileId, [readable, controller!, Number(fileTotal)]);
57-
const responseHeader = new Headers({
58-
[HEADER_KEY.FILE_ID]: fileId,
59-
[HEADER_KEY.FILE_SIZE]: fileSize,
60-
[HEADER_KEY.FILE_NAME]: newFileName,
61-
"Content-Type": "application/octet-stream; charset=utf-8",
62-
"Content-Security-Policy": "default-src 'none'",
63-
"X-Content-Security-Policy": "default-src 'none'",
64-
"X-WebKit-CSP": "default-src 'none'",
65-
"X-XSS-Protection": "1; mode=block",
66-
"Cross-Origin-Embedder-Policy": "require-corp",
67-
"Content-Disposition": "attachment; filename*=UTF-8''" + newFileName,
68-
"Content-Length": fileSize,
69-
});
70-
const response = new Response(readable, {
71-
headers: responseHeader,
72-
});
73-
return event.respondWith(response);
46+
if (!fileId || !fileName || !fileSize || !fileTotal) {
47+
return fetch(event.request);
48+
}
49+
const transfer = map.get(fileId);
50+
if (!transfer) {
51+
return null;
7452
}
75-
return fetch(event.request);
53+
const [readable] = transfer;
54+
const newFileName = decodeURIComponent(fileName);
55+
const responseHeader = new Headers({
56+
[HEADER_KEY.FILE_ID]: fileId,
57+
[HEADER_KEY.FILE_SIZE]: fileSize,
58+
[HEADER_KEY.FILE_NAME]: newFileName,
59+
"Content-Type": "application/octet-stream; charset=utf-8",
60+
"Content-Security-Policy": "default-src 'none'",
61+
"X-Content-Security-Policy": "default-src 'none'",
62+
"X-WebKit-CSP": "default-src 'none'",
63+
"X-XSS-Protection": "1; mode=block",
64+
"Cross-Origin-Embedder-Policy": "require-corp",
65+
"Content-Disposition": "attachment; filename*=UTF-8''" + newFileName,
66+
"Content-Length": fileSize,
67+
});
68+
const response = new Response(readable, {
69+
headers: responseHeader,
70+
});
71+
return event.respondWith(response);
7672
};

packages/webrtc/types/worker.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,22 @@
1+
import type { Spread } from "./client";
2+
13
export const HEADER_KEY = {
24
FILE_ID: "X-File-Id",
35
FILE_NAME: "X-File-Name",
46
FILE_SIZE: "X-File-Size",
57
FILE_TOTAL: "X-File-Total",
68
};
9+
10+
export const MESSAGE_TYPE = {
11+
INIT_CHANNEL: "INIT_CHANNEL",
12+
TRANSFER_START: "TRANSFER_START",
13+
TRANSFER_CLOSE: "TRANSFER_CLOSE",
14+
} as const;
15+
16+
export type MessageTypeMap = {
17+
[MESSAGE_TYPE.INIT_CHANNEL]: Record<string, never>;
18+
[MESSAGE_TYPE.TRANSFER_START]: { id: string; readable: ReadableStream<Uint8Array> };
19+
[MESSAGE_TYPE.TRANSFER_CLOSE]: { id: string };
20+
};
21+
22+
export type MessageType = Spread<MessageTypeMap>;

0 commit comments

Comments
 (0)