Skip to content

Commit f1ff6db

Browse files
committed
Test all supported types for chunking.
1 parent 3cb3248 commit f1ff6db

File tree

2 files changed

+160
-74
lines changed

2 files changed

+160
-74
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
import { MONGO_STORAGE_FACTORY, StorageFactory } from '@core-tests/util.js';
2+
import { Metrics, reduceBucket } from '@powersync/service-core';
3+
import { SqliteJsonValue } from '@powersync/service-sync-rules';
4+
import * as crypto from 'node:crypto';
5+
import * as timers from 'timers/promises';
6+
import { describe, expect, test } from 'vitest';
7+
import { WalStreamTestContext } from './wal_stream_utils.js';
8+
9+
describe('batch replication tests - mongodb', { timeout: 30_000 }, function () {
10+
// These are slow but consistent tests.
11+
// Not run on every test run, but we do run on CI, or when manually debugging issues.
12+
defineBatchTests(MONGO_STORAGE_FACTORY);
13+
});
14+
15+
function defineBatchTests(factory: StorageFactory) {
16+
// We need to test every supported type, since chunking could be quite sensitive to
17+
// how each specific type is handled.
18+
test('chunked snapshot edge case (int2)', async () => {
19+
await testChunkedSnapshot({
20+
idType: 'int2',
21+
genId: 'i',
22+
lastId: '2000',
23+
moveTo: '0',
24+
moveToJs: 0
25+
});
26+
});
27+
28+
test('chunked snapshot edge case (int4)', async () => {
29+
await testChunkedSnapshot({
30+
idType: 'int4',
31+
genId: 'i',
32+
lastId: '2000',
33+
moveTo: '0',
34+
moveToJs: 0
35+
});
36+
});
37+
38+
test('chunked snapshot edge case (int8)', async () => {
39+
await testChunkedSnapshot({
40+
idType: 'int8',
41+
genId: 'i',
42+
lastId: '2000',
43+
moveTo: '0',
44+
moveToJs: 0
45+
});
46+
});
47+
48+
test('chunked snapshot edge case (text)', async () => {
49+
await testChunkedSnapshot({
50+
idType: 'text',
51+
genId: `to_char(i, 'fm0000')`,
52+
lastId: `'2000'`,
53+
moveTo: `'0000'`,
54+
moveToJs: '0000'
55+
});
56+
});
57+
58+
test('chunked snapshot edge case (varchar)', async () => {
59+
await testChunkedSnapshot({
60+
idType: 'varchar',
61+
genId: `to_char(i, 'fm0000')`,
62+
lastId: `'2000'`,
63+
moveTo: `'0000'`,
64+
moveToJs: '0000'
65+
});
66+
});
67+
68+
test('chunked snapshot edge case (uuid)', async () => {
69+
await testChunkedSnapshot({
70+
idType: 'uuid',
71+
// Generate a uuid by using the first part of a uuid and appending a 4-digit number.
72+
genId: `('00000000-0000-4000-8000-00000000' || to_char(i, 'fm0000')) :: uuid`,
73+
lastId: `'00000000-0000-4000-8000-000000002000'`,
74+
moveTo: `'00000000-0000-4000-8000-000000000000'`,
75+
moveToJs: '00000000-0000-4000-8000-000000000000'
76+
});
77+
});
78+
79+
async function testChunkedSnapshot(options: {
80+
idType: string;
81+
genId: string;
82+
lastId: string;
83+
moveTo: string;
84+
moveToJs: SqliteJsonValue;
85+
}) {
86+
// 1. Start with 2k rows, one row with id = 2000, and a large TOAST value in another column.
87+
// 2. Replicate one batch of rows (id < 2000).
88+
// 3. `UPDATE table SET id = 0 WHERE id = 2000`
89+
// 4. Replicate the rest of the table.
90+
// 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column.
91+
// 6. We end up with a row that has a missing TOAST column.
92+
93+
await using context = await WalStreamTestContext.open(factory, {
94+
// We need to use a smaller chunk size here, so that we can run a query in between chunks
95+
walStreamOptions: { snapshotChunkSize: 100 }
96+
});
97+
98+
await context.updateSyncRules(`bucket_definitions:
99+
global:
100+
data:
101+
- SELECT * FROM test_data`);
102+
const { pool } = context;
103+
104+
await pool.query(`CREATE TABLE test_data(id ${options.idType} primary key, description text)`);
105+
106+
// 1. Start with 10k rows, one row with id = 10000...
107+
await pool.query({
108+
statement: `INSERT INTO test_data(id, description) SELECT ${options.genId}, 'foo' FROM generate_series(1, 2000) i`
109+
});
110+
111+
// ...and a large TOAST value in another column.
112+
// Toast value, must be > 8kb after compression
113+
const largeDescription = crypto.randomBytes(20_000).toString('hex');
114+
await pool.query({
115+
statement: `UPDATE test_data SET description = $1 WHERE id = ${options.lastId} :: ${options.idType}`,
116+
params: [{ type: 'varchar', value: largeDescription }]
117+
});
118+
119+
// 2. Replicate one batch of rows (id < 2000).
120+
// Our "stopping point" here is not quite deterministic.
121+
const p = context.replicateSnapshot();
122+
123+
const stopAfter = 100;
124+
const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0;
125+
126+
while (true) {
127+
const count =
128+
((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - startRowCount;
129+
130+
if (count >= stopAfter) {
131+
break;
132+
}
133+
await timers.setTimeout(1);
134+
}
135+
136+
// 3. `UPDATE table SET id = 0 WHERE id = 2000`
137+
const rs = await pool.query(
138+
`UPDATE test_data SET id = ${options.moveTo} WHERE id = ${options.lastId} RETURNING id`
139+
);
140+
expect(rs.rows.length).toEqual(1);
141+
142+
// 4. Replicate the rest of the table.
143+
await p;
144+
145+
// 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column.
146+
context.startStreaming();
147+
148+
// 6. If all went well, the "resnapshot" process would take care of this.
149+
const data = await context.getBucketData('global[]', undefined, {});
150+
const reduced = reduceBucket(data);
151+
152+
const movedRow = reduced.find((row) => row.object_id == String(options.moveToJs));
153+
expect(movedRow?.data).toEqual(JSON.stringify({ id: options.moveToJs, description: largeDescription }));
154+
155+
expect(reduced.length).toEqual(2001);
156+
}
157+
}

modules/module-postgres/test/src/large_batch.test.ts

Lines changed: 3 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
import { MONGO_STORAGE_FACTORY, StorageFactory, StorageOptions } from '@core-tests/util.js';
1+
import { MONGO_STORAGE_FACTORY, StorageFactory } from '@core-tests/util.js';
2+
import { Metrics } from '@powersync/service-core';
3+
import * as timers from 'timers/promises';
24
import { describe, expect, test } from 'vitest';
35
import { populateData } from '../../dist/utils/populate_test_data.js';
46
import { env } from './env.js';
57
import { TEST_CONNECTION_OPTIONS } from './util.js';
68
import { WalStreamTestContext } from './wal_stream_utils.js';
7-
import * as timers from 'timers/promises';
8-
import { Metrics, reduceBucket } from '@powersync/service-core';
9-
import * as crypto from 'node:crypto';
109

1110
describe('batch replication tests - mongodb', { timeout: 120_000 }, function () {
1211
// These are slow but consistent tests.
@@ -369,76 +368,6 @@ function defineBatchTests(factory: StorageFactory) {
369368
expect(data.length).toEqual(11002 + deletedRowOps.length);
370369
}
371370

372-
test('chunked snapshot edge case', async () => {
373-
// 1. Start with 10k rows, one row with id = 10000, and a large TOAST value in another column.
374-
// 2. Replicate one batch of rows (id < 10000).
375-
// 3. `UPDATE table SET id = 0 WHERE id = 10000`
376-
// 4. Replicate the rest of the table.
377-
// 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column.
378-
// 6. We end up with a row that has a missing TOAST column.
379-
380-
await using context = await WalStreamTestContext.open(factory, {
381-
// We need to use a smaller chunk size here, so that we can run a query in between chunks
382-
walStreamOptions: { snapshotChunkSize: 100 }
383-
});
384-
385-
await context.updateSyncRules(`bucket_definitions:
386-
global:
387-
data:
388-
- SELECT * FROM test_data`);
389-
const { pool } = context;
390-
391-
await pool.query(`CREATE TABLE test_data(id integer primary key, description text)`);
392-
393-
// 1. Start with 10k rows, one row with id = 10000...
394-
await pool.query({
395-
statement: `INSERT INTO test_data(id, description) SELECT i, 'foo' FROM generate_series(1, 10000) i`
396-
});
397-
398-
// ...and a large TOAST value in another column.
399-
// Toast value, must be > 8kb after compression
400-
const largeDescription = crypto.randomBytes(20_000).toString('hex');
401-
await pool.query({
402-
statement: 'UPDATE test_data SET description = $1 WHERE id = 10000',
403-
params: [{ type: 'varchar', value: largeDescription }]
404-
});
405-
406-
// 2. Replicate one batch of rows (id < 10000).
407-
// Our "stopping point" here is not quite deterministic.
408-
const p = context.replicateSnapshot();
409-
410-
const stopAfter = 1_000;
411-
const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0;
412-
413-
while (true) {
414-
const count =
415-
((await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - startRowCount;
416-
417-
if (count >= stopAfter) {
418-
break;
419-
}
420-
await timers.setTimeout(1);
421-
}
422-
423-
// 3. `UPDATE table SET id = 0 WHERE id = 10000`
424-
await pool.query('UPDATE test_data SET id = 0 WHERE id = 10000');
425-
426-
// 4. Replicate the rest of the table.
427-
await p;
428-
429-
// 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column.
430-
context.startStreaming();
431-
432-
// 6. If all went well, the "resnapshot" process would take care of this.
433-
const data = await context.getBucketData('global[]', undefined, {});
434-
const reduced = reduceBucket(data);
435-
436-
const movedRow = reduced.find((row) => row.object_id === '0');
437-
expect(movedRow?.data).toEqual(`{"id":0,"description":"${largeDescription}"}`);
438-
439-
expect(reduced.length).toEqual(10_001);
440-
});
441-
442371
function printMemoryUsage() {
443372
const memoryUsage = process.memoryUsage();
444373

0 commit comments

Comments
 (0)