Skip to content

Do not log unsuccessful join attempt each time #39756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,16 @@ public class ClusterFormationFailureHelper {
private final Supplier<ClusterFormationState> clusterFormationStateSupplier;
private final ThreadPool threadPool;
private final TimeValue clusterFormationWarningTimeout;
private final Runnable logLastFailedJoinAttempt;
@Nullable // if no warning is scheduled
private volatile WarningScheduler warningScheduler;

public ClusterFormationFailureHelper(Settings settings, Supplier<ClusterFormationState> clusterFormationStateSupplier,
ThreadPool threadPool) {
ThreadPool threadPool, Runnable logLastFailedJoinAttempt) {
this.clusterFormationStateSupplier = clusterFormationStateSupplier;
this.threadPool = threadPool;
this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings);
this.logLastFailedJoinAttempt = logLastFailedJoinAttempt;
}

public boolean isRunning() {
Expand Down Expand Up @@ -94,6 +96,7 @@ public void onFailure(Exception e) {
@Override
protected void doRun() {
if (isActive()) {
logLastFailedJoinAttempt.run();
logger.warn(clusterFormationStateSupplier.get().getDescription());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
transportService.getThreadPool());
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
}

private ClusterFormationState getClusterFormationState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -58,6 +60,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand All @@ -83,6 +86,8 @@ public class JoinHelper {

private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());

private AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();

JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Expand Down Expand Up @@ -172,7 +177,55 @@ boolean isJoinPending() {
return pendingOutgoingJoins.isEmpty() == false;
}

void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
// package-private for testing
static class FailedJoinAttempt {
private final DiscoveryNode destination;
private final JoinRequest joinRequest;
private final TransportException exception;
private final long timestamp;

FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, TransportException exception) {
this.destination = destination;
this.joinRequest = joinRequest;
this.exception = exception;
this.timestamp = System.nanoTime();
}

void logNow() {
logger.log(getLogLevel(exception),
() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest),
exception);
}

static Level getLogLevel(TransportException e) {
Throwable cause = e.unwrapCause();
if (cause instanceof CoordinationStateRejectedException ||
cause instanceof FailedToCommitClusterStateException ||
cause instanceof NotMasterException) {
return Level.DEBUG;
}
return Level.INFO;
}

void logWarnWithTimestamp() {
logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ms ago, failed to join {} with {}",
TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)),
destination,
joinRequest),
exception);
}
}


void logLastFailedJoinAttempt() {
FailedJoinAttempt attempt = lastFailedJoinAttempt.get();
if (attempt != null) {
attempt.logWarnWithTimestamp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we avoid repeatedly logging the same failed join attempt? I wonder if we should set lastFailedJoinAttempt back to null here (and use AtomicReference)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we need to log lastFailedJoinAttempt each time we log a warn in ClusterFormationHelper, because that way we know that cluster still can not form and failed join attempt still could be the reason for it. Otherwise, next time ClusterFormationHelper logs a warning and no failed join attempt, we don't know if the issue with join is actually resolved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with @ywelsch on this one. Imagine one join attempt fails with an exception and then the network is disconnected. We will continue to log this join exception, with a new timestamp each time, and I think that's confusing because this exception has nothing to do with the ongoing failure to form a cluster. It's true that we log how many milliseconds ago the exception occurred but I think that could easily be overlooked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I fixed it here 283fdea

lastFailedJoinAttempt.compareAndSet(attempt, null);
}
}

public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
Expand All @@ -190,12 +243,15 @@ public Empty read(StreamInput in) {
public void handleResponse(Empty response) {
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
}

@Override
public void handleException(TransportException exp) {
pendingOutgoingJoins.remove(dedupKey);
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
attempt.logNow();
lastFailedJoinAttempt.set(attempt);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ public void testScheduling() {
= new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());

final AtomicLong warningCount = new AtomicLong();
final AtomicLong logLastFailedJoinAttemptWarningCount = new AtomicLong();

final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(),
() -> {
warningCount.incrementAndGet();
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L);
},
deterministicTaskQueue.getThreadPool());
deterministicTaskQueue.getThreadPool(), () -> logLastFailedJoinAttemptWarningCount.incrementAndGet());

deterministicTaskQueue.runAllTasks();
assertThat("should not schedule anything yet", warningCount.get(), is(0L));
Expand Down Expand Up @@ -105,8 +106,10 @@ public void testScheduling() {
deterministicTaskQueue.runAllTasksInTimeOrder();

assertThat(warningCount.get(), is(5L));
assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));

warningCount.set(0);
logLastFailedJoinAttemptWarningCount.set(0);
clusterFormationFailureHelper.start();
clusterFormationFailureHelper.stop();
clusterFormationFailureHelper.start();
Expand All @@ -127,6 +130,7 @@ public void testScheduling() {
deterministicTaskQueue.runAllTasksInTimeOrder();

assertThat(warningCount.get(), is(5L));
assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));
}

public void testDescriptionOnMasterIneligibleNodes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.Level;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

Expand All @@ -32,6 +36,7 @@

import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is;

public class JoinHelperTests extends ESTestCase {

Expand Down Expand Up @@ -107,4 +112,23 @@ public void testJoinDeduplication() {
capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy"));
assertFalse(joinHelper.isJoinPending());
}

public void testFailedJoinAttemptLogLevel() {
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(new TransportException("generic transport exception")), is(Level.INFO));

assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
new RemoteTransportException("remote transport exception with generic cause", new Exception())), is(Level.INFO));

assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
new RemoteTransportException("caused by CoordinationStateRejectedException",
new CoordinationStateRejectedException("test"))), is(Level.DEBUG));

assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
new RemoteTransportException("caused by FailedToCommitClusterStateException",
new FailedToCommitClusterStateException("test"))), is(Level.DEBUG));

assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
new RemoteTransportException("caused by NotMasterException",
new NotMasterException("test"))), is(Level.DEBUG));
}
}