Skip to content

Commit 889ac46

Browse files
authored
Fix bulkWrite BSON error (#154)
* Add failing test. * Update mongo driver. * Workaround for bulkWrite batching issues. * Add changeset.
1 parent 4590b38 commit 889ac46

File tree

10 files changed

+168
-27
lines changed

10 files changed

+168
-27
lines changed

.changeset/perfect-fireants-warn.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
'@powersync/service-module-mongodb': patch
4+
'@powersync/service-core': patch
5+
'@powersync/service-image': patch
6+
---
7+
8+
Fix "BSONObj size is invalid" error during replication.

modules/module-mongodb/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"@powersync/service-jsonbig": "workspace:*",
3434
"@powersync/service-sync-rules": "workspace:*",
3535
"@powersync/service-types": "workspace:*",
36-
"mongodb": "^6.7.0",
36+
"mongodb": "^6.11.0",
3737
"ts-codec": "^1.2.2",
3838
"uuid": "^9.0.1",
3939
"uri-js": "^4.4.1"

modules/module-postgres/test/src/large_batch.test.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,71 @@ function defineBatchTests(factory: StorageFactory) {
176176
console.log(`Truncated ${truncateCount} ops in ${truncateDuration}ms ${truncatePerSecond} ops/s. ${used}MB heap`);
177177
});
178178

179+
test('large number of bucket_data docs', async () => {
180+
// This tests that we don't run into this error:
181+
// MongoBulkWriteError: BSONObj size: 16814023 (0x1008FC7) is invalid. Size must be between 0 and 16793600(16MB) First element: insert: "bucket_data"
182+
// The test is quite sensitive to internals, since we need to
183+
// generate an internal batch that is just below 16MB.
184+
//
185+
// For the test to work, we need a:
186+
// 1. Large number of documents in the batch.
187+
// 2. More bucket_data documents than current_data documents,
188+
// otherwise other batch limiting thresholds are hit.
189+
// 3. A large document to make sure we get to just below the 16MB
190+
// limit.
191+
// 4. Another document to make sure the internal batching overflows
192+
// to a second batch.
193+
194+
await using context = await WalStreamTestContext.open(factory);
195+
await context.updateSyncRules(`bucket_definitions:
196+
global:
197+
data:
198+
# Sync 4x so we get more bucket_data documents
199+
- SELECT * FROM test_data
200+
- SELECT * FROM test_data
201+
- SELECT * FROM test_data
202+
- SELECT * FROM test_data
203+
`);
204+
const { pool } = context;
205+
206+
await pool.query(`CREATE TABLE test_data(id serial primary key, description text)`);
207+
208+
const numDocs = 499;
209+
let description = '';
210+
while (description.length < 2650) {
211+
description += '.';
212+
}
213+
214+
await pool.query({
215+
statement: `INSERT INTO test_data(description) SELECT $2 FROM generate_series(1, $1) i`,
216+
params: [
217+
{ type: 'int4', value: numDocs },
218+
{ type: 'varchar', value: description }
219+
]
220+
});
221+
222+
let largeDescription = '';
223+
224+
while (largeDescription.length < 2_768_000) {
225+
largeDescription += '.';
226+
}
227+
await pool.query({
228+
statement: 'INSERT INTO test_data(description) VALUES($1)',
229+
params: [{ type: 'varchar', value: largeDescription }]
230+
});
231+
await pool.query({
232+
statement: 'INSERT INTO test_data(description) VALUES($1)',
233+
params: [{ type: 'varchar', value: 'testingthis' }]
234+
});
235+
await context.replicateSnapshot();
236+
237+
context.startStreaming();
238+
239+
const checkpoint = await context.getCheckpoint({ timeout: 50_000 });
240+
const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']);
241+
expect(checksum.get('global[]')!.count).toEqual((numDocs + 2) * 4);
242+
});
243+
179244
test('resuming initial replication (1)', async () => {
180245
// Stop early - likely to not include deleted row in first replication attempt.
181246
await testResumingReplication(2000);

packages/service-core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"jose": "^4.15.1",
3838
"lodash": "^4.17.21",
3939
"lru-cache": "^10.2.2",
40-
"mongodb": "^6.7.0",
40+
"mongodb": "^6.11.0",
4141
"node-fetch": "^3.3.2",
4242
"ts-codec": "^1.2.2",
4343
"uri-js": "^4.4.1",

packages/service-core/src/storage/mongo/MongoCompactor.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { PowerSyncMongo } from './db.js';
55
import { BucketDataDocument, BucketDataKey } from './models.js';
66
import { CompactOptions } from '../BucketStorage.js';
77
import { cacheKey } from './OperationBatch.js';
8+
import { safeBulkWrite } from './util.js';
89

910
interface CurrentBucketState {
1011
/** Bucket name */
@@ -264,7 +265,7 @@ export class MongoCompactor {
264265
private async flush() {
265266
if (this.updates.length > 0) {
266267
logger.info(`Compacting ${this.updates.length} ops`);
267-
await this.db.bucket_data.bulkWrite(this.updates, {
268+
await safeBulkWrite(this.db.bucket_data, this.updates, {
268269
// Order is not important.
269270
// Since checksums are not affected, these operations can happen in any order,
270271
// and it's fine if the operations are partially applied.

packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
WriteCheckpointMode
1010
} from '../WriteCheckpointAPI.js';
1111
import { PowerSyncMongo } from './db.js';
12+
import { safeBulkWrite } from './util.js';
1213

1314
export type MongoCheckpointAPIOptions = {
1415
db: PowerSyncMongo;
@@ -134,7 +135,8 @@ export async function batchCreateCustomWriteCheckpoints(
134135
return;
135136
}
136137

137-
await db.custom_write_checkpoints.bulkWrite(
138+
await safeBulkWrite(
139+
db.custom_write_checkpoints,
138140
checkpoints.map((checkpointOptions) => ({
139141
updateOne: {
140142
filter: { user_id: checkpointOptions.user_id, sync_rules_id: checkpointOptions.sync_rules_id },
@@ -146,6 +148,7 @@ export async function batchCreateCustomWriteCheckpoints(
146148
},
147149
upsert: true
148150
}
149-
}))
151+
})),
152+
{}
150153
);
151154
}

packages/service-core/src/storage/mongo/PersistedBatch.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
SourceKey,
1717
ReplicaId
1818
} from './models.js';
19-
import { replicaIdToSubkey, serializeLookup } from './util.js';
19+
import { replicaIdToSubkey, safeBulkWrite, serializeLookup } from './util.js';
2020
import { logger } from '@powersync/lib-services-framework';
2121

2222
/**
@@ -33,6 +33,13 @@ import { logger } from '@powersync/lib-services-framework';
3333
*/
3434
const MAX_TRANSACTION_BATCH_SIZE = 30_000_000;
3535

36+
/**
37+
* Limit number of documents to write in a single transaction.
38+
*
39+
* This has an effect on error message size in some cases.
40+
*/
41+
const MAX_TRANSACTION_DOC_COUNT = 2_000;
42+
3643
/**
3744
* Keeps track of bulkwrite operations within a transaction.
3845
*
@@ -231,26 +238,32 @@ export class PersistedBatch {
231238
}
232239

233240
shouldFlushTransaction() {
234-
return this.currentSize >= MAX_TRANSACTION_BATCH_SIZE;
241+
return (
242+
this.currentSize >= MAX_TRANSACTION_BATCH_SIZE ||
243+
this.bucketData.length >= MAX_TRANSACTION_DOC_COUNT ||
244+
this.currentData.length >= MAX_TRANSACTION_DOC_COUNT ||
245+
this.bucketParameters.length >= MAX_TRANSACTION_DOC_COUNT
246+
);
235247
}
236248

237249
async flush(db: PowerSyncMongo, session: mongo.ClientSession) {
238250
if (this.bucketData.length > 0) {
239-
await db.bucket_data.bulkWrite(this.bucketData, {
251+
// calculate total size
252+
await safeBulkWrite(db.bucket_data, this.bucketData, {
240253
session,
241254
// inserts only - order doesn't matter
242255
ordered: false
243256
});
244257
}
245258
if (this.bucketParameters.length > 0) {
246-
await db.bucket_parameters.bulkWrite(this.bucketParameters, {
259+
await safeBulkWrite(db.bucket_parameters, this.bucketParameters, {
247260
session,
248261
// inserts only - order doesn't matter
249262
ordered: false
250263
});
251264
}
252265
if (this.currentData.length > 0) {
253-
await db.current_data.bulkWrite(this.currentData, {
266+
await safeBulkWrite(db.current_data, this.currentData, {
254267
session,
255268
// may update and delete data within the same batch - order matters
256269
ordered: true

packages/service-core/src/storage/mongo/util.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,48 @@ export function isUUID(value: any): value is bson.UUID {
156156
const uuid = value as bson.UUID;
157157
return uuid._bsontype == 'Binary' && uuid.sub_type == bson.Binary.SUBTYPE_UUID;
158158
}
159+
160+
/**
161+
* MongoDB bulkWrite internally splits the operations into batches
162+
* so that no batch exceeds 16MB. However, there are cases where
163+
* the batch size is very close to 16MB, where additional metadata
164+
* on the server pushes it over the limit, resulting in this error
165+
* from the server:
166+
*
167+
* > MongoBulkWriteError: BSONObj size: 16814023 (0x1008FC7) is invalid. Size must be between 0 and 16793600(16MB) First element: insert: "bucket_data"
168+
*
169+
* We work around the issue by doing our own batching, limiting the
170+
* batch size to 15MB. This does add additional overhead with
171+
* BSON.calculateObjectSize.
172+
*/
173+
export async function safeBulkWrite<T extends mongo.Document>(
174+
collection: mongo.Collection<T>,
175+
operations: mongo.AnyBulkWriteOperation<T>[],
176+
options: mongo.BulkWriteOptions
177+
) {
178+
// Must be below 16MB.
179+
// We could probably go a little closer, but 15MB is a safe threshold.
180+
const BULK_WRITE_LIMIT = 15 * 1024 * 1024;
181+
182+
let batch: mongo.AnyBulkWriteOperation<T>[] = [];
183+
let currentSize = 0;
184+
// Estimated overhead per operation, should be smaller in reality.
185+
const keySize = 8;
186+
for (let op of operations) {
187+
const bsonSize =
188+
mongo.BSON.calculateObjectSize(op, {
189+
checkKeys: false,
190+
ignoreUndefined: true
191+
} as any) + keySize;
192+
if (batch.length > 0 && currentSize + bsonSize > BULK_WRITE_LIMIT) {
193+
await collection.bulkWrite(batch, options);
194+
currentSize = 0;
195+
batch = [];
196+
}
197+
batch.push(op);
198+
currentSize += bsonSize;
199+
}
200+
if (batch.length > 0) {
201+
await collection.bulkWrite(batch, options);
202+
}
203+
}

pnpm-lock.yaml

Lines changed: 22 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
"ix": "^5.0.0",
3535
"jose": "^4.15.1",
3636
"lru-cache": "^10.0.1",
37-
"mongodb": "^6.7.0",
37+
"mongodb": "^6.11.0",
3838
"node-fetch": "^3.3.2",
3939
"pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87",
4040
"ts-codec": "^1.2.2",

0 commit comments

Comments
 (0)