Skip to content

Commit

Permalink
[DPE-2412] Full cluster crash test (#190)
Browse files Browse the repository at this point in the history
* cluster crash test

* tests now work

* linting

* comments + juju versions

* enrico comments
  • Loading branch information
MiaAltieri authored Aug 28, 2023
1 parent f06a956 commit 0d3a908
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import sys

from pymongo import MongoClient
from pymongo.errors import AutoReconnect, NotPrimaryError, PyMongoError
from pymongo.errors import (
AutoReconnect,
NotPrimaryError,
OperationFailure,
PyMongoError,
)
from pymongo.write_concern import WriteConcern

run = True
Expand Down Expand Up @@ -43,6 +48,14 @@ def continous_writes(connection_string: str, starting_number: int):
# reconnect and re-write the previous value. Hence, we `continue` here, without
# incrementing `write_value` as to try to insert this value again.
continue
except OperationFailure as e:
if e.code == 211: # HMAC error
# after cluster comes back up, it needs to resync the HMAC code. Hence, we
# `continue` here, without incrementing `write_value` as to try to insert
# this value again.
continue
else:
pass
except PyMongoError:
# we should not raise this exception but instead increment the write value and move
# on, indicating that there was a failure writing to the database.
Expand Down
159 changes: 159 additions & 0 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import os
import string
import subprocess
import tarfile
import tempfile
from asyncio import gather
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional

import kubernetes as kubernetes
import ops
import yaml
from juju.unit import Unit
Expand Down Expand Up @@ -730,3 +732,160 @@ async def update_pebble_plans(ops_test: OpsTest, override: Dict[str, str]) -> No
)
ret_code, _, _ = await ops_test.juju(*replan_cmd)
assert ret_code == 0, f"Failed to replan for unit {unit.name}"


async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool:
"""Verifies that all units of the charm do not have the DB process running."""
app = await get_application_name(ops_test, APP_NAME)

# '/' can effect the results of `pgrep`, to search for processes with '/' it is
# necessary to match the full name, i.e. '-f'
if "/" in process:
pgrep_cmd = ("pgrep", "-f", process)
else:
pgrep_cmd = ("pgrep", "-x", process)

try:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
for unit in ops_test.model.applications[app].units:
_, raw_pid, _ = await ops_test.juju(
"ssh", "--container", MONGODB_CONTAINER_NAME, unit.name, *pgrep_cmd
)

# If something was returned, there is a running process.
if len(raw_pid) > 0:
raise ProcessRunningError
except RetryError:
return False

return True


def modify_pebble_restart_delay(
ops_test: OpsTest,
unit_name: str,
pebble_plan_path: str,
ensure_replan: bool = False,
) -> None:
"""Modify the pebble restart delay of the underlying process.
Args:
ops_test: The ops test framework
unit_name: The name of unit to extend the pebble restart delay for
pebble_plan_path: Path to the file with the modified pebble plan
ensure_replan: Whether to check that the replan command succeeded
"""
kubernetes.config.load_kube_config()
client = kubernetes.client.api.core_v1_api.CoreV1Api()

pod_name = unit_name.replace("/", "-")
container_name = "mongod"
service_name = "mongod"
now = datetime.now().isoformat()

copy_file_into_pod(
client,
ops_test.model.info.name,
pod_name,
container_name,
f"/tmp/pebble_plan_{now}.yml",
pebble_plan_path,
)

add_to_pebble_layer_commands = (
f"/charm/bin/pebble add --combine {service_name} /tmp/pebble_plan_{now}.yml"
)
response = kubernetes.stream.stream(
client.connect_get_namespaced_pod_exec,
pod_name,
ops_test.model.info.name,
container=container_name,
command=add_to_pebble_layer_commands.split(),
stdin=False,
stdout=True,
stderr=True,
tty=False,
_preload_content=False,
)
response.run_forever(timeout=5)
assert (
response.returncode == 0
), f"Failed to add to pebble layer, unit={unit_name}, container={container_name}, service={service_name}"

for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
replan_pebble_layer_commands = "/charm/bin/pebble replan"
response = kubernetes.stream.stream(
client.connect_get_namespaced_pod_exec,
pod_name,
ops_test.model.info.name,
container=container_name,
command=replan_pebble_layer_commands.split(),
stdin=False,
stdout=True,
stderr=True,
tty=False,
_preload_content=False,
)
response.run_forever(timeout=60)
if ensure_replan:
assert (
response.returncode == 0
), f"Failed to replan pebble layer, unit={unit_name}, container={container_name}, service={service_name}"


def copy_file_into_pod(
client: kubernetes.client.api.core_v1_api.CoreV1Api,
namespace: str,
pod_name: str,
container_name: str,
source_path: str,
destination_path: str,
) -> None:
"""Copy file contents into pod.
Args:
client: The kubernetes CoreV1Api client
namespace: The namespace of the pod to copy files to
pod_name: The name of the pod to copy files to
container_name: The name of the pod container to copy files to
source_path: The path to which the file should be copied over
destination_path: The path of the file which needs to be copied over
"""
try:
exec_command = ["tar", "xvf", "-", "-C", "/"]

api_response = kubernetes.stream.stream(
client.connect_get_namespaced_pod_exec,
pod_name,
namespace,
container=container_name,
command=exec_command,
stdin=True,
stdout=True,
stderr=True,
tty=False,
_preload_content=False,
)

with tempfile.TemporaryFile() as tar_buffer:
with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
tar.add(destination_path, source_path)

tar_buffer.seek(0)
commands = []
commands.append(tar_buffer.read())

while api_response.is_open():
api_response.update(timeout=1)

if commands:
command = commands.pop(0)
api_response.write_stdin(command.decode())
else:
break

api_response.close()
except kubernetes.client.rest.ApiException:
assert False
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
services:
mongod:
override: merge
backoff-delay: 180s
backoff-limit: 180s
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
services:
mongod:
override: merge
backoff-delay: 500ms
backoff-limit: 30s
76 changes: 75 additions & 1 deletion tests/integration/ha_tests/test_ha.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import asyncio
import logging
import time
from datetime import datetime, timezone
Expand All @@ -16,6 +17,7 @@
MONGODB_CONTAINER_NAME,
TEST_COLLECTION,
TEST_DB,
are_all_db_processes_down,
check_db_stepped_down,
count_primaries,
deploy_and_scale_application,
Expand All @@ -34,6 +36,7 @@
insert_record_in_collection,
isolate_instance_from_cluster,
kubectl_delete,
modify_pebble_restart_delay,
mongod_ready,
relate_mongodb_and_application,
remove_instance_isolation,
Expand All @@ -49,12 +52,13 @@

logger = logging.getLogger(__name__)

RESTART_DELAY = 60 * 3
MEDIAN_REELECTION_TIME = 12


@pytest_asyncio.fixture
async def continuous_writes(ops_test: OpsTest) -> None:
"""Starts continuous writes to the MySQL cluster for a test and clear the writes at the end."""
"""Starts continuous writes to the MongoDB cluster and clear the writes at the end."""
application_name = await get_application_name(ops_test, "application")

application_unit = ops_test.model.applications[application_name].units[0]
Expand Down Expand Up @@ -392,6 +396,76 @@ async def test_restart_db_process(ops_test, continuous_writes, change_logging):
await verify_writes(ops_test)


async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes):
mongodb_application_name = await get_application_name(ops_test, APP_NAME)

# update all units to have a new RESTART_DELAY, Modifying the Restart delay to 3 minutes
# should ensure enough time for all replicas to be down at the same time.
for unit in ops_test.model.applications[mongodb_application_name].units:
modify_pebble_restart_delay(
ops_test,
unit.name,
"tests/integration/ha_tests/manifests/extend_pebble_restart_delay.yml",
ensure_replan=True,
)

# kill all units "simultaneously"
await asyncio.gather(
*[
send_signal_to_pod_container_process(
ops_test, unit.name, MONGODB_CONTAINER_NAME, MONGOD_PROCESS_NAME, "SIGKILL"
)
for unit in ops_test.model.applications[mongodb_application_name].units
]
)

# This test serves to verify behavior when all replicas are down at the same time that when
# they come back online they operate as expected. This check verifies that we meet the criterea
# of all replicas being down at the same time.
try:
assert await are_all_db_processes_down(
ops_test, MONGOD_PROCESS_NAME
), "Not all units down at the same time."
finally:
for unit in ops_test.model.applications[mongodb_application_name].units:
modify_pebble_restart_delay(
ops_test,
unit.name,
"tests/integration/ha_tests/manifests/restore_pebble_restart_delay.yml",
ensure_replan=True,
)

# sleep for twice the median election time and the restart delay
time.sleep(MEDIAN_REELECTION_TIME * 2 + RESTART_DELAY)

# verify all units are up and running
for unit in ops_test.model.applications[mongodb_application_name].units:
assert await mongod_ready(ops_test, int(unit.name.split("/")[1]))

# verify new writes are continuing by counting the number of writes before and after a 5 second
# wait
logger.info("Validating writes are continuing to DB")
primary = await get_replica_set_primary(ops_test)
with await get_mongo_client(ops_test, excluded=[primary.name]) as client:
writes = client[TEST_DB][TEST_COLLECTION].count_documents({})
time.sleep(5)
more_writes = client[TEST_DB][TEST_COLLECTION].count_documents({})
assert more_writes > writes, "writes not continuing to DB"

# verify all units are running under the same replset
hostnames = await get_units_hostnames(ops_test)
member_hosts = await fetch_replica_set_members(ops_test)
assert set(member_hosts) == set(hostnames), "all members not running under the same replset"

# verify there is only one primary after un-freezing old primary
assert (
await count_primaries(ops_test) == 1
), "there are more than one primary in the replica set."

# verify that no writes were missed.
await verify_writes(ops_test)


async def test_network_cut(ops_test: OpsTest, continuous_writes, chaos_mesh):
app = await get_application_name(ops_test, APP_NAME)

Expand Down

0 comments on commit 0d3a908

Please sign in to comment.