Skip to content

Fix an issue with logical replication test and simplify the test #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ dev_dependencies:
pedantic: ^1.0.0
test: ^1.3.0
coverage: any
docker_process: ^1.3.0
docker_process: ^1.3.1
7 changes: 7 additions & 0 deletions test/docker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ void usePostgresDocker() {
pgUser: 'dart',
pgPassword: 'dart',
cleanup: true,
// These are necessary for logical replication tests and
// they won't have an effect on other tests.
configurations: [
'wal_level=logical',
'max_replication_slots=5',
'max_wal_senders=5',
],
);
});

Expand Down
91 changes: 32 additions & 59 deletions test/logical_replication_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import 'package:test/scaffolding.dart';
import 'docker.dart';

void main() {
// Running these tests on the CI will fail since the the `SetUpAll` function
// alter systems configuration (i.e. wal_level, max_replication_slots, max_wal_senders)
// which requires reloading the database changes. The only known possible way to do that
// now is to restart the docker container which apparently won't work on the CI
// Running these tests on the CI will not work as there is no way to pass
// image arguments to the `services.postgres` container of github actions.
//
// TODO: enable replication configuration before spinning up the container
// i.e. pre-set wal_level, max_replication_slots, max_wal_senders in
// `postgresql.conf` file
// TODO: Find a solution to spin up a postgres container where one can enable
// the replication configuration before spinning up the container either
// by passing image arguments or restarting the container after altering
// the database configurations (required for replication configs).
if (Platform.environment.containsKey('GITHUB_ACTION')) {
test('NO LOGICAL REPLICATION TESTS ARE RUNNING.', () {
// no-op
Expand All @@ -40,7 +39,7 @@ void main() {
final logicalDecodingPlugin = 'pgoutput';
final replicationMode = ReplicationMode.logical;
// use this for listening to messages
var replicationConn = PostgreSQLConnection(
final replicationConn = PostgreSQLConnection(
_host,
_port,
_database,
Expand All @@ -58,51 +57,34 @@ void main() {
password: _password,
);

setUpAll(() async {
await replicationConn.open();

// Setup the database for replication
await replicationConn.execute('ALTER SYSTEM SET wal_level = logical;');
await replicationConn
.execute('ALTER SYSTEM SET max_replication_slots = 5;');
await replicationConn.execute('ALTER SYSTEM SET max_wal_senders=5;');

/// 'ALTER SYSTEM' statement requires restarting the database
/// An easy way is to restart the docker container
await replicationConn.close();

// This is a temp work around until a better way is found
// Adding this to `docker.dart` can be problmatic since it should not be
// used in tests that run on the CI.
// TODO: remove this once an alternative method is found
await Process.run('docker', ['restart', 'postgres-dart-test']);

// it is necessary re-construct the object as calling `conn.open()` won't work
replicationConn = PostgreSQLConnection(
_host,
_port,
_database,
username: _username,
password: _password,
replicationMode: replicationMode,
);
// this table is for insert, update, and delete tests.
final changesTable = 'temp_changes_table';
// this will be used for testing truncation
// must be created before hand to add in publication
final truncateTable = 'temp_truncate_table';

// wait for a second then open the connections.
await Future.delayed(Duration(seconds: 1));
setUpAll(() async {
await replicationConn.open();
await changesConn.open();

// create a temp table for testing
await replicationConn.execute('create table if not exists temp'
'(id int GENERATED ALWAYS AS IDENTITY, value text,'
// create testing tables
// note: primary keys are necessary for replication to work and they are
// used as an identity replica (to allow update & delete) on tables
// that are part of a publication.
await replicationConn.execute('create table $changesTable '
'(id int GENERATED ALWAYS AS IDENTITY, value text, '
'PRIMARY KEY (id));');
await replicationConn.execute('create table $truncateTable '
'(id int GENERATED ALWAYS AS IDENTITY, value text, '
'PRIMARY KEY (id));');

// create publication
final publicationName = 'test_publication';
await replicationConn
.execute('DROP PUBLICATION IF EXISTS $publicationName;');
await replicationConn
.execute('CREATE PUBLICATION $publicationName FOR ALL TABLES;');
await replicationConn.execute(
'CREATE PUBLICATION $publicationName FOR TABLE $changesTable, $truncateTable;',
);

final sysInfoRes = await replicationConn.query('IDENTIFY_SYSTEM;',
useSimpleQueryProtocol: true);
Expand All @@ -114,7 +96,7 @@ void main() {

// `TEMPORARY` will remove the slot after the connection is closed/dropped
await replicationConn.execute(
'CREATE_REPLICATION_SLOT $slotName TEMPORARY LOGICAL'
'CREATE_REPLICATION_SLOT $slotName TEMPORARY LOGICAL '
'$logicalDecodingPlugin NOEXPORT_SNAPSHOT',
);

Expand Down Expand Up @@ -156,7 +138,7 @@ void main() {
// don't await here otherwise what's after won't be executed.
final future = controller.addStream(stream);
await changesConn
.execute("insert into temp (value) values ('test');");
.execute("insert into $changesTable (value) values ('test');");
await future;
await controller.close();
},
Expand All @@ -175,7 +157,7 @@ void main() {
test('- Receive UpdateMessage after update statement', () async {
// insert data to be updated
await changesConn
.execute("insert into temp (value) values ('update_test');");
.execute("insert into $changesTable (value) values ('update_test');");
// wait to avoid capturing INSERT
await Future.delayed(Duration(seconds: 3));
final stream = replicationConn.messages
Expand All @@ -192,7 +174,7 @@ void main() {
// don't await here otherwise what's after won't be executed.
final future = controller.addStream(stream);
await changesConn.execute(
"update temp set value = 'updated_test_value'"
"update $changesTable set value = 'updated_test_value'"
"where value = 'update_test';",
);
await future;
Expand All @@ -212,7 +194,7 @@ void main() {
test('- Receive DeleteMessage after delete statement', () async {
// insert data to be delete
await changesConn
.execute("insert into temp (value) values ('update_test');");
.execute("insert into $changesTable (value) values ('update_test');");
// wait to avoid capturing INSERT
await Future.delayed(Duration(seconds: 3));
final stream = replicationConn.messages
Expand All @@ -229,7 +211,7 @@ void main() {
// don't await here otherwise what's after won't be executed.
final future = controller.addStream(stream);
await changesConn.execute(
"delete from temp where value = 'update_test';",
"delete from $changesTable where value = 'update_test';",
);
await future;
await controller.close();
Expand All @@ -247,15 +229,6 @@ void main() {

// BeginMessage -> TruncateMessage -> CommitMessage
test('- Receive TruncateMessage after delete statement', () async {
final tableName = 'temp_truncate';
// create table to be truncated
await changesConn.execute('''
create table if not exists $tableName (
id int GENERATED ALWAYS AS IDENTITY,
value text,
PRIMARY KEY (id)
);
''');
// wait to for a second
await Future.delayed(Duration(seconds: 1));
final stream = replicationConn.messages
Expand All @@ -272,7 +245,7 @@ create table if not exists $tableName (
// don't await here otherwise what's after won't be executed.
final future = controller.addStream(stream);
await changesConn.execute(
'truncate table $tableName;',
'truncate table $truncateTable;',
);
await future;
await controller.close();
Expand Down