Skip to content

Commit 951c951

Browse files
committed
Add test again.
1 parent 76e4e2d commit 951c951

File tree

1 file changed

+157
-0
lines changed

1 file changed

+157
-0
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
import { reduceBucket, storage } from '@powersync/service-core';
2+
import { SqliteJsonValue } from '@powersync/service-sync-rules';
3+
import * as crypto from 'node:crypto';
4+
import * as timers from 'timers/promises';
5+
import { describe, expect, test } from 'vitest';
6+
import { env } from './env.js';
7+
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
8+
import { WalStreamTestContext } from './wal_stream_utils.js';
9+
import { METRICS_HELPER } from '@powersync/service-core-tests';
10+
11+
describe.skipIf(!env.TEST_MONGO_STORAGE)('chunked replication tests - mongodb', { timeout: 120_000 }, function () {
12+
defineBatchTests(INITIALIZED_MONGO_STORAGE_FACTORY);
13+
});
14+
15+
function defineBatchTests(factory: storage.TestStorageFactory) {
16+
// We need to test every supported type, since chunking could be quite sensitive to
17+
// how each specific type is handled.
18+
test('chunked snapshot edge case (int2)', async () => {
19+
await testChunkedSnapshot({
20+
idType: 'int2',
21+
genId: 'i',
22+
lastId: '2000',
23+
moveTo: '0',
24+
moveToJs: 0
25+
});
26+
});
27+
28+
test('chunked snapshot edge case (int4)', async () => {
29+
await testChunkedSnapshot({
30+
idType: 'int4',
31+
genId: 'i',
32+
lastId: '2000',
33+
moveTo: '0',
34+
moveToJs: 0
35+
});
36+
});
37+
38+
test('chunked snapshot edge case (int8)', async () => {
39+
await testChunkedSnapshot({
40+
idType: 'int8',
41+
genId: 'i',
42+
lastId: '2000',
43+
moveTo: '0',
44+
moveToJs: 0
45+
});
46+
});
47+
48+
test('chunked snapshot edge case (text)', async () => {
49+
await testChunkedSnapshot({
50+
idType: 'text',
51+
genId: `to_char(i, 'fm0000')`,
52+
lastId: `'2000'`,
53+
moveTo: `'0000'`,
54+
moveToJs: '0000'
55+
});
56+
});
57+
58+
test('chunked snapshot edge case (varchar)', async () => {
59+
await testChunkedSnapshot({
60+
idType: 'varchar',
61+
genId: `to_char(i, 'fm0000')`,
62+
lastId: `'2000'`,
63+
moveTo: `'0000'`,
64+
moveToJs: '0000'
65+
});
66+
});
67+
68+
test('chunked snapshot edge case (uuid)', async () => {
69+
await testChunkedSnapshot({
70+
idType: 'uuid',
71+
// Generate a uuid by using the first part of a uuid and appending a 4-digit number.
72+
genId: `('00000000-0000-4000-8000-00000000' || to_char(i, 'fm0000')) :: uuid`,
73+
lastId: `'00000000-0000-4000-8000-000000002000'`,
74+
moveTo: `'00000000-0000-4000-8000-000000000000'`,
75+
moveToJs: '00000000-0000-4000-8000-000000000000'
76+
});
77+
});
78+
79+
async function testChunkedSnapshot(options: {
80+
idType: string;
81+
genId: string;
82+
lastId: string;
83+
moveTo: string;
84+
moveToJs: SqliteJsonValue;
85+
}) {
86+
// 1. Start with 2k rows, one row with id = 2000, and a large TOAST value in another column.
87+
// 2. Replicate one batch of rows (id < 2000).
88+
// 3. `UPDATE table SET id = 0 WHERE id = 2000`
89+
// 4. Replicate the rest of the table.
90+
// 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column.
91+
// 6. We end up with a row that has a missing TOAST column.
92+
93+
await using context = await WalStreamTestContext.open(factory, {
94+
// We need to use a smaller chunk size here, so that we can run a query in between chunks
95+
walStreamOptions: { snapshotChunkSize: 100 }
96+
});
97+
98+
await context.updateSyncRules(`bucket_definitions:
99+
global:
100+
data:
101+
- SELECT * FROM test_data`);
102+
const { pool } = context;
103+
104+
await pool.query(`CREATE TABLE test_data(id ${options.idType} primary key, description text)`);
105+
106+
// 1. Start with 10k rows, one row with id = 10000...
107+
await pool.query({
108+
statement: `INSERT INTO test_data(id, description) SELECT ${options.genId}, 'foo' FROM generate_series(1, 2000) i`
109+
});
110+
111+
// ...and a large TOAST value in another column.
112+
// Toast value, must be > 8kb after compression
113+
const largeDescription = crypto.randomBytes(20_000).toString('hex');
114+
await pool.query({
115+
statement: `UPDATE test_data SET description = $1 WHERE id = ${options.lastId} :: ${options.idType}`,
116+
params: [{ type: 'varchar', value: largeDescription }]
117+
});
118+
119+
// 2. Replicate one batch of rows (id < 2000).
120+
// Our "stopping point" here is not quite deterministic.
121+
const p = context.replicateSnapshot();
122+
123+
const stopAfter = 100;
124+
const startRowCount = (await METRICS_HELPER.getMetricValueForTests('powersync_rows_replicated_total')) ?? 0;
125+
126+
while (true) {
127+
const count =
128+
((await METRICS_HELPER.getMetricValueForTests('powersync_rows_replicated_total')) ?? 0) - startRowCount;
129+
130+
if (count >= stopAfter) {
131+
break;
132+
}
133+
await timers.setTimeout(1);
134+
}
135+
136+
// 3. `UPDATE table SET id = 0 WHERE id = 2000`
137+
const rs = await pool.query(
138+
`UPDATE test_data SET id = ${options.moveTo} WHERE id = ${options.lastId} RETURNING id`
139+
);
140+
expect(rs.rows.length).toEqual(1);
141+
142+
// 4. Replicate the rest of the table.
143+
await p;
144+
145+
// 5. Logical replication picks up the UPDATE above, but it is missing the TOAST column.
146+
context.startStreaming();
147+
148+
// 6. If all went well, the "resnapshot" process would take care of this.
149+
const data = await context.getBucketData('global[]', undefined, {});
150+
const reduced = reduceBucket(data);
151+
152+
const movedRow = reduced.find((row) => row.object_id == String(options.moveToJs));
153+
expect(movedRow?.data).toEqual(JSON.stringify({ id: options.moveToJs, description: largeDescription }));
154+
155+
expect(reduced.length).toEqual(2001);
156+
}
157+
}

0 commit comments

Comments
 (0)