Skip to content

Commit 793a5b7

Browse files
committed
Move input conversion to source stream
1 parent 92bed80 commit 793a5b7

File tree

16 files changed

+103
-55
lines changed

16 files changed

+103
-55
lines changed

.changeset/dry-pots-leave.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
'@powersync/service-module-mongodb': patch
4+
'@powersync/service-core': patch
5+
'@powersync/service-module-mysql': patch
6+
'@powersync/service-sync-rules': patch
7+
---
8+
9+
Correctly handle custom types in primary keys.

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,7 @@ export class MongoBucketBatch
319319
const record = operation.record;
320320
const beforeId = operation.beforeId;
321321
const afterId = operation.afterId;
322-
let sourceAfter = record.after;
323-
let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter);
322+
let after = record.after;
324323
const sourceTable = record.sourceTable;
325324

326325
let existing_buckets: CurrentBucket[] = [];

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ export class ChangeStream {
481481
// Pre-fetch next batch, so that we can read and write concurrently
482482
nextChunkPromise = query.nextChunk();
483483
for (let document of docBatch) {
484-
const record = constructAfterRecord(document);
484+
const record = this.constructAfterRecord(document);
485485

486486
// This auto-flushes when the batch reaches its size limit
487487
await batch.save({
@@ -619,6 +619,11 @@ export class ChangeStream {
619619
return result.table;
620620
}
621621

622+
private constructAfterRecord(document: mongo.Document): SqliteRow {
623+
const inputRow = constructAfterRecord(document);
624+
return this.sync_rules.applyRowContext<never>(inputRow);
625+
}
626+
622627
async writeChange(
623628
batch: storage.BucketStorageBatch,
624629
table: storage.SourceTable,
@@ -631,7 +636,7 @@ export class ChangeStream {
631636

632637
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
633638
if (change.operationType == 'insert') {
634-
const baseRecord = constructAfterRecord(change.fullDocument);
639+
const baseRecord = this.constructAfterRecord(change.fullDocument);
635640
return await batch.save({
636641
tag: SaveOperationTag.INSERT,
637642
sourceTable: table,
@@ -650,7 +655,7 @@ export class ChangeStream {
650655
beforeReplicaId: change.documentKey._id
651656
});
652657
}
653-
const after = constructAfterRecord(change.fullDocument!);
658+
const after = this.constructAfterRecord(change.fullDocument!);
654659
return await batch.save({
655660
tag: SaveOperationTag.UPDATE,
656661
sourceTable: table,

modules/module-mongodb/src/replication/MongoRelation.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
22
import { storage } from '@powersync/service-core';
3-
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
3+
import { JsonContainer } from '@powersync/service-jsonbig';
44
import {
55
CompatibilityContext,
66
CustomArray,
77
CustomObject,
88
CustomSqliteValue,
9-
DatabaseInputValue,
109
SqliteInputRow,
1110
SqliteInputValue,
12-
SqliteRow,
13-
SqliteValue,
1411
DateTimeValue
1512
} from '@powersync/service-sync-rules';
1613

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ export class BinLogStream {
329329
throw new ReplicationAssertionError(`No 'fields' event emitted`);
330330
}
331331

332-
const record = common.toSQLiteRow(row, columns!);
332+
const record = this.toSQLiteRow(row, columns!);
333333
await batch.save({
334334
tag: storage.SaveOperationTag.INSERT,
335335
sourceTable: table,
@@ -596,14 +596,19 @@ export class BinLogStream {
596596
return null;
597597
}
598598

599+
private toSQLiteRow(row: Record<string, any>, columns: Map<string, ColumnDescriptor>): sync_rules.SqliteRow {
600+
const inputRecord = common.toSQLiteRow(row, columns);
601+
return this.syncRules.applyRowContext<never>(inputRecord);
602+
}
603+
599604
private async writeChange(
600605
batch: storage.BucketStorageBatch,
601606
payload: WriteChangePayload
602607
): Promise<storage.FlushedResult | null> {
603608
switch (payload.type) {
604609
case storage.SaveOperationTag.INSERT:
605610
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
606-
const record = common.toSQLiteRow(payload.row, payload.columns);
611+
const record = this.toSQLiteRow(payload.row, payload.columns);
607612
return await batch.save({
608613
tag: storage.SaveOperationTag.INSERT,
609614
sourceTable: payload.sourceTable,
@@ -617,9 +622,9 @@ export class BinLogStream {
617622
// The previous row may be null if the replica id columns are unchanged.
618623
// It's fine to treat that the same as an insert.
619624
const beforeUpdated = payload.previous_row
620-
? common.toSQLiteRow(payload.previous_row, payload.columns)
625+
? this.toSQLiteRow(payload.previous_row, payload.columns)
621626
: undefined;
622-
const after = common.toSQLiteRow(payload.row, payload.columns);
627+
const after = this.toSQLiteRow(payload.row, payload.columns);
623628

624629
return await batch.save({
625630
tag: storage.SaveOperationTag.UPDATE,
@@ -634,7 +639,7 @@ export class BinLogStream {
634639

635640
case storage.SaveOperationTag.DELETE:
636641
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
637-
const beforeDeleted = common.toSQLiteRow(payload.row, payload.columns);
642+
const beforeDeleted = this.toSQLiteRow(payload.row, payload.columns);
638643

639644
return await batch.save({
640645
tag: storage.SaveOperationTag.DELETE,

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,7 @@ export class PostgresBucketBatch
687687
// We store bytea colums for source keys
688688
const beforeId = operation.beforeId;
689689
const afterId = operation.afterId;
690-
let sourceAfter = record.after;
691-
let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter);
690+
let after = record.after;
692691
const sourceTable = record.sourceTable;
693692

694693
let existingBuckets: CurrentBucket[] = [];

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@ import {
2525
CompatibilityContext,
2626
DatabaseInputRow,
2727
SqliteInputRow,
28+
SqliteInputValue,
29+
SqliteRow,
2830
SqlSyncRules,
2931
TablePattern,
32+
ToastableSqliteRow,
3033
toSyncRulesRow
3134
} from '@powersync/service-sync-rules';
3235

@@ -635,7 +638,8 @@ WHERE oid = $1::regclass`,
635638
hasRemainingData = true;
636639
}
637640

638-
for (const record of WalStream.getQueryData(rows)) {
641+
for (const inputRecord of WalStream.getQueryData(rows)) {
642+
const record = this.syncRulesRecord(inputRecord);
639643
// This auto-flushes when the batch reaches its size limit
640644
await batch.save({
641645
tag: storage.SaveOperationTag.INSERT,
@@ -787,6 +791,20 @@ WHERE oid = $1::regclass`,
787791
return table;
788792
}
789793

794+
private syncRulesRecord(row: SqliteInputRow): SqliteRow;
795+
private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined;
796+
797+
private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined {
798+
if (row == null) {
799+
return undefined;
800+
}
801+
return this.sync_rules.applyRowContext<never>(row);
802+
}
803+
804+
private toastableSyncRulesRecord(row: ToastableSqliteRow<SqliteInputValue>): ToastableSqliteRow {
805+
return this.sync_rules.applyRowContext(row);
806+
}
807+
790808
async writeChange(
791809
batch: storage.BucketStorageBatch,
792810
msg: pgwire.PgoutputMessage
@@ -803,7 +821,7 @@ WHERE oid = $1::regclass`,
803821

804822
if (msg.tag == 'insert') {
805823
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
806-
const baseRecord = this.connections.types.constructAfterRecord(msg);
824+
const baseRecord = this.syncRulesRecord(this.connections.types.constructAfterRecord(msg));
807825
return await batch.save({
808826
tag: storage.SaveOperationTag.INSERT,
809827
sourceTable: table,
@@ -816,8 +834,8 @@ WHERE oid = $1::regclass`,
816834
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
817835
// "before" may be null if the replica id columns are unchanged
818836
// It's fine to treat that the same as an insert.
819-
const before = this.connections.types.constructBeforeRecord(msg);
820-
const after = this.connections.types.constructAfterRecord(msg);
837+
const before = this.syncRulesRecord(this.connections.types.constructBeforeRecord(msg));
838+
const after = this.toastableSyncRulesRecord(this.connections.types.constructAfterRecord(msg));
821839
return await batch.save({
822840
tag: storage.SaveOperationTag.UPDATE,
823841
sourceTable: table,
@@ -828,7 +846,7 @@ WHERE oid = $1::regclass`,
828846
});
829847
} else if (msg.tag == 'delete') {
830848
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
831-
const before = this.connections.types.constructBeforeRecord(msg)!;
849+
const before = this.syncRulesRecord(this.connections.types.constructBeforeRecord(msg)!);
832850

833851
return await batch.save({
834852
tag: storage.SaveOperationTag.DELETE,

modules/module-postgres/test/src/wal_stream.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,4 +348,28 @@ config:
348348
const data = await context.getBucketData('1#stream|0[]');
349349
expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '{"foo":1,"bar":2}' })]);
350350
});
351+
352+
test('custom types in primary key', async () => {
353+
await using context = await WalStreamTestContext.open(factory);
354+
355+
await context.updateSyncRules(`
356+
streams:
357+
stream:
358+
query: SELECT id, * FROM "test_data"
359+
360+
config:
361+
edition: 2
362+
`);
363+
364+
const { pool } = context;
365+
await pool.query(`DROP TABLE IF EXISTS test_data`);
366+
await pool.query(`CREATE DOMAIN test_id AS TEXT;`);
367+
await pool.query(`CREATE TABLE test_data(id test_id primary key);`);
368+
369+
await context.initializeReplication();
370+
await pool.query(`INSERT INTO test_data(id) VALUES ('t1')`);
371+
372+
const data = await context.getBucketData('1#stream|0[]');
373+
expect(data).toMatchObject([putOp('test_data', { id: 't1' })]);
374+
});
351375
}

packages/service-core/src/storage/BucketStorageBatch.ts

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
import { ObserverClient } from '@powersync/lib-services-framework';
2-
import {
3-
EvaluatedParameters,
4-
EvaluatedRow,
5-
SqliteInputRow,
6-
SqliteRow,
7-
ToastableSqliteRow
8-
} from '@powersync/service-sync-rules';
2+
import { EvaluatedParameters, EvaluatedRow, SqliteRow, ToastableSqliteRow } from '@powersync/service-sync-rules';
93
import { BSON } from 'bson';
104
import { ReplicationEventPayload } from './ReplicationEventPayload.js';
115
import { SourceTable, TableSnapshotStatus } from './SourceTable.js';
@@ -138,7 +132,7 @@ export interface SaveInsert {
138132
sourceTable: SourceTable;
139133
before?: undefined;
140134
beforeReplicaId?: undefined;
141-
after: SqliteInputRow;
135+
after: SqliteRow;
142136
afterReplicaId: ReplicaId;
143137
}
144138

@@ -149,7 +143,7 @@ export interface SaveUpdate {
149143
/**
150144
* This is only present when the id has changed, and will only contain replica identity columns.
151145
*/
152-
before?: SqliteInputRow;
146+
before?: SqliteRow;
153147
beforeReplicaId?: ReplicaId;
154148

155149
/**
@@ -164,7 +158,7 @@ export interface SaveUpdate {
164158
export interface SaveDelete {
165159
tag: SaveOperationTag.DELETE;
166160
sourceTable: SourceTable;
167-
before?: SqliteInputRow;
161+
before?: SqliteRow;
168162
beforeReplicaId: ReplicaId;
169163
after?: undefined;
170164
afterReplicaId?: undefined;

packages/service-core/src/util/utils.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ export function uuidForRowBson(row: sync_rules.SqliteRow): bson.UUID {
182182
return new bson.UUID(uuid.v5(repr, ID_NAMESPACE, buffer));
183183
}
184184

185-
export function hasToastedValues(row: sync_rules.ToastableSqliteRow) {
185+
export function hasToastedValues<V>(row: sync_rules.ToastableSqliteRow<V>) {
186186
for (let key in row) {
187187
if (typeof row[key] == 'undefined') {
188188
return true;
@@ -196,10 +196,10 @@ export function hasToastedValues(row: sync_rules.ToastableSqliteRow) {
196196
*
197197
* If we don't store data, we assume we always have a complete row.
198198
*/
199-
export function isCompleteRow(
199+
export function isCompleteRow<V>(
200200
storeData: boolean,
201-
row: sync_rules.ToastableSqliteRow
202-
): row is sync_rules.SqliteInputRow {
201+
row: sync_rules.ToastableSqliteRow<V>
202+
): row is sync_rules.SqliteRow<V> {
203203
if (!storeData) {
204204
// Assume the row is complete - no need to check
205205
return true;

0 commit comments

Comments
 (0)