Skip to content

Commit 1907356

Browse files
authored
Minor fixes (#279)
* Fix replication lag logging for postgres replication. * Cleanly stop clearing storage when the process terminates. * Add changeset.
1 parent f9e8673 commit 1907356

File tree

11 files changed

+56
-25
lines changed

11 files changed

+56
-25
lines changed

.changeset/long-dolphins-judge.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-module-postgres': patch
5+
'@powersync/service-module-mongodb': patch
6+
'@powersync/service-core': patch
7+
'@powersync/service-module-mysql': patch
8+
---
9+
10+
Cleanly interrupt clearing of storage when the process is stopped/restarted.

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ export class MongoBucketBatch
129129
return this.last_checkpoint_lsn;
130130
}
131131

132-
async flush(options?: storage.BucketBatchCommitOptions): Promise<storage.FlushedResult | null> {
132+
async flush(options?: storage.BatchBucketFlushOptions): Promise<storage.FlushedResult | null> {
133133
let result: storage.FlushedResult | null = null;
134134
// One flush may be split over multiple transactions.
135135
// Each flushInner() is one transaction.
@@ -142,7 +142,7 @@ export class MongoBucketBatch
142142
return result;
143143
}
144144

145-
private async flushInner(options?: storage.BucketBatchCommitOptions): Promise<storage.FlushedResult | null> {
145+
private async flushInner(options?: storage.BatchBucketFlushOptions): Promise<storage.FlushedResult | null> {
146146
const batch = this.batch;
147147
if (batch == null) {
148148
return null;

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
BaseObserver,
55
ErrorCode,
66
logger,
7+
ReplicationAbortedError,
78
ServiceAssertionError,
89
ServiceError
910
} from '@powersync/lib-services-framework';
@@ -504,7 +505,7 @@ export class MongoSyncBucketStorage
504505
async terminate(options?: storage.TerminateOptions) {
505506
// Default is to clear the storage except when explicitly requested not to.
506507
if (!options || options?.clearStorage) {
507-
await this.clear();
508+
await this.clear(options);
508509
}
509510
await this.db.sync_rules.updateOne(
510511
{
@@ -547,8 +548,11 @@ export class MongoSyncBucketStorage
547548
};
548549
}
549550

550-
async clear(): Promise<void> {
551+
async clear(options?: storage.ClearStorageOptions): Promise<void> {
551552
while (true) {
553+
if (options?.signal?.aborted) {
554+
throw new ReplicationAbortedError('Aborted clearing data');
555+
}
552556
try {
553557
await this.clearIteration();
554558

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ export class ChangeStream {
670670
if (result.needsInitialSync) {
671671
if (result.snapshotLsn == null) {
672672
// Snapshot LSN is not present, so we need to start replication from scratch.
673-
await this.storage.clear();
673+
await this.storage.clear({ signal: this.abort_signal });
674674
}
675675
await this.initialReplication(result.snapshotLsn);
676676
}

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ AND table_type = 'BASE TABLE';`,
288288
* and starts again from scratch.
289289
*/
290290
async startInitialReplication() {
291-
await this.storage.clear();
291+
await this.storage.clear({ signal: this.abortSignal });
292292
// Replication will be performed in a single transaction on this connection
293293
const connection = await this.connections.getStreamingConnection();
294294
const promiseConnection = (connection as mysql.Connection).promise();

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ export class PostgresSyncRulesStorage
567567

568568
async terminate(options?: storage.TerminateOptions) {
569569
if (!options || options?.clearStorage) {
570-
await this.clear();
570+
await this.clear(options);
571571
}
572572
await this.db.sql`
573573
UPDATE sync_rules
@@ -606,7 +606,8 @@ export class PostgresSyncRulesStorage
606606
};
607607
}
608608

609-
async clear(): Promise<void> {
609+
async clear(options?: storage.ClearStorageOptions): Promise<void> {
610+
// TODO: Cleanly abort the cleanup when the provided signal is aborted.
610611
await this.db.sql`
611612
UPDATE sync_rules
612613
SET

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ WHERE oid = $1::regclass`,
431431
// In those cases, we have to start replication from scratch.
432432
// If there is an existing healthy slot, we can skip this and continue
433433
// initial replication where we left off.
434-
await this.storage.clear();
434+
await this.storage.clear({ signal: this.abort_signal });
435435

436436
await db.query({
437437
statement: 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = $1',
@@ -948,7 +948,7 @@ WHERE oid = $1::regclass`,
948948
skipKeepalive = false;
949949
// flush() must be before the resnapshot check - that is
950950
// typically what reports the resnapshot records.
951-
await batch.flush();
951+
await batch.flush({ oldestUncommittedChange: this.oldestUncommittedChange });
952952
// This _must_ be checked after the flush(), and before
953953
// commit() or ack(). We never persist the resnapshot list,
954954
// so we have to process it before marking our progress.

modules/module-postgres/src/replication/WalStreamReplicationJob.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { container, logger } from '@powersync/lib-services-framework';
1+
import { container, logger, ReplicationAbortedError } from '@powersync/lib-services-framework';
22
import { PgManager } from './PgManager.js';
33
import { MissingReplicationSlotError, sendKeepAlive, WalStream } from './WalStream.js';
44

@@ -104,6 +104,10 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
104104
this.lastStream = stream;
105105
await stream.replicate();
106106
} catch (e) {
107+
if (this.isStopped && e instanceof ReplicationAbortedError) {
108+
// Ignore aborted errors
109+
return;
110+
}
107111
this.logger.error(`Replication error`, e);
108112
if (e.cause != null) {
109113
// Example:

packages/service-core/src/replication/AbstractReplicator.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
4848
*/
4949
private activeReplicationJob: T | undefined = undefined;
5050

51-
private stopped = false;
52-
5351
// First ping is only after 5 minutes, not when starting
5452
private lastPing = hrtime.bigint();
5553

54+
private abortController: AbortController | undefined;
55+
5656
protected constructor(private options: AbstractReplicatorOptions) {
5757
this.logger = logger.child({ name: `Replicator:${options.id}` });
5858
}
@@ -85,7 +85,12 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
8585
return this.options.metricsEngine;
8686
}
8787

88+
protected get stopped() {
89+
return this.abortController?.signal.aborted;
90+
}
91+
8892
public async start(): Promise<void> {
93+
this.abortController = new AbortController();
8994
this.runLoop().catch((e) => {
9095
this.logger.error('Data source fatal replication error', e);
9196
container.reporter.captureException(e);
@@ -107,7 +112,7 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
107112
}
108113

109114
public async stop(): Promise<void> {
110-
this.stopped = true;
115+
this.abortController?.abort();
111116
let promises: Promise<void>[] = [];
112117
for (const job of this.replicationJobs.values()) {
113118
promises.push(job.stop());
@@ -241,6 +246,7 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
241246
const stopped = await this.storage.getStoppedSyncRules();
242247
for (let syncRules of stopped) {
243248
try {
249+
// TODO: Do this in the "background", allowing the periodic refresh to continue
244250
const syncRuleStorage = this.storage.getInstance(syncRules, { skipLifecycleHooks: true });
245251
await this.terminateSyncRules(syncRuleStorage);
246252
} catch (e) {
@@ -256,7 +262,7 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst
256262
protected async terminateSyncRules(syncRuleStorage: storage.SyncRulesBucketStorage) {
257263
this.logger.info(`Terminating sync rules: ${syncRuleStorage.group_id}...`);
258264
await this.cleanUp(syncRuleStorage);
259-
await syncRuleStorage.terminate();
265+
await syncRuleStorage.terminate({ signal: this.abortController?.signal, clearStorage: true });
260266
this.logger.info(`Successfully terminated sync rules: ${syncRuleStorage.group_id}`);
261267
}
262268

packages/service-core/src/storage/BucketStorageBatch.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export interface BucketStorageBatch extends ObserverClient<BucketBatchStorageLis
3939
*
4040
* @returns null if there are no changes to flush.
4141
*/
42-
flush(): Promise<FlushedResult | null>;
42+
flush(options?: BatchBucketFlushOptions): Promise<FlushedResult | null>;
4343

4444
/**
4545
* Flush and commit any saved ops. This creates a new checkpoint by default.
@@ -161,13 +161,7 @@ export interface FlushedResult {
161161
flushed_op: InternalOpId;
162162
}
163163

164-
export interface BucketBatchCommitOptions {
165-
/**
166-
* Creates a new checkpoint even if there were no persisted operations.
167-
* Defaults to true.
168-
*/
169-
createEmptyCheckpoints?: boolean;
170-
164+
export interface BatchBucketFlushOptions {
171165
/**
172166
* The timestamp of the first change in this batch, according to the source database.
173167
*
@@ -176,4 +170,12 @@ export interface BucketBatchCommitOptions {
176170
oldestUncommittedChange?: Date | null;
177171
}
178172

173+
export interface BucketBatchCommitOptions extends BatchBucketFlushOptions {
174+
/**
175+
* Creates a new checkpoint even if there were no persisted operations.
176+
* Defaults to true.
177+
*/
178+
createEmptyCheckpoints?: boolean;
179+
}
180+
179181
export type ResolvedBucketBatchCommitOptions = Required<BucketBatchCommitOptions>;

0 commit comments

Comments
 (0)