Skip to content

WIP: Raw tables #300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
import 'package:meta/meta.dart';

Expand Down Expand Up @@ -83,8 +84,12 @@ class PowerSyncDatabaseImpl
DefaultSqliteOpenFactory factory =
// ignore: deprecated_member_use_from_same_package
PowerSyncOpenFactory(path: path, sqliteSetup: sqliteSetup);
return PowerSyncDatabaseImpl.withFactory(factory,
schema: schema, maxReaders: maxReaders, logger: logger);
return PowerSyncDatabaseImpl.withFactory(
factory,
schema: schema,
maxReaders: maxReaders,
logger: logger,
);
}

/// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory].
Expand All @@ -96,22 +101,29 @@ class PowerSyncDatabaseImpl
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.
factory PowerSyncDatabaseImpl.withFactory(
DefaultSqliteOpenFactory openFactory,
{required Schema schema,
int maxReaders = SqliteDatabase.defaultMaxReaders,
Logger? logger}) {
DefaultSqliteOpenFactory openFactory, {
required Schema schema,
int maxReaders = SqliteDatabase.defaultMaxReaders,
Logger? logger,
}) {
final db = SqliteDatabase.withFactory(openFactory, maxReaders: maxReaders);
return PowerSyncDatabaseImpl.withDatabase(
schema: schema, database: db, logger: logger);
schema: schema,
database: db,
logger: logger,
);
}

/// Open a PowerSyncDatabase on an existing [SqliteDatabase].
///
/// Migrations are run on the database when this constructor is called.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.s
PowerSyncDatabaseImpl.withDatabase(
{required this.schema, required this.database, Logger? logger}) {
PowerSyncDatabaseImpl.withDatabase({
required this.schema,
required this.database,
Logger? logger,
}) {
this.logger = logger ?? autoLogger;
isInitialized = baseInit();
}
Expand Down Expand Up @@ -247,6 +259,7 @@ class PowerSyncDatabaseImpl
options,
crudMutex.shared,
syncMutex.shared,
jsonEncode(schema),
),
debugName: 'Sync ${database.openFactory.path}',
onError: receiveUnhandledErrors.sendPort,
Expand Down Expand Up @@ -290,13 +303,15 @@ class _PowerSyncDatabaseIsolateArgs {
final ResolvedSyncOptions options;
final SerializedMutex crudMutex;
final SerializedMutex syncMutex;
final String schemaJson;

_PowerSyncDatabaseIsolateArgs(
this.sPort,
this.dbRef,
this.options,
this.crudMutex,
this.syncMutex,
this.schemaJson,
);
}

Expand Down Expand Up @@ -392,6 +407,7 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
final storage = BucketStorage(connection);
final sync = StreamingSyncImplementation(
adapter: storage,
schemaJson: args.schemaJson,
connector: InternalConnector(
getCredentialsCached: getCredentialsCached,
prefetchCredentials: prefetchCredentials,
Expand Down
44 changes: 26 additions & 18 deletions packages/powersync_core/lib/src/database/powersync_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ abstract class PowerSyncDatabase
/// A maximum of [maxReaders] concurrent read transactions are allowed.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.
factory PowerSyncDatabase(
{required Schema schema,
required String path,
Logger? logger,
@Deprecated("Use [PowerSyncDatabase.withFactory] instead.")
// ignore: deprecated_member_use_from_same_package
SqliteConnectionSetup? sqliteSetup}) {
factory PowerSyncDatabase({
required Schema schema,
required String path,
Logger? logger,
@Deprecated("Use [PowerSyncDatabase.withFactory] instead.")
// ignore: deprecated_member_use_from_same_package
SqliteConnectionSetup? sqliteSetup,
}) {
return PowerSyncDatabaseImpl(
schema: schema,
path: path,
logger: logger,
// ignore: deprecated_member_use_from_same_package
sqliteSetup: sqliteSetup);
schema: schema,
path: path,
logger: logger,
// ignore: deprecated_member_use_from_same_package
sqliteSetup: sqliteSetup,
);
}

/// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory].
Expand All @@ -55,12 +57,18 @@ abstract class PowerSyncDatabase
/// Subclass [PowerSyncOpenFactory] to add custom logic to this process.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.
factory PowerSyncDatabase.withFactory(DefaultSqliteOpenFactory openFactory,
{required Schema schema,
int maxReaders = SqliteDatabase.defaultMaxReaders,
Logger? logger}) {
return PowerSyncDatabaseImpl.withFactory(openFactory,
schema: schema, maxReaders: maxReaders, logger: logger);
factory PowerSyncDatabase.withFactory(
DefaultSqliteOpenFactory openFactory, {
required Schema schema,
int maxReaders = SqliteDatabase.defaultMaxReaders,
Logger? logger,
}) {
return PowerSyncDatabaseImpl.withFactory(
openFactory,
schema: schema,
maxReaders: maxReaders,
logger: logger,
);
}

/// Open a PowerSyncDatabase on an existing [SqliteDatabase].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ class PowerSyncDatabaseImpl
/// Migrations are run on the database when this constructor is called.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.s
factory PowerSyncDatabaseImpl.withDatabase(
{required Schema schema,
required SqliteDatabase database,
Logger? logger}) {
factory PowerSyncDatabaseImpl.withDatabase({
required Schema schema,
required SqliteDatabase database,
Logger? logger,
}) {
throw UnimplementedError();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'package:meta/meta.dart';
import 'package:http/browser_client.dart';
import 'package:logging/logging.dart';
Expand Down Expand Up @@ -75,8 +76,12 @@ class PowerSyncDatabaseImpl
SqliteConnectionSetup? sqliteSetup}) {
// ignore: deprecated_member_use_from_same_package
DefaultSqliteOpenFactory factory = PowerSyncOpenFactory(path: path);
return PowerSyncDatabaseImpl.withFactory(factory,
maxReaders: maxReaders, logger: logger, schema: schema);
return PowerSyncDatabaseImpl.withFactory(
factory,
maxReaders: maxReaders,
logger: logger,
schema: schema,
);
}

/// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory].
Expand All @@ -94,16 +99,22 @@ class PowerSyncDatabaseImpl
Logger? logger}) {
final db = SqliteDatabase.withFactory(openFactory, maxReaders: 1);
return PowerSyncDatabaseImpl.withDatabase(
schema: schema, logger: logger, database: db);
schema: schema,
logger: logger,
database: db,
);
}

/// Open a PowerSyncDatabase on an existing [SqliteDatabase].
///
/// Migrations are run on the database when this constructor is called.
///
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.
PowerSyncDatabaseImpl.withDatabase(
{required this.schema, required this.database, Logger? logger}) {
PowerSyncDatabaseImpl.withDatabase({
required this.schema,
required this.database,
Logger? logger,
}) {
if (logger != null) {
this.logger = logger;
} else {
Expand Down Expand Up @@ -141,6 +152,7 @@ class PowerSyncDatabaseImpl

sync = StreamingSyncImplementation(
adapter: storage,
schemaJson: jsonEncode(schema),
connector: InternalConnector.wrap(connector, this),
crudUpdateTriggerStream: crudStream,
options: options,
Expand Down
63 changes: 61 additions & 2 deletions packages/powersync_core/lib/src/schema.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import 'schema_logic.dart';
class Schema {
/// List of tables in the schema.
final List<Table> tables;
final List<RawTable> rawTables;

const Schema(this.tables);
const Schema(this.tables, {this.rawTables = const []});

Map<String, dynamic> toJson() => {'tables': tables};
Map<String, dynamic> toJson() => {'raw_tables': rawTables, 'tables': tables};

void validate() {
Set<String> tableNames = {};
Expand Down Expand Up @@ -315,6 +316,64 @@ class Column {
Map<String, dynamic> toJson() => {'name': name, 'type': type.sqlite};
}

final class RawTable {
final String name;
final PendingStatement put;
final PendingStatement delete;

const RawTable({
required this.name,
required this.put,
required this.delete,
});

Map<String, dynamic> toJson() => {
'name': name,
'put': put,
'delete': delete,
};
}

final class PendingStatement {
final String sql;
final List<PendingStatementValue> params;

PendingStatement({required this.sql, required this.params});

Map<String, dynamic> toJson() => {
'sql': sql,
'params': params,
};
}

sealed class PendingStatementValue {
factory PendingStatementValue.id() = _PendingStmtValueId;
factory PendingStatementValue.column(String column) = _PendingStmtValueColumn;

dynamic toJson();
}

class _PendingStmtValueColumn implements PendingStatementValue {
final String column;
const _PendingStmtValueColumn(this.column);

@override
dynamic toJson() {
return {
'Column': column,
};
}
}

class _PendingStmtValueId implements PendingStatementValue {
const _PendingStmtValueId();

@override
dynamic toJson() {
return 'Id';
}
}

/// Type of column.
enum ColumnType {
/// TEXT column.
Expand Down
9 changes: 8 additions & 1 deletion packages/powersync_core/lib/src/sync/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ abstract interface class StreamingSync {

@internal
class StreamingSyncImplementation implements StreamingSync {
final String schemaJson;
final BucketStorage adapter;
final InternalConnector connector;
final ResolvedSyncOptions options;
Expand Down Expand Up @@ -62,6 +63,7 @@ class StreamingSyncImplementation implements StreamingSync {
String? clientId;

StreamingSyncImplementation({
required this.schemaJson,
required this.adapter,
required this.connector,
required this.crudUpdateTriggerStream,
Expand Down Expand Up @@ -596,7 +598,12 @@ final class _ActiveRustStreamingIteration {
Future<void> syncIteration() async {
try {
await _control(
'start', convert.json.encode({'parameters': sync.options.params}));
'start',
convert.json.encode({
'parameters': sync.options.params,
'schema': convert.json.decode(sync.schemaJson),
}),
);
assert(_completedStream.isCompleted, 'Should have started streaming');
await _completedStream.future;
} finally {
Expand Down
5 changes: 4 additions & 1 deletion packages/powersync_core/lib/src/web/sync_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class SyncWorkerHandle implements StreamingSync {
@override
Future<void> streamingSync() async {
await _channel.startSynchronization(
database.database.openFactory.path, ResolvedSyncOptions(options));
database.database.openFactory.path,
ResolvedSyncOptions(options),
database.schema,
);
}
}
Loading