3434import java .util .concurrent .atomic .AtomicInteger ;
3535import java .util .concurrent .atomic .AtomicLong ;
3636import java .util .concurrent .atomic .AtomicReference ;
37+ import java .util .function .ToLongFunction ;
3738import org .junit .jupiter .api .AfterEach ;
39+ import org .junit .jupiter .api .BeforeEach ;
3840import org .junit .jupiter .api .Test ;
41+ import org .junit .jupiter .api .TestInfo ;
3942import org .junit .jupiter .api .extension .ExtendWith ;
4043import org .slf4j .Logger ;
4144import org .slf4j .LoggerFactory ;
@@ -45,6 +48,7 @@ public class FailureTest {
4548
4649 private static final Logger LOGGER = LoggerFactory .getLogger (FailureTest .class );
4750
51+ static String testMethod ;
4852 TestUtils .ClientFactory cf ;
4953 String stream ;
5054 ExecutorService executorService ;
@@ -57,6 +61,11 @@ static void wait(Duration duration) {
5761 }
5862 }
5963
64+ @ BeforeEach
65+ void init (TestInfo info ) {
66+ testMethod = info .getTestMethod ().get ().getName ();
67+ }
68+
6069 @ AfterEach
6170 void tearDown () {
6271 if (executorService != null ) {
@@ -142,9 +151,9 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
142151 waitAtMost (
143152 Duration .ofSeconds (10 ),
144153 () -> {
145- LOGGER . info ("Getting metadata for {}" , stream );
154+ log ("Getting metadata for {}" , stream );
146155 Client .StreamMetadata m = publisher .metadata (stream ).get (stream );
147- LOGGER . info ("Metadata for {} (expecting 2 replicas): {}" , stream , m );
156+ log ("Metadata for {} (expecting 2 replicas): {}" , stream , m );
148157 return m .getReplicas ().size () == 2 ;
149158 });
150159
@@ -195,6 +204,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
195204 Map <Long , Message > published = new ConcurrentHashMap <>();
196205 Set <Message > confirmed = ConcurrentHashMap .newKeySet ();
197206
207+ // match confirmed messages to published messages
198208 Client .PublishConfirmListener publishConfirmListener =
199209 (publisherId , publishingId ) -> {
200210 Message confirmedMessage ;
@@ -212,18 +222,22 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
212222 AtomicReference <Client > publisher = new AtomicReference <>();
213223 CountDownLatch reconnectionLatch = new CountDownLatch (1 );
214224 AtomicReference <Client .ShutdownListener > shutdownListenerReference = new AtomicReference <>();
225+ // shutdown listener reconnects to node 2 to locate the node the stream leader is on
226+ // it then re-creates a publisher connected to this node
215227 Client .ShutdownListener shutdownListener =
216228 shutdownContext -> {
217229 if (shutdownContext .getShutdownReason ()
218230 == Client .ShutdownContext .ShutdownReason .UNKNOWN ) {
231+ log ("Connection got closed, reconnecting" );
219232 // avoid long-running task in the IO thread
220233 executorService .submit (
221234 () -> {
222235 connected .set (false );
223236 AtomicReference <Client > locator = new AtomicReference <>();
224237 try {
238+ log ("Reconnecting to node 2" );
225239 waitAtMost (
226- Duration .ofSeconds (5 ),
240+ Duration .ofSeconds (20 ),
227241 () -> {
228242 try {
229243 locator .set (
@@ -233,14 +247,35 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
233247 return false ;
234248 }
235249 });
250+ log ("Reconnected to node 2, looking up new stream leader" );
236251 waitAtMost (
237- Duration .ofSeconds (5 ),
252+ Duration .ofSeconds (20 ),
238253 () -> {
239254 Client .StreamMetadata m = locator .get ().metadata (stream ).get (stream );
240255 return m .getLeader () != null
241256 && m .getLeader ().getPort () != streamPortNode1 ();
242257 });
258+ log ("New stream leader is on another node than node 1" );
243259 } catch (Throwable e ) {
260+ log ("Error while trying to connect to new stream leader" );
261+ if (locator .get () == null ) {
262+ log ("Could not reconnect" );
263+ } else {
264+ try {
265+ Client .StreamMetadata m = locator .get ().metadata (stream ).get (stream );
266+ if (m .getLeader () == null ) {
267+ log ("The stream has no leader" );
268+ } else {
269+ log (
270+ "The stream is on node with port {} (node 1 = {}, node 2 = {})" ,
271+ m .getLeader ().getPort (),
272+ streamPortNode1 (),
273+ streamPortNode2 ());
274+ }
275+ } catch (Exception ex ) {
276+ log ("Error while checking failure: {}" , ex .getMessage ());
277+ }
278+ }
244279 reconnectionLatch .countDown ();
245280 return ;
246281 }
@@ -278,6 +313,9 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
278313
279314 AtomicBoolean keepPublishing = new AtomicBoolean (true );
280315
316+ AtomicLong publishSequence = new AtomicLong (0 );
317+ ToLongFunction <Object > publishSequenceFunction = value -> publishSequence .getAndIncrement ();
318+
281319 executorService .submit (
282320 () -> {
283321 while (keepPublishing .get ()) {
@@ -295,7 +333,11 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
295333 .build ();
296334 try {
297335 long publishingId =
298- publisher .get ().publish ((byte ) 1 , Collections .singletonList (message )).get (0 );
336+ publisher
337+ .get ()
338+ .publish (
339+ (byte ) 1 , Collections .singletonList (message ), publishSequenceFunction )
340+ .get (0 );
299341 published .put (publishingId , message );
300342 } catch (Exception e ) {
301343 // keep going
@@ -314,6 +356,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
314356 int confirmedCount = confirmed .size ();
315357
316358 try {
359+ // stop the first node (this is where the stream leader is)
317360 Host .rabbitmqctl ("stop_app" );
318361
319362 assertThat (reconnectionLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
@@ -324,6 +367,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
324367 } finally {
325368 Host .rabbitmqctl ("start_app" );
326369 }
370+ // making sure we published a few messages and got the confirmations
327371 assertThat (confirmed ).hasSizeGreaterThan (confirmedCount );
328372 confirmedCount = confirmed .size ();
329373
@@ -339,6 +383,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
339383 // let's publish for a bit of time
340384 Thread .sleep (2000 );
341385
386+ // making sure we published messages and got the confirmations
342387 assertThat (confirmed ).hasSizeGreaterThan (confirmedCount );
343388
344389 keepPublishing .set (false );
@@ -640,4 +685,8 @@ void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSam
640685 Host .killStreamLeaderProcess (stream );
641686 waitUntil (() -> metadataNotifications .get () == 2 );
642687 }
688+
689+ private static void log (String format , Object ... args ) {
690+ LOGGER .info ("[" + testMethod + "] " + format , args );
691+ }
643692}
0 commit comments