Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose gossip future #8735

Merged
merged 12 commits into from
Oct 25, 2024
Prev Previous commit
Next Next commit
rename logging method
add static creators
  • Loading branch information
tbenr committed Oct 24, 2024
commit 9b37f3e54fb255b2dd544bd26417ad80ab62f5f8
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected SafeFuture<Void> publishMessageWithFeedback(final T message) {
.handle(
(__, err) -> {
if (err != null) {
gossipFailureLogger.logWithSuppression(err, getSlotForMessage.apply(message));
gossipFailureLogger.log(err, getSlotForMessage.apply(message));
} else {
LOG.trace(
"Successfully gossiped message with root {} on {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public AggregateGossipManager(
message -> Optional.of(message.getMessage().getAggregate().getData().getSlot()),
message -> spec.computeEpochAtSlot(message.getMessage().getAggregate().getData().getSlot()),
spec.getNetworkingConfig(),
new GossipFailureLogger(GossipTopicName.BEACON_AGGREGATE_AND_PROOF.toString(), true),
GossipFailureLogger.createSuppressing(
GossipTopicName.BEACON_AGGREGATE_AND_PROOF.toString()),
debugDataDumper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class AttestationGossipManager implements GossipManager {
private final Counter attestationPublishFailureCounter;

private final GossipFailureLogger gossipFailureLogger =
new GossipFailureLogger("attestation", true);
GossipFailureLogger.createSuppressing("attestation");

public AttestationGossipManager(
final MetricsSystem metricsSystem,
Expand Down Expand Up @@ -64,8 +64,7 @@ public void onNewAttestation(final ValidatableAttestation validatableAttestation
attestationPublishSuccessCounter.inc();
},
error -> {
gossipFailureLogger.logWithSuppression(
error, Optional.of(attestation.getData().getSlot()));
gossipFailureLogger.log(error, Optional.of(attestation.getData().getSlot()));
attestationPublishFailureCounter.inc();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public AttesterSlashingGossipManager(
message -> Optional.of(message.getAttestation1().getData().getSlot()),
message -> spec.computeEpochAtSlot(message.getAttestation1().getData().getSlot()),
spec.getNetworkingConfig(),
new GossipFailureLogger(GossipTopicName.ATTESTER_SLASHING.toString(), false),
GossipFailureLogger.createNonSuppressing(GossipTopicName.ATTESTER_SLASHING.toString()),
debugDataDumper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static BlobSidecarGossipManager create(
gossipNetwork,
gossipEncoding,
subnetIdToTopicHandler,
new GossipFailureLogger("blob_sidecar", false));
GossipFailureLogger.createNonSuppressing("blob_sidecar"));
}

private BlobSidecarGossipManager(
Expand Down Expand Up @@ -118,7 +118,7 @@ public SafeFuture<Void> publishBlobSidecar(final BlobSidecar message) {
.handle(
(__, err) -> {
if (err != null) {
gossipFailureLogger.logWithSuppression(err, Optional.of(message.getSlot()));
gossipFailureLogger.log(err, Optional.of(message.getSlot()));
} else {
LOG.trace(
"Successfully gossiped blob sidecar {} on {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public BlockGossipManager(
block -> Optional.of(block.getSlot()),
block -> spec.computeEpochAtSlot(block.getSlot()),
spec.getNetworkingConfig(),
new GossipFailureLogger(GossipTopicName.BEACON_BLOCK.toString(), false),
GossipFailureLogger.createNonSuppressing(GossipTopicName.BEACON_BLOCK.toString()),
debugDataDumper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,20 @@ public class GossipFailureLogger {
private Optional<UInt64> lastErroredSlot;
private Throwable lastRootCause;

public GossipFailureLogger(final String messageType, final boolean shouldSuppress) {
public static GossipFailureLogger createSuppressing(final String messageType) {
return new GossipFailureLogger(messageType, true);
}

public static GossipFailureLogger createNonSuppressing(final String messageType) {
return new GossipFailureLogger(messageType, false);
}

private GossipFailureLogger(final String messageType, final boolean shouldSuppress) {
this.messageType = shouldSuppress ? messageType + "(s)" : messageType;
this.shouldSuppress = shouldSuppress;
}

public synchronized void logWithSuppression(
final Throwable error, final Optional<UInt64> maybeSlot) {
public synchronized void log(final Throwable error, final Optional<UInt64> maybeSlot) {
final Throwable rootCause = Throwables.getRootCause(error);

final boolean suppress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public ProposerSlashingGossipManager(
.getSpec()
.computeEpochAtSlot(message.getHeader1().getMessage().getSlot()),
networkingConfig,
new GossipFailureLogger(GossipTopicName.PROPOSER_SLASHING.toString(), false),
GossipFailureLogger.createNonSuppressing(GossipTopicName.PROPOSER_SLASHING.toString()),
debugDataDumper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public SignedBlsToExecutionChangeGossipManager(
// of the topic they arrived on (ie disable fork checking at this level)
message -> forkInfo.getFork().getEpoch(),
networkingConfig,
new GossipFailureLogger(GossipTopicName.BLS_TO_EXECUTION_CHANGE.toString(), false),
GossipFailureLogger.createSuppressing(GossipTopicName.BLS_TO_EXECUTION_CHANGE.toString()),
debugDataDumper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public SignedContributionAndProofGossipManager(
.getSpec()
.computeEpochAtSlot(message.getMessage().getContribution().getSlot()),
networkingConfig,
new GossipFailureLogger(
GossipTopicName.SYNC_COMMITTEE_CONTRIBUTION_AND_PROOF.toString(), true),
GossipFailureLogger.createSuppressing(
GossipTopicName.SYNC_COMMITTEE_CONTRIBUTION_AND_PROOF.toString()),
debugDataDumper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SyncCommitteeMessageGossipManager implements GossipManager {
private final Counter publishFailureCounter;

private final GossipFailureLogger gossipFailureLogger =
new GossipFailureLogger("sync_committee_message", true);
GossipFailureLogger.createSuppressing("sync_committee_message");

public SyncCommitteeMessageGossipManager(
final MetricsSystem metricsSystem,
Expand Down Expand Up @@ -109,7 +109,7 @@ private void publish(final SyncCommitteeMessage message, final int subnetId) {
publishSuccessCounter.inc();
},
error -> {
gossipFailureLogger.logWithSuppression(error, Optional.of(message.getSlot()));
gossipFailureLogger.log(error, Optional.of(message.getSlot()));
publishFailureCounter.inc();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public VoluntaryExitGossipManager(
exit -> Optional.empty(),
exit -> exit.getMessage().getEpoch(),
networkingConfig,
new GossipFailureLogger(GossipTopicName.VOLUNTARY_EXIT.toString(), false),
GossipFailureLogger.createNonSuppressing(GossipTopicName.VOLUNTARY_EXIT.toString()),
debugDataDumper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ protected TestGossipManager(
message -> Optional.of(UInt64.ZERO),
message -> UInt64.ZERO,
networkingConfig,
new GossipFailureLogger(TOPIC_NAME.toString(), true),
GossipFailureLogger.createSuppressing(TOPIC_NAME.toString()),
DebugDataDumper.NOOP);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

package tech.pegasys.teku.networking.eth2.gossip;

import static tech.pegasys.teku.networking.eth2.gossip.GossipFailureLogger.createNonSuppressing;
import static tech.pegasys.teku.networking.eth2.gossip.GossipFailureLogger.createSuppressing;

import io.libp2p.core.SemiDuplexNoOutboundStreamException;
import io.libp2p.pubsub.MessageAlreadySeenException;
import io.libp2p.pubsub.NoPeersForOutboundMessageException;
Expand All @@ -31,13 +34,13 @@ class GossipFailureLoggerTest {
public static final NoPeersForOutboundMessageException NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION =
new NoPeersForOutboundMessageException("no peers");

private final GossipFailureLogger logger = new GossipFailureLogger("thingy", true);
private final GossipFailureLogger loggerNoSuppression = new GossipFailureLogger("thingy", false);
private final GossipFailureLogger loggerSuppressing = createSuppressing("thingy");
private final GossipFailureLogger loggerNoSuppression = createNonSuppressing("thingy");

@Test
void shouldLogAlreadySeenErrorsAtDebugLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
loggerSuppressing.log(
new RuntimeException("Foo", new MessageAlreadySeenException("Dupe")), SLOT);
logCaptor.assertDebugLog(ALREADY_SEEN_MESSAGE);
}
Expand All @@ -46,7 +49,7 @@ void shouldLogAlreadySeenErrorsAtDebugLevel() {
@Test
void shouldLogFirstNoPeersErrorsAtWarningLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
loggerSuppressing.log(
new RuntimeException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION), SLOT);
logCaptor.assertWarnLog(noPeersMessage(SLOT, true));
}
Expand All @@ -55,19 +58,19 @@ void shouldLogFirstNoPeersErrorsAtWarningLevel() {
@Test
void shouldLogFirstNoActiveStreamErrorsAtWarningLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(new RuntimeException("Foo", NO_ACTIVE_STREAM_EXCEPTION), SLOT);
loggerSuppressing.log(new RuntimeException("Foo", NO_ACTIVE_STREAM_EXCEPTION), SLOT);
logCaptor.assertWarnLog(noActiveStreamMessage(SLOT, true));
}
}

@Test
void shouldLogRepeatedNoPeersErrorsAtDebugLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
loggerSuppressing.log(
new RuntimeException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION), SLOT);
logCaptor.clearLogs();

logger.logWithSuppression(
loggerSuppressing.log(
new IllegalStateException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION), SLOT);
logCaptor.assertDebugLog(noPeersMessage(SLOT, true));
}
Expand All @@ -76,11 +79,11 @@ void shouldLogRepeatedNoPeersErrorsAtDebugLevel() {
@Test
void shouldLogRepeatedNoPeersErrorsWhenNoSuppression() {
tbenr marked this conversation as resolved.
Show resolved Hide resolved
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
loggerNoSuppression.logWithSuppression(
loggerNoSuppression.log(
new RuntimeException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION), SLOT);
logCaptor.clearLogs();

loggerNoSuppression.logWithSuppression(
loggerNoSuppression.log(
new IllegalStateException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION), SLOT);
logCaptor.assertWarnLog(noPeersMessage(SLOT, false));
}
Expand All @@ -89,11 +92,11 @@ void shouldLogRepeatedNoPeersErrorsWhenNoSuppression() {
@Test
void shouldLogNoPeersErrorsWithDifferentSlotsAtWarnLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
loggerSuppressing.log(
new RuntimeException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION), SLOT);
logCaptor.assertWarnLog(noPeersMessage(SLOT, true));

logger.logWithSuppression(
loggerSuppressing.log(
new IllegalStateException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION),
Optional.of(UInt64.valueOf(2)));
logCaptor.assertWarnLog(noPeersMessage(Optional.of(UInt64.valueOf(2)), true));
Expand All @@ -103,14 +106,14 @@ void shouldLogNoPeersErrorsWithDifferentSlotsAtWarnLevel() {
@Test
void shouldLogNoPeersErrorsAtWarnLevelWhenSeparatedByADifferentException() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
loggerSuppressing.log(
new RuntimeException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION), SLOT);
logCaptor.assertWarnLog(noPeersMessage(SLOT, true));
logCaptor.clearLogs();

logger.logWithSuppression(new MessageAlreadySeenException("Dupe"), SLOT);
loggerSuppressing.log(new MessageAlreadySeenException("Dupe"), SLOT);

logger.logWithSuppression(
loggerSuppressing.log(
new IllegalStateException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION), SLOT);
logCaptor.assertWarnLog(noPeersMessage(SLOT, true));
}
Expand All @@ -119,20 +122,18 @@ void shouldLogNoPeersErrorsAtWarnLevelWhenSeparatedByADifferentException() {
@Test
void shouldLogFirstGenericErrorAtErrorLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new IllegalStateException("Boom")), SLOT);
loggerSuppressing.log(new RuntimeException("Foo", new IllegalStateException("Boom")), SLOT);
logCaptor.assertErrorLog(genericError(SLOT, true));
}
}

@Test
void shouldLogRepeatedGenericErrorsAtDebugLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new IllegalStateException("Boom")), SLOT);
loggerSuppressing.log(new RuntimeException("Foo", new IllegalStateException("Boom")), SLOT);
logCaptor.clearLogs();

logger.logWithSuppression(
loggerSuppressing.log(
new IllegalStateException("Foo", new IllegalStateException("goes the dynamite")), SLOT);
logCaptor.assertDebugLog(genericError(SLOT, true));
}
Expand All @@ -141,12 +142,11 @@ void shouldLogRepeatedGenericErrorsAtDebugLevel() {
@Test
void shouldLogMultipleGenericErrorsWithDifferentCausesAtErrorLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new IllegalStateException("Boom")), SLOT);
loggerSuppressing.log(new RuntimeException("Foo", new IllegalStateException("Boom")), SLOT);
logCaptor.assertErrorLog(genericError(SLOT, true));
logCaptor.clearLogs();

logger.logWithSuppression(
loggerSuppressing.log(
new IllegalStateException("Foo", new IllegalArgumentException("goes the dynamite")),
SLOT);
logCaptor.assertErrorLog(genericError(SLOT, true));
Expand All @@ -156,11 +156,11 @@ void shouldLogMultipleGenericErrorsWithDifferentCausesAtErrorLevel() {
@Test
void shouldLogGenericErrorsWithoutSuppression() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
loggerSuppressing.log(
new RuntimeException("Foo", new IllegalStateException("Boom")), Optional.empty());
logCaptor.clearLogs();

logger.logWithSuppression(
loggerSuppressing.log(
new IllegalStateException("Foo", new IllegalStateException("goes the dynamite")),
Optional.empty());
logCaptor.assertErrorLog(genericError(Optional.empty(), true));
Expand All @@ -170,11 +170,11 @@ void shouldLogGenericErrorsWithoutSuppression() {
@Test
void shouldLogNoPeersErrorsAtWarnLevelWithoutSuppression() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
loggerSuppressing.log(
new RuntimeException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION), Optional.empty());
logCaptor.clearLogs();

logger.logWithSuppression(
loggerSuppressing.log(
new IllegalStateException("Foo", NO_PEERS_FOR_OUTBOUND_MESSAGE_EXCEPTION),
Optional.empty());
logCaptor.assertWarnLog(noPeersMessage(Optional.empty(), true));
Expand All @@ -184,11 +184,11 @@ void shouldLogNoPeersErrorsAtWarnLevelWithoutSuppression() {
@Test
void shouldLogNoActiveStreamErrorsWithoutSuppression() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
loggerSuppressing.log(
new RuntimeException("Foo", NO_ACTIVE_STREAM_EXCEPTION), Optional.empty());
logCaptor.clearLogs();

logger.logWithSuppression(
loggerSuppressing.log(
new IllegalStateException("Foo", NO_ACTIVE_STREAM_EXCEPTION), Optional.empty());
logCaptor.assertWarnLog(noActiveStreamMessage(Optional.empty(), true));
}
Expand Down