Skip to content

[MySQL] Automatic schema change handling #287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 48 commits into
base: main
Choose a base branch
from

Conversation

Rentacookie
Copy link
Contributor

@Rentacookie Rentacookie commented Jun 25, 2025

Currently, when syncing from a MySQL database database schema changes are ignored. This is problematic when those changes affect tables that are in the defined sync rules. The work around for that has been to redeploy the sync rules, thereby triggering a full re-sync.

This change set adds functionality to automatically handle schema changes affecting tables in the sync rules by listening for and parsing DDL query binlog events.

The basic mechanism for this works as follows:

  1. Listen for query binlog events.
  2. Parse these events to see if they affect the schema of tables in the sync rules.
  3. Pause the binlog listener and update the stored bucket data based on the schema change if required
  4. Restart the binlog listener.

Parsing of the query SQL is accomplished by using the node-sql-parser package. This does introduce a limitation in that if the library can't parse the query, we cannot easily interpret the intended schema change either. If this is the case, some best effort pattern matching is done to check if the query is a schema change that affects one of the replicated tables and a warning is logged.

The following schema changes are detectable:

  • Create, rename, drop Table
  • Truncate Table
  • Create / Drop unique indexes and primary keys
  • Add, modify, drop, rename columns

With the exception of Create Table, all of the schema events are detected by parsing the DDL statements received in the binlog query events. For Create table, these changes are picked up when row events are received for the new table.

…kage directly

Added check for tablemap events
…tener class.

Introduced a mechanism to limit the maximum size of the binlog processing queue, thus also limiting memory usage.
This maximum processing queue size is configurable
# Conflicts:
#	modules/module-mysql/package.json
Cleaned up BinLogStream logs a bit
Simplified BinLogListener stopping mechanism
Added a few more defensive stopped checks to the binlog listener
… memory usage rather than number of events.

Introduced a maximum timeout that the binlog processing queue can be paused before auto-resuming. This is to prevent the replication connection timing out.
Made SourceTable implement SourceEntityDescriptor interface
…trics instead of ignoring them. SourceTable.

Moved MySQL table detail retrieval logic to utility function.
# Conflicts:
#	modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts
#	modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts
#	modules/module-mongodb/src/replication/ChangeStream.ts
#	modules/module-mysql/package.json
#	modules/module-mysql/src/replication/BinLogStream.ts
#	modules/module-mysql/src/replication/zongji/BinLogListener.ts
#	modules/module-mysql/src/replication/zongji/zongji-utils.ts
#	modules/module-mysql/test/src/BinLogListener.test.ts
#	modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts
#	modules/module-postgres/src/replication/WalStream.ts
#	packages/service-core/src/storage/SourceTable.ts
#	pnpm-lock.yaml
Improved binlog table filtering
Added extended type definitions for node-sql-parser package
Cleaned up MySQL tests in general and added a few new test utils
@Rentacookie Rentacookie changed the title Feat/mysql schema change handling [MySQL] Automatic schema change handling Jun 26, 2025
import { ColumnDescriptor } from './SourceEntity.js';
import { ColumnDescriptor, SourceEntityDescriptor } from './SourceEntity.js';

export interface SourceTableOptions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of code style change than anything else, we tend to rather create these encapsulating interfaces instead of having functions/constructors with a lot of parameters

/**
* The columns that are used to uniquely identify a record in the source entity.
*/
replicaIdColumns: ColumnDescriptor[];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original name here was not quite right and could be confusing. Reverted to the name primarily used throughout the code base


export interface TableSnapshotStatus {
totalEstimatedCount: number;
replicatedCount: number;
lastKey: Uint8Array | null;
}

export class SourceTable {
export class SourceTable implements SourceEntityDescriptor {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This paves the way for some potential future cleanup, but also helps consolidating the naming of fields.

@@ -61,7 +63,8 @@ export function createOpenTelemetryMetricsFactory(context: ServiceContext): Metr
const meterProvider = new MeterProvider({
resource: new Resource(
{
['service']: 'PowerSync'
['service']: 'PowerSync',
['service.version']: pkg.version
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a rather useful tag to include with our metrics.

Updated to released zongji listener version
@@ -183,6 +183,9 @@ export function toSQLiteRow(row: Record<string, any>, columns: Map<string, Colum
result[key] = row[key];
break;
}
} else {
// If the value is null, we just set it to null
result[key] = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkistner I am not 100% sure if this is the correct place to fix it, but I found that without this, columns with null values would not be added to sqliterow and added to the storage.
There is a schema change test that adds a column that exposed that this might be a problem.

RENAME_TABLE = 'rename_table',
DROP_TABLE = 'drop_table',
TRUNCATE_TABLE = 'truncate_table',
MODIFY_COLUMN = 'modify_column',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we can combine most of these column changes into one type, but keeping them seperate opens the door to some potential optimisations for handling the schema change.

// Set a heartbeat interval for the Zongji replication connection
// Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown
// The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket.
await new Promise((resolve, reject) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whilst testing I discovered that trying to set up the heartbeat once the listener is already active does not work. The listener itself already has a lock on the connection then.

}
}

private get isStopped(): boolean {
return this.zongji.stopped;
public async replicateUntilStopped(): Promise<void> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was required after decoupling the zongji stop/start from this binlog listener wrapper stop start.

Internally the binlog listener can stop and start Zongji when required, not affecting the externally viewed overal stop/start state of this wrapper.

* @param query
*/
private toSchemaChanges(query: string): SchemaChange[] {
const ast = this.sqlParser.astify(query, { database: 'MySQL' });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How closely does this parser follow the MySQL syntax? And what happens if it runs into syntax it doesn't support?

One example of a past issue:
taozhi8833998/node-sql-parser#2232

To what extent would it be feasible to use the tablemap events instead of or in addition to this to detect changes?

Copy link
Contributor

@rkistner rkistner Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example that crashes the process with a nasty error (thanks to LLMs for generating some nasty test cases):

CREATE TABLE sales (
    sale_date DATE,
    amount DECIMAL(10,2)
) PARTITION BY RANGE (YEAR(sale_date)) (
    PARTITION p2023 VALUES LESS THAN (2024),
    PARTITION p2024 VALUES LESS THAN (2025),
    PARTITION pmax VALUES LESS THAN MAXVALUE
);

The crash happens regardless of whether the table is referenced in sync rules or not, and you cannot recover by restarting - it will immediately crash at the same point again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another example:

CREATE TABLE invisible_column_example (
  id INT,
  created_at DATE INVISIBLE
)

The point is not these specific examples though, just that it would be extremely difficult to cover all cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. 😧

Unfortunately the tablemap events are really only useable for detecting some column modifications. Dropping, truncating and renaming of tables do not result in any tablemap events. I did initially try a joint approach with tablemap + query parsing. But in the end since you have to parse all DDL queries anyway it felt simpler to settle on a single source for schema changes.

So that left the query parsing as the only real source of those schema changes. But as you say, if we can't parse the DDL queries our hands are rather tied. In those cases we probably have to end up skipping that specific schema change 😬 Maybe we list that as a known issue and add a warning to the logs when it happens?
That hopefully doesn't happen too often though, considering that node-sql-parser is a rather prolificly used and maintained. 🤞

I am open to any other approaches though.

Comment on lines 230 to 233
// We have to handle schema change events before handling more binlog events,
// This avoids a bunch of possible race conditions
if (zongji_utils.eventIsQuery(evt)) {
await this.processQueryEvent(evt);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What specific race conditions are covered here?

It feels like this has the possibility to introduce more race conditions, since the order is not preserved between schema changes (applied immediately here) and data modifications (queued).

Furthermore, zongi doesn't wait for these promises to complete AFAIK, so processing here can cause concurrent processing of events if multiple events come in shortly after each other, and uncaught rejections if the processing fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, reading more code I see it works like this:

  1. Schema change detected (synchronously up to that point).
  2. Stop the listener.
  3. Wait until queue is empty.
  4. Process the schema change.

So that would avoid the race conditions. I'm just wondering if it could be simpler to do this as part of the normal queue mechanism?

Copy link
Contributor Author

@Rentacookie Rentacookie Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try the normal queuing mechanism first, but the problem I ran into was that after handling the schema change we update the current binlogposition so that when the listener is restarted we process event from that point onwards. But there might already be a bunch of events in the queue, so if the listener is restarted with the binlog position post schema change. A bunch of duplicate events end up on the processing queue.

So that could be handled by processing all the events in the queue after the schema change, causing the binglog position to update and only then restarting the listener. But since the processing queue only has a single worker you run into a deadlock. As in you can't process any new tasks until the current schema change one has been completely finished and removed from the queue.

The schema change task can't technically be truly completed until the listener is restarted, but maybe I can work around that with a callback 🤔 I'll check it out since I much preferred the processing queue only approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good news I was able to slot the schema handling back into the processing queue, by using a callback to restart the listener once the queue had emptied.

Catch parsing errors, and log an error if the DDL query might apply to one of the tables in the sync rules.
…e schema change events were in the processing queue

Added small timeout to test to prevent rare race condition
Added util functions to identify the different types of DDL statements
- Added more detections of constraint changes
- Removed detection of create table statements since they can be detected and reacted to when row events are received for new tables
- Added multiple extra test cases
@Rentacookie Rentacookie marked this pull request as ready for review July 9, 2025 11:22
}

export interface BinLogListenerOptions {
connectionManager: MySQLConnectionManager;
eventHandler: BinLogEventHandler;
includedTables: string[];
// Filter for tables to include in the replication
tableFilter: (tableName: string) => boolean;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this functionality fixed the issue where wild card tables could not be replicated

@stevensJourney stevensJourney requested a review from Copilot July 11, 2025 07:08
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds automatic handling of schema changes (DDL events) in MySQL replication and standardizes the use of a name property instead of table across sync-rules and storage modules.

  • Renamed SourceTableInterface.table to .name everywhere in sync-rules and service-core
  • Introduced BinLogListener logic to parse DDL statements and emit schema change events
  • Updated all storage and API layers (Postgres, MySQL, MongoDB) to use table.name and qualifiedName

Reviewed Changes

Copilot reviewed 41 out of 42 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
packages/sync-rules/src/…/SourceTableInterface.ts Renamed tablename in interface
packages/service-core/src/storage/SourceTable.ts Refactored SourceTable to take an options object
modules/module-mysql/src/replication/zongji/BinLogListener.ts Added DDL parsing, schema change handling, restart logic
modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts Updated mapping to use table.name
modules/module-mysql/test/src/BinLogStream.test.ts Fixed missing await and test descriptions
modules/module-mysql/src/utils/parser-utils.ts Introduced DDL‐matching helper
packages/service-core/src/metrics/open-telemetry/util.ts Added service.version and JSON import of package.json
Files not reviewed (1)
  • pnpm-lock.yaml: Language not supported
Comments suppressed due to low confidence (4)

packages/service-core/src/metrics/open-telemetry/util.ts:66

  • Use the semantic key 'service.name' (per OpenTelemetry conventions) instead of 'service' for the service identifier.
        ['service']: 'PowerSync',

modules/module-mysql/test/src/BinLogStream.test.ts:16

  • [nitpick] Typo in the test suite title: it should read 'BinLogStream tests' to match the class name.
describe('BigLogStream tests', () => {

packages/service-core/src/storage/SourceTable.ts:55

  • [nitpick] The hasReplicaIdentity getter was removed; verify that no downstream code relies on it or reintroduce it if still used.
   */

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts:467

  • Confirm that the table object has a name property; if it still uses table.table, update the mapping to use table.table or rename the source field accordingly.
        name: table.name,

@@ -43,7 +42,7 @@ export class SqlEventDescriptor {
const matchingQuery = this.sourceQueries.find((q) => q.applies(options.sourceTable));
if (!matchingQuery) {
return {
errors: [{ error: `No marching source query found for table ${options.sourceTable.table}` }]
errors: [{ error: `No marching source query found for table ${options.sourceTable.name}` }]
Copy link
Preview

Copilot AI Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the typo in the error message: change 'No marching source query' to 'No matching source query'.

Suggested change
errors: [{ error: `No marching source query found for table ${options.sourceTable.name}` }]
errors: [{ error: `No matching source query found for table ${options.sourceTable.name}` }]

Copilot uses AI. Check for mistakes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants