Skip to content

[MongoDB] Fix replication batching #271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/beige-clouds-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

[MongoDB] Fix replication batching
8 changes: 2 additions & 6 deletions modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';

import { MongoManager } from '../replication/MongoManager.js';
import { constructAfterRecord, createCheckpoint } from '../replication/MongoRelation.js';
import { constructAfterRecord, createCheckpoint, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js';
import * as types from '../types/types.js';
import { escapeRegExp } from '../utils.js';
Expand Down Expand Up @@ -206,10 +206,6 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
return undefined;
}

async getReplicationHead(): Promise<string> {
return createCheckpoint(this.client, this.db);
}

async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
const session = this.client.startSession();
try {
Expand All @@ -224,7 +220,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
// Trigger a change on the changestream.
await this.db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate(
{
_id: 'checkpoint' as any
_id: STANDALONE_CHECKPOINT_ID as any
},
{
$inc: { i: 1 }
Expand Down
58 changes: 47 additions & 11 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
import { escapeRegExp } from '../utils.js';
import { MongoManager } from './MongoManager.js';
import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js';
import {
constructAfterRecord,
createCheckpoint,
getCacheIdentifier,
getMongoRelation,
STANDALONE_CHECKPOINT_ID
} from './MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';

export interface ChangeStreamOptions {
Expand Down Expand Up @@ -69,6 +75,8 @@ export class ChangeStream {

private relation_cache = new Map<string | number, storage.SourceTable>();

private checkpointStreamId = new mongo.ObjectId();

constructor(options: ChangeStreamOptions) {
this.storage = options.storage;
this.metrics = options.metrics;
Expand Down Expand Up @@ -247,6 +255,11 @@ export class ChangeStream {
await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, {
changeStreamPreAndPostImages: { enabled: true }
});
} else {
// Clear the collection on startup, to keep it clean
// We never query this collection directly, and don't want to keep the data around.
// We only use this to get data into the oplog/changestream.
await this.defaultDb.collection(CHECKPOINTS_COLLECTION).deleteMany({});
}
}

Expand Down Expand Up @@ -434,7 +447,7 @@ export class ChangeStream {
await batch.truncate([result.table]);

await this.snapshotTable(batch, result.table);
const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb);
const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID);

const [table] = await batch.markSnapshotDone([result.table], no_checkpoint_before_lsn);
return table;
Expand Down Expand Up @@ -601,7 +614,11 @@ export class ChangeStream {
// Always start with a checkpoint.
// This helps us to clear errors when restarting, even if there is
// no data to replicate.
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);
let waitForCheckpointLsn: string | null = await createCheckpoint(
this.client,
this.defaultDb,
this.checkpointStreamId
);

let splitDocument: mongo.ChangeStreamDocument | null = null;

Expand Down Expand Up @@ -700,13 +717,9 @@ export class ChangeStream {
}
}

if (
(changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace' ||
changeDocument.operationType == 'drop') &&
changeDocument.ns.coll == CHECKPOINTS_COLLECTION
) {
const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined;

if (ns?.coll == CHECKPOINTS_COLLECTION) {
/**
* Dropping the database does not provide an `invalidate` event.
* We typically would receive `drop` events for the collection which we
Expand All @@ -727,6 +740,29 @@ export class ChangeStream {
);
}

if (
!(
changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace'
)
) {
continue;
}

// We handle two types of checkpoint events:
// 1. "Standalone" checkpoints, typically write checkpoints. We want to process these
// immediately, regardless of where they were created.
// 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate
// limiting of commits, so we specifically want to exclude checkpoints from other streams.
//
// It may be useful to also throttle commits due to standalone checkpoints in the future.
// However, these typically have a much lower rate than batch checkpoints, so we don't do that for now.

const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId;
if (!(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(checkpointId))) {
continue;
}
const { comparable: lsn } = new MongoLSN({
timestamp: changeDocument.clusterTime!,
resume_token: changeDocument._id
Expand All @@ -743,7 +779,7 @@ export class ChangeStream {
changeDocument.operationType == 'delete'
) {
if (waitForCheckpointLsn == null) {
waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb);
waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId);
}
const rel = getMongoRelation(changeDocument.ns);
const table = await this.getRelation(batch, rel, {
Expand Down
22 changes: 17 additions & 5 deletions modules/module-mongodb/src/replication/MongoRelation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,27 @@ function filterJsonData(data: any, depth = 0): any {
}
}

export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): Promise<string> {
/**
* Id for checkpoints not associated with any specific replication stream.
*
* Use this for write checkpoints, or any other case where we want to process
* the checkpoint immediately, and not wait for batching.
*/
export const STANDALONE_CHECKPOINT_ID = '_standalone_checkpoint';

export async function createCheckpoint(
client: mongo.MongoClient,
db: mongo.Db,
id: mongo.ObjectId | string
): Promise<string> {
const session = client.startSession();
try {
// Note: If multiple PowerSync instances are replicating the same source database,
// they'll modify the same checkpoint document. This is fine - it could create
// more replication load than required, but won't break anything.
// We use an unique id per process, and clear documents on startup.
// This is so that we can filter events for our own process only, and ignore
// events from other processes.
await db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate(
{
_id: 'checkpoint' as any
_id: id as any
},
{
$inc: { i: 1 }
Expand Down
4 changes: 2 additions & 2 deletions modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';

import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js';
import { MongoManager } from '@module/replication/MongoManager.js';
import { createCheckpoint } from '@module/replication/MongoRelation.js';
import { createCheckpoint, STANDALONE_CHECKPOINT_ID } from '@module/replication/MongoRelation.js';
import { NormalizedMongoConnectionConfig } from '@module/types/types.js';

import { TEST_CONNECTION_OPTIONS, clearTestDb } from './util.js';
Expand Down Expand Up @@ -160,7 +160,7 @@ export async function getClientCheckpoint(
options?: { timeout?: number }
): Promise<InternalOpId> {
const start = Date.now();
const lsn = await createCheckpoint(client, db);
const lsn = await createCheckpoint(client, db, STANDALONE_CHECKPOINT_ID);
// This old API needs a persisted checkpoint id.
// Since we don't use LSNs anymore, the only way to get that is to wait.

Expand Down
5 changes: 0 additions & 5 deletions packages/service-core/src/api/RouteAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ export interface RouteAPI {
*/
getReplicationLag(options: ReplicationLagOptions): Promise<number | undefined>;

/**
* Get the current LSN or equivalent replication HEAD position identifier
*/
getReplicationHead(): Promise<string>;

/**
* Get the current LSN or equivalent replication HEAD position identifier.
*
Expand Down