Skip to content

[Postgres] Fix slot recovery & improve log output #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chatty-boxes-repeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-postgres': patch
---

Fix replication slot recovery
5 changes: 5 additions & 0 deletions .changeset/serious-rivers-sin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Reduce noise in log output
17 changes: 13 additions & 4 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ export class WalStream {
if (slotExists) {
// This checks that the slot is still valid
const r = await this.checkReplicationSlot();
if (snapshotDone && r.needsNewSlot) {
// We keep the current snapshot, and create a new replication slot
throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`);
}
// We can have:
// needsInitialSync: true, needsNewSlot: true -> initial sync from scratch
// needsInitialSync: true, needsNewSlot: false -> resume initial sync
// needsInitialSync: false, needsNewSlot: true -> handled above
// needsInitialSync: false, needsNewSlot: false -> resume streaming replication
return {
needsInitialSync: !snapshotDone,
needsNewSlot: r.needsNewSlot
Expand All @@ -204,7 +213,7 @@ export class WalStream {
/**
* If a replication slot exists, check that it is healthy.
*/
private async checkReplicationSlot(): Promise<InitResult> {
private async checkReplicationSlot(): Promise<{ needsNewSlot: boolean }> {
let last_error = null;
const slotName = this.slot_name;

Expand Down Expand Up @@ -244,7 +253,7 @@ export class WalStream {

// Success
logger.info(`Slot ${slotName} appears healthy`);
return { needsInitialSync: false, needsNewSlot: false };
return { needsNewSlot: false };
} catch (e) {
last_error = e;
logger.warn(`${slotName} Replication slot error`, e);
Expand Down Expand Up @@ -274,9 +283,9 @@ export class WalStream {
// Sample: publication "powersync" does not exist
// Happens when publication deleted or never created.
// Slot must be re-created in this case.
logger.info(`${slotName} does not exist anymore, will create new slot`);
logger.info(`${slotName} is not valid anymore`);

return { needsInitialSync: true, needsNewSlot: true };
return { needsNewSlot: true };
}
// Try again after a pause
await new Promise((resolve) => setTimeout(resolve, 1000));
Expand Down
49 changes: 49 additions & 0 deletions modules/module-postgres/test/src/wal_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { pgwireRows } from '@powersync/service-jpgwire';
import * as crypto from 'crypto';
import { describe, expect, test } from 'vitest';
import { WalStreamTestContext } from './wal_stream_utils.js';
import { MissingReplicationSlotError } from '@module/replication/WalStream.js';

type StorageFactory = () => Promise<BucketStorageFactory>;

Expand Down Expand Up @@ -291,4 +292,52 @@ bucket_definitions:
expect(endRowCount - startRowCount).toEqual(0);
expect(endTxCount - startTxCount).toEqual(1);
});

test('reporting slot issues', async () => {
{
await using context = await WalStreamTestContext.open(factory);
const { pool } = context;
await context.updateSyncRules(`
bucket_definitions:
global:
data:
- SELECT id, description FROM "test_data"`);

await pool.query(
`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)`
);
await pool.query(
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
);
await context.replicateSnapshot();
await context.startStreaming();

const data = await context.getBucketData('global[]');

expect(data).toMatchObject([
putOp('test_data', {
id: '8133cd37-903b-4937-a022-7c8294015a3a',
description: 'test1'
})
]);

expect(await context.storage!.getStatus()).toMatchObject({ active: true, snapshot_done: true });
}

{
await using context = await WalStreamTestContext.open(factory, { doNotClear: true });
const { pool } = context;
await pool.query('DROP PUBLICATION powersync');
await pool.query(`UPDATE test_data SET description = 'updated'`);
await pool.query('CREATE PUBLICATION powersync FOR ALL TABLES');

await context.loadActiveSyncRules();
await expect(async () => {
await context.replicateSnapshot();
}).rejects.toThrowError(MissingReplicationSlotError);

// The error is handled on a higher level, which triggers
// creating a new replication slot.
}
});
}
10 changes: 10 additions & 0 deletions modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ export class WalStreamTestContext implements AsyncDisposable {
return this.storage!;
}

async loadActiveSyncRules() {
const syncRules = await this.factory.getActiveSyncRulesContent();
if (syncRules == null) {
throw new Error(`Active sync rules not available`);
}

this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}

get walStream() {
if (this.storage == null) {
throw new Error('updateSyncRules() first');
Expand Down
11 changes: 8 additions & 3 deletions packages/service-core/src/storage/mongo/MongoBucketBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
super[Symbol.dispose]();
}

private lastWaitingLogThottled = 0;

async commit(lsn: string): Promise<boolean> {
await this.flush();

Expand All @@ -619,9 +621,12 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
return false;
}
if (lsn < this.no_checkpoint_before_lsn) {
logger.info(
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
);
if (Date.now() - this.lastWaitingLogThottled > 5_000) {
logger.info(
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
);
this.lastWaitingLogThottled = Date.now();
}

// Edge case: During initial replication, we have a no_checkpoint_before_lsn set,
// and don't actually commit the snapshot.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,13 @@ export class MongoSyncBucketStorage
while (true) {
try {
await this.clearIteration();

logger.info(`${this.slot_name} Done clearing data`);
return;
} catch (e: unknown) {
if (e instanceof mongo.MongoServerError && e.codeName == 'MaxTimeMSExpired') {
logger.info(
`Clearing took longer than ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, waiting and triggering another iteration.`
`${this.slot_name} Cleared batch of data in ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, continuing...`
);
await timers.setTimeout(db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5);
continue;
Expand Down
Loading