Skip to content

Use cancel instead of timeout verb for early-terminating publications #37670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ void becomeCandidate(String method) {

if (mode != Mode.CANDIDATE) {
mode = Mode.CANDIDATE;
cancelActivePublication();
cancelActivePublication("become candidate: " + method);
joinAccumulator.close(mode);
joinAccumulator = joinHelper.new CandidateJoinAccumulator();

Expand Down Expand Up @@ -527,7 +527,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
discoveryUpgradeService.deactivate();
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
cancelActivePublication();
cancelActivePublication("become follower: " + method);
preVoteCollector.update(getPreVoteResponse(), leaderNode);

if (restartLeaderChecker) {
Expand Down Expand Up @@ -935,7 +935,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
@Override
public void run() {
synchronized (mutex) {
publication.onTimeout();
publication.cancel("timed out after " + publishTimeout);
}
}

Expand Down Expand Up @@ -991,10 +991,10 @@ public void onFailure(Exception e) {
};
}

private void cancelActivePublication() {
private void cancelActivePublication(String reason) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (currentPublication.isPresent()) {
currentPublication.get().onTimeout();
currentPublication.get().cancel(reason);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class Publication {

private Optional<ApplyCommitRequest> applyCommitRequest; // set when state is committed
private boolean isCompleted; // set when publication is completed
private boolean timedOut; // set when publication timed out
private boolean cancelled; // set when publication is cancelled

public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) {
this.publishRequest = publishRequest;
Expand All @@ -71,17 +71,17 @@ public void start(Set<DiscoveryNode> faultyNodes) {
publicationTargets.forEach(PublicationTarget::sendPublishRequest);
}

public void onTimeout() {
public void cancel(String reason) {
if (isCompleted) {
return;
}

assert timedOut == false;
timedOut = true;
assert cancelled == false;
cancelled = true;
if (applyCommitRequest.isPresent() == false) {
logger.debug("onTimeout: [{}] timed out before committing", this);
logger.debug("cancel: [{}] cancelled before committing (reason: {})", this, reason);
// fail all current publications
final Exception e = new ElasticsearchException("publication timed out before committing");
final Exception e = new ElasticsearchException("publication cancelled before committing: " + reason);
publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e));
}
onPossibleCompletion();
Expand All @@ -101,7 +101,7 @@ private void onPossibleCompletion() {
return;
}

if (timedOut == false) {
if (cancelled == false) {
for (final PublicationTarget target : publicationTargets) {
if (target.isActive()) {
return;
Expand All @@ -125,8 +125,8 @@ private void onPossibleCompletion() {
}

// For assertions only: verify that this invariant holds
private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() {
if (timedOut == false) {
private boolean publicationCompletedIffAllTargetsInactiveOrCancelled() {
if (cancelled == false) {
for (final PublicationTarget target : publicationTargets) {
if (target.isActive()) {
return isCompleted == false;
Expand Down Expand Up @@ -222,7 +222,7 @@ void sendPublishRequest() {
state = PublicationTargetState.SENT_PUBLISH_REQUEST;
Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler());
// TODO Can this ^ fail with an exception? Target should be failed if so.
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

void handlePublishResponse(PublishResponse publishResponse) {
Expand All @@ -245,7 +245,7 @@ void sendApplyCommit() {
state = PublicationTargetState.SENT_APPLY_COMMIT;
assert applyCommitRequest.isPresent();
Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler());
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

void setAppliedCommit() {
Expand Down Expand Up @@ -300,7 +300,7 @@ private class PublishResponseHandler implements ActionListener<PublishWithJoinRe
public void onResponse(PublishWithJoinResponse response) {
if (isFailed()) {
logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode);
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
return;
}

Expand All @@ -319,7 +319,7 @@ public void onResponse(PublishWithJoinResponse response) {
state = PublicationTargetState.WAITING_FOR_QUORUM;
handlePublishResponse(response.getPublishResponse());

assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

@Override
Expand All @@ -330,7 +330,7 @@ public void onFailure(Exception e) {
assert ((TransportException) e).getRootCause() instanceof Exception;
setFailed((Exception) exp.getRootCause());
onPossibleCommitFailure();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

}
Expand All @@ -346,7 +346,7 @@ public void onResponse(TransportResponse.Empty ignored) {
}
setAppliedCommit();
onPossibleCompletion();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

@Override
Expand All @@ -357,7 +357,7 @@ public void onFailure(Exception e) {
assert ((TransportException) e).getRootCause() instanceof Exception;
setFailed((Exception) exp.getRootCause());
onPossibleCompletion();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws Inter
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
if (e.getKey().equals(n2)) {
if (timeOut) {
publication.onTimeout();
publication.cancel("timed out");
} else {
e.getValue().onFailure(new TransportException(new Exception("dummy failure")));
}
Expand Down Expand Up @@ -407,7 +407,7 @@ public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedEx
}
});

publication.onTimeout();
publication.cancel("timed out");
assertTrue(publication.completed);
assertTrue(publication.committed);
assertEquals(committingNodes, ackListener.await(0L, TimeUnit.SECONDS));
Expand Down