Skip to content

Commit

Permalink
Merge pull request #22619 from bashtanov/test_recovery_after_catastro…
Browse files Browse the repository at this point in the history
…phic_failure-fix-shutdown

tests/availability: fix resuming of suspended nodes
  • Loading branch information
piyushredpanda authored Aug 1, 2024
2 parents 35cedc8 + 7be18a9 commit 2b7f1e4
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 32 deletions.
5 changes: 3 additions & 2 deletions tests/rptest/services/failure_injector.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,11 @@ def _heal_all(self):
if spec.type != FailureSpec.FAILURE_ISOLATE
}

def _contunue_all(self):
def _continue_all(self):
self.redpanda.logger.info(f"continuing execution on all nodes")
for n in self.redpanda.nodes:
self._continue(n)
if self.redpanda.check_node(n):
self._continue(n)
self._in_flight = {
spec
for spec in self._in_flight
Expand Down
32 changes: 14 additions & 18 deletions tests/rptest/tests/availability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ def test_availability_when_one_node_failed(self):
self.start_producer(1, throughput=10000)
self.start_consumer(1)
self.await_startup()
# start failure injector with default parameters
self.start_finjector()

self.validate_records()
# run failure injector loop with default parameters
with self.finj_thread():
self.validate_records()

@cluster(num_nodes=5, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_recovery_after_catastrophic_failure(self):
Expand Down Expand Up @@ -90,17 +89,14 @@ def test_recovery_after_catastrophic_failure(self):
self.start_consumer(1)
self.await_startup()

# inject permanent random failure
f_spec = FailureSpec(random.choice(FailureSpec.FAILURE_TYPES),
random.choice(self.redpanda.nodes[0:1]))

self.inject_failure(f_spec)

# inject transient failure on other node
f_spec = FailureSpec(random.choice(FailureSpec.FAILURE_TYPES),
self.redpanda.nodes[2],
length=2.0 if self.scale.local else 15.0)

self.inject_failure(f_spec)

self.validate_records()
with self.finj_manual() as finj:
# inject permanent random failure
f_spec = FailureSpec(random.choice(FailureSpec.FAILURE_TYPES),
random.choice(self.redpanda.nodes[0:1]))
finj(f_spec)
# inject transient failure on other node
f_spec = FailureSpec(random.choice(FailureSpec.FAILURE_TYPES),
self.redpanda.nodes[2],
length=2.0 if self.scale.local else 15.0)
finj(f_spec)
self.validate_records()
61 changes: 49 additions & 12 deletions tests/rptest/tests/e2e_finjector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from contextlib import contextmanager
import random
import time
import threading
Expand All @@ -33,7 +34,8 @@ def const_delay(delay_seconds=10):
class EndToEndFinjectorTest(EndToEndTest):
def __init__(self, test_context):
super(EndToEndFinjectorTest, self).__init__(test_context=test_context)
self.enable_failures = True
self.enable_manual = False
self.enable_loop = False
self.scale = Scale(test_context)
self.finjector_thread = None
self.failure_length_provider = scale_dependent_length(self.scale)
Expand All @@ -56,11 +58,49 @@ def configure_finjector(self,
if delay_provider:
self.failure_delay_provier = delay_provider

def start_finjector(self):
self.finjector_thread = threading.Thread(
target=self._failure_injector_loop, args=())
self.finjector_thread.daemon = True
self.finjector_thread.start()
@contextmanager
def finj_thread(self):
"""
Get a context manager that holds the test in manual failure injection
mode. Recoverable failures such as suspended process or network issues
will be repaired on exit.
:return: void
"""
try:
assert not self.enable_manual and not self.enable_loop
self.enable_loop = True
self.finjector_thread = threading.Thread(
target=self._failure_injector_loop, args=())
self.finjector_thread.start()
yield
finally:
self.enable_loop = False
if self.finjector_thread:
self.finjector_thread.join()
self._cleanup()

@contextmanager
def finj_manual(self):
"""
Get a context manager that holds the test in manual failure injection
mode. Recoverable failures such as suspended process or network issues
will be repaired on exit. Caller is supposed to make inject_failure()
calls inside the `with` statement.
:return: a callable with a single failure spec argument
"""
try:
assert not self.enable_manual and not self.enable_loop
self.enable_manual = True

def callable(spec):
return self.inject_failure(spec)

yield callable
finally:
self.enable_manual = False
self._cleanup()

def random_failure_spec(self):
f_type = random.choice(self.allowed_failures)
Expand All @@ -70,6 +110,7 @@ def random_failure_spec(self):
return FailureSpec(node=node, type=f_type, length=length)

def inject_failure(self, spec):
assert self.enable_manual or self.enable_loop
f_injector = make_failure_injector(self.redpanda)
f_injector.inject_failure(spec)

Expand All @@ -80,8 +121,7 @@ def _next_failure(self):
return self.random_failure_spec()

def _failure_injector_loop(self):

while self.enable_failures:
while self.enable_loop:
f_injector = make_failure_injector(self.redpanda)
f_injector.inject_failure(self._next_failure())

Expand All @@ -90,9 +130,6 @@ def _failure_injector_loop(self):
f"waiting {delay} seconds before next failure")
time.sleep(delay)

def teardown(self):
self.enable_failures = False
if self.finjector_thread:
self.finjector_thread.join()
def _cleanup(self):
make_failure_injector(self.redpanda)._heal_all()
make_failure_injector(self.redpanda)._continue_all()

0 comments on commit 2b7f1e4

Please sign in to comment.