Skip to content

Add powersync_replication_lag_seconds metric #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/good-years-marry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mysql': minor
'@powersync/service-types': minor
---

Add powersync_replication_lag_seconds metric
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ export class MongoBucketBatch
return this.last_checkpoint_lsn;
}

async flush(): Promise<storage.FlushedResult | null> {
async flush(options?: storage.BucketBatchCommitOptions): Promise<storage.FlushedResult | null> {
let result: storage.FlushedResult | null = null;
// One flush may be split over multiple transactions.
// Each flushInner() is one transaction.
while (this.batch != null) {
let r = await this.flushInner();
let r = await this.flushInner(options);
if (r) {
result = r;
}
Expand All @@ -127,7 +127,7 @@ export class MongoBucketBatch
return result;
}

private async flushInner(): Promise<storage.FlushedResult | null> {
private async flushInner(options?: storage.BucketBatchCommitOptions): Promise<storage.FlushedResult | null> {
const batch = this.batch;
if (batch == null) {
return null;
Expand All @@ -137,7 +137,7 @@ export class MongoBucketBatch
let resumeBatch: OperationBatch | null = null;

await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => {
resumeBatch = await this.replicateBatch(session, batch, opSeq);
resumeBatch = await this.replicateBatch(session, batch, opSeq, options);

last_op = opSeq.last();
});
Expand All @@ -157,7 +157,8 @@ export class MongoBucketBatch
private async replicateBatch(
session: mongo.ClientSession,
batch: OperationBatch,
op_seq: MongoIdSequence
op_seq: MongoIdSequence,
options?: storage.BucketBatchCommitOptions
): Promise<OperationBatch | null> {
let sizes: Map<string, number> | undefined = undefined;
if (this.storeCurrentData && !this.skipExistingRows) {
Expand Down Expand Up @@ -253,7 +254,7 @@ export class MongoBucketBatch
if (persistedBatch!.shouldFlushTransaction()) {
// Transaction is getting big.
// Flush, and resume in a new transaction.
await persistedBatch!.flush(this.db, this.session);
await persistedBatch!.flush(this.db, this.session, options);
persistedBatch = null;
// Computing our current progress is a little tricky here, since
// we're stopping in the middle of a batch.
Expand All @@ -264,7 +265,7 @@ export class MongoBucketBatch

if (persistedBatch) {
transactionSize = persistedBatch.currentSize;
await persistedBatch.flush(this.db, this.session);
await persistedBatch.flush(this.db, this.session, options);
}
}

Expand Down Expand Up @@ -613,12 +614,13 @@ export class MongoBucketBatch
async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise<boolean> {
const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options };

await this.flush();
await this.flush(options);

if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
// When re-applying transactions, don't create a new checkpoint until
// we are past the last transaction.
logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`);
// Cannot create a checkpoint yet - return false
return false;
}
if (lsn < this.no_checkpoint_before_lsn) {
Expand Down Expand Up @@ -647,11 +649,13 @@ export class MongoBucketBatch
{ session: this.session }
);

// Cannot create a checkpoint yet - return false
return false;
}

if (!createEmptyCheckpoints && this.persisted_op == null) {
return false;
// Nothing to commit - also return true
return true;
}

const now = new Date();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
public readonly last_fatal_error: string | null;
public readonly last_keepalive_ts: Date | null;
public readonly last_checkpoint_ts: Date | null;
public readonly active: boolean;

public current_lock: MongoSyncRulesLock | null = null;

Expand All @@ -30,6 +31,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
this.last_fatal_error = doc.last_fatal_error;
this.last_checkpoint_ts = doc.last_checkpoint_ts;
this.last_keepalive_ts = doc.last_keepalive_ts;
this.active = doc.state == 'ACTIVE';
}

parsed(options: storage.ParseSyncRulesOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,23 +270,27 @@ export class PersistedBatch {
);
}

async flush(db: PowerSyncMongo, session: mongo.ClientSession) {
async flush(db: PowerSyncMongo, session: mongo.ClientSession, options?: storage.BucketBatchCommitOptions) {
const startAt = performance.now();
let flushedSomething = false;
if (this.bucketData.length > 0) {
flushedSomething = true;
await db.bucket_data.bulkWrite(this.bucketData, {
session,
// inserts only - order doesn't matter
ordered: false
});
}
if (this.bucketParameters.length > 0) {
flushedSomething = true;
await db.bucket_parameters.bulkWrite(this.bucketParameters, {
session,
// inserts only - order doesn't matter
ordered: false
});
}
if (this.currentData.length > 0) {
flushedSomething = true;
await db.current_data.bulkWrite(this.currentData, {
session,
// may update and delete data within the same batch - order matters
Expand All @@ -295,19 +299,51 @@ export class PersistedBatch {
}

if (this.bucketStates.size > 0) {
flushedSomething = true;
await db.bucket_state.bulkWrite(this.getBucketStateUpdates(), {
session,
// Per-bucket operation - order doesn't matter
ordered: false
});
}

const duration = performance.now() - startAt;
logger.info(
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
this.currentData.length
} updates, ${Math.round(this.currentSize / 1024)}kb in ${duration.toFixed(0)}ms. Last op_id: ${this.debugLastOpId}`
);
if (flushedSomething) {
const duration = Math.round(performance.now() - startAt);
if (options?.oldestUncommittedChange != null) {
const replicationLag = Math.round((Date.now() - options.oldestUncommittedChange.getTime()) / 1000);

logger.info(
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
this.currentData.length
} updates, ${Math.round(this.currentSize / 1024)}kb in ${duration}ms. Last op_id: ${this.debugLastOpId}. Replication lag: ${replicationLag}s`,
{
flushed: {
duration: duration,
size: this.currentSize,
bucket_data_count: this.bucketData.length,
parameter_data_count: this.bucketParameters.length,
current_data_count: this.currentData.length,
replication_lag_seconds: replicationLag
}
}
);
} else {
logger.info(
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
this.currentData.length
} updates, ${Math.round(this.currentSize / 1024)}kb in ${duration}ms. Last op_id: ${this.debugLastOpId}`,
{
flushed: {
duration: duration,
size: this.currentSize,
bucket_data_count: this.bucketData.length,
parameter_data_count: this.bucketParameters.length,
current_data_count: this.currentData.length
}
}
);
}
}

this.bucketData = [];
this.bucketParameters = [];
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
return result;
}

async getReplicationLag(options: api.ReplicationLagOptions): Promise<number | undefined> {
async getReplicationLagBytes(options: api.ReplicationLagOptions): Promise<number | undefined> {
// There is no fast way to get replication lag in bytes in MongoDB.
// We can get replication lag in seconds, but need a different API for that.
return undefined;
Expand Down
61 changes: 54 additions & 7 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
logger,
ReplicationAbortedError,
ReplicationAssertionError,
ServiceAssertionError,
ServiceError
} from '@powersync/lib-services-framework';
import { MetricsEngine, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
Expand All @@ -22,7 +23,7 @@ import {
getMongoRelation,
STANDALONE_CHECKPOINT_ID
} from './MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js';

export interface ChangeStreamOptions {
connections: MongoManager;
Expand Down Expand Up @@ -75,6 +76,17 @@ export class ChangeStream {

private relation_cache = new Map<string | number, storage.SourceTable>();

/**
* Time of the oldest uncommitted change, according to the source db.
* This is used to determine the replication lag.
*/
private oldestUncommittedChange: Date | null = null;
/**
* Keep track of whether we have done a commit or keepalive yet.
* We can only compute replication lag if isStartingReplication == false, or oldestUncommittedChange is present.
*/
private isStartingReplication = true;

private checkpointStreamId = new mongo.ObjectId();

constructor(options: ChangeStreamOptions) {
Expand Down Expand Up @@ -553,10 +565,18 @@ export class ChangeStream {
async (batch) => {
const { lastCheckpointLsn } = batch;
const lastLsn = lastCheckpointLsn ? MongoLSN.fromSerialized(lastCheckpointLsn) : null;
const startAfter = lastLsn?.timestamp;
const resumeAfter = lastLsn?.resumeToken;

logger.info(`${this.logPrefix} Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);
if (lastLsn == null) {
throw new ServiceAssertionError(`No resume timestamp found`);
}
const startAfter = lastLsn.timestamp;
const resumeAfter = lastLsn.resumeToken;
// It is normal for this to be a minute or two old when there is a low volume
// of ChangeStream events.
const tokenAgeSeconds = Math.round((Date.now() - timestampToDate(startAfter).getTime()) / 1000);

logger.info(
`${this.logPrefix} Resume streaming at ${startAfter.inspect()} / ${lastLsn} | Token age: ${tokenAgeSeconds}s`
);

const filters = this.getSourceNamespaceFilters();

Expand Down Expand Up @@ -655,10 +675,16 @@ export class ChangeStream {
// We add an additional check for waitForCheckpointLsn == null, to make sure we're not
// doing a keepalive in the middle of a transaction.
if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) {
const { comparable: lsn } = MongoLSN.fromResumeToken(stream.resumeToken);
const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(stream.resumeToken);
await batch.keepalive(lsn);
await touch();
lastEmptyResume = performance.now();
// Log the token update. This helps as a general "replication is still active" message in the logs.
// This token would typically be around 10s behind.
logger.info(
`${this.logPrefix} Idle change stream. Persisted resumeToken for ${timestampToDate(timestamp).toISOString()}`
);
this.isStartingReplication = false;
}
continue;
}
Expand Down Expand Up @@ -771,7 +797,12 @@ export class ChangeStream {
if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) {
waitForCheckpointLsn = null;
}
await batch.commit(lsn);
const didCommit = await batch.commit(lsn, { oldestUncommittedChange: this.oldestUncommittedChange });

if (didCommit) {
this.oldestUncommittedChange = null;
this.isStartingReplication = false;
}
} else if (
changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
Expand All @@ -790,6 +821,9 @@ export class ChangeStream {
snapshot: true
});
if (table.syncAny) {
if (this.oldestUncommittedChange == null && changeDocument.clusterTime != null) {
this.oldestUncommittedChange = timestampToDate(changeDocument.clusterTime);
}
await this.writeChange(batch, table, changeDocument);
}
} else if (changeDocument.operationType == 'drop') {
Expand Down Expand Up @@ -825,6 +859,19 @@ export class ChangeStream {
}
);
}

async getReplicationLagMillis(): Promise<number | undefined> {
if (this.oldestUncommittedChange == null) {
if (this.isStartingReplication) {
// We don't have anything to compute replication lag with yet.
return undefined;
} else {
// We don't have any uncommitted changes, so replication is up-to-date.
return 0;
}
}
return Date.now() - this.oldestUncommittedChange.getTime();
}
}

async function touch() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { isMongoServerError } from '@powersync/lib-service-mongodb';
import { container } from '@powersync/lib-services-framework';
import { replication } from '@powersync/service-core';

Expand All @@ -11,18 +10,19 @@ export interface ChangeStreamReplicationJobOptions extends replication.AbstractR

export class ChangeStreamReplicationJob extends replication.AbstractReplicationJob {
private connectionFactory: ConnectionManagerFactory;
private lastStream: ChangeStream | null = null;

constructor(options: ChangeStreamReplicationJobOptions) {
super(options);
this.connectionFactory = options.connectionFactory;
}

async cleanUp(): Promise<void> {
// TODO: Implement?
// Nothing needed here
}

async keepAlive() {
// TODO: Implement?
// Nothing needed here
}

private get slotName() {
Expand Down Expand Up @@ -74,6 +74,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
metrics: this.options.metrics,
connections: connectionManager
});
this.lastStream = stream;
await stream.replicate();
} catch (e) {
if (this.abortController.signal.aborted) {
Expand All @@ -98,4 +99,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
await connectionManager.end();
}
}

async getReplicationLagMillis(): Promise<number | undefined> {
return this.lastStream?.getReplicationLagMillis();
}
}
Loading