Skip to content

Commit abf8599

Browse files
committed
Implement cursor offsets rather than intervals
Instead of calculating an even interval based upon the length of the buffer and the batchtime, instead pass along movement offsets as part of the message, and replay these offsets as part of the dispensing. This ensures you get a more accurate representation of cursor movements.
1 parent f900c7d commit abf8599

File tree

6 files changed

+76
-63
lines changed

6 files changed

+76
-63
lines changed

src/CursorBatching.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { CURSOR_UPDATE } from './Cursors.js';
44
import type { CursorUpdate } from './types.js';
55
import type { CursorsOptions } from './types.js';
66

7-
type OutgoingBuffer = Pick<CursorUpdate, 'position' | 'data'>[];
7+
type BufferData = { cursor: Pick<CursorUpdate, 'position' | 'data'>; offset: number };
8+
type OutgoingBuffer = BufferData[];
89

910
export default class CursorBatching {
1011
outgoingBuffers: OutgoingBuffer = [];
@@ -20,15 +21,32 @@ export default class CursorBatching {
2021
// Set to `true` if there is more than one user listening to cursors
2122
shouldSend: boolean = false;
2223

24+
// Used for tracking offsets in the buffer
25+
bufferStartTimestamp: number = 0;
26+
2327
constructor(readonly outboundBatchInterval: CursorsOptions['outboundBatchInterval']) {
2428
this.batchTime = outboundBatchInterval;
2529
}
2630

2731
pushCursorPosition(channel: Types.RealtimeChannelPromise, cursor: Pick<CursorUpdate, 'position' | 'data'>) {
2832
// Ignore the cursor update if there is no one listening
2933
if (!this.shouldSend) return;
34+
35+
// Assume there is a better way to get timestamp
36+
const timestamp = new Date().getTime();
37+
38+
let offset: number;
39+
// First update in the buffer is always 0
40+
if (this.outgoingBuffers.length === 0) {
41+
offset = 0;
42+
this.bufferStartTimestamp = timestamp;
43+
} else {
44+
// Add the offset compared to the first update in the buffer
45+
offset = timestamp - this.bufferStartTimestamp;
46+
}
47+
3048
this.hasMovement = true;
31-
this.pushToBuffer(cursor);
49+
this.pushToBuffer({ cursor, offset });
3250
this.publishFromBuffer(channel, CURSOR_UPDATE);
3351
}
3452

@@ -40,18 +58,18 @@ export default class CursorBatching {
4058
this.batchTime = batchTime;
4159
}
4260

43-
private pushToBuffer(value: Pick<CursorUpdate, 'position' | 'data'>) {
61+
private pushToBuffer(value: BufferData) {
4462
this.outgoingBuffers.push(value);
4563
}
4664

47-
private async publishFromBuffer(channel, eventName: string) {
65+
private async publishFromBuffer(channel: Types.RealtimeChannelPromise, eventName: string) {
4866
if (!this.isRunning) {
4967
this.isRunning = true;
5068
await this.batchToChannel(channel, eventName);
5169
}
5270
}
5371

54-
private async batchToChannel(channel, eventName: string) {
72+
private async batchToChannel(channel: Types.RealtimeChannelPromise, eventName: string) {
5573
if (!this.hasMovement) {
5674
this.isRunning = false;
5775
return;
@@ -65,3 +83,5 @@ export default class CursorBatching {
6583
this.isRunning = true;
6684
}
6785
}
86+
87+
export { type BufferData };

src/CursorDispensing.ts

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,36 @@
1-
import { clamp } from './utilities/math.js';
2-
1+
import { type BufferData } from './CursorBatching.js';
32
import { type CursorUpdate } from './types.js';
43
import { type RealtimeMessage } from './utilities/types.js';
54

65
export default class CursorDispensing {
76
private buffer: Record<string, CursorUpdate[]> = {};
87
private handlerRunning: boolean = false;
9-
private timerIds: ReturnType<typeof setTimeout>[] = [];
108
private emitCursorUpdate: (update: CursorUpdate) => void;
11-
private getCurrentBatchTime: () => number;
129

13-
constructor(emitCursorUpdate, getCurrentBatchTime) {
10+
constructor(emitCursorUpdate: (update: CursorUpdate) => void) {
1411
this.emitCursorUpdate = emitCursorUpdate;
15-
this.getCurrentBatchTime = getCurrentBatchTime;
1612
}
1713

18-
emitFromBatch(batchDispenseInterval: number) {
14+
emitFromBatch() {
1915
if (!this.bufferHaveData()) {
2016
this.handlerRunning = false;
2117
return;
2218
}
2319

2420
this.handlerRunning = true;
2521

26-
const processBuffer = () => {
27-
for (let connectionId in this.buffer) {
28-
const buffer = this.buffer[connectionId];
29-
const update = buffer.shift();
30-
31-
if (!update) continue;
32-
this.emitCursorUpdate(update);
33-
}
34-
35-
if (this.bufferHaveData()) {
36-
this.emitFromBatch(this.calculateDispenseInterval());
37-
} else {
38-
this.handlerRunning = false;
39-
}
22+
for (let connectionId in this.buffer) {
23+
const buffer = this.buffer[connectionId];
24+
const update = buffer.shift();
4025

41-
this.timerIds.shift();
42-
};
26+
if (!update) continue;
27+
setTimeout(() => this.emitCursorUpdate(update), update.offset);
28+
}
4329

44-
if (typeof document !== 'undefined' && document.visibilityState === 'hidden') {
45-
this.timerIds.forEach((id) => clearTimeout(id));
46-
this.timerIds = [];
47-
processBuffer();
30+
if (this.bufferHaveData()) {
31+
this.emitFromBatch();
4832
} else {
49-
this.timerIds.push(setTimeout(processBuffer, batchDispenseInterval));
33+
this.handlerRunning = false;
5034
}
5135
}
5236

@@ -58,22 +42,16 @@ export default class CursorDispensing {
5842
);
5943
}
6044

61-
calculateDispenseInterval(): number {
62-
const bufferLengths = Object.entries(this.buffer).map(([, v]) => v.length);
63-
const highest = bufferLengths.sort()[bufferLengths.length - 1];
64-
const finalOutboundBatchInterval = this.getCurrentBatchTime();
65-
return Math.floor(clamp(finalOutboundBatchInterval / highest, 1, 1000 / 15));
66-
}
67-
6845
processBatch(message: RealtimeMessage) {
6946
const updates = message.data || [];
7047

71-
updates.forEach((update) => {
48+
updates.forEach((update: BufferData) => {
7249
const enhancedMsg = {
7350
clientId: message.clientId,
7451
connectionId: message.connectionId,
75-
position: update.position,
76-
data: update.data,
52+
position: update.cursor.position,
53+
offset: update.offset,
54+
data: update.cursor.data,
7755
};
7856

7957
if (this.buffer[enhancedMsg.connectionId]) {
@@ -84,7 +62,7 @@ export default class CursorDispensing {
8462
});
8563

8664
if (!this.handlerRunning && this.bufferHaveData()) {
87-
this.emitFromBatch(this.calculateDispenseInterval());
65+
this.emitFromBatch();
8866
}
8967
}
9068
}

src/CursorHistory.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ export default class CursorHistory {
1616
private messageToUpdate(
1717
connectionId: string,
1818
clientId: string,
19-
update: Pick<CursorUpdate, 'position' | 'data'>,
19+
update: Pick<CursorUpdate, 'position' | 'data' | 'offset'>,
2020
): CursorUpdate {
2121
return {
2222
clientId,
2323
connectionId,
2424
position: update.position,
2525
data: update.data,
26+
offset: update.offset,
2627
};
2728
}
2829

src/Cursors.test.ts

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ describe('Cursors', () => {
6666
it<CursorsTestContext>('emits a cursorsUpdate event', ({ space, dispensing, batching, fakeMessageStub }) => {
6767
const fakeMessage = {
6868
...fakeMessageStub,
69-
data: [{ position: { x: 1, y: 1 } }, { position: { x: 1, y: 2 }, data: { color: 'red' } }],
69+
data: [
70+
{ cursor: { position: { x: 1, y: 1 } } },
71+
{ cursor: { position: { x: 1, y: 2 }, data: { color: 'red' } } },
72+
],
7073
};
7174

7275
const spy = vitest.fn();
@@ -150,16 +153,16 @@ describe('Cursors', () => {
150153

151154
it<CursorsTestContext>('creates an outgoingBuffer for a new cursor movement', ({ batching, channel }) => {
152155
batching.pushCursorPosition(channel, { position: { x: 1, y: 1 }, data: {} });
153-
expect(batching.outgoingBuffers).toEqual([{ position: { x: 1, y: 1 }, data: {} }]);
156+
expect(batching.outgoingBuffers).toEqual([{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 }]);
154157
});
155158

156159
it<CursorsTestContext>('adds cursor data to an existing buffer', ({ batching, channel }) => {
157160
batching.pushCursorPosition(channel, { position: { x: 1, y: 1 }, data: {} });
158-
expect(batching.outgoingBuffers).toEqual([{ position: { x: 1, y: 1 }, data: {} }]);
161+
expect(batching.outgoingBuffers).toEqual([{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 }]);
159162
batching.pushCursorPosition(channel, { position: { x: 2, y: 2 }, data: {} });
160163
expect(batching.outgoingBuffers).toEqual([
161-
{ position: { x: 1, y: 1 }, data: {} },
162-
{ position: { x: 2, y: 2 }, data: {} },
164+
{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 },
165+
{ cursor: { position: { x: 2, y: 2 }, data: {} }, offset: 0 },
163166
]);
164167
});
165168

@@ -194,15 +197,17 @@ describe('Cursors', () => {
194197

195198
it<CursorsTestContext>('should publish the cursor buffer', async ({ batching, channel }) => {
196199
batching.hasMovement = true;
197-
batching.outgoingBuffers = [{ position: { x: 1, y: 1 }, data: {} }];
200+
batching.outgoingBuffers = [{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 }];
198201
const spy = vi.spyOn(channel, 'publish');
199202
await batching['batchToChannel'](channel, CURSOR_UPDATE);
200-
expect(spy).toHaveBeenCalledWith(CURSOR_UPDATE, [{ position: { x: 1, y: 1 }, data: {} }]);
203+
expect(spy).toHaveBeenCalledWith(CURSOR_UPDATE, [
204+
{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 },
205+
]);
201206
});
202207

203208
it<CursorsTestContext>('should clear the buffer', async ({ batching, channel }) => {
204209
batching.hasMovement = true;
205-
batching.outgoingBuffers = [{ position: { x: 1, y: 1 }, data: {} }];
210+
batching.outgoingBuffers = [{ cursor: { position: { x: 1, y: 1 }, data: {} }, offset: 0 }];
206211
await batching['batchToChannel'](channel, CURSOR_UPDATE);
207212
expect(batching.outgoingBuffers).toEqual([]);
208213
});
@@ -240,7 +245,7 @@ describe('Cursors', () => {
240245

241246
const fakeMessage = {
242247
...fakeMessageStub,
243-
data: [{ position: { x: 1, y: 1 } }],
248+
data: [{ cursor: { position: { x: 1, y: 1 } } }],
244249
};
245250

246251
dispensing['handlerRunning'] = true;
@@ -253,7 +258,7 @@ describe('Cursors', () => {
253258

254259
const fakeMessage = {
255260
...fakeMessageStub,
256-
data: [{ position: { x: 1, y: 1 } }],
261+
data: [{ cursor: { position: { x: 1, y: 1 } } }],
257262
};
258263

259264
dispensing.processBatch(fakeMessage);
@@ -267,9 +272,9 @@ describe('Cursors', () => {
267272
const fakeMessage = {
268273
...fakeMessageStub,
269274
data: [
270-
{ position: { x: 1, y: 1 } },
271-
{ position: { x: 2, y: 3 }, data: { color: 'blue' } },
272-
{ position: { x: 5, y: 4 } },
275+
{ cursor: { position: { x: 1, y: 1 } }, offset: 0 },
276+
{ cursor: { position: { x: 2, y: 3 }, data: { color: 'blue' } }, offset: 1 },
277+
{ cursor: { position: { x: 5, y: 4 } }, offset: 2 },
273278
],
274279
};
275280

@@ -279,18 +284,21 @@ describe('Cursors', () => {
279284
{
280285
position: { x: 1, y: 1 },
281286
data: undefined,
287+
offset: 0,
282288
clientId: 'clientId',
283289
connectionId: 'connectionId',
284290
},
285291
{
286292
position: { x: 2, y: 3 },
287293
data: { color: 'blue' },
294+
offset: 1,
288295
clientId: 'clientId',
289296
connectionId: 'connectionId',
290297
},
291298
{
292299
position: { x: 5, y: 4 },
293300
data: undefined,
301+
offset: 2,
294302
clientId: 'clientId',
295303
connectionId: 'connectionId',
296304
},
@@ -304,9 +312,9 @@ describe('Cursors', () => {
304312
const fakeMessage = {
305313
...fakeMessageStub,
306314
data: [
307-
{ position: { x: 1, y: 1 } },
308-
{ position: { x: 2, y: 3 }, data: { color: 'blue' } },
309-
{ position: { x: 5, y: 4 } },
315+
{ cursor: { position: { x: 1, y: 1 } }, offset: 0 },
316+
{ cursor: { position: { x: 2, y: 3 }, data: { color: 'blue' } }, offset: 1 },
317+
{ cursor: { position: { x: 5, y: 4 } }, offset: 2 },
310318
],
311319
};
312320

@@ -360,6 +368,7 @@ describe('Cursors', () => {
360368
x: 2,
361369
y: 3,
362370
},
371+
offset: 0,
363372
},
364373
connectionId2: {
365374
connectionId: 'connectionId2',
@@ -369,6 +378,7 @@ describe('Cursors', () => {
369378
x: 25,
370379
y: 44,
371380
},
381+
offset: 0,
372382
},
373383
connectionId3: {
374384
connectionId: 'connectionId3',
@@ -378,6 +388,7 @@ describe('Cursors', () => {
378388
x: 225,
379389
y: 244,
380390
},
391+
offset: 0,
381392
},
382393
};
383394
});
@@ -497,6 +508,7 @@ describe('Cursors', () => {
497508
x: 2,
498509
y: 3,
499510
},
511+
offset: 0,
500512
},
501513
};
502514

@@ -525,6 +537,7 @@ describe('Cursors', () => {
525537
y: 44,
526538
},
527539
data: undefined,
540+
offset: 0,
528541
},
529542
connectionId3: {
530543
connectionId: 'connectionId3',
@@ -534,6 +547,7 @@ describe('Cursors', () => {
534547
y: 244,
535548
},
536549
data: undefined,
550+
offset: 0,
537551
},
538552
});
539553
});

src/Cursors.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ export default class Cursors extends EventEmitter<CursorsEventMap> {
5050
this.cursorBatching = new CursorBatching(this.options.outboundBatchInterval);
5151

5252
const emitCursorUpdate = (update: CursorUpdate): void => this.emit('cursorsUpdate', update);
53-
const getCurrentBatchTime: () => number = () => this.cursorBatching.batchTime;
54-
this.cursorDispensing = new CursorDispensing(emitCursorUpdate, getCurrentBatchTime);
53+
this.cursorDispensing = new CursorDispensing(emitCursorUpdate);
5554
}
5655

5756
/**

src/types.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export interface CursorUpdate {
1717
connectionId: string;
1818
position: CursorPosition;
1919
data?: CursorData;
20+
offset?: number;
2021
}
2122

2223
export interface SpaceOptions {

0 commit comments

Comments
 (0)