Skip to content

Commit

Permalink
Log more info in cluster test
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Dec 19, 2024
1 parent 979526b commit 2c219c6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ private void assignProducersToNewManagers(
List<BrokerWrapper> candidates = brokerAndCandidates.v2();
String key = keyForNode(broker);
LOGGER.debug(
"Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key);
"Assigning {} producer(s) and consumer tracker(s) to {} (stream '{}')", trackers.size(), key, stream);
trackers.forEach(tracker -> maybeRecoverAgent(broker, candidates, tracker));
})
.exceptionally(
Expand Down
23 changes: 21 additions & 2 deletions src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import io.netty.channel.EventLoopGroup;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -251,6 +254,11 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
LOGGER.info("Environment information:");
System.out.println(TestUtils.jsonPrettyPrint(environment.toString()));

LOGGER.info("Producer information:");
producers.forEach(p -> {
LOGGER.info("Producer to '{}' (last exception: '{}')", p.stream(), p.lastException);
});

LOGGER.info("Closing producers");
producers.forEach(
p -> {
Expand Down Expand Up @@ -293,6 +301,8 @@ private static class ProducerState implements AutoCloseable {
final AtomicBoolean stopped = new AtomicBoolean(false);
final AtomicInteger acceptedCount = new AtomicInteger();
final AtomicReference<Runnable> postConfirmed = new AtomicReference<>(() -> {});
final AtomicReference<Throwable> lastException = new AtomicReference<>();
final AtomicReference<Instant> lastExceptionInstant = new AtomicReference<>();

private ProducerState(String stream, boolean dynamicBatch, Environment environment) {
this.stream = stream;
Expand All @@ -317,8 +327,9 @@ void start() {
this.limiter.acquire(1);
this.producer.send(
producer.messageBuilder().addData(BODY).build(), confirmationHandler);
} catch (Exception e) {

} catch (Throwable e) {
this.lastException.set(e);
this.lastExceptionInstant.set(Instant.now());
}
}
});
Expand All @@ -342,6 +353,14 @@ String stream() {
return this.stream;
}

String lastException() {
if (this.lastException.get() == null) {
return "no exception";
} else {
return this.lastException.get().getMessage() + " at " + DateTimeFormatter.ISO_LOCAL_TIME.format(lastExceptionInstant.get());
}
}

@Override
public void close() {
stopped.set(true);
Expand Down

0 comments on commit 2c219c6

Please sign in to comment.