Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { migrations } from '@powersync/service-core';
import * as storage from '../../../storage/storage-index.js';
import { MongoStorageConfig } from '../../../types/types.js';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.sync_rules.updateMany(
{ storage_version: { $exists: false } },
{ $set: { storage_version: storage.LEGACY_STORAGE_VERSION } }
);
} finally {
await db.client.close();
}
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;

const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.sync_rules.updateMany(
{ storage_version: storage.LEGACY_STORAGE_VERSION },
{ $unset: { storage_version: 1 } }
);
} finally {
await db.client.close();
}
};
18 changes: 15 additions & 3 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import * as lib_mongo from '@powersync/lib-service-mongodb';
import { mongo } from '@powersync/lib-service-mongodb';

import { PowerSyncMongo } from './implementation/db.js';
import { SyncRuleDocument } from './implementation/models.js';
import { CURRENT_STORAGE_VERSION, STORAGE_VERSION_CONFIG, SyncRuleDocument } from './implementation/models.js';
import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js';
import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from './implementation/MongoSyncBucketStorage.js';
import { generateSlotName } from '../utils/util.js';
import { MongoChecksumOptions } from './implementation/MongoChecksums.js';

export interface MongoBucketStorageOptions {
checksumOptions?: Omit<MongoChecksumOptions, 'storageConfig'>;
}

export class MongoBucketStorage
extends BaseObserver<storage.BucketStorageFactoryListener>
Expand All @@ -32,7 +37,7 @@ export class MongoBucketStorage
options: {
slot_name_prefix: string;
},
private internalOptions?: MongoSyncBucketStorageOptions
private internalOptions?: MongoBucketStorageOptions
) {
super();
this.client = db.client;
Expand All @@ -50,10 +55,15 @@ export class MongoBucketStorage
if ((typeof id as any) == 'bigint') {
id = Number(id);
}
const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name, undefined, this.internalOptions);
const storageConfig = (syncRules as MongoPersistedSyncRulesContent).getStorageConfig();
const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name, undefined, {
...this.internalOptions,
storageConfig
});
if (!options?.skipLifecycleHooks) {
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
}

storage.registerListener({
batchStarted: (batch) => {
batch.registerListener({
Expand Down Expand Up @@ -205,8 +215,10 @@ export class MongoBucketStorage
const id = Number(id_doc!.op_id);
const slot_name = generateSlotName(this.slot_name_prefix, id);

const storageVersion = options.storageVersion ?? CURRENT_STORAGE_VERSION;
const doc: SyncRuleDocument = {
_id: id,
storage_version: storageVersion,
content: options.content,
last_checkpoint: null,
last_checkpoint_lsn: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
PartialOrFullChecksum
} from '@powersync/service-core';
import { PowerSyncMongo } from './db.js';
import { StorageConfig } from './models.js';

/**
* Checksum calculation options, primarily for tests.
Expand All @@ -27,6 +28,8 @@ export interface MongoChecksumOptions {
* Limit on the number of documents to calculate a checksum on at a time.
*/
operationBatchLimit?: number;

storageConfig: StorageConfig;
}

const DEFAULT_BUCKET_BATCH_LIMIT = 200;
Expand All @@ -43,12 +46,15 @@ const DEFAULT_OPERATION_BATCH_LIMIT = 50_000;
*/
export class MongoChecksums {
private _cache: ChecksumCache | undefined;
private readonly storageConfig: StorageConfig;

constructor(
private db: PowerSyncMongo,
private group_id: number,
private options?: MongoChecksumOptions
) {}
private options: MongoChecksumOptions
) {
this.storageConfig = options.storageConfig;
}

/**
* Lazy-instantiated cache.
Expand Down Expand Up @@ -222,6 +228,11 @@ export class MongoChecksums {
});
}

// Historically, checksum may be stored as 'int' or 'double'.
// More recently, this should be a 'long'.
// $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations.
const checksumLong = this.storageConfig.longChecksums ? '$checksum' : { $toLong: '$checksum' };

// Aggregate over a max of `batchLimit` operations at a time.
// Let's say we have 3 buckets (A, B, C), each with 10 operations, and our batch limit is 12.
// Then we'll do three batches:
Expand All @@ -245,10 +256,7 @@ export class MongoChecksums {
{
$group: {
_id: '$_id.b',
// Historically, checksum may be stored as 'int' or 'double'.
// More recently, this should be a 'long'.
// $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations.
checksum_total: { $sum: { $toLong: '$checksum' } },
checksum_total: { $sum: checksumLong },
count: { $sum: 1 },
has_clear_op: {
$max: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,40 @@
import { SyncConfigWithErrors, HydratedSyncRules, versionedHydrationState } from '@powersync/service-sync-rules';
import {
CompatibilityOption,
HydratedSyncRules,
SyncConfigWithErrors,
versionedHydrationState
} from '@powersync/service-sync-rules';

import { storage } from '@powersync/service-core';
import { DEFAULT_HYDRATION_STATE, HydrationState } from '@powersync/service-sync-rules/src/HydrationState.js';
import { StorageConfig } from './models.js';

export class MongoPersistedSyncRules implements storage.PersistedSyncRules {
public readonly slot_name: string;
public readonly hydrationState: HydrationState;

constructor(
public readonly id: number,
public readonly sync_rules: SyncConfigWithErrors,
public readonly checkpoint_lsn: string | null,
slot_name: string | null
slot_name: string | null,
public readonly storageConfig: StorageConfig
) {
this.slot_name = slot_name ?? `powersync_${id}`;

if (
storageConfig.versionedBuckets ||
this.sync_rules.config.compatibility.isEnabled(CompatibilityOption.versionedBucketIds)
) {
// For new sync config versions (using the new storage version), we always enable versioned bucket names.
// For older versions, this depends on the compatibility option.
this.hydrationState = versionedHydrationState(this.id);
} else {
this.hydrationState = DEFAULT_HYDRATION_STATE;
}
}

hydratedSyncRules(): HydratedSyncRules {
return this.sync_rules.config.hydrate({ hydrationState: versionedHydrationState(this.id) });
return this.sync_rules.config.hydrate({ hydrationState: this.hydrationState });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { SqlSyncRules } from '@powersync/service-sync-rules';
import { MongoPersistedSyncRules } from './MongoPersistedSyncRules.js';
import { MongoSyncRulesLock } from './MongoSyncRulesLock.js';
import { PowerSyncMongo } from './db.js';
import { SyncRuleDocument } from './models.js';
import { LEGACY_STORAGE_VERSION, STORAGE_VERSION_CONFIG, SyncRuleDocument } from './models.js';
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';

export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRulesContent {
public readonly slot_name: string;
Expand All @@ -17,6 +18,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
public readonly last_keepalive_ts: Date | null;
public readonly last_checkpoint_ts: Date | null;
public readonly active: boolean;
public readonly storage_version: number;

public current_lock: MongoSyncRulesLock | null = null;

Expand All @@ -34,14 +36,32 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
this.last_checkpoint_ts = doc.last_checkpoint_ts;
this.last_keepalive_ts = doc.last_keepalive_ts;
this.active = doc.state == 'ACTIVE';
this.storage_version = doc.storage_version ?? LEGACY_STORAGE_VERSION;
}

/**
* Load the storage config.
*
* This may throw if the persisted storage version is not supported.
*/
getStorageConfig() {
const storageConfig = STORAGE_VERSION_CONFIG[this.storage_version];
if (storageConfig == null) {
throw new ServiceError(
ErrorCode.PSYNC_S1403,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the flow be if a user did downgrade the service and this has been reached?

Would we always recommend performing a sync rules change when downgrading the service? Otherwise, it seems like a downgrade would essentially take-down the instance for both replication and api services (if I understand this correctly)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - a downgrade would take down the instance. I feel that's better than attempting to continue, which could result in obscure errors or even silent consistency issues.

I added a section in the PR description on the available downgrade options.

`Unsupported storage version ${this.storage_version} for sync rules ${this.id}`
);
}
return storageConfig;
}

parsed(options: storage.ParseSyncRulesOptions) {
return new MongoPersistedSyncRules(
this.id,
SqlSyncRules.fromYaml(this.sync_rules_content, options),
this.last_checkpoint_lsn,
this.slot_name
this.slot_name,
this.getStorageConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,23 @@ import * as timers from 'timers/promises';
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';
import { MongoBucketStorage } from '../MongoBucketStorage.js';
import { PowerSyncMongo } from './db.js';
import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js';
import {
BucketDataDocument,
BucketDataKey,
BucketStateDocument,
SourceKey,
SourceTableDocument,
StorageConfig
} from './models.js';
import { MongoBucketBatch } from './MongoBucketBatch.js';
import { MongoChecksumOptions, MongoChecksums } from './MongoChecksums.js';
import { MongoCompactor } from './MongoCompactor.js';
import { MongoParameterCompactor } from './MongoParameterCompactor.js';
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';

export interface MongoSyncBucketStorageOptions {
checksumOptions?: MongoChecksumOptions;
checksumOptions?: Omit<MongoChecksumOptions, 'storageConfig'>;
storageConfig: StorageConfig;
}

/**
Expand Down Expand Up @@ -69,12 +77,15 @@ export class MongoSyncBucketStorage
public readonly group_id: number,
private readonly sync_rules: storage.PersistedSyncRulesContent,
public readonly slot_name: string,
writeCheckpointMode?: storage.WriteCheckpointMode,
options?: MongoSyncBucketStorageOptions
writeCheckpointMode: storage.WriteCheckpointMode | undefined,
options: MongoSyncBucketStorageOptions
) {
super();
this.db = factory.db;
this.checksums = new MongoChecksums(this.db, this.group_id, options?.checksumOptions);
this.checksums = new MongoChecksums(this.db, this.group_id, {
...options.checksumOptions,
storageConfig: options?.storageConfig
});
this.writeCheckpointAPI = new MongoWriteCheckpointAPI({
db: this.db,
mode: writeCheckpointMode ?? storage.WriteCheckpointMode.MANAGED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,41 @@ export interface SyncRuleDocument {
id: string;
expires_at: Date;
} | null;

storage_version?: number;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought this might not need to be optional, due to it being set in the migrations. But I assume we can't really guarantee that all migrations have actually been executed in some circumstances - like self-hosted environments, or is there another reason for declaring it as optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially this was from before having a migration. Now, it's mostly a case of we're not guaranteed that the migration has run, and having the fallback is simple to implement.

}

export interface StorageConfig {
/**
* When true, bucket_data.checksum is guaranteed to be persisted as a Long.
*
* When false, it could also have been persisted as an Int32 or Double, in which case it must be converted to
* a Long before summing.
*/
longChecksums: boolean;

/**
* Whether versioned bucket names are automatically enabled.
*
* If this is false, bucket names may still be versioned depending on the sync config.
*/
versionedBuckets: boolean;
}

export const LEGACY_STORAGE_VERSION = 1;
export const CURRENT_STORAGE_VERSION = 2;

export const STORAGE_VERSION_CONFIG: Record<number, StorageConfig | undefined> = {
1: {
longChecksums: false,
versionedBuckets: false
},
2: {
longChecksums: true,
versionedBuckets: false
}
};

export interface CheckpointEventDocument {
_id: bson.ObjectId;
}
Expand Down
7 changes: 3 additions & 4 deletions modules/module-mongodb-storage/src/utils/test-utils.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { PowerSyncMongo } from '../storage/implementation/db.js';
import { TestStorageOptions } from '@powersync/service-core';
import { MongoBucketStorage, MongoBucketStorageOptions } from '../storage/MongoBucketStorage.js';
import { MongoReportStorage } from '../storage/MongoReportStorage.js';
import { MongoBucketStorage } from '../storage/MongoBucketStorage.js';
import { MongoSyncBucketStorageOptions } from '../storage/implementation/MongoSyncBucketStorage.js';
import { PowerSyncMongo } from '../storage/implementation/db.js';

export type MongoTestStorageOptions = {
url: string;
isCI: boolean;
internalOptions?: MongoSyncBucketStorageOptions;
internalOptions?: MongoBucketStorageOptions;
};

export function mongoTestStorageFactoryGenerator(factoryOptions: MongoTestStorageOptions) {
Expand Down
10 changes: 5 additions & 5 deletions modules/module-mongodb-storage/test/src/storage_sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ describe('sync - mongodb', () => {
// Test syncing a batch of data that is small in count,
// but large enough in size to be split over multiple returned chunks.
// Similar to the above test, but splits over 1MB chunks.
const sync_rules = test_utils.testRules(
`
await using factory = await INITIALIZED_MONGO_STORAGE_FACTORY();
const syncRules = await factory.updateSyncRules({
content: `
bucket_definitions:
global:
data:
- SELECT id, description FROM "%"
`
);
await using factory = await INITIALIZED_MONGO_STORAGE_FACTORY();
const bucketStorage = factory.getInstance(sync_rules);
});
const bucketStorage = factory.getInstance(syncRules);

const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
const sourceTable = TEST_TABLE;
Expand Down
Loading
Loading