Skip to content

Commit b5b932f

Browse files
committed
Implement cursor offsets rather than intervals
Instead of calculating an even interval based upon the length of the buffer and the batchtime, instead pass along movement offsets as part of the message, and replay these offsets as part of the dispensing. This ensures you get a more accurate representation of cursor movements. The first offset is also 0, which removes the slight delay that was present in the original implementation, since the calculated interval was always the same. Since we are immediately invoking the all the data in the buffer to create the timeouts, we can simplify the code to always just use whatever data is present in the buffer as the gate to whether we should call `emitFromBatch()`. This means we can remove the logic related to `handlerRunning`. Due to the way we are immediately processing the buffer, it means we cannot test in the same way we did previously, so tests have been amended to accomodate this.
1 parent dc8a420 commit b5b932f

File tree

4 files changed

+118
-135
lines changed

4 files changed

+118
-135
lines changed

src/CursorBatching.ts

+23-7
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import { CURSOR_UPDATE } from './CursorConstants.js';
44
import type { CursorUpdate } from './types.js';
55
import type { CursorsOptions } from './types.js';
66

7-
type OutgoingBuffer = Pick<CursorUpdate, 'position' | 'data'>[];
7+
type OutgoingBuffer = { cursor: Pick<CursorUpdate, 'position' | 'data'>; offset: number };
88

99
export default class CursorBatching {
10-
outgoingBuffers: OutgoingBuffer = [];
10+
outgoingBuffer: OutgoingBuffer[] = [];
1111

1212
batchTime: number;
1313

@@ -20,15 +20,31 @@ export default class CursorBatching {
2020
// Set to `true` if there is more than one user listening to cursors
2121
shouldSend: boolean = false;
2222

23+
// Used for tracking offsets in the buffer
24+
bufferStartTimestamp: number = 0;
25+
2326
constructor(readonly outboundBatchInterval: CursorsOptions['outboundBatchInterval']) {
2427
this.batchTime = outboundBatchInterval;
2528
}
2629

2730
pushCursorPosition(channel: Types.RealtimeChannelPromise, cursor: Pick<CursorUpdate, 'position' | 'data'>) {
2831
// Ignore the cursor update if there is no one listening
2932
if (!this.shouldSend) return;
33+
34+
const timestamp = new Date().getTime();
35+
36+
let offset: number;
37+
// First update in the buffer is always 0
38+
if (this.outgoingBuffer.length === 0) {
39+
offset = 0;
40+
this.bufferStartTimestamp = timestamp;
41+
} else {
42+
// Add the offset compared to the first update in the buffer
43+
offset = timestamp - this.bufferStartTimestamp;
44+
}
45+
3046
this.hasMovement = true;
31-
this.pushToBuffer(cursor);
47+
this.pushToBuffer({ cursor, offset });
3248
this.publishFromBuffer(channel, CURSOR_UPDATE);
3349
}
3450

@@ -40,8 +56,8 @@ export default class CursorBatching {
4056
this.batchTime = batchTime;
4157
}
4258

43-
private pushToBuffer(value: Pick<CursorUpdate, 'position' | 'data'>) {
44-
this.outgoingBuffers.push(value);
59+
private pushToBuffer(value: OutgoingBuffer) {
60+
this.outgoingBuffer.push(value);
4561
}
4662

4763
private async publishFromBuffer(channel: Types.RealtimeChannelPromise, eventName: string) {
@@ -57,10 +73,10 @@ export default class CursorBatching {
5773
return;
5874
}
5975
// Must be copied here to avoid a race condition where the buffer is cleared before the publish happens
60-
const bufferCopy = [...this.outgoingBuffers];
76+
const bufferCopy = [...this.outgoingBuffer];
6177
channel.publish(eventName, bufferCopy);
6278
setTimeout(() => this.batchToChannel(channel, eventName), this.batchTime);
63-
this.outgoingBuffers = [];
79+
this.outgoingBuffer = [];
6480
this.hasMovement = false;
6581
this.isRunning = true;
6682
}

src/CursorDispensing.ts

+28-53
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,26 @@
1-
import { clamp } from './utilities/math.js';
2-
31
import { type CursorUpdate } from './types.js';
42
import { type RealtimeMessage } from './utilities/types.js';
53

64
export default class CursorDispensing {
7-
private buffer: Record<string, CursorUpdate[]> = {};
8-
private handlerRunning: boolean = false;
9-
private timerIds: ReturnType<typeof setTimeout>[] = [];
10-
11-
constructor(private emitCursorUpdate: (update: CursorUpdate) => void, private getCurrentBatchTime: () => number) {}
12-
13-
emitFromBatch(batchDispenseInterval: number) {
14-
if (!this.bufferHaveData()) {
15-
this.handlerRunning = false;
16-
return;
17-
}
18-
19-
this.handlerRunning = true;
5+
private buffer: Record<string, { cursor: CursorUpdate; offset: number }[]> = {};
206

21-
const processBuffer = () => {
22-
for (let connectionId in this.buffer) {
23-
const buffer = this.buffer[connectionId];
24-
const update = buffer.shift();
7+
constructor(private emitCursorUpdate: (update: CursorUpdate) => void) {}
258

26-
if (!update) continue;
27-
this.emitCursorUpdate(update);
28-
}
9+
setEmitCursorUpdate(update: CursorUpdate) {
10+
this.emitCursorUpdate(update);
11+
}
2912

30-
if (this.bufferHaveData()) {
31-
this.emitFromBatch(this.calculateDispenseInterval());
32-
} else {
33-
this.handlerRunning = false;
34-
}
13+
emitFromBatch() {
14+
for (let connectionId in this.buffer) {
15+
const buffer = this.buffer[connectionId];
16+
const update = buffer.shift();
3517

36-
this.timerIds.shift();
37-
};
18+
if (!update) continue;
19+
setTimeout(() => this.setEmitCursorUpdate(update.cursor), update.offset);
20+
}
3821

39-
if (typeof document !== 'undefined' && document.visibilityState === 'hidden') {
40-
this.timerIds.forEach((id) => clearTimeout(id));
41-
this.timerIds = [];
42-
processBuffer();
43-
} else {
44-
this.timerIds.push(setTimeout(processBuffer, batchDispenseInterval));
22+
if (this.bufferHaveData()) {
23+
this.emitFromBatch();
4524
}
4625
}
4726

@@ -53,33 +32,29 @@ export default class CursorDispensing {
5332
);
5433
}
5534

56-
calculateDispenseInterval(): number {
57-
const bufferLengths = Object.entries(this.buffer).map(([, v]) => v.length);
58-
const highest = bufferLengths.sort()[bufferLengths.length - 1];
59-
const finalOutboundBatchInterval = this.getCurrentBatchTime();
60-
return Math.floor(clamp(finalOutboundBatchInterval / highest, 1, 1000 / 15));
61-
}
62-
6335
processBatch(message: RealtimeMessage) {
64-
const updates: CursorUpdate[] = message.data || [];
36+
const updates: { cursor: CursorUpdate; offset: number }[] = message.data || [];
6537

66-
updates.forEach((update) => {
38+
updates.forEach((update: { cursor: CursorUpdate; offset: number }) => {
6739
const enhancedMsg = {
68-
clientId: message.clientId,
69-
connectionId: message.connectionId,
70-
position: update.position,
71-
data: update.data,
40+
cursor: {
41+
clientId: message.clientId,
42+
connectionId: message.connectionId,
43+
position: update.cursor.position,
44+
data: update.cursor.data,
45+
},
46+
offset: update.offset,
7247
};
7348

74-
if (this.buffer[enhancedMsg.connectionId]) {
75-
this.buffer[enhancedMsg.connectionId].push(enhancedMsg);
49+
if (this.buffer[enhancedMsg.cursor.connectionId]) {
50+
this.buffer[enhancedMsg.cursor.connectionId].push(enhancedMsg);
7651
} else {
77-
this.buffer[enhancedMsg.connectionId] = [enhancedMsg];
52+
this.buffer[enhancedMsg.cursor.connectionId] = [enhancedMsg];
7853
}
7954
});
8055

81-
if (!this.handlerRunning && this.bufferHaveData()) {
82-
this.emitFromBatch(this.calculateDispenseInterval());
56+
if (this.bufferHaveData()) {
57+
this.emitFromBatch();
8358
}
8459
}
8560
}

src/Cursors.test.ts

+66-73
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ describe('Cursors', () => {
6767
it<CursorsTestContext>('emits a cursorsUpdate event', ({ space, dispensing, batching, fakeMessageStub }) => {
6868
const fakeMessage = {
6969
...fakeMessageStub,
70-
data: [{ position: { x: 1, y: 1 } }, { position: { x: 1, y: 2 }, data: { color: 'red' } }],
70+
data: [
71+
{ cursor: { position: { x: 1, y: 1 } } },
72+
{ cursor: { position: { x: 1, y: 2 }, data: { color: 'red' } } },
73+
],
7174
};
7275

7376
const spy = vitest.fn();
@@ -151,16 +154,19 @@ describe('Cursors', () => {
151154

152155
it<CursorsTestContext>('creates an outgoingBuffer for a new cursor movement', ({ batching, channel }) => {
153156
batching.pushCursorPosition(channel, { position: { x: 1, y: 1 }, data: {} });
154-
expect(batching.outgoingBuffers).toEqual([{ position: { x: 1, y: 1 }, data: {} }]);
157+
expect(batching.outgoingBuffer).toEqual([{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 }]);
155158
});
156159

157160
it<CursorsTestContext>('adds cursor data to an existing buffer', ({ batching, channel }) => {
161+
vi.useFakeTimers();
158162
batching.pushCursorPosition(channel, { position: { x: 1, y: 1 }, data: {} });
159-
expect(batching.outgoingBuffers).toEqual([{ position: { x: 1, y: 1 }, data: {} }]);
163+
expect(batching.outgoingBuffer).toEqual([{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 }]);
164+
165+
vi.advanceTimersByTime(10);
160166
batching.pushCursorPosition(channel, { position: { x: 2, y: 2 }, data: {} });
161-
expect(batching.outgoingBuffers).toEqual([
162-
{ position: { x: 1, y: 1 }, data: {} },
163-
{ position: { x: 2, y: 2 }, data: {} },
167+
expect(batching.outgoingBuffer).toEqual([
168+
{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 },
169+
{ cursor: { position: { x: 2, y: 2 }, data: {} }, offset: 10 },
164170
]);
165171
});
166172

@@ -195,17 +201,19 @@ describe('Cursors', () => {
195201

196202
it<CursorsTestContext>('should publish the cursor buffer', async ({ batching, channel }) => {
197203
batching.hasMovement = true;
198-
batching.outgoingBuffers = [{ position: { x: 1, y: 1 }, data: {} }];
204+
batching.outgoingBuffer = [{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 }];
199205
const spy = vi.spyOn(channel, 'publish');
200206
await batching['batchToChannel'](channel, CURSOR_UPDATE);
201-
expect(spy).toHaveBeenCalledWith(CURSOR_UPDATE, [{ position: { x: 1, y: 1 }, data: {} }]);
207+
expect(spy).toHaveBeenCalledWith(CURSOR_UPDATE, [
208+
{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 },
209+
]);
202210
});
203211

204212
it<CursorsTestContext>('should clear the buffer', async ({ batching, channel }) => {
205213
batching.hasMovement = true;
206-
batching.outgoingBuffers = [{ position: { x: 1, y: 1 }, data: {} }];
214+
batching.outgoingBuffer = [{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 }];
207215
await batching['batchToChannel'](channel, CURSOR_UPDATE);
208-
expect(batching.outgoingBuffers).toEqual([]);
216+
expect(batching.outgoingBuffer).toEqual([]);
209217
});
210218

211219
it<CursorsTestContext>('should set hasMovements to false', async ({ batching, channel }) => {
@@ -233,28 +241,12 @@ describe('Cursors', () => {
233241
expect(spy).not.toHaveBeenCalled();
234242
});
235243

236-
it<CursorsTestContext>('does not call emitFromBatch if the loop is already running', async ({
237-
dispensing,
238-
fakeMessageStub,
239-
}) => {
240-
const spy = vi.spyOn(dispensing, 'emitFromBatch');
241-
242-
const fakeMessage = {
243-
...fakeMessageStub,
244-
data: [{ position: { x: 1, y: 1 } }],
245-
};
246-
247-
dispensing['handlerRunning'] = true;
248-
dispensing.processBatch(fakeMessage);
249-
expect(spy).not.toHaveBeenCalled();
250-
});
251-
252244
it<CursorsTestContext>('call emitFromBatch if there are updates', async ({ dispensing, fakeMessageStub }) => {
253245
const spy = vi.spyOn(dispensing, 'emitFromBatch');
254246

255247
const fakeMessage = {
256248
...fakeMessageStub,
257-
data: [{ position: { x: 1, y: 1 } }],
249+
data: [{ cursor: { position: { x: 1, y: 1 } } }],
258250
};
259251

260252
dispensing.processBatch(fakeMessage);
@@ -268,70 +260,70 @@ describe('Cursors', () => {
268260
const fakeMessage = {
269261
...fakeMessageStub,
270262
data: [
271-
{ position: { x: 1, y: 1 } },
272-
{ position: { x: 2, y: 3 }, data: { color: 'blue' } },
273-
{ position: { x: 5, y: 4 } },
263+
{ cursor: { position: { x: 1, y: 1 } }, offset: 10 },
264+
{ cursor: { position: { x: 2, y: 3 }, data: { color: 'blue' } }, offset: 20 },
265+
{ cursor: { position: { x: 5, y: 4 } }, offset: 30 },
274266
],
275267
};
268+
vi.useFakeTimers();
276269

270+
const spy = vi.spyOn(dispensing, 'setEmitCursorUpdate');
277271
dispensing.processBatch(fakeMessage);
278-
expect(dispensing['buffer']).toEqual({
279-
connectionId: [
280-
{
281-
position: { x: 1, y: 1 },
282-
data: undefined,
283-
clientId: 'clientId',
284-
connectionId: 'connectionId',
285-
},
286-
{
287-
position: { x: 2, y: 3 },
288-
data: { color: 'blue' },
289-
clientId: 'clientId',
290-
connectionId: 'connectionId',
291-
},
292-
{
293-
position: { x: 5, y: 4 },
294-
data: undefined,
295-
clientId: 'clientId',
296-
connectionId: 'connectionId',
297-
},
298-
],
272+
273+
vi.advanceTimersByTime(10);
274+
expect(spy).toHaveBeenCalledWith({
275+
position: { x: 1, y: 1 },
276+
data: undefined,
277+
clientId: 'clientId',
278+
connectionId: 'connectionId',
279+
});
280+
281+
vi.advanceTimersByTime(10);
282+
expect(spy).toHaveBeenCalledWith({
283+
position: { x: 2, y: 3 },
284+
data: { color: 'blue' },
285+
clientId: 'clientId',
286+
connectionId: 'connectionId',
299287
});
288+
289+
vi.advanceTimersByTime(10);
290+
expect(spy).toHaveBeenCalledWith({
291+
position: { x: 5, y: 4 },
292+
data: undefined,
293+
clientId: 'clientId',
294+
connectionId: 'connectionId',
295+
});
296+
expect(spy).toHaveBeenCalledTimes(3);
300297
});
301298

302-
it<CursorsTestContext>('runs until the batch is empty', async ({ dispensing, batching, fakeMessageStub }) => {
299+
it<CursorsTestContext>('runs until the batch is empty', async ({ dispensing, fakeMessageStub }) => {
303300
vi.useFakeTimers();
304301

305302
const fakeMessage = {
306303
...fakeMessageStub,
307304
data: [
308-
{ position: { x: 1, y: 1 } },
309-
{ position: { x: 2, y: 3 }, data: { color: 'blue' } },
310-
{ position: { x: 5, y: 4 } },
305+
{ cursor: { position: { x: 1, y: 1 } }, offset: 10 },
306+
{ cursor: { position: { x: 2, y: 3 }, data: { color: 'blue' } }, offset: 20 },
307+
{ cursor: { position: { x: 5, y: 4 } }, offset: 30 },
311308
],
312309
};
313310

314-
expect(dispensing['handlerRunning']).toBe(false);
311+
const spy = vi.spyOn(dispensing, 'setEmitCursorUpdate');
312+
315313
expect(dispensing.bufferHaveData()).toBe(false);
316314

317315
dispensing.processBatch(fakeMessage);
318-
expect(dispensing['buffer']['connectionId']).toHaveLength(3);
319-
expect(dispensing['handlerRunning']).toBe(true);
320-
expect(dispensing.bufferHaveData()).toBe(true);
321-
vi.advanceTimersByTime(batching.batchTime / 2);
322-
323-
expect(dispensing['buffer']['connectionId']).toHaveLength(2);
324-
expect(dispensing['handlerRunning']).toBe(true);
325-
expect(dispensing.bufferHaveData()).toBe(true);
326-
327-
vi.advanceTimersByTime(batching.batchTime / 2);
328-
expect(dispensing['buffer']['connectionId']).toHaveLength(1);
329-
expect(dispensing['handlerRunning']).toBe(true);
330-
expect(dispensing.bufferHaveData()).toBe(true);
331-
332-
vi.advanceTimersByTime(batching.batchTime);
333-
expect(dispensing['buffer']['connectionId']).toHaveLength(0);
334-
expect(dispensing['handlerRunning']).toBe(false);
316+
expect(spy).toHaveBeenCalledTimes(0);
317+
318+
vi.advanceTimersByTime(10);
319+
expect(spy).toHaveBeenCalledTimes(1);
320+
321+
vi.advanceTimersByTime(10);
322+
expect(spy).toHaveBeenCalledTimes(2);
323+
324+
vi.advanceTimersByTime(10);
325+
expect(spy).toHaveBeenCalledTimes(3);
326+
335327
expect(dispensing.bufferHaveData()).toBe(false);
336328

337329
vi.useRealTimers();
@@ -498,6 +490,7 @@ describe('Cursors', () => {
498490
x: 2,
499491
y: 3,
500492
},
493+
offset: 0,
501494
},
502495
};
503496

0 commit comments

Comments
 (0)