Create a new branch off main that:
- Removes all Level dependencies (the React Native blocker)
- Simplifies to SQLite-only storage (sql.js for browser, op-sqlite for RN)
- Implements cursor-based sync using monotonically increasing sequence numbers
- Adds peer tracking for safe compaction (no data loss for slow peers)
- Client-driven compaction with server hints when documents exceed size threshold
The main branch uses y-leveldb + abstract-level + browser-level for persistence. These Level dependencies have native bindings that break React Native compatibility.
- Replace Level-based storage with direct SQLite storage
- Use
Y.encodeStateAsUpdate()/Y.applyUpdate()to persist Y.Doc state as blobs - Use cursor-based sync (monotonically increasing version numbers) instead of
_creationTime - Keep state vector sync for recovery/cold start scenarios
From Convex docs:
"Currently the 'check if there are new changes' uses the previous
_creationTimeas the cursor. Unfortunately, the previous query could have fetched results before an in-progress mutation commits, which could have started prior to what the latest document's_creationTimeshows, meaning the next check will miss the commit."
Using a monotonically increasing version number (assigned at commit time) ensures no updates are missed.
| Task | Description |
|---|---|
| 1.1 | Create cursor branch off main |
| 1.2 | Remove dependencies from package.json: y-leveldb, abstract-level, browser-level |
| 1.3 | Delete src/client/persistence/sqlite-level.ts (abstract-level wrapper) |
| 1.4 | Delete src/client/persistence/indexeddb.ts (replaced by SQLite) |
| 1.5 | Keep src/client/persistence/memory.ts for testing |
-- Full Y.Doc state (compacted)
CREATE TABLE snapshots (
collection TEXT PRIMARY KEY,
data BLOB NOT NULL, -- Y.encodeStateAsUpdate(ydoc)
state_vector BLOB, -- Y.encodeStateVector(ydoc) for quick sync
version INTEGER DEFAULT 0 -- Latest version included in snapshot
);
-- Incremental updates (before compaction)
CREATE TABLE updates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
collection TEXT NOT NULL,
data BLOB NOT NULL, -- Individual Y.Doc update
version INTEGER NOT NULL -- Monotonically increasing
);
-- Key-value store for metadata (cursor, etc.)
CREATE TABLE kv (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);| Task | Description |
|---|---|
| 2.1 | Rewrite sqlite.ts with new schema (no y-leveldb) |
| 2.2 | Implement loadSnapshot() - load snapshot + pending updates, apply to Y.Doc |
| 2.3 | Implement saveUpdate() - store incremental Y.Doc update |
| 2.4 | Implement compact() - Y.mergeUpdates([snapshot, ...updates]), replace snapshot, delete updates |
| 2.5 | Store state_vector in snapshot for quick recovery sync |
| 2.6 | Keep SqlJsAdapter and OPSqliteAdapter (already work) |
// Full state as a single blob (for snapshots)
const snapshot = Y.encodeStateAsUpdate(ydoc); // Uint8Array
Y.applyUpdate(ydoc, snapshot); // Restore
// Merge multiple updates into one (compaction)
const compacted = Y.mergeUpdates([update1, update2, ...]);
// State vector for sync
const stateVector = Y.encodeStateVector(ydoc);
const diff = Y.encodeStateAsUpdate(ydoc, remoteStateVector);// src/component/schema.ts
documents: defineTable({
collection: v.string(),
documentId: v.string(),
crdtBytes: v.bytes(),
version: v.number(), // Monotonically increasing (auto-incremented)
})
.index("by_collection_version", ["collection", "version"]) // NEW: cursor queries
.index("by_collection_document", ["collection", "documentId"])// In mutations (insert, update, delete)
const latest = await ctx.db
.query("documents")
.withIndex("by_collection_version", q => q.eq("collection", args.collection))
.order("desc")
.first();
const nextVersion = (latest?.version ?? 0) + 1;
await ctx.db.insert("documents", {
collection: args.collection,
documentId: args.documentId,
crdtBytes: args.crdtBytes,
version: nextVersion,
});// Cursor-based streaming: "give me everything after version X"
const deltas = await ctx.db
.query("documents")
.withIndex("by_collection_version", q =>
q.eq("collection", args.collection).gt("version", args.cursor)
)
.take(limit);
// Return new cursor = max version from results
const newCursor = deltas.length > 0
? Math.max(...deltas.map(d => d.version))
: args.cursor;| Task | Description |
|---|---|
| 3.1 | Update component schema - add indexes, peer tracking table |
| 3.2 | Update mutations to auto-increment seq: query max seq + 1 |
| 3.3 | Update stream query to use .gt("seq", cursor) |
| 3.4 | Add ack mutation for peer acknowledgment |
| 3.5 | Add compact mutation with peer-aware safety |
| 3.6 | Update disparity detection to use seq-based logic |
The main branch's compaction is unsafe - it deletes old deltas based purely on size threshold, which can cause data loss for slow/offline peers that haven't synced yet.
The Loro branch tracks which sequence number each peer has synced to, ensuring we only delete deltas that ALL active peers have already received.
// src/component/schema.ts
export default defineSchema({
// Delta log (append-only until compaction)
documents: defineTable({
collection: v.string(),
documentId: v.string(),
seq: v.number(), // Monotonically increasing sequence number
bytes: v.bytes(), // CRDT update bytes
})
.index("by_collection", ["collection"])
.index("by_document_seq", ["collection", "documentId", "seq"])
.index("by_seq", ["collection", "seq"]),
// Compacted snapshots (one per document)
snapshots: defineTable({
collection: v.string(),
documentId: v.string(),
bytes: v.bytes(), // Y.encodeStateAsUpdate() result
stateVector: v.bytes(), // Y.encodeStateVector() for recovery
snapshotSeq: v.number(), // Seq at time of snapshot
createdAt: v.number(),
}).index("by_document", ["collection", "documentId"]),
// Peer tracking for safe compaction
peers: defineTable({
collection: v.string(),
peerId: v.string(), // Unique client identifier
lastSyncedSeq: v.number(), // Last seq this peer acknowledged
lastSeenAt: v.number(), // Timestamp for activity timeout
})
.index("by_collection", ["collection"])
.index("by_collection_peer", ["collection", "peerId"]),
});After receiving deltas, client acknowledges the highest seq it has processed:
// Client calls this after applying deltas
export const ack = mutation({
args: {
collection: v.string(),
peerId: v.string(),
syncedSeq: v.number(),
},
handler: async (ctx, args) => {
const existing = await ctx.db
.query("peers")
.withIndex("by_collection_peer", q =>
q.eq("collection", args.collection).eq("peerId", args.peerId)
)
.first();
if (existing) {
await ctx.db.patch(existing._id, {
lastSyncedSeq: Math.max(existing.lastSyncedSeq, args.syncedSeq),
lastSeenAt: Date.now(),
});
} else {
await ctx.db.insert("peers", {
collection: args.collection,
peerId: args.peerId,
lastSyncedSeq: args.syncedSeq,
lastSeenAt: Date.now(),
});
}
},
});Compaction only deletes deltas that all active peers have synced:
export const compact = mutation({
args: {
collection: v.string(),
documentId: v.string(),
bytes: v.bytes(), // Compacted snapshot from client
stateVector: v.bytes(), // State vector for recovery sync
peerTimeout: v.number(), // e.g., 5 minutes - peers older are considered inactive
},
handler: async (ctx, args) => {
const now = Date.now();
const peerCutoff = now - args.peerTimeout;
// Get all deltas for this document
const deltas = await ctx.db
.query("documents")
.withIndex("by_document_seq", q =>
q.eq("collection", args.collection).eq("documentId", args.documentId)
)
.collect();
// Find active peers (seen within timeout)
const activePeers = await ctx.db
.query("peers")
.withIndex("by_collection", q => q.eq("collection", args.collection))
.filter(q => q.gt(q.field("lastSeenAt"), peerCutoff))
.collect();
// Find the MINIMUM synced seq across all active peers
// This is the highest seq we can safely delete up to
const minSyncedSeq = activePeers.length > 0
? Math.min(...activePeers.map(p => p.lastSyncedSeq))
: Infinity; // No active peers = safe to delete all
// Store new snapshot
const existingSnapshot = await ctx.db
.query("snapshots")
.withIndex("by_document", q =>
q.eq("collection", args.collection).eq("documentId", args.documentId)
)
.first();
if (existingSnapshot) {
await ctx.db.delete(existingSnapshot._id);
}
const snapshotSeq = deltas.length > 0
? Math.max(...deltas.map(d => d.seq))
: 0;
await ctx.db.insert("snapshots", {
collection: args.collection,
documentId: args.documentId,
bytes: args.bytes,
stateVector: args.stateVector,
snapshotSeq,
createdAt: now,
});
// Only delete deltas that ALL active peers have synced
let removed = 0;
for (const delta of deltas) {
if (delta.seq < minSyncedSeq) {
await ctx.db.delete(delta._id);
removed++;
}
}
return { success: true, removed, retained: deltas.length - removed };
},
});Server tells client when a document needs compaction:
export const stream = query({
args: {
collection: v.string(),
cursor: v.number(),
limit: v.optional(v.number()),
sizeThreshold: v.optional(v.number()), // e.g., 5MB
},
returns: v.object({
changes: v.array(v.object({
documentId: v.string(),
seq: v.number(),
bytes: v.bytes(),
operationType: v.string(),
})),
cursor: v.number(),
hasMore: v.boolean(),
compact: v.optional(v.string()), // documentId that needs compaction
}),
handler: async (ctx, args) => {
// ... fetch deltas ...
// Check if any document exceeds size threshold
let compact: string | undefined;
const allDocs = await ctx.db
.query("documents")
.withIndex("by_collection", q => q.eq("collection", args.collection))
.collect();
const sizeByDocument = new Map<string, number>();
for (const doc of allDocs) {
const current = sizeByDocument.get(doc.documentId) ?? 0;
sizeByDocument.set(doc.documentId, current + doc.bytes.byteLength);
}
for (const [docId, size] of sizeByDocument) {
if (size > (args.sizeThreshold ?? 5_000_000)) {
compact = docId;
break;
}
}
return { changes, cursor: newCursor, hasMore, compact };
},
});// In collection.ts sync loop
const result = await client.query(api.stream, { collection, cursor });
// Apply changes...
// Acknowledge sync
await client.mutation(api.ack, {
collection,
peerId: clientId,
syncedSeq: result.cursor,
});
// Handle compaction hint
if (result.compact) {
const snapshot = Y.encodeStateAsUpdate(ydoc);
const stateVector = Y.encodeStateVector(ydoc);
await client.mutation(api.compact, {
collection,
documentId: result.compact,
bytes: snapshot,
stateVector,
peerTimeout: 5 * 60 * 1000, // 5 minutes
});
}| Task | Description |
|---|---|
| 4.1 | Add peers table to component schema |
| 4.2 | Implement ack mutation for peer acknowledgment |
| 4.3 | Implement compact mutation with peer-aware safety |
| 4.4 | Update stream to return compact hint when threshold exceeded |
| 4.5 | Add client-side compaction trigger in collection.ts |
| 4.6 | Store stateVector in snapshots for recovery sync |
| Task | Description |
|---|---|
| 5.1 | Rename Checkpoint service to CursorService (stores seq number) |
| 5.2 | Update collection.ts to use cursor for streaming |
| 5.3 | Add peer acknowledgment after receiving deltas |
| 5.4 | Add compaction trigger when server sends compact hint |
| 5.5 | Generate unique peerId per client (persisted in SQLite kv) |
| 5.6 | Update types in src/shared/types.ts |
// src/client/services/cursor.ts
export type Cursor = number; // Just a sequence number
export class CursorService {
loadCursor(collection: string): Promise<Cursor>; // Returns 0 if not found
saveCursor(collection: string, cursor: Cursor): Promise<void>;
clearCursor(collection: string): Promise<void>;
}// Simplified sync loop in collection.ts
async function syncLoop(collection: string) {
const cursor = await cursorService.loadCursor(collection);
const peerId = await getPeerId(); // From kv store, or generate new UUID
// Subscribe to stream
client.onUpdate(api.stream, { collection, cursor }, async (result) => {
// Apply changes to Y.Doc
for (const change of result.changes) {
Y.applyUpdate(ydoc, new Uint8Array(change.bytes));
}
// Save new cursor
await cursorService.saveCursor(collection, result.cursor);
// Acknowledge to server (for safe compaction)
await client.mutation(api.ack, {
collection,
peerId,
syncedSeq: result.cursor,
});
// Handle compaction hint
if (result.compact) {
await triggerCompaction(collection, result.compact);
}
});
}
async function triggerCompaction(collection: string, documentId: string) {
const snapshot = Y.encodeStateAsUpdate(ydoc);
const stateVector = Y.encodeStateVector(ydoc);
await client.mutation(api.compact, {
collection,
documentId,
bytes: snapshot,
stateVector,
peerTimeout: 5 * 60 * 1000, // 5 minutes
});
}| Task | Description |
|---|---|
| 6.1 | Update src/server/storage.ts - createStreamQuery uses seq cursor |
| 6.2 | Add createAckMutation for peer acknowledgment |
| 6.3 | Add createCompactMutation for safe compaction |
| 6.4 | Update stream response to include compact hint |
// Before (timestamp-based)
{
changes: [...],
checkpoint: { lastModified: number },
hasMore: boolean,
}
// After (cursor-based with compaction hint)
{
changes: [...],
cursor: number, // Sequence number
hasMore: boolean,
compact?: string, // documentId that needs compaction
}| Task | Description |
|---|---|
| 7.1 | Update src/client/persistence/index.ts - remove indexeddb, export only sqlite + memory |
| 7.2 | Update src/client/index.ts exports |
| 7.3 | Update package.json - remove level dependencies |
| 7.4 | Update tsdown.config.ts if needed |
// src/client/persistence/index.ts
export { sqlitePersistence, type SqliteAdapter } from "./sqlite.js";
export { memoryPersistence } from "./memory.js";
export { SqlJsAdapter } from "./adapters/sqljs.js";
export { OPSqliteAdapter } from "./adapters/opsqlite.js";| Task | Description |
|---|---|
| 8.1 | Update illustrations to use new SQLite-only persistence |
| 8.2 | Verify build works for both browser and React Native targets |
| 8.3 | Test compaction with Y.mergeUpdates() |
| 8.4 | Test peer tracking - verify slow peers don't lose data |
| 8.5 | Test disparity recovery - client behind compacted deltas |
src/client/persistence/sqlite-level.tssrc/client/persistence/indexeddb.ts
src/client/persistence/sqlite.ts(new schema, direct Y.Doc storage)src/client/persistence/index.ts(simplified exports)src/client/services/checkpoint.ts→ rename tocursor.tssrc/component/schema.ts(seq index, peers table, snapshots with stateVector)src/component/public.ts(seq-based queries, ack, compact mutations)src/server/storage.ts(seq-based stream, ack, compact)src/client/collection.ts(cursor sync, peer ack, compaction trigger)
src/client/persistence/memory.tssrc/client/persistence/adapters/sqljs.tssrc/client/persistence/adapters/opsqlite.tssrc/client/services/reconciliation.ts
{
"y-leveldb": "remove",
"abstract-level": "remove",
"browser-level": "remove"
}-
Cursor-based streaming (real-time updates)
- Client: "Give me all changes after seq X"
- Server: Returns changes + new cursor + optional compaction hint
- Efficient for ongoing sync
-
Peer acknowledgment (safe compaction)
- Client: "I've synced up to seq Y"
- Server: Tracks per-peer sync state
- Enables safe delta deletion
-
State vector recovery (cold start / disparity)
- Client: Sends
Y.encodeStateVector(ydoc) - Server: Computes diff using
Y.diffUpdate() - Efficient for catching up after long offline
- Client: Sends
- Cursor ensures we never miss server events (solves
_creationTimeordering problem) - Peer tracking ensures slow/offline peers don't lose data during compaction
- State Vector ensures efficient CRDT reconciliation (no duplicate data transfer)
Together they minimize data loss, support offline-first, and optimize bandwidth.
1. Stream response includes `compact: documentId` when size threshold exceeded
2. Client receives hint, creates snapshot: Y.encodeStateAsUpdate(ydoc)
3. Client sends compact mutation with snapshot + stateVector
4. Server checks active peers' lastSyncedSeq
5. Server only deletes deltas where seq < min(peer.lastSyncedSeq)
6. Slow peers can still recover via snapshots + remaining deltas
- Create
cursorbranch offmain - Remove Level dependencies from package.json
- Delete sqlite-level.ts and indexeddb.ts
- Rewrite sqlite.ts with direct Y.Doc storage
- Add snapshots, updates, kv tables
- Implement loadSnapshot, saveUpdate, compact
- Add
by_seqindex to documents table - Add
peerstable for peer tracking - Update snapshots table with stateVector field
- Implement
ackmutation - Implement
compactmutation with peer-aware safety - Update
streamto return compaction hints - Auto-increment seq in mutations
- Rename Checkpoint → CursorService
- Add peer acknowledgment to sync loop
- Add compaction trigger on server hint
- Generate and persist peerId
- Update createStreamQuery for seq-based cursor
- Add createAckMutation
- Add createCompactMutation
- Update persistence/index.ts exports
- Update client/index.ts exports
- Update illustrations
- Test browser build (sql.js)
- Test React Native compatibility
- Test peer tracking (slow peer doesn't lose data)
- Test disparity recovery (client behind compacted deltas)