Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions smoketests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,15 @@ def fingerprint(self):
def new_identity(self):
new_identity(self.__class__.config_path)

def subscribe(self, *queries, n, confirmed = False):
def subscribe(self, *queries, n, confirmed = False, database = None):
self._check_published()
assert isinstance(n, int)

args = [
SPACETIME_BIN,
"--config-path", str(self.config_path),
"subscribe", self.database_identity,
"subscribe",
database if database is not None else self.database_identity,
"-t", "600",
"-n", str(n),
"--print-initial-update",
Expand Down
79 changes: 50 additions & 29 deletions smoketests/tests/replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def test_quorum_loss(self):
self.call("send_message", "terminal")


class EnableReplication(ReplicationTest):
class EnableReplicationTest(ReplicationTest):
AUTOPUBLISH = False

def __init__(self, *args, **kwargs):
Expand All @@ -423,32 +423,41 @@ def run_counter(self, id, n = 100):
self.expected_counter_rows.append({"id": id, "value": n})
self.assertEqual(self.collect_counter_rows(), self.expected_counter_rows)

def test_enable_replication(self):
"""Tests enabling and disabling replication"""
def subscribe_to_enable_replication_events(self):
id = self.cluster.get_db_id()
return self.subscribe(
f"select * from staged_enable_replication_event where database_id={id}",
n = 2,
database = "spacetime-control"
)

def assert_bootstrap_complete(self, sub):
events = sub()
self.assertEqual(
events[-1]['staged_enable_replication_event']['inserts'][0]['message'],
'bootstrap complete',
)

def enable_replication(self, database_name):
sub = self.subscribe_to_enable_replication_events()
self.call_control("enable_replication", {"Name": database_name}, 3)
self.assert_bootstrap_complete(sub)

class EnableReplicationUnsuspended(EnableReplicationTest):
def test_enable_replication_fails_if_not_suspended(self):
"""Tests that the database to enable replication on must be suspended"""

self.add_me_as_admin()
name = random_string()
n = 100

self.publish_module(name, num_replicas = 1)
self.cluster.wait_for_leader_change(None)

# start un-replicated
self.run_counter(1, n)
# enable replication
self.call_control("enable_replication", {"Name": name}, 3)
self.run_counter(2, n)
# disable replication
self.call_control("disable_replication", {"Name": name })
self.run_counter(3, n)
# enable it one more time
self.call_control("enable_replication", {"Name": name}, 3)
self.run_counter(4, n)
with self.assertRaises(Exception):
self.call_control("enable_replication", {"Name": name}, 3)


class EnableReplicationSuspended(ReplicationTest):
AUTOPUBLISH = False

class EnableReplicationSuspended(EnableReplicationTest):
def test_enable_replication_on_suspended_database(self):
"""Tests that we can enable replication on a suspended database"""

Expand All @@ -459,14 +468,12 @@ def test_enable_replication_on_suspended_database(self):
self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health(1)

id = self.cluster.get_db_id()

self.call_control("suspend_database", {"Name": name})
# Database is now unreachable.
with self.assertRaises(Exception):
self.call("send_message", "hi")

self.call_control("enable_replication", {"Name": name}, 3)
self.enable_replication(name)
# Still unreachable until we call unsuspend.
with self.assertRaises(Exception):
self.call("send_message", "hi")
Expand All @@ -475,11 +482,25 @@ def test_enable_replication_on_suspended_database(self):
self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health(2)

# We can't direcly observe that there are indeed three replicas running,
# so as a sanity check inspect the event log.
rows = self.cluster.read_controldb(
f"select message from staged_enable_replication_event where database_id={id}")
self.assertEqual(rows, [
{'message': '"bootstrap requested"'},
{'message': '"bootstrap complete"'},
])
class EnableDisableReplication(EnableReplicationTest):
def test_enable_disable_replication(self):
"""Tests that we can enable then disable replication"""

self.add_me_as_admin()
name = random_string()

self.publish_module(name, num_replicas = 1)
self.cluster.wait_for_leader_change(None)

# suspend first
self.call_control("suspend_database", {"Name": name})
# enable replication and wait for it to complete
self.enable_replication(name)
# unsuspend
self.call_control("unsuspend_database", {"Name": name})

self.cluster.wait_for_leader_change(None)
self.run_counter(1, 100)

self.call_control("disable_replication", {"Name": name})
self.run_counter(2, 100)
Loading