Skip to content

Raw tables #654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .changeset/bright-snakes-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
'@powersync/common': minor
'@powersync/node': minor
'@powersync/web': minor
'@powersync/react-native': minor
---

Add experimental support for raw tables, giving you full control over the table structure to sync into.
While PowerSync manages tables as JSON views by default, raw tables have to be created by the application
developer. Also, the upsert and delete statements for raw tables needs to be specified in the app schema:

```JavaScript
const customSchema = new Schema({});
customSchema.withRawTables({
lists: {
put: {
sql: 'INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)',
// put statements can use `Id` and extracted columns to bind parameters.
params: ['Id', { Column: 'name' }]
},
delete: {
sql: 'DELETE FROM lists WHERE id = ?',
// delete statements can only use the id (but a CTE querying existing rows by id could
// be used as a workaround).
params: ['Id']
}
}
});

const powersync = // open powersync database;
await powersync.execute('CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT);');

// Ready to sync into your custom table at this point
```

The main benefit of raw tables is better query performance (since SQLite doesn't have to
extract rows from JSON) and more control (allowing the use of e.g. column and table constraints).
5 changes: 4 additions & 1 deletion packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Connects to stream of events from the PowerSync instance.
*/
async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) {
return this.connectionManager.connect(connector, options);
const resolvedOptions = options ?? {};
resolvedOptions.serializedSchema = this.schema.toJSON();

return this.connectionManager.connect(connector, resolvedOptions);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/client/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
await this.syncDisposer?.();
}

async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) {
async connect(connector: PowerSyncBackendConnector, options: PowerSyncConnectionOptions) {
// Keep track if there were pending operations before this call
const hadPendingOptions = !!this.pendingConnectionOptions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ export interface BaseConnectionOptions {
* These parameters are passed to the sync rules, and will be available under the`user_parameters` object.
*/
params?: Record<string, StreamingSyncRequestParameterType>;

/**
* The serialized schema - mainly used to forward information about raw tables to the sync client.
*/
serializedSchema?: any;
}

/** @internal */
Expand Down Expand Up @@ -208,7 +213,8 @@ export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptio
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
clientImplementation: DEFAULT_SYNC_CLIENT_IMPLEMENTATION,
fetchStrategy: FetchStrategy.Buffered,
params: {}
params: {},
serializedSchema: undefined
};

// The priority we assume when we receive checkpoint lines where no priority is set.
Expand Down Expand Up @@ -1019,12 +1025,12 @@ The next upload iteration will be delayed.`);
}

try {
await control(
PowerSyncControlCommand.START,
JSON.stringify({
parameters: resolvedOptions.params
})
);
const options: any = { parameters: resolvedOptions.params };
if (resolvedOptions.serializedSchema) {
options.schema = resolvedOptions.serializedSchema;
}

await control(PowerSyncControlCommand.START, JSON.stringify(options));

this.notifyCompletedUploads = () => {
controlInvocations?.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED });
Expand Down
63 changes: 63 additions & 0 deletions packages/common/src/db/schema/RawTable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* A pending variant of a {@link RawTable} that doesn't have a name (because it would be inferred when creating the
* schema).
*/
export type RawTableType = {
/**
* The statement to run when PowerSync detects that a row needs to be inserted or updated.
*/
put: PendingStatement;
/**
* The statement to run when PowerSync detects that a row needs to be deleted.
*/
delete: PendingStatement;
};

/**
* A parameter to use as part of {@link PendingStatement}.
*
* For delete statements, only the `"Id"` value is supported - the sync client will replace it with the id of the row to
* be synced.
*
* For insert and replace operations, the values of columns in the table are available as parameters through
* `{Column: 'name'}`.
*/
export type PendingStatementParameter = 'Id' | { Column: string };

/**
* A statement that the PowerSync client should use to insert or delete data into a table managed by the user.
*/
export type PendingStatement = {
sql: string;
params: PendingStatementParameter[];
};

/**
* Instructs PowerSync to sync data into a "raw" table.
*
* Since raw tables are not backed by JSON, running complex queries on them may be more efficient. Further, they allow
* using client-side table and column constraints.
*
* Note that raw tables are only supported when using the new `SyncClientImplementation.rust` sync client.
*
* @experimental Please note that this feature is experimental at the moment, and not covered by PowerSync semver or
* stability guarantees.
*/
export class RawTable implements RawTableType {
/**
* The name of the table.
*
* This does not have to match the actual table name in the schema - {@link put} and {@link delete} are free to use
* another table. Instead, this name is used by the sync client to recognize that operations on this table (as it
* appears in the source / backend database) are to be handled specially.
*/
name: string;
put: PendingStatement;
delete: PendingStatement;

constructor(name: string, type: RawTableType) {
this.name = name;
this.put = type.put;
this.delete = type.delete;
}
}
23 changes: 22 additions & 1 deletion packages/common/src/db/schema/Schema.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { RawTable, RawTableType } from './RawTable.js';
import { RowType, Table } from './Table.js';

type SchemaType = Record<string, Table<any>>;
Expand All @@ -16,6 +17,7 @@ export class Schema<S extends SchemaType = SchemaType> {
readonly types: SchemaTableType<S>;
readonly props: S;
readonly tables: Table[];
readonly rawTables: RawTable[];

constructor(tables: Table[] | S) {
if (Array.isArray(tables)) {
Expand All @@ -36,6 +38,24 @@ export class Schema<S extends SchemaType = SchemaType> {
this.props = tables as S;
this.tables = this.convertToClassicTables(this.props);
}

this.rawTables = [];
}

/**
* Adds raw tables to this schema. Raw tables are identified by their name, but entirely managed by the application
* developer instead of automatically by PowerSync.
* Since raw tables are not backed by JSON, running complex queries on them may be more efficient. Further, they allow
* using client-side table and column constraints.
* Note that raw tables are only supported when using the new `SyncClientImplementation.rust` sync client.
*
* @param tables An object of (table name, raw table definition) entries.
* @experimental Note that the raw tables API is still experimental and may change in the future.
*/
withRawTables(tables: Record<string, RawTableType>) {
for (const [name, rawTableDefinition] of Object.entries(tables)) {
this.rawTables.push(new RawTable(name, rawTableDefinition));
}
}

validate() {
Expand All @@ -47,7 +67,8 @@ export class Schema<S extends SchemaType = SchemaType> {
toJSON() {
return {
// This is required because "name" field is not present in TableV2
tables: this.tables.map((t) => t.toJSON())
tables: this.tables.map((t) => t.toJSON()),
raw_tables: this.rawTables
};
}

Expand Down
25 changes: 25 additions & 0 deletions packages/common/tests/db/schema/Schema.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ describe('Schema', () => {
content: column.text
})
});
schema.withRawTables({
lists: {
put: {
sql: 'SELECT 1',
params: [{ Column: 'foo' }]
},
delete: {
sql: 'SELECT 2',
params: ['Id']
}
}
});

const json = schema.toJSON();

Expand Down Expand Up @@ -115,6 +127,19 @@ describe('Schema', () => {
],
indexes: []
}
],
raw_tables: [
{
name: 'lists',
delete: {
sql: 'SELECT 2',
params: ['Id']
},
put: {
sql: 'SELECT 1',
params: [{ Column: 'foo' }]
}
}
]
});
});
Expand Down
80 changes: 80 additions & 0 deletions packages/node/tests/sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
OplogEntryJSON,
PowerSyncConnectionOptions,
ProgressWithOperations,
Schema,
SyncClientImplementation,
SyncStreamConnectionMethod
} from '@powersync/common';
Expand Down Expand Up @@ -638,6 +639,85 @@ function defineSyncTests(impl: SyncClientImplementation) {
expect(another.currentStatus.statusForPriority(0).hasSynced).toBeTruthy();
await another.waitForFirstSync({ priority: 0 });
});

if (impl == SyncClientImplementation.RUST) {
mockSyncServiceTest('raw tables', async ({ syncService }) => {
const customSchema = new Schema({});
customSchema.withRawTables({
lists: {
put: {
sql: 'INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)',
params: ['Id', { Column: 'name' }]
},
delete: {
sql: 'DELETE FROM lists WHERE id = ?',
params: ['Id']
}
}
});

const powersync = await syncService.createDatabase({ schema: customSchema });
await powersync.execute('CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT);');

const query = powersync.watchWithAsyncGenerator('SELECT * FROM lists')[Symbol.asyncIterator]();
expect((await query.next()).value.rows._array).toStrictEqual([]);

powersync.connect(new TestConnector(), options);
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({
checkpoint: {
last_op_id: '1',
buckets: [bucket('a', 1)]
}
});
syncService.pushLine({
data: {
bucket: 'a',
data: [
{
checksum: 0,
op_id: '1',
op: 'PUT',
object_id: 'my_list',
object_type: 'lists',
data: '{"name": "custom list"}'
}
]
}
});
syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } });
await powersync.waitForFirstSync();

expect((await query.next()).value.rows._array).toStrictEqual([{ id: 'my_list', name: 'custom list' }]);

syncService.pushLine({
checkpoint: {
last_op_id: '2',
buckets: [bucket('a', 2)]
}
});
await vi.waitFor(() => powersync.currentStatus.dataFlowStatus.downloading == true);
syncService.pushLine({
data: {
bucket: 'a',
data: [
{
checksum: 0,
op_id: '2',
op: 'REMOVE',
object_id: 'my_list',
object_type: 'lists'
}
]
}
});
syncService.pushLine({ checkpoint_complete: { last_op_id: '2' } });
await vi.waitFor(() => powersync.currentStatus.dataFlowStatus.downloading == false);

expect((await query.next()).value.rows._array).toStrictEqual([]);
});
}
}

function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum {
Expand Down
10 changes: 5 additions & 5 deletions packages/node/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
column,
NodePowerSyncDatabaseOptions,
PowerSyncBackendConnector,
PowerSyncConnectionOptions,
PowerSyncCredentials,
PowerSyncDatabase,
Schema,
Expand Down Expand Up @@ -56,12 +55,12 @@ async function createDatabase(
options: Partial<NodePowerSyncDatabaseOptions> = {}
): Promise<PowerSyncDatabase> {
const database = new PowerSyncDatabase({
...options,
schema: AppSchema,
database: {
dbFilename: 'test.db',
dbLocation: tmpdir
}
},
...options
});
await database.init();
return database;
Expand Down Expand Up @@ -128,8 +127,9 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
}
};

const newConnection = async () => {
const newConnection = async (options?: Partial<NodePowerSyncDatabaseOptions>) => {
const db = await createDatabase(tmpdir, {
...options,
remoteOptions: {
fetchImplementation: inMemoryFetch
}
Expand All @@ -156,7 +156,7 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
export interface MockSyncService {
pushLine: (line: StreamingSyncLine) => void;
connectedListeners: any[];
createDatabase: () => Promise<PowerSyncDatabase>;
createDatabase: (options?: Partial<NodePowerSyncDatabaseOptions>) => Promise<PowerSyncDatabase>;
}

export class TestConnector implements PowerSyncBackendConnector {
Expand Down
Loading