Skip to content

Commit e3a9343

Browse files
authored
[Postgres] Fix slot recovery & improve log output (#162)
* Fix replication slot recovery. * Reduce log noise for "Waiting until ... before creating checkpoint". * Improved logs for clearing data. * Remove .only. * Add changesets.
1 parent b3e28f7 commit e3a9343

File tree

7 files changed

+93
-8
lines changed

7 files changed

+93
-8
lines changed

.changeset/chatty-boxes-repeat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
---
4+
5+
Fix replication slot recovery

.changeset/serious-rivers-sin.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-core': patch
3+
---
4+
5+
Reduce noise in log output

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,15 @@ export class WalStream {
192192
if (slotExists) {
193193
// This checks that the slot is still valid
194194
const r = await this.checkReplicationSlot();
195+
if (snapshotDone && r.needsNewSlot) {
196+
// We keep the current snapshot, and create a new replication slot
197+
throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`);
198+
}
199+
// We can have:
200+
// needsInitialSync: true, needsNewSlot: true -> initial sync from scratch
201+
// needsInitialSync: true, needsNewSlot: false -> resume initial sync
202+
// needsInitialSync: false, needsNewSlot: true -> handled above
203+
// needsInitialSync: false, needsNewSlot: false -> resume streaming replication
195204
return {
196205
needsInitialSync: !snapshotDone,
197206
needsNewSlot: r.needsNewSlot
@@ -204,7 +213,7 @@ export class WalStream {
204213
/**
205214
* If a replication slot exists, check that it is healthy.
206215
*/
207-
private async checkReplicationSlot(): Promise<InitResult> {
216+
private async checkReplicationSlot(): Promise<{ needsNewSlot: boolean }> {
208217
let last_error = null;
209218
const slotName = this.slot_name;
210219

@@ -244,7 +253,7 @@ export class WalStream {
244253

245254
// Success
246255
logger.info(`Slot ${slotName} appears healthy`);
247-
return { needsInitialSync: false, needsNewSlot: false };
256+
return { needsNewSlot: false };
248257
} catch (e) {
249258
last_error = e;
250259
logger.warn(`${slotName} Replication slot error`, e);
@@ -274,9 +283,9 @@ export class WalStream {
274283
// Sample: publication "powersync" does not exist
275284
// Happens when publication deleted or never created.
276285
// Slot must be re-created in this case.
277-
logger.info(`${slotName} does not exist anymore, will create new slot`);
286+
logger.info(`${slotName} is not valid anymore`);
278287

279-
return { needsInitialSync: true, needsNewSlot: true };
288+
return { needsNewSlot: true };
280289
}
281290
// Try again after a pause
282291
await new Promise((resolve) => setTimeout(resolve, 1000));

modules/module-postgres/test/src/wal_stream.test.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { pgwireRows } from '@powersync/service-jpgwire';
55
import * as crypto from 'crypto';
66
import { describe, expect, test } from 'vitest';
77
import { WalStreamTestContext } from './wal_stream_utils.js';
8+
import { MissingReplicationSlotError } from '@module/replication/WalStream.js';
89

910
type StorageFactory = () => Promise<BucketStorageFactory>;
1011

@@ -291,4 +292,52 @@ bucket_definitions:
291292
expect(endRowCount - startRowCount).toEqual(0);
292293
expect(endTxCount - startTxCount).toEqual(1);
293294
});
295+
296+
test('reporting slot issues', async () => {
297+
{
298+
await using context = await WalStreamTestContext.open(factory);
299+
const { pool } = context;
300+
await context.updateSyncRules(`
301+
bucket_definitions:
302+
global:
303+
data:
304+
- SELECT id, description FROM "test_data"`);
305+
306+
await pool.query(
307+
`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)`
308+
);
309+
await pool.query(
310+
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
311+
);
312+
await context.replicateSnapshot();
313+
await context.startStreaming();
314+
315+
const data = await context.getBucketData('global[]');
316+
317+
expect(data).toMatchObject([
318+
putOp('test_data', {
319+
id: '8133cd37-903b-4937-a022-7c8294015a3a',
320+
description: 'test1'
321+
})
322+
]);
323+
324+
expect(await context.storage!.getStatus()).toMatchObject({ active: true, snapshot_done: true });
325+
}
326+
327+
{
328+
await using context = await WalStreamTestContext.open(factory, { doNotClear: true });
329+
const { pool } = context;
330+
await pool.query('DROP PUBLICATION powersync');
331+
await pool.query(`UPDATE test_data SET description = 'updated'`);
332+
await pool.query('CREATE PUBLICATION powersync FOR ALL TABLES');
333+
334+
await context.loadActiveSyncRules();
335+
await expect(async () => {
336+
await context.replicateSnapshot();
337+
}).rejects.toThrowError(MissingReplicationSlotError);
338+
339+
// The error is handled on a higher level, which triggers
340+
// creating a new replication slot.
341+
}
342+
});
294343
}

modules/module-postgres/test/src/wal_stream_utils.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@ export class WalStreamTestContext implements AsyncDisposable {
7777
return this.storage!;
7878
}
7979

80+
async loadActiveSyncRules() {
81+
const syncRules = await this.factory.getActiveSyncRulesContent();
82+
if (syncRules == null) {
83+
throw new Error(`Active sync rules not available`);
84+
}
85+
86+
this.storage = this.factory.getInstance(syncRules);
87+
return this.storage!;
88+
}
89+
8090
get walStream() {
8191
if (this.storage == null) {
8292
throw new Error('updateSyncRules() first');

packages/service-core/src/storage/mongo/MongoBucketBatch.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,8 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
609609
super[Symbol.dispose]();
610610
}
611611

612+
private lastWaitingLogThottled = 0;
613+
612614
async commit(lsn: string): Promise<boolean> {
613615
await this.flush();
614616

@@ -619,9 +621,12 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
619621
return false;
620622
}
621623
if (lsn < this.no_checkpoint_before_lsn) {
622-
logger.info(
623-
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
624-
);
624+
if (Date.now() - this.lastWaitingLogThottled > 5_000) {
625+
logger.info(
626+
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
627+
);
628+
this.lastWaitingLogThottled = Date.now();
629+
}
625630

626631
// Edge case: During initial replication, we have a no_checkpoint_before_lsn set,
627632
// and don't actually commit the snapshot.

packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,11 +522,13 @@ export class MongoSyncBucketStorage
522522
while (true) {
523523
try {
524524
await this.clearIteration();
525+
526+
logger.info(`${this.slot_name} Done clearing data`);
525527
return;
526528
} catch (e: unknown) {
527529
if (e instanceof mongo.MongoServerError && e.codeName == 'MaxTimeMSExpired') {
528530
logger.info(
529-
`Clearing took longer than ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, waiting and triggering another iteration.`
531+
`${this.slot_name} Cleared batch of data in ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, continuing...`
530532
);
531533
await timers.setTimeout(db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5);
532534
continue;

0 commit comments

Comments
 (0)