Skip to content

[MySQL] Automatic schema change handling #287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 48 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
e5db119
Removed zongji type mappings which are now provided by the Zongji pac…
Rentacookie May 21, 2025
d03360e
Moved most of the binlog event handling logic to a separate BinlogLis…
Rentacookie May 21, 2025
924ecd8
Updated the BinLogStream to use the new BinLogListener
Rentacookie May 21, 2025
404dcde
Renamed BinlogListener to BinLogListener
Rentacookie May 21, 2025
b1b8c30
Added changeset
Rentacookie May 21, 2025
16c1235
Merge branch 'main' into mysql-binlog-backpressure-handling
Rentacookie May 21, 2025
126f9b3
Simplified BinLogListener stopping mechanism
Rentacookie May 22, 2025
a03260f
Corrected BinLogListener name.
Rentacookie May 26, 2025
e147318
Supply port for binlog listener connections.
Rentacookie May 27, 2025
999a8dc
Only set up binlog heartbeat once the listener is fully started up.
Rentacookie May 27, 2025
782b43c
Merge branch 'main' into mysql-binlog-backpressure-handling
Rentacookie May 27, 2025
07201e8
Updated changeset
Rentacookie May 27, 2025
079a2f5
Changed binlog backpressure mechanism to be based on processing queue…
Rentacookie May 29, 2025
59afb33
Merge branch 'main' into mysql-binlog-backpressure-handling
Rentacookie May 29, 2025
286ba16
Changed binlog backpressure mechanism to be based on processing queue…
Rentacookie May 29, 2025
9a00b8b
Added optional columns field to SourceEntityDescriptor
Rentacookie Jun 4, 2025
3aebffd
Cleanup unused imports
Rentacookie Jun 4, 2025
bf481c8
Ensure column values are preserved when available Report 0 storage me…
Rentacookie Jun 5, 2025
b673609
Added basic schema change handling for MySQL
Rentacookie Jun 5, 2025
f707e2b
Revert columns field addition to SourceEntityDescriptor
Rentacookie Jun 18, 2025
74adb22
Added schema change handling for the MySQL binlog replication.
Rentacookie Jun 18, 2025
1270939
Include powersync core version in metrics metadata
Rentacookie Jun 18, 2025
7500bed
Code analysis cleanup
Rentacookie Jun 18, 2025
add2590
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jun 18, 2025
54e6a9d
Merge conflicts
Rentacookie Jun 18, 2025
a5582b1
Fixed parser import
Rentacookie Jun 18, 2025
d34f8fa
Fixed mysql->sqlite rows parsing that would filter out columns with n…
Rentacookie Jun 25, 2025
fa327ae
Cleaned up SchemaChange handling in BinLogListener
Rentacookie Jun 25, 2025
1d1e945
Added schema change tests
Rentacookie Jun 25, 2025
dd0119a
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jun 25, 2025
ce8cb9c
Change binlog event receive log message to debug
Rentacookie Jun 25, 2025
2411f21
Revert and fix mysql->sqlite row conversion for null value columns
Rentacookie Jun 25, 2025
cd8ef3e
Added conditional skip of mysql schema test for syntax that does not …
Rentacookie Jun 25, 2025
18e0865
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jun 26, 2025
3adea04
Fixed version checking for mysql 5.7 incompatible test
Rentacookie Jun 26, 2025
79bd14e
Fix skip test on mysql 5.7 schema change
Rentacookie Jun 26, 2025
81b437f
Reverted mysql dev docker compose
Rentacookie Jun 26, 2025
b8e631b
Moved schema change handling to processing queue
Rentacookie Jun 30, 2025
ac96801
Fixed bug where multiple zongji listeners could be started if multipl…
Rentacookie Jul 1, 2025
301345c
Extended node-sql-parser type definitions
Rentacookie Jul 9, 2025
3898db7
- Simplified schema change types
Rentacookie Jul 9, 2025
a339ec8
Removed unused constant
Rentacookie Jul 9, 2025
f472dd6
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jul 9, 2025
5df001b
Skip unsupported schema test for MySQL 5.7
Rentacookie Jul 9, 2025
57fcfec
Added error handling for zongji emitted schema errors
Rentacookie Jul 10, 2025
e624081
Added changeset
Rentacookie Jul 11, 2025
462e08d
Typo fixes from pr feedback
Rentacookie Jul 11, 2025
e9a6569
Merge branch 'main' into feat/mysql-schema-change-handling
Rentacookie Jul 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .changeset/wet-berries-enjoy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mysql': minor
'@powersync/service-sync-rules': minor
---

MySQL:
- Added schema change handling
- Except for some edge cases, the following schema changes are now handled automatically:
- Creation, renaming, dropping and truncation of tables.
- Creation and dropping of unique indexes and primary keys.
- Adding, modifying, dropping and renaming of table columns.
- If a schema change cannot handled automatically, a warning with details will be logged.
- Mismatches in table schema from the Zongji binlog listener are now handled more gracefully.
- Replication of wildcard tables is now supported.
- Improved logging for binlog event processing.
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ export class MongoSyncBucketStorage
async resolveTable(options: storage.ResolveTableOptions): Promise<storage.ResolveTableResult> {
const { group_id, connection_id, connection_tag, entity_descriptor } = options;

const { schema, name: table, objectId, replicationColumns } = entity_descriptor;
const { schema, name, objectId, replicaIdColumns } = entity_descriptor;

const columns = replicationColumns.map((column) => ({
const normalizedReplicaIdColumns = replicaIdColumns.map((column) => ({
name: column.name,
type: column.type,
type_oid: column.typeId
Expand All @@ -176,8 +176,8 @@ export class MongoSyncBucketStorage
group_id: group_id,
connection_id: connection_id,
schema_name: schema,
table_name: table,
replica_id_columns2: columns
table_name: name,
replica_id_columns2: normalizedReplicaIdColumns
};
if (objectId != null) {
filter.relation_id = objectId;
Expand All @@ -190,24 +190,24 @@ export class MongoSyncBucketStorage
connection_id: connection_id,
relation_id: objectId,
schema_name: schema,
table_name: table,
table_name: name,
replica_id_columns: null,
replica_id_columns2: columns,
replica_id_columns2: normalizedReplicaIdColumns,
snapshot_done: false,
snapshot_status: undefined
};

await col.insertOne(doc, { session });
}
const sourceTable = new storage.SourceTable(
doc._id,
connection_tag,
objectId,
schema,
table,
replicationColumns,
doc.snapshot_done ?? true
);
const sourceTable = new storage.SourceTable({
id: doc._id,
connectionTag: connection_tag,
objectId: objectId,
schema: schema,
name: name,
replicaIdColumns: replicaIdColumns,
snapshotComplete: doc.snapshot_done ?? true
});
sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable);
sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable);
sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable);
Expand All @@ -222,7 +222,7 @@ export class MongoSyncBucketStorage

let dropTables: storage.SourceTable[] = [];
// Detect tables that are either renamed, or have different replica_id_columns
let truncateFilter = [{ schema_name: schema, table_name: table }] as any[];
let truncateFilter = [{ schema_name: schema, table_name: name }] as any[];
if (objectId != null) {
// Only detect renames if the source uses relation ids.
truncateFilter.push({ relation_id: objectId });
Expand All @@ -240,15 +240,16 @@ export class MongoSyncBucketStorage
.toArray();
dropTables = truncate.map(
(doc) =>
new storage.SourceTable(
doc._id,
connection_tag,
doc.relation_id,
doc.schema_name,
doc.table_name,
doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [],
doc.snapshot_done ?? true
)
new storage.SourceTable({
id: doc._id,
connectionTag: connection_tag,
objectId: doc.relation_id,
schema: doc.schema_name,
name: doc.table_name,
replicaIdColumns:
doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [],
snapshotComplete: doc.snapshot_done ?? true
})
);

result = {
Expand Down Expand Up @@ -567,7 +568,6 @@ export class MongoSyncBucketStorage
`${this.slot_name} Cleared batch of data in ${lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, continuing...`
);
await timers.setTimeout(lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5);
continue;
} else {
throw e;
}
Expand Down
38 changes: 19 additions & 19 deletions modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';

import { MongoManager } from '../replication/MongoManager.js';
import { constructAfterRecord, createCheckpoint, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js';
import { constructAfterRecord, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js';
import * as types from '../types/types.js';
import { escapeRegExp } from '../utils.js';
Expand Down Expand Up @@ -137,15 +137,15 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
if (tablePattern.isWildcard) {
patternResult.tables = [];
for (let collection of collections) {
const sourceTable = new SourceTable(
0,
this.connectionTag,
collection.name,
schema,
collection.name,
[],
true
);
const sourceTable = new SourceTable({
id: 0,
connectionTag: this.connectionTag,
objectId: collection.name,
schema: schema,
name: collection.name,
replicaIdColumns: [],
snapshotComplete: true
});
let errors: service_types.ReplicationError[] = [];
if (collection.type == 'view') {
errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` });
Expand All @@ -164,15 +164,15 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
});
}
} else {
const sourceTable = new SourceTable(
0,
this.connectionTag,
tablePattern.name,
schema,
tablePattern.name,
[],
true
);
const sourceTable = new SourceTable({
id: 0,
connectionTag: this.connectionTag,
objectId: tablePattern.name,
schema: schema,
name: tablePattern.name,
replicaIdColumns: [],
snapshotComplete: true
});

const syncData = sqlSyncRules.tableSyncsData(sourceTable);
const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable);
Expand Down
4 changes: 2 additions & 2 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ export class ChangeStream {

async estimatedCountNumber(table: storage.SourceTable): Promise<number> {
const db = this.client.db(table.schema);
return await db.collection(table.table).estimatedDocumentCount();
return await db.collection(table.name).estimatedDocumentCount();
}

private async getSnapshotLsn(): Promise<string> {
Expand Down Expand Up @@ -432,7 +432,7 @@ export class ChangeStream {
const totalEstimatedCount = await this.estimatedCountNumber(table);
let at = table.snapshotStatus?.replicatedCount ?? 0;
const db = this.client.db(table.schema);
const collection = db.collection(table.table);
const collection = db.collection(table.name);
await using query = new ChunkedSnapshotQuery({
collection,
key: table.snapshotStatus?.lastKey,
Expand Down
4 changes: 2 additions & 2 deletions modules/module-mongodb/src/replication/MongoRelation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
schema: source.db,
// Not relevant for MongoDB - we use db + coll name as the identifier
objectId: undefined,
replicationColumns: [{ name: '_id' }]
replicaIdColumns: [{ name: '_id' }]
} satisfies storage.SourceEntityDescriptor;
}

Expand All @@ -22,7 +22,7 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
*/
export function getCacheIdentifier(source: storage.SourceEntityDescriptor | storage.SourceTable): string {
if (source instanceof storage.SourceTable) {
return `${source.schema}.${source.table}`;
return `${source.schema}.${source.name}`;
}
return `${source.schema}.${source.name}`;
}
Expand Down
3 changes: 2 additions & 1 deletion modules/module-mysql/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@
"@powersync/service-sync-rules": "workspace:*",
"@powersync/service-types": "workspace:*",
"@powersync/service-jsonbig": "workspace:*",
"@powersync/mysql-zongji": "0.2.0",
"@powersync/mysql-zongji": "^0.4.0",
"async": "^3.2.4",
"mysql2": "^3.11.0",
"node-sql-parser": "^5.3.9",
"semver": "^7.5.4",
"ts-codec": "^1.3.0",
"uri-js": "^4.4.1",
Expand Down
14 changes: 11 additions & 3 deletions modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
idColumnsResult = await common.getReplicationIdentityColumns({
connection: connection,
schema,
table_name: tableName
tableName: tableName
});
} catch (ex) {
idColumnsError = { level: 'fatal', message: ex.message };
Expand All @@ -217,7 +217,15 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
}

const idColumns = idColumnsResult?.columns ?? [];
const sourceTable = new storage.SourceTable(0, this.config.tag, tableName, schema, tableName, idColumns, true);
const sourceTable = new storage.SourceTable({
id: 0,
connectionTag: this.config.tag,
objectId: tableName,
schema: schema,
name: tableName,
replicaIdColumns: idColumns,
snapshotComplete: true
});
const syncData = syncRules.tableSyncsData(sourceTable);
const syncParameters = syncRules.tableSyncsParameters(sourceTable);

Expand All @@ -232,7 +240,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
let selectError: service_types.ReplicationError | null = null;
try {
await this.retriedQuery({
query: `SELECT * FROM ${sourceTable.table} LIMIT 1`
query: `SELECT * FROM ${sourceTable.name} LIMIT 1`
});
} catch (e) {
selectError = { level: 'fatal', message: e.message };
Expand Down
3 changes: 1 addition & 2 deletions modules/module-mysql/src/common/common-index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export * from './check-source-configuration.js';
export * from './get-replication-columns.js';
export * from './get-tables-from-pattern.js';
export * from './schema-utils.js';
export * from './mysql-to-sqlite.js';
export * from './read-executed-gtid.js';
export * from './ReplicatedGTID.js';
44 changes: 0 additions & 44 deletions modules/module-mysql/src/common/get-tables-from-pattern.ts

This file was deleted.

3 changes: 3 additions & 0 deletions modules/module-mysql/src/common/mysql-to-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ export function toSQLiteRow(row: Record<string, any>, columns: Map<string, Colum
result[key] = row[key];
break;
}
} else {
// If the value is null, we just set it to null
result[key] = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkistner I am not 100% sure if this is the correct place to fix it, but I found that without this, columns with null values would not be added to sqliterow and added to the storage.
There is a schema change test that adds a column that exposed that this might be a problem.

}
}
return sync_rules.toSyncRulesRow(result);
Expand Down
Loading