Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compress replay data #1436

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 211 additions & 2 deletions src/__tests__/extensions/replay/sessionrecording.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,60 @@ const createIncrementalSnapshot = (event = {}): incrementalSnapshotEvent => ({
...event,
})

const createCustomSnapshot = (event = {}): customEvent => ({
const createIncrementalMouseEvent = () => {
return createIncrementalSnapshot({
data: {
source: 2,
positions: [
{
id: 1,
x: 100,
y: 200,
timeOffset: 100,
},
],
},
})
}

const createIncrementalMutationEvent = () => {
const mutationData = {
texts: [],
attributes: [],
removes: [],
adds: [],
isAttachIframe: true,
}
return createIncrementalSnapshot({
data: {
source: 0,
...mutationData,
},
})
}

const createIncrementalStyleSheetEvent = () => {
return createIncrementalSnapshot({
data: {
// doesn't need to be a valid style sheet event
source: 8,
id: 1,
styleId: 1,
removes: [],
adds: [],
replace: 'something',
replaceSync: 'something',
},
})
}

const createCustomSnapshot = (event = {}, payload = {}): customEvent => ({
type: EventType.Custom,
data: {
tag: 'custom',
payload: {},
payload: {
...payload,
},
},
...event,
})
Expand Down Expand Up @@ -1937,4 +1986,164 @@ describe('SessionRecording', () => {
expect((sessionRecording as any)['_tryAddCustomEvent']).not.toHaveBeenCalled()
})
})

describe('when compression is active', () => {
const captureOptions = {
_batchKey: 'recordings',
_noTruncate: true,
_url: 'https://test.com/s/',
skip_client_rate_limiting: true,
}

beforeEach(() => {
posthog.config.session_recording.compress_events = true
sessionRecording.afterDecideResponse(makeDecideResponse({ sessionRecording: { endpoint: '/s/' } }))
sessionRecording.startIfEnabledOrStop()
})

it('compresses full snapshot data', () => {
_emit(createFullSnapshot())
sessionRecording['_flushBuffer']()

expect(posthog.capture).toHaveBeenCalledWith(
'$snapshot',
{
$snapshot_data: [
{
data: expect.any(String),
cv: '2024-10',
type: 2,
},
],
$session_id: sessionId,
$snapshot_bytes: expect.any(Number),
$window_id: 'windowId',
},
captureOptions
)
})

it('compresses incremental snapshot mutation data', () => {
_emit(createIncrementalMutationEvent())
sessionRecording['_flushBuffer']()

expect(posthog.capture).toHaveBeenCalledWith(
'$snapshot',
{
$snapshot_data: [
{
cv: '2024-10',
data: {
adds: expect.any(String),
texts: expect.any(String),
removes: expect.any(String),
attributes: expect.any(String),
isAttachIframe: true,
source: 0,
},
type: 3,
},
],
$session_id: sessionId,
$snapshot_bytes: expect.any(Number),
$window_id: 'windowId',
},
captureOptions
)
})

it('compresses incremental snapshot style data', () => {
_emit(createIncrementalStyleSheetEvent())
sessionRecording['_flushBuffer']()

expect(posthog.capture).toHaveBeenCalledWith(
'$snapshot',
{
$snapshot_data: [
{
data: {
adds: expect.any(String),
id: 1,
removes: expect.any(String),
replace: 'something',
replaceSync: 'something',
source: 8,
styleId: 1,
},
cv: '2024-10',
type: 3,
},
],
$session_id: sessionId,
$snapshot_bytes: expect.any(Number),
$window_id: 'windowId',
},
captureOptions
)
})

it('does not compress incremental snapshot non full data', () => {
const mouseEvent = createIncrementalMouseEvent()
_emit(mouseEvent)
sessionRecording['_flushBuffer']()

expect(posthog.capture).toHaveBeenCalledWith(
'$snapshot',
{
$snapshot_data: [mouseEvent],
$session_id: sessionId,
$snapshot_bytes: 86,
$window_id: 'windowId',
},
captureOptions
)
})

it('does not compress custom events', () => {
_emit(createCustomSnapshot(undefined, { tag: 'wat' }))
sessionRecording['_flushBuffer']()

expect(posthog.capture).toHaveBeenCalledWith(
'$snapshot',
{
$snapshot_data: [
{
data: {
payload: { tag: 'wat' },
tag: 'custom',
},
type: 5,
},
],
$session_id: sessionId,
$snapshot_bytes: 58,
$window_id: 'windowId',
},
captureOptions
)
})

it('does not compress meta events', () => {
_emit(createMetaSnapshot())
sessionRecording['_flushBuffer']()

expect(posthog.capture).toHaveBeenCalledWith(
'$snapshot',
{
$snapshot_data: [
{
type: META_EVENT_TYPE,
data: {
href: 'https://has-to-be-present-or-invalid.com',
},
},
],
$session_id: sessionId,
$snapshot_bytes: 69,
$window_id: 'windowId',
},
captureOptions
)
})
})
})
99 changes: 96 additions & 3 deletions src/extensions/replay/sessionrecording.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { assignableWindow, document, window } from '../../utils/globals'
import { buildNetworkRequestOptions } from './config'
import { isLocalhost } from '../../utils/request-utils'
import { MutationRateLimiter } from './mutation-rate-limiter'
import { gzipSync, strFromU8, strToU8 } from 'fflate'

const BASE_ENDPOINT = '/s/'

Expand Down Expand Up @@ -88,6 +89,95 @@ const newQueuedEvent = (rrwebMethod: () => void): QueuedRRWebEvent => ({

const LOGGER_PREFIX = '[SessionRecording]'

type compressedFullSnapshotEvent = {
type: EventType.FullSnapshot
data: string
}

type compressedIncrementalSnapshotEvent = {
type: EventType.IncrementalSnapshot
data: {
source: IncrementalSource
texts: string
attributes: string
removes: string
adds: string
}
}

type compressedIncrementalStyleSnapshotEvent = {
type: EventType.IncrementalSnapshot
data: {
source: IncrementalSource.StyleSheetRule
id?: number
styleId?: number
replace?: string
replaceSync?: string
adds: string
removes: string
}
}

export type compressedEvent =
| compressedIncrementalStyleSnapshotEvent
| compressedFullSnapshotEvent
| compressedIncrementalSnapshotEvent
export type compressedEventWithTime = compressedEvent & {
timestamp: number
delay?: number
// marker for compression version
cv: '2024-10'
}

function gzipToString(data: unknown): string {
return strFromU8(gzipSync(strToU8(JSON.stringify(data))), true)
}
Comment on lines +132 to +134
Copy link
Contributor

@richard-better richard-better Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered streaming, for at least the full snapshots? This seems memory intensive to do the Object->String->Array->Gzip Array->String transformation synchronously

(Though I imagine it would need a bigger refactor to the network code)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming in the browser? 🤔 certainly not a thing I've ever heard of?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's new-ish (at least for sending, reading the response as a stream has been available for a while): https://developer.chrome.com/docs/capabilities/web-apis/fetch-streaming-requests#using_with_writable_streams

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd not... that's interesting.
certainly we've talked about having a fast path for full snapshots at least since if all we capture are full snapshots we can play something back


// rrweb's packer takes an event and returns a string or the reverse on unpact,
// but we want to be able to inspect metadata during ingestion, and don't want to compress the entire event
// so we have a custom packer that only compresses part of some events
function compressEvent(event: eventWithTime, ph: PostHog): eventWithTime | compressedEventWithTime {
try {
if (event.type === EventType.FullSnapshot) {
return {
...event,
data: gzipToString(event.data),
cv: '2024-10',
}
}
if (event.type === EventType.IncrementalSnapshot && event.data.source === IncrementalSource.Mutation) {
return {
...event,
cv: '2024-10',
data: {
...event.data,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what gains this really gives.
Why not pull out from the data any counters you need and then just compress the entire object? In theory it would never need to be decompressed and can then be written straight to the S3 file for example and then decompressed at read time.
Or at least it would be a simple single decompression of the whole payload rather than multiple ones?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so... we know that when compressed the data fits into kafka but when uncompressed it doesn't always (and we reject it unnecessarily)

when I looked at pulling the metadata out (and it's fair I don't have this anywhere but in my head)

i at least had to change the SDK, django capture, blobby and maybe playback depending on how it was implemented

and we'd still have multiple compress/decompress steps since we're doing that at network boundaries anyway

this way i can test the impact with a relatively small change

Copy link
Member Author

@pauldambra pauldambra Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what gains this really gives.

it should mean that the data that is currently not making into kafka from rust or django capture does make it in
without having to change anything else
so i can probe if that's (the last|a) piece of the puzzle for playback issues

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#BiasForImpact :p

texts: gzipToString(event.data.texts),
attributes: gzipToString(event.data.attributes),
removes: gzipToString(event.data.removes),
adds: gzipToString(event.data.adds),
},
}
}
if (event.type === EventType.IncrementalSnapshot && event.data.source === IncrementalSource.StyleSheetRule) {
return {
...event,
cv: '2024-10',
data: {
...event.data,
adds: gzipToString(event.data.adds),
removes: gzipToString(event.data.removes),
},
}
}
} catch (e: unknown) {
logger.error(LOGGER_PREFIX + ' could not compress event', e)
ph.captureException((e as Error) || 'e was not an error', {
attempted_event_type: event?.type || 'no event type',
})
}
return event
}

export class SessionRecording {
private _endpoint: string
private flushBufferTimer?: any
Expand Down Expand Up @@ -795,11 +885,10 @@ export class SessionRecording {

// TODO: Re-add ensureMaxMessageSize once we are confident in it
const event = truncateLargeConsoleLogs(throttledEvent)
const size = estimateSize(event)

this._updateWindowAndSessionIds(event)

// When in an idle state we keep recording, but don't capture the events
// When in an idle state we keep recording, but don't capture the events,
// but we allow custom events even when idle
if (this.isIdle && event.type !== EventType.Custom) {
return
Expand All @@ -817,9 +906,13 @@ export class SessionRecording {
}
}

const eventToSend = this.instance.config.session_recording.compress_events
? compressEvent(event, this.instance)
: event
const size = estimateSize(eventToSend)
const properties = {
$snapshot_bytes: size,
$snapshot_data: event,
$snapshot_data: eventToSend,
$session_id: this.sessionId,
$window_id: this.windowId,
}
Expand Down
2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ export interface SessionRecordingOptions {
recordBody?: boolean
// ADVANCED: while a user is active we take a full snapshot of the browser every interval. For very few sites playback performance might be better with different interval. Set to 0 to disable
full_snapshot_interval_millis?: number
// PREVIEW: whether to compress part of the events before sending them to the server, this is a preview feature and may change without notice
compress_events?: boolean
}

export type SessionIdChangedCallback = (
Expand Down
Loading