Skip to content

Commit 951b010

Browse files
authored
[Postgres + MongoDB] Resumable replication (#163)
* Refactor pgwire types. * Add SnapshotQuery support for postgres again. * Add test again. * WIP: Record replication progress for Postgres. * Avoid promoteBuffers. * MongoDB: Skip tables already replicated. * Refactor MongoDB snapshot queries. * Chunk MongoDB queries by _id. * Fix query typo. * Get a resume token instead of clusterTime for initial snapshot. * Replication logging improvements. * Define a log prefix on child loggers. * Make sure the change stream is always closed. * tryNext(), not hasNext(). * Separate storage for snapshot LSN; fix snapshot resumeToken. * Improve test. * Implement re-replication for postgres storage. * Some test cleanup. * Keepalive after table snapshot to fix schema tests. * Test cleanup. * Add some mongodb replication tests. * Improve postgres replication abort logic and progress reporting. * Split out tests. * Add tests for resuming mongodb snapshots. * Clear mongo storage if we're resuming replication without an LSN. * Correctly persist snapshot progress in postgres storage. * Fix down migrations on first run. * Fix migration tests. * Address copilot comments. * Add changeset. * Add comments addressing review feedback.
1 parent 08b7aa9 commit 951b010

File tree

58 files changed

+2401
-913
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+2401
-913
lines changed

.changeset/perfect-ads-sin.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-module-postgres': minor
5+
'@powersync/service-module-mongodb': minor
6+
'@powersync/service-core': minor
7+
'@powersync/lib-services-framework': minor
8+
'@powersync/service-jpgwire': minor
9+
'@powersync/service-errors': patch
10+
'@powersync/service-module-mysql': patch
11+
---
12+
13+
Implement resuming of initial replication snapshots.

libs/lib-services/src/logger/Logger.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
11
import winston from 'winston';
22

3+
const prefixFormat = winston.format((info) => {
4+
if (info.prefix) {
5+
info.message = `${info.prefix}${info.message}`;
6+
}
7+
return {
8+
...info,
9+
prefix: undefined
10+
};
11+
});
12+
313
export namespace LogFormat {
4-
export const development = winston.format.combine(winston.format.colorize({ level: true }), winston.format.simple());
5-
export const production = winston.format.combine(winston.format.timestamp(), winston.format.json());
14+
export const development = winston.format.combine(
15+
prefixFormat(),
16+
winston.format.colorize({ level: true }),
17+
winston.format.simple()
18+
);
19+
export const production = winston.format.combine(prefixFormat(), winston.format.timestamp(), winston.format.json());
620
}
721

822
export const logger = winston.createLogger();

libs/lib-services/src/migrations/AbstractMigrationAgent.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export abstract class AbstractMigrationAgent<Generics extends MigrationAgentGene
7070
try {
7171
const state = await this.store.load();
7272

73-
logger.info('Running migrations');
73+
logger.info(`Running migrations ${direction}`);
7474
const logStream = this.execute({
7575
direction,
7676
migrations,
@@ -142,6 +142,9 @@ export abstract class AbstractMigrationAgent<Generics extends MigrationAgentGene
142142
) {
143143
index += 1;
144144
}
145+
} else if (params.direction == defs.Direction.Down) {
146+
// Down migration with no state - exclude all migrations
147+
index = migrations.length;
145148
}
146149

147150
migrations = migrations.slice(index);

modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ export class MongoBucketStorage
209209
no_checkpoint_before: null,
210210
keepalive_op: null,
211211
snapshot_done: false,
212+
snapshot_lsn: undefined,
212213
state: storage.SyncRuleState.PROCESSING,
213214
slot_name: slot_name,
214215
last_checkpoint_ts: null,

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 100 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,19 @@ import {
77
container,
88
ErrorCode,
99
errors,
10-
logger,
10+
Logger,
11+
logger as defaultLogger,
1112
ReplicationAssertionError,
1213
ServiceError
1314
} from '@powersync/lib-services-framework';
14-
import { deserializeBson, InternalOpId, SaveOperationTag, storage, utils } from '@powersync/service-core';
15+
import {
16+
BucketStorageMarkRecordUnavailable,
17+
deserializeBson,
18+
InternalOpId,
19+
SaveOperationTag,
20+
storage,
21+
utils
22+
} from '@powersync/service-core';
1523
import * as timers from 'node:timers/promises';
1624
import { PowerSyncMongo } from './db.js';
1725
import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js';
@@ -46,12 +54,18 @@ export interface MongoBucketBatchOptions {
4654
* Set to true for initial replication.
4755
*/
4856
skipExistingRows: boolean;
57+
58+
markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
59+
60+
logger?: Logger;
4961
}
5062

5163
export class MongoBucketBatch
5264
extends BaseObserver<storage.BucketBatchStorageListener>
5365
implements storage.BucketStorageBatch
5466
{
67+
private logger: Logger;
68+
5569
private readonly client: mongo.MongoClient;
5670
public readonly db: PowerSyncMongo;
5771
public readonly session: mongo.ClientSession;
@@ -65,6 +79,7 @@ export class MongoBucketBatch
6579

6680
private batch: OperationBatch | null = null;
6781
private write_checkpoint_batch: storage.CustomWriteCheckpointOptions[] = [];
82+
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
6883

6984
/**
7085
* Last LSN received associated with a checkpoint.
@@ -86,6 +101,7 @@ export class MongoBucketBatch
86101

87102
constructor(options: MongoBucketBatchOptions) {
88103
super();
104+
this.logger = options.logger ?? defaultLogger;
89105
this.client = options.db.client;
90106
this.db = options.db;
91107
this.group_id = options.groupId;
@@ -96,6 +112,7 @@ export class MongoBucketBatch
96112
this.sync_rules = options.syncRules;
97113
this.storeCurrentData = options.storeCurrentData;
98114
this.skipExistingRows = options.skipExistingRows;
115+
this.markRecordUnavailable = options.markRecordUnavailable;
99116
this.batch = new OperationBatch();
100117

101118
this.persisted_op = options.keepaliveOp ?? null;
@@ -232,7 +249,9 @@ export class MongoBucketBatch
232249
current_data_lookup.set(cacheKey(doc._id.t, doc._id.k), doc);
233250
}
234251

235-
let persistedBatch: PersistedBatch | null = new PersistedBatch(this.group_id, transactionSize);
252+
let persistedBatch: PersistedBatch | null = new PersistedBatch(this.group_id, transactionSize, {
253+
logger: this.logger
254+
});
236255

237256
for (let op of b) {
238257
if (resumeBatch) {
@@ -311,11 +330,18 @@ export class MongoBucketBatch
311330
// Not an error if we re-apply a transaction
312331
existing_buckets = [];
313332
existing_lookups = [];
314-
// Log to help with debugging if there was a consistency issue
315333
if (this.storeCurrentData) {
316-
logger.warn(
317-
`Cannot find previous record for update on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
318-
);
334+
if (this.markRecordUnavailable != null) {
335+
// This will trigger a "resnapshot" of the record.
336+
// This is not relevant if storeCurrentData is false, since we'll get the full row
337+
// directly in the replication stream.
338+
this.markRecordUnavailable(record);
339+
} else {
340+
// Log to help with debugging if there was a consistency issue
341+
this.logger.warn(
342+
`Cannot find previous record for update on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
343+
);
344+
}
319345
}
320346
} else {
321347
existing_buckets = result.buckets;
@@ -332,8 +358,8 @@ export class MongoBucketBatch
332358
existing_buckets = [];
333359
existing_lookups = [];
334360
// Log to help with debugging if there was a consistency issue
335-
if (this.storeCurrentData) {
336-
logger.warn(
361+
if (this.storeCurrentData && this.markRecordUnavailable == null) {
362+
this.logger.warn(
337363
`Cannot find previous record for delete on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
338364
);
339365
}
@@ -430,7 +456,7 @@ export class MongoBucketBatch
430456
}
431457
}
432458
);
433-
logger.error(
459+
this.logger.error(
434460
`Failed to evaluate data query on ${record.sourceTable.qualifiedName}.${record.after?.id}: ${error.error}`
435461
);
436462
}
@@ -470,7 +496,7 @@ export class MongoBucketBatch
470496
}
471497
}
472498
);
473-
logger.error(
499+
this.logger.error(
474500
`Failed to evaluate parameter query on ${record.sourceTable.qualifiedName}.${after.id}: ${error.error}`
475501
);
476502
}
@@ -524,7 +550,7 @@ export class MongoBucketBatch
524550
if (e instanceof mongo.MongoError && e.hasErrorLabel('TransientTransactionError')) {
525551
// Likely write conflict caused by concurrent write stream replicating
526552
} else {
527-
logger.warn('Transaction error', e as Error);
553+
this.logger.warn('Transaction error', e as Error);
528554
}
529555
await timers.setTimeout(Math.random() * 50);
530556
throw e;
@@ -549,7 +575,7 @@ export class MongoBucketBatch
549575
await this.withTransaction(async () => {
550576
flushTry += 1;
551577
if (flushTry % 10 == 0) {
552-
logger.info(`${this.slot_name} ${description} - try ${flushTry}`);
578+
this.logger.info(`${description} - try ${flushTry}`);
553579
}
554580
if (flushTry > 20 && Date.now() > lastTry) {
555581
throw new ServiceError(ErrorCode.PSYNC_S1402, 'Max transaction tries exceeded');
@@ -619,13 +645,13 @@ export class MongoBucketBatch
619645
if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
620646
// When re-applying transactions, don't create a new checkpoint until
621647
// we are past the last transaction.
622-
logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`);
648+
this.logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`);
623649
// Cannot create a checkpoint yet - return false
624650
return false;
625651
}
626652
if (lsn < this.no_checkpoint_before_lsn) {
627653
if (Date.now() - this.lastWaitingLogThottled > 5_000) {
628-
logger.info(
654+
this.logger.info(
629655
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
630656
);
631657
this.lastWaitingLogThottled = Date.now();
@@ -677,7 +703,8 @@ export class MongoBucketBatch
677703
_id: this.group_id
678704
},
679705
{
680-
$set: update
706+
$set: update,
707+
$unset: { snapshot_lsn: 1 }
681708
},
682709
{ session: this.session }
683710
);
@@ -699,7 +726,7 @@ export class MongoBucketBatch
699726
if (this.persisted_op != null) {
700727
// The commit may have been skipped due to "no_checkpoint_before_lsn".
701728
// Apply it now if relevant
702-
logger.info(`Commit due to keepalive at ${lsn} / ${this.persisted_op}`);
729+
this.logger.info(`Commit due to keepalive at ${lsn} / ${this.persisted_op}`);
703730
return await this.commit(lsn);
704731
}
705732

@@ -713,7 +740,8 @@ export class MongoBucketBatch
713740
snapshot_done: true,
714741
last_fatal_error: null,
715742
last_keepalive_ts: new Date()
716-
}
743+
},
744+
$unset: { snapshot_lsn: 1 }
717745
},
718746
{ session: this.session }
719747
);
@@ -722,6 +750,22 @@ export class MongoBucketBatch
722750
return true;
723751
}
724752

753+
async setSnapshotLsn(lsn: string): Promise<void> {
754+
const update: Partial<SyncRuleDocument> = {
755+
snapshot_lsn: lsn
756+
};
757+
758+
await this.db.sync_rules.updateOne(
759+
{
760+
_id: this.group_id
761+
},
762+
{
763+
$set: update
764+
},
765+
{ session: this.session }
766+
);
767+
}
768+
725769
async save(record: storage.SaveOptions): Promise<storage.FlushedResult | null> {
726770
const { after, before, sourceTable, tag } = record;
727771
for (const event of this.getTableEvents(sourceTable)) {
@@ -746,7 +790,7 @@ export class MongoBucketBatch
746790
return null;
747791
}
748792

749-
logger.debug(`Saving ${record.tag}:${record.before?.id}/${record.after?.id}`);
793+
this.logger.debug(`Saving ${record.tag}:${record.before?.id}/${record.after?.id}`);
750794

751795
this.batch ??= new OperationBatch();
752796
this.batch.push(new RecordOperation(record));
@@ -817,7 +861,7 @@ export class MongoBucketBatch
817861
session: session
818862
});
819863
const batch = await cursor.toArray();
820-
const persistedBatch = new PersistedBatch(this.group_id, 0);
864+
const persistedBatch = new PersistedBatch(this.group_id, 0, { logger: this.logger });
821865

822866
for (let value of batch) {
823867
persistedBatch.saveBucketData({
@@ -847,6 +891,37 @@ export class MongoBucketBatch
847891
return last_op!;
848892
}
849893

894+
async updateTableProgress(
895+
table: storage.SourceTable,
896+
progress: Partial<storage.TableSnapshotStatus>
897+
): Promise<storage.SourceTable> {
898+
const copy = table.clone();
899+
const snapshotStatus = {
900+
totalEstimatedCount: progress.totalEstimatedCount ?? copy.snapshotStatus?.totalEstimatedCount ?? 0,
901+
replicatedCount: progress.replicatedCount ?? copy.snapshotStatus?.replicatedCount ?? 0,
902+
lastKey: progress.lastKey ?? copy.snapshotStatus?.lastKey ?? null
903+
};
904+
copy.snapshotStatus = snapshotStatus;
905+
906+
await this.withTransaction(async () => {
907+
await this.db.source_tables.updateOne(
908+
{ _id: table.id },
909+
{
910+
$set: {
911+
snapshot_status: {
912+
last_key: snapshotStatus.lastKey == null ? null : new bson.Binary(snapshotStatus.lastKey),
913+
total_estimated_count: snapshotStatus.totalEstimatedCount,
914+
replicated_count: snapshotStatus.replicatedCount
915+
}
916+
}
917+
},
918+
{ session: this.session }
919+
);
920+
});
921+
922+
return copy;
923+
}
924+
850925
async markSnapshotDone(tables: storage.SourceTable[], no_checkpoint_before_lsn: string) {
851926
const session = this.session;
852927
const ids = tables.map((table) => table.id);
@@ -857,6 +932,9 @@ export class MongoBucketBatch
857932
{
858933
$set: {
859934
snapshot_done: true
935+
},
936+
$unset: {
937+
snapshot_status: 1
860938
}
861939
},
862940
{ session }
@@ -880,17 +958,8 @@ export class MongoBucketBatch
880958
}
881959
});
882960
return tables.map((table) => {
883-
const copy = new storage.SourceTable(
884-
table.id,
885-
table.connectionTag,
886-
table.objectId,
887-
table.schema,
888-
table.table,
889-
table.replicaIdColumns,
890-
table.snapshotComplete
891-
);
892-
copy.syncData = table.syncData;
893-
copy.syncParameters = table.syncParameters;
961+
const copy = table.clone();
962+
copy.snapshotComplete = true;
894963
return copy;
895964
});
896965
}

0 commit comments

Comments
 (0)