Skip to content

Fix cursor set race condition #339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions src/CursorBatching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export type OutgoingBuffer = { cursor: Pick<CursorUpdate, 'position' | 'data'>;

export default class CursorBatching {
outgoingBuffer: OutgoingBuffer[] = [];
pendingBuffer: OutgoingBuffer[] = [];

batchTime: number;

Expand All @@ -28,28 +29,39 @@ export default class CursorBatching {
}

pushCursorPosition(channel: RealtimeChannel, cursor: Pick<CursorUpdate, 'position' | 'data'>) {
// Ignore the cursor update if there is no one listening
if (!this.shouldSend) return;

const timestamp = new Date().getTime();

let offset: number;
// First update in the buffer is always 0
if (this.outgoingBuffer.length === 0) {
if (this.outgoingBuffer.length === 0 && this.pendingBuffer.length === 0) {
offset = 0;
this.bufferStartTimestamp = timestamp;
} else {
// Add the offset compared to the first update in the buffer
offset = timestamp - this.bufferStartTimestamp;
}

const bufferItem = { cursor, offset };

if (!this.shouldSend) {
// Queue cursor positions when channel is not ready (no one listening yet)
this.pushToPendingBuffer(bufferItem);
return;
}

this.hasMovement = true;
this.pushToBuffer({ cursor, offset });
this.pushToBuffer(bufferItem);
this.publishFromBuffer(channel, CURSOR_UPDATE);
}

setShouldSend(shouldSend: boolean) {
const wasSending = this.shouldSend;
this.shouldSend = shouldSend;

// If we just became ready to send and have pending cursor positions, process them
if (!wasSending && this.shouldSend && this.pendingBuffer.length > 0) {
this.processPendingBuffer();
}
}

setBatchTime(batchTime: number) {
Expand All @@ -60,6 +72,35 @@ export default class CursorBatching {
this.outgoingBuffer.push(value);
}

private pushToPendingBuffer(value: OutgoingBuffer) {
this.pendingBuffer.push(value);
}

private processPendingBuffer() {
// Move all pending cursor positions to outgoing buffer
for (const item of this.pendingBuffer) {
this.pushToBuffer(item);
}

// Clear pending buffer
this.pendingBuffer = [];

// Start publishing if we have cursor movements
if (this.outgoingBuffer.length > 0) {
this.hasMovement = true;
// Note: We need the channel to publish, but since setShouldSend doesn't have it,
// we'll need to trigger this from the caller that has access to the channel
}
}

// Method to manually trigger publishing when pending items are processed
triggerPublishFromPending(channel: RealtimeChannel) {
if (this.outgoingBuffer.length > 0) {
this.hasMovement = true;
this.publishFromBuffer(channel, CURSOR_UPDATE);
}
}

private async publishFromBuffer(channel: RealtimeChannel, eventName: string) {
if (!this.isRunning) {
this.isRunning = true;
Expand Down
141 changes: 141 additions & 0 deletions src/CursorQueueing.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { it, describe, expect, vi, beforeEach } from 'vitest';
import { Realtime, RealtimeClient, RealtimeChannel } from 'ably';
import Space from './Space.js';
import CursorBatching from './CursorBatching.js';
import { CURSOR_UPDATE } from './CursorConstants.js';

interface CursorQueueingTestContext {
client: RealtimeClient;
space: Space;
channel: RealtimeChannel;
batching: CursorBatching;
}

vi.mock('ably');

describe('Cursor Queuing Bug Fix', () => {
beforeEach<CursorQueueingTestContext>((context) => {
const client = new Realtime({});
// Mock the connection object that Space expects
(client as any).connection = { id: 'test-connection-id' };

context.client = client;
context.space = new Space('test', client);

// Set up cursor channel by subscribing
context.space.cursors.subscribe('update', () => {});
context.channel = context.space.cursors.channel!;
context.batching = context.space.cursors['cursorBatching'];

// Mock channel methods
vi.spyOn(context.channel, 'publish');
});

it<CursorQueueingTestContext>('BUG FIX: cursor positions set before channel ready should be queued and sent when ready', async ({
space,
batching,
channel,
}) => {
// Mock the self member (required for cursor.set())
vi.spyOn(space.members, 'getSelf').mockResolvedValue({
connectionId: 'test-connection',
clientId: 'test-client',
isConnected: true,
profileData: {},
location: null,
lastEvent: { name: 'enter', timestamp: 1 },
});

// Get the publish spy
const publishSpy = vi.spyOn(channel, 'publish');

// Start with shouldSend false (channel not ready)
batching.setShouldSend(false);

// Client sets cursor position before channel is ready
await space.cursors.set({ position: { x: 100, y: 200 }, data: { color: 'blue' } });

// Position should NOT be published immediately
expect(publishSpy).not.toHaveBeenCalled();

// Verify position is in pending buffer
expect(batching.pendingBuffer.length).toBe(1);
expect(batching.pendingBuffer[0].cursor.position).toEqual({ x: 100, y: 200 });

// Simulate channel becoming ready
batching.setShouldSend(true);

// Trigger publish of pending items
batching.triggerPublishFromPending(channel);

// The queued cursor position should now be published
expect(publishSpy).toHaveBeenCalledWith(CURSOR_UPDATE, [
expect.objectContaining({
cursor: { position: { x: 100, y: 200 }, data: { color: 'blue' } },
}),
]);

// Pending buffer should be cleared
expect(batching.pendingBuffer.length).toBe(0);
});

it<CursorQueueingTestContext>('multiple pending cursor positions are preserved and sent in order', async ({
batching,
channel,
}) => {
const publishSpy = vi.spyOn(channel, 'publish');

// Start with shouldSend false
batching.setShouldSend(false);

// Add multiple cursor positions to pending buffer
batching.pushCursorPosition(channel, { position: { x: 10, y: 20 }, data: { color: 'red' } });
batching.pushCursorPosition(channel, { position: { x: 30, y: 40 }, data: { color: 'green' } });
batching.pushCursorPosition(channel, { position: { x: 50, y: 60 }, data: { color: 'blue' } });

// Verify all positions are queued
expect(batching.pendingBuffer.length).toBe(3);
expect(publishSpy).not.toHaveBeenCalled();

// Set shouldSend to true (this should process pending items)
batching.setShouldSend(true);

// Trigger publish of pending items
batching.triggerPublishFromPending(channel);

// All pending items should be moved to outgoing buffer and published
expect(batching.pendingBuffer.length).toBe(0);
expect(publishSpy).toHaveBeenCalled();
});

it<CursorQueueingTestContext>('cursor positions set after shouldSend is true are published immediately', async ({
batching,
channel,
}) => {
const publishSpy = vi.spyOn(channel, 'publish');

// Start with shouldSend true
batching.setShouldSend(true);

// Add cursor position (should be published immediately)
batching.pushCursorPosition(channel, { position: { x: 100, y: 200 }, data: { color: 'yellow' } });

// Should be published immediately, not queued
expect(batching.pendingBuffer.length).toBe(0);
expect(publishSpy).toHaveBeenCalled();
});

it<CursorQueueingTestContext>('setShouldSend(true) processes existing pending items', ({ batching, channel }) => {
// Add items to pending buffer while shouldSend is false
batching.setShouldSend(false);
batching.pushCursorPosition(channel, { position: { x: 1, y: 2 }, data: {} });
batching.pushCursorPosition(channel, { position: { x: 3, y: 4 }, data: {} });

expect(batching.pendingBuffer.length).toBe(2);

// Setting shouldSend to true should process pending items
batching.setShouldSend(true);

expect(batching.pendingBuffer.length).toBe(0);
});
});
3 changes: 3 additions & 0 deletions src/Cursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ export default class Cursors extends EventEmitter<CursorsEventMap> {
* E.g. multiply the configured outboundBatchInterval by groups of 100 members instead of the total number of members.
*/
this.cursorBatching.setBatchTime(Math.ceil(cursorsMembers.length / 100) * this.options.outboundBatchInterval);

// Trigger publishing of any pending cursor positions now that channel is ready
this.cursorBatching.triggerPublishFromPending(channel);
}

private isUnsubscribed() {
Expand Down
19 changes: 14 additions & 5 deletions test/integration/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ describe(
// 2. one of its `get*()` methods is called
// 3. its `subscribe` or `unsubscribe` method is called
//
// This seems to mean that a client that sends cursor updates but does not listen for them will drop the first update passed to `cursors.set()`.
//
// So, to work around this, here I perform a "sacrificial" call to `performerSpace.cursors.set()`, the idea of which is to put `performerSpace.cursors.set()` into a state in which it will not drop the updates passed in subsequent calls.
// UPDATE: This race condition bug has been fixed. Early cursor positions are now queued and sent when the channel becomes ready.
// However, we'll keep this "sacrificial" call to ensure the test works correctly with the fix.
await performerSpace.cursors.set({ position: { x: 0, y: 0 } });
});

Expand All @@ -203,7 +202,8 @@ describe(
const observedCursorEventsData: CursorsEventMap['update'][] = [];
const cursorsListener = (data: CursorsEventMap['update']) => {
observedCursorEventsData.push(data);
if (observedCursorEventsData.length === 4) {
// Now expecting 5 updates: 1 sacrificial + 4 intended (since the race condition bug is fixed)
if (observedCursorEventsData.length === 5) {
observerSpace.cursors.unsubscribe(cursorsListener);
resolve(observedCursorEventsData);
}
Expand Down Expand Up @@ -232,8 +232,17 @@ describe(

// Note that we check that the order in which we recieve the cursor updates matches that in which they were passed to `set()`
const observedCursorEventsData = await cursorUpdatesPromise;

// First cursor should be the sacrificial one from scenario 2.1 (now preserved due to bug fix)
expect(observedCursorEventsData[0]).toMatchObject({
clientId: performerClientId,
position: { x: 0, y: 0 },
// Note: no data field expected for the sacrificial cursor
});

// Remaining 4 cursors should match the intended cursors
for (const [index, setCursor] of cursorsToSet.entries()) {
expect(observedCursorEventsData[index]).toMatchObject({ clientId: performerClientId, ...setCursor });
expect(observedCursorEventsData[index + 1]).toMatchObject({ clientId: performerClientId, ...setCursor });
}
});
});
Expand Down
Loading