Skip to content

Fix bulkWrite BSON error #154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/perfect-fireants-warn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-module-postgres': patch
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix "BSONObj size is invalid" error during replication.
2 changes: 1 addition & 1 deletion modules/module-mongodb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"@powersync/service-jsonbig": "workspace:*",
"@powersync/service-sync-rules": "workspace:*",
"@powersync/service-types": "workspace:*",
"mongodb": "^6.7.0",
"mongodb": "^6.11.0",
"ts-codec": "^1.2.2",
"uuid": "^9.0.1",
"uri-js": "^4.4.1"
Expand Down
65 changes: 65 additions & 0 deletions modules/module-postgres/test/src/large_batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,71 @@ function defineBatchTests(factory: StorageFactory) {
console.log(`Truncated ${truncateCount} ops in ${truncateDuration}ms ${truncatePerSecond} ops/s. ${used}MB heap`);
});

test('large number of bucket_data docs', async () => {
// This tests that we don't run into this error:
// MongoBulkWriteError: BSONObj size: 16814023 (0x1008FC7) is invalid. Size must be between 0 and 16793600(16MB) First element: insert: "bucket_data"
// The test is quite sensitive to internals, since we need to
// generate an internal batch that is just below 16MB.
//
// For the test to work, we need a:
// 1. Large number of documents in the batch.
// 2. More bucket_data documents than current_data documents,
// otherwise other batch limiting thresholds are hit.
// 3. A large document to make sure we get to just below the 16MB
// limit.
// 4. Another document to make sure the internal batching overflows
// to a second batch.

await using context = await WalStreamTestContext.open(factory);
await context.updateSyncRules(`bucket_definitions:
global:
data:
# Sync 4x so we get more bucket_data documents
- SELECT * FROM test_data
- SELECT * FROM test_data
- SELECT * FROM test_data
- SELECT * FROM test_data
`);
const { pool } = context;

await pool.query(`CREATE TABLE test_data(id serial primary key, description text)`);

const numDocs = 499;
let description = '';
while (description.length < 2650) {
description += '.';
}

await pool.query({
statement: `INSERT INTO test_data(description) SELECT $2 FROM generate_series(1, $1) i`,
params: [
{ type: 'int4', value: numDocs },
{ type: 'varchar', value: description }
]
});

let largeDescription = '';

while (largeDescription.length < 2_768_000) {
largeDescription += '.';
}
await pool.query({
statement: 'INSERT INTO test_data(description) VALUES($1)',
params: [{ type: 'varchar', value: largeDescription }]
});
await pool.query({
statement: 'INSERT INTO test_data(description) VALUES($1)',
params: [{ type: 'varchar', value: 'testingthis' }]
});
await context.replicateSnapshot();

context.startStreaming();

const checkpoint = await context.getCheckpoint({ timeout: 50_000 });
const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']);
expect(checksum.get('global[]')!.count).toEqual((numDocs + 2) * 4);
});

test('resuming initial replication (1)', async () => {
// Stop early - likely to not include deleted row in first replication attempt.
await testResumingReplication(2000);
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"jose": "^4.15.1",
"lodash": "^4.17.21",
"lru-cache": "^10.2.2",
"mongodb": "^6.7.0",
"mongodb": "^6.11.0",
"node-fetch": "^3.3.2",
"ts-codec": "^1.2.2",
"uri-js": "^4.4.1",
Expand Down
3 changes: 2 additions & 1 deletion packages/service-core/src/storage/mongo/MongoCompactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { PowerSyncMongo } from './db.js';
import { BucketDataDocument, BucketDataKey } from './models.js';
import { CompactOptions } from '../BucketStorage.js';
import { cacheKey } from './OperationBatch.js';
import { safeBulkWrite } from './util.js';

interface CurrentBucketState {
/** Bucket name */
Expand Down Expand Up @@ -264,7 +265,7 @@ export class MongoCompactor {
private async flush() {
if (this.updates.length > 0) {
logger.info(`Compacting ${this.updates.length} ops`);
await this.db.bucket_data.bulkWrite(this.updates, {
await safeBulkWrite(this.db.bucket_data, this.updates, {
// Order is not important.
// Since checksums are not affected, these operations can happen in any order,
// and it's fine if the operations are partially applied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
WriteCheckpointMode
} from '../WriteCheckpointAPI.js';
import { PowerSyncMongo } from './db.js';
import { safeBulkWrite } from './util.js';

export type MongoCheckpointAPIOptions = {
db: PowerSyncMongo;
Expand Down Expand Up @@ -134,7 +135,8 @@ export async function batchCreateCustomWriteCheckpoints(
return;
}

await db.custom_write_checkpoints.bulkWrite(
await safeBulkWrite(
db.custom_write_checkpoints,
checkpoints.map((checkpointOptions) => ({
updateOne: {
filter: { user_id: checkpointOptions.user_id, sync_rules_id: checkpointOptions.sync_rules_id },
Expand All @@ -146,6 +148,7 @@ export async function batchCreateCustomWriteCheckpoints(
},
upsert: true
}
}))
})),
{}
);
}
23 changes: 18 additions & 5 deletions packages/service-core/src/storage/mongo/PersistedBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
SourceKey,
ReplicaId
} from './models.js';
import { replicaIdToSubkey, serializeLookup } from './util.js';
import { replicaIdToSubkey, safeBulkWrite, serializeLookup } from './util.js';
import { logger } from '@powersync/lib-services-framework';

/**
Expand All @@ -33,6 +33,13 @@ import { logger } from '@powersync/lib-services-framework';
*/
const MAX_TRANSACTION_BATCH_SIZE = 30_000_000;

/**
* Limit number of documents to write in a single transaction.
*
* This has an effect on error message size in some cases.
*/
const MAX_TRANSACTION_DOC_COUNT = 2_000;

/**
* Keeps track of bulkwrite operations within a transaction.
*
Expand Down Expand Up @@ -231,26 +238,32 @@ export class PersistedBatch {
}

shouldFlushTransaction() {
return this.currentSize >= MAX_TRANSACTION_BATCH_SIZE;
return (
this.currentSize >= MAX_TRANSACTION_BATCH_SIZE ||
this.bucketData.length >= MAX_TRANSACTION_DOC_COUNT ||
this.currentData.length >= MAX_TRANSACTION_DOC_COUNT ||
this.bucketParameters.length >= MAX_TRANSACTION_DOC_COUNT
);
}

async flush(db: PowerSyncMongo, session: mongo.ClientSession) {
if (this.bucketData.length > 0) {
await db.bucket_data.bulkWrite(this.bucketData, {
// calculate total size
await safeBulkWrite(db.bucket_data, this.bucketData, {
session,
// inserts only - order doesn't matter
ordered: false
});
}
if (this.bucketParameters.length > 0) {
await db.bucket_parameters.bulkWrite(this.bucketParameters, {
await safeBulkWrite(db.bucket_parameters, this.bucketParameters, {
session,
// inserts only - order doesn't matter
ordered: false
});
}
if (this.currentData.length > 0) {
await db.current_data.bulkWrite(this.currentData, {
await safeBulkWrite(db.current_data, this.currentData, {
session,
// may update and delete data within the same batch - order matters
ordered: true
Expand Down
45 changes: 45 additions & 0 deletions packages/service-core/src/storage/mongo/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,48 @@ export function isUUID(value: any): value is bson.UUID {
const uuid = value as bson.UUID;
return uuid._bsontype == 'Binary' && uuid.sub_type == bson.Binary.SUBTYPE_UUID;
}

/**
* MongoDB bulkWrite internally splits the operations into batches
* so that no batch exceeds 16MB. However, there are cases where
* the batch size is very close to 16MB, where additional metadata
* on the server pushes it over the limit, resulting in this error
* from the server:
*
* > MongoBulkWriteError: BSONObj size: 16814023 (0x1008FC7) is invalid. Size must be between 0 and 16793600(16MB) First element: insert: "bucket_data"
*
* We work around the issue by doing our own batching, limiting the
* batch size to 15MB. This does add additional overhead with
* BSON.calculateObjectSize.
*/
export async function safeBulkWrite<T extends mongo.Document>(
collection: mongo.Collection<T>,
operations: mongo.AnyBulkWriteOperation<T>[],
options: mongo.BulkWriteOptions
) {
// Must be below 16MB.
// We could probably go a little closer, but 15MB is a safe threshold.
const BULK_WRITE_LIMIT = 15 * 1024 * 1024;

let batch: mongo.AnyBulkWriteOperation<T>[] = [];
let currentSize = 0;
// Estimated overhead per operation, should be smaller in reality.
const keySize = 8;
for (let op of operations) {
const bsonSize =
mongo.BSON.calculateObjectSize(op, {
checkKeys: false,
ignoreUndefined: true
} as any) + keySize;
if (batch.length > 0 && currentSize + bsonSize > BULK_WRITE_LIMIT) {
await collection.bulkWrite(batch, options);
currentSize = 0;
batch = [];
}
batch.push(op);
currentSize += bsonSize;
}
if (batch.length > 0) {
await collection.bulkWrite(batch, options);
}
}
38 changes: 22 additions & 16 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"ix": "^5.0.0",
"jose": "^4.15.1",
"lru-cache": "^10.0.1",
"mongodb": "^6.7.0",
"mongodb": "^6.11.0",
"node-fetch": "^3.3.2",
"pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87",
"ts-codec": "^1.2.2",
Expand Down
Loading