Skip to content

Commit 1018103

Browse files
committed
feat(replay): Keep min. 30s of data in buffer & worker mode
1 parent 4800458 commit 1018103

12 files changed

+291
-90
lines changed

packages/replay-worker/src/Compressor.ts

+9-16
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { constants, Deflate, deflate } from 'pako';
1+
import { constants, Deflate } from 'pako';
22

33
/**
44
* A stateful compressor that can be used to batch compress events.
@@ -7,7 +7,7 @@ export class Compressor {
77
/**
88
* pako deflator instance
99
*/
10-
public deflate: Deflate;
10+
private _deflate: Deflate;
1111

1212
/**
1313
* If any events have been added.
@@ -38,7 +38,7 @@ export class Compressor {
3838
// TODO: We may want Z_SYNC_FLUSH or Z_FULL_FLUSH (not sure the difference)
3939
// Using NO_FLUSH here for now as we can create many attachments that our
4040
// web UI will get API rate limited.
41-
this.deflate.push(prefix + data, constants.Z_SYNC_FLUSH);
41+
this._deflate.push(prefix + data, constants.Z_SYNC_FLUSH);
4242

4343
this._hasEvents = true;
4444
}
@@ -48,15 +48,15 @@ export class Compressor {
4848
*/
4949
public finish(): Uint8Array {
5050
// We should always have a list, it can be empty
51-
this.deflate.push(']', constants.Z_FINISH);
51+
this._deflate.push(']', constants.Z_FINISH);
5252

53-
if (this.deflate.err) {
54-
throw this.deflate.err;
53+
if (this._deflate.err) {
54+
throw this._deflate.err;
5555
}
5656

5757
// Copy result before we create a new deflator and return the compressed
5858
// result
59-
const result = this.deflate.result;
59+
const result = this._deflate.result;
6060

6161
this._init();
6262

@@ -68,16 +68,9 @@ export class Compressor {
6868
*/
6969
private _init(): void {
7070
this._hasEvents = false;
71-
this.deflate = new Deflate();
71+
this._deflate = new Deflate();
7272

7373
// Fake an array by adding a `[`
74-
this.deflate.push('[', constants.Z_NO_FLUSH);
74+
this._deflate.push('[', constants.Z_NO_FLUSH);
7575
}
7676
}
77-
78-
/**
79-
* Compress a string.
80-
*/
81-
export function compress(data: string): Uint8Array {
82-
return deflate(data);
83-
}

packages/replay-worker/src/handleMessage.ts

+72-31
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,101 @@
1-
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
2-
import { compress, Compressor } from './Compressor';
3-
4-
const compressor = new Compressor();
1+
import { Compressor } from './Compressor';
52

63
interface Handlers {
7-
clear: () => void;
4+
clear: (mode?: string) => void;
85
addEvent: (data: string) => void;
96
finish: () => Uint8Array;
10-
compress: (data: string) => Uint8Array;
117
}
128

13-
const handlers: Handlers = {
14-
clear: () => {
15-
compressor.clear();
16-
},
9+
class CompressionHandler implements Handlers {
10+
private _compressor: Compressor;
11+
private _bufferCompressor?: Compressor;
12+
13+
public constructor() {
14+
this._compressor = new Compressor();
15+
}
16+
17+
public clear(mode?: string): void {
18+
/*
19+
In buffer mode, we want to make sure to always keep the last round of events around.
20+
So when the time comes and we finish the buffer, we can ensure that we have at least one set of events.
21+
Without this change, it can happen that you finish right after the last checkout (=clear),
22+
and thus have no (or very few) events buffered.
23+
24+
Now, in buffer mode, we basically have to compressors, which are swapped and reset on clear:
25+
* On first `clear` in buffer mode, we initialize the buffer compressor.
26+
The regular compressor keeps running as the "current" one
27+
* On consequitive `clear` calls, we swap the buffer compressor in as the "current" one, and initialize a new buffer compressor
28+
This will clear any events that were buffered before the _last_ clear call.
29+
30+
This sadly means we need to keep the buffer twice in memory. But it's a tradeoff we have to make.
31+
*/
32+
if (mode === 'buffer') {
33+
// This means it is the first clear in buffer mode - just initialize a new compressor for the alternate compressor
34+
if (!this._bufferCompressor) {
35+
this._bufferCompressor = new Compressor();
36+
} else {
37+
// Else, swap the alternative compressor in as "normal" compressor, and initialize a new alterntive compressor
38+
this._compressor = this._bufferCompressor;
39+
this._bufferCompressor = new Compressor();
40+
}
41+
return;
42+
}
43+
44+
/*
45+
In non-buffer mode, we just clear the current compressor (and make sure an eventual buffer compressor is reset)
46+
*/
47+
this._bufferCompressor = undefined;
48+
49+
this._compressor.clear();
50+
}
1751

18-
addEvent: (data: string) => {
19-
return compressor.addEvent(data);
20-
},
52+
public addEvent(data: string): void {
53+
if (this._bufferCompressor) {
54+
this._bufferCompressor.addEvent(data);
55+
}
2156

22-
finish: () => {
23-
return compressor.finish();
24-
},
57+
return this._compressor.addEvent(data);
58+
}
2559

26-
compress: (data: string) => {
27-
return compress(data);
28-
},
29-
};
60+
public finish(): Uint8Array {
61+
if (this._bufferCompressor) {
62+
this._bufferCompressor.clear();
63+
this._bufferCompressor = undefined;
64+
}
65+
66+
return this._compressor.finish();
67+
}
68+
}
69+
70+
const handlers = new CompressionHandler();
3071

3172
/**
3273
* Handler for worker messages.
3374
*/
34-
export function handleMessage(e: MessageEvent): void {
35-
const method = e.data.method as string;
36-
const id = e.data.id as number;
37-
const data = e.data.arg as string;
75+
export function handleMessage(event: MessageEvent): void {
76+
const data = event.data as {
77+
method: keyof Handlers;
78+
id: number;
79+
arg: string;
80+
};
81+
82+
const { method, id, arg } = data;
3883

39-
// @ts-ignore this syntax is actually fine
40-
if (method in handlers && typeof handlers[method] === 'function') {
84+
if (typeof handlers[method] === 'function') {
4185
try {
42-
// @ts-ignore this syntax is actually fine
43-
const response = handlers[method](data);
44-
// @ts-ignore this syntax is actually fine
86+
const response = handlers[method](arg);
4587
postMessage({
4688
id,
4789
method,
4890
success: true,
4991
response,
5092
});
5193
} catch (err) {
52-
// @ts-ignore this syntax is actually fine
5394
postMessage({
5495
id,
5596
method,
5697
success: false,
57-
response: err.message,
98+
response: (err as Error).message,
5899
});
59100

60101
// eslint-disable-next-line no-console

packages/replay/jest.setup.ts

+41-3
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
22
import { getCurrentHub } from '@sentry/core';
33
import type { ReplayRecordingData, Transport } from '@sentry/types';
4-
import { TextEncoder } from 'util';
4+
import pako from 'pako';
5+
import { TextDecoder, TextEncoder } from 'util';
56

67
import type { ReplayContainer, Session } from './src/types';
78

89
// eslint-disable-next-line @typescript-eslint/no-explicit-any
910
(global as any).TextEncoder = TextEncoder;
11+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
12+
(global as any).TextDecoder = TextDecoder;
1013

1114
type MockTransport = jest.MockedFunction<Transport['send']>;
1215

@@ -71,6 +74,40 @@ type Call = [
7174
];
7275
type CheckCallForSentReplayResult = { pass: boolean; call: Call | undefined; results: Result[] };
7376

77+
function parseRecordingData(recordingPayload: undefined | string | Uint8Array): string {
78+
if (!recordingPayload) {
79+
return '';
80+
}
81+
82+
if (typeof recordingPayload === 'string') {
83+
return recordingPayload;
84+
}
85+
86+
if (recordingPayload instanceof Uint8Array) {
87+
// We look up the place where the zlib compression header(0x78 0x9c) starts
88+
// As the payload consists of two UInt8Arrays joined together, where the first part is a TextEncoder encoded string,
89+
// and the second part a pako-compressed one
90+
for (let i = 0; i < recordingPayload.length; i++) {
91+
if (recordingPayload[i] === 0x78 && recordingPayload[i + 1] === 0x9c) {
92+
try {
93+
// We found a zlib-compressed payload - let's decompress it
94+
const header = recordingPayload.slice(0, i);
95+
const payload = recordingPayload.slice(i);
96+
// now we return the decompressed payload as JSON
97+
const decompressedPayload = pako.inflate(payload, { to: 'string' });
98+
const decompressedHeader = new TextDecoder().decode(header);
99+
100+
return `${decompressedHeader}${decompressedPayload}`;
101+
} catch (error) {
102+
throw new Error(`Could not parse UInt8Array payload: ${error}`);
103+
}
104+
}
105+
}
106+
}
107+
108+
throw new Error(`Invalid recording payload: ${recordingPayload}`);
109+
}
110+
74111
function checkCallForSentReplay(
75112
call: Call | undefined,
76113
expected?: SentReplayExpected | { sample: SentReplayExpected; inverse: boolean },
@@ -79,8 +116,9 @@ function checkCallForSentReplay(
79116
const envelopeItems = call?.[1] || [[], []];
80117
const [[replayEventHeader, replayEventPayload], [recordingHeader, recordingPayload] = []] = envelopeItems;
81118

82-
// @ts-ignore recordingPayload is always a string in our tests
83-
const [recordingPayloadHeader, recordingData] = recordingPayload?.split('\n') || [];
119+
const recordingStr = parseRecordingData(recordingPayload as unknown as string | Uint8Array);
120+
121+
const [recordingPayloadHeader, recordingData] = recordingStr?.split('\n') || [];
84122

85123
const actualObj: Required<SentReplayExpected> = {
86124
// @ts-ignore Custom envelope

packages/replay/src/constants.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ export const DEFAULT_FLUSH_MIN_DELAY = 5_000;
2626
// was the same as `wait`
2727
export const DEFAULT_FLUSH_MAX_DELAY = 5_500;
2828

29-
/* How long to wait for error checkouts */
30-
export const BUFFER_CHECKOUT_TIME = 60_000;
29+
/* How long to wait for buffer checkouts. */
30+
export const BUFFER_CHECKOUT_TIME = 30_000;
3131

3232
export const RETRY_BASE_INTERVAL = 5000;
3333
export const RETRY_MAX_COUNT = 3;

packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts

+32-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ReplayRecordingData } from '@sentry/types';
1+
import type { ReplayRecordingData, ReplayRecordingMode } from '@sentry/types';
22

33
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
44
import { timestampToMs } from '../util/timestampToMs';
@@ -11,10 +11,12 @@ import { WorkerHandler } from './WorkerHandler';
1111
export class EventBufferCompressionWorker implements EventBuffer {
1212
private _worker: WorkerHandler;
1313
private _earliestTimestamp: number | null;
14+
private _bufferEarliestTimestamp: number | null;
1415

1516
public constructor(worker: Worker) {
1617
this._worker = new WorkerHandler(worker);
1718
this._earliestTimestamp = null;
19+
this._bufferEarliestTimestamp = null;
1820
}
1921

2022
/** @inheritdoc */
@@ -48,6 +50,15 @@ export class EventBufferCompressionWorker implements EventBuffer {
4850
this._earliestTimestamp = timestamp;
4951
}
5052

53+
/*
54+
We also update this in parallel, in case we need it.
55+
At this point we don't really know if this is a buffer recording,
56+
so just always keeping this is the safest solution.
57+
*/
58+
if (!this._bufferEarliestTimestamp || timestamp < this._bufferEarliestTimestamp) {
59+
this._bufferEarliestTimestamp = timestamp;
60+
}
61+
5162
return this._sendEventToWorker(event);
5263
}
5364

@@ -59,10 +70,26 @@ export class EventBufferCompressionWorker implements EventBuffer {
5970
}
6071

6172
/** @inheritdoc */
62-
public clear(): void {
63-
this._earliestTimestamp = null;
73+
public clear(recordingMode: ReplayRecordingMode): void {
74+
if (recordingMode === 'buffer') {
75+
/*
76+
In buffer mode, we want to make sure to always keep the last round of events around.
77+
So when the time comes and we finish the buffer, we can ensure that we have at least one set of events.
78+
Without this change, it can happen that you finish right after the last checkout (=clear),
79+
and thus have no (or very few) events buffered.
80+
81+
Because of this, we keep track of the previous earliest timestamp as well.
82+
When the next clear comes, we set the current earliest timestamp to the previous one.
83+
*/
84+
this._earliestTimestamp = this._bufferEarliestTimestamp;
85+
this._bufferEarliestTimestamp = null;
86+
} else {
87+
this._earliestTimestamp = null;
88+
this._bufferEarliestTimestamp = null;
89+
}
90+
6491
// We do not wait on this, as we assume the order of messages is consistent for the worker
65-
void this._worker.postMessage('clear');
92+
void this._worker.postMessage('clear', recordingMode);
6693
}
6794

6895
/** @inheritdoc */
@@ -84,6 +111,7 @@ export class EventBufferCompressionWorker implements EventBuffer {
84111
const response = await this._worker.postMessage<Uint8Array>('finish');
85112

86113
this._earliestTimestamp = null;
114+
this._bufferEarliestTimestamp = null;
87115

88116
return response;
89117
}

packages/replay/src/eventBuffer/EventBufferProxy.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ReplayRecordingData } from '@sentry/types';
1+
import type { ReplayRecordingData, ReplayRecordingMode } from '@sentry/types';
22
import { logger } from '@sentry/utils';
33

44
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
@@ -36,8 +36,8 @@ export class EventBufferProxy implements EventBuffer {
3636
}
3737

3838
/** @inheritdoc */
39-
public clear(): void {
40-
return this._used.clear();
39+
public clear(recordingMode: ReplayRecordingMode): void {
40+
return this._used.clear(recordingMode);
4141
}
4242

4343
/** @inheritdoc */

packages/replay/src/replay.ts

+3
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ export class ReplayContainer implements ReplayContainerInterface {
167167
// If neither sample rate is > 0, then do nothing - user will need to call one of
168168
// `start()` or `startBuffering` themselves.
169169
if (errorSampleRate <= 0 && sessionSampleRate <= 0) {
170+
__DEBUG_BUILD__ && logger.log('[Replay] No sample rate set, not initializing.');
170171
return;
171172
}
172173

@@ -194,6 +195,8 @@ export class ReplayContainer implements ReplayContainerInterface {
194195
this.recordingMode = 'buffer';
195196
}
196197

198+
__DEBUG_BUILD__ && logger.log(`[Replay] Session initialized in mode ${this.recordingMode}`);
199+
197200
this._initializeRecording();
198201
}
199202

packages/replay/src/types.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ export interface EventBuffer {
449449
/**
450450
* Clear the event buffer.
451451
*/
452-
clear(): void;
452+
clear(recordingMode: ReplayRecordingMode): void;
453453

454454
/**
455455
* Add an event to the event buffer.

0 commit comments

Comments
 (0)