Skip to content

[Zen2] Introduce ElectionScheduler #32709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
82fcbb1
Introduce ElectionScheduler
DaveCTurner Aug 8, 2018
cee906d
Imports
DaveCTurner Aug 8, 2018
4734d7a
Make it restartable and improve the tests
DaveCTurner Aug 8, 2018
b0d9a8d
Lifecycle stuff duplicates the presence/absence of currentScheduler, …
DaveCTurner Aug 8, 2018
15ac140
Merge branch 'zen2' into 2018-08-08-election-scheduler
DaveCTurner Aug 8, 2018
6cbca69
On reflection, these settings do not need to be dynamic, which simpli…
DaveCTurner Aug 8, 2018
a42c4f3
Final fields need no mutex
DaveCTurner Aug 9, 2018
b1b2dcc
Rename settings and add separate backoff parameter
DaveCTurner Aug 9, 2018
3e59fe4
Just validate in ctor
DaveCTurner Aug 9, 2018
da19702
Less indenting
DaveCTurner Aug 9, 2018
1d1d2b9
Log other parameters when scheduling election
DaveCTurner Aug 9, 2018
b3f429e
... under mutex
DaveCTurner Aug 9, 2018
82ae493
All election activity is off the happy path so promoting it to debug
DaveCTurner Aug 9, 2018
f664744
Missing param
DaveCTurner Aug 9, 2018
56a089c
Move mutex
DaveCTurner Aug 9, 2018
7cbd40c
Rename mutex
DaveCTurner Aug 9, 2018
5637da3
Introduce PreVoteRequest
DaveCTurner Aug 9, 2018
5cccf2c
Introduce PreVoteResponse
DaveCTurner Aug 9, 2018
665e7a9
ElectionScheduler now receives a TransportService
DaveCTurner Aug 9, 2018
94d261b
WIP add pre-voting
DaveCTurner Aug 9, 2018
0c96aff
Fix up ElectionSchedulerTests
DaveCTurner Aug 9, 2018
439daa0
No need for braces
DaveCTurner Aug 9, 2018
a77cf41
Move currentDelayMillis inside current scheduler and remove mutexes
DaveCTurner Aug 13, 2018
2e6af7b
Start work on better test suite
DaveCTurner Aug 13, 2018
eb6c0b8
Improve, and test, scheduling
DaveCTurner Aug 13, 2018
d252d29
Add tests of prevoting
DaveCTurner Aug 13, 2018
b1285e3
Imports
DaveCTurner Aug 13, 2018
fe17921
Imports
DaveCTurner Aug 13, 2018
896d456
Logger usage
DaveCTurner Aug 13, 2018
61fd397
Introduce initial grace period before first election attempt
DaveCTurner Aug 13, 2018
7542117
Separate PreVoteCollector out as a top-level class
DaveCTurner Aug 14, 2018
1036ece
Simplify ElectionSchedulerTests
DaveCTurner Aug 14, 2018
2f936d9
Simplify PreVoteCollectorTests
DaveCTurner Aug 14, 2018
7eaa959
Pass broadcast nodes into PVC.start()
DaveCTurner Aug 14, 2018
c5fd779
Just schedule a runnable
DaveCTurner Aug 14, 2018
78c3079
Simplify start/stop state
DaveCTurner Aug 14, 2018
a6080b3
Pass in a ClusterState rather than overriding isElectionQuorum
DaveCTurner Aug 14, 2018
6f05620
Pass in max-term consumer rather than overriding method
DaveCTurner Aug 14, 2018
40bcc97
PreVoteCollector now manages its rounds
DaveCTurner Aug 14, 2018
9aa9586
Add PreVoteRequest handler
DaveCTurner Aug 14, 2018
85573b0
Add tests of request handling
DaveCTurner Aug 14, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ public boolean electionWon() {
}

public boolean isElectionQuorum(VoteCollection votes) {
return votes.isQuorum(getLastCommittedConfiguration()) && votes.isQuorum(getLastAcceptedConfiguration());
return isElectionQuorum(votes, getLastAcceptedState());
}

static boolean isElectionQuorum(VoteCollection votes, ClusterState lastAcceptedState) {
return votes.isQuorum(lastAcceptedState.getLastCommittedConfiguration())
&& votes.isQuorum(lastAcceptedState.getLastAcceptedConfiguration());
}

public boolean isPublishQuorum(VoteCollection votes) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

public class ElectionScheduler extends AbstractComponent {

/*
* It's provably impossible to guarantee that any leader election algorithm ever elects a leader, but they generally work (with
* probability that approaches 1 over time) as long as elections occur sufficiently infrequently, compared to the time it takes to send
* a message to another node and receive a response back. We do not know the round-trip latency here, but we can approximate it by
* attempting elections randomly at reasonably high frequency and backing off (linearly) until one of them succeeds. We also place an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a setting for the linear backoff factor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've added such a setting.

* upper bound on the backoff so that if elections are failing due to a network partition that lasts for a long time then when the
* partition heals there is an election attempt reasonably quickly.
*/

// bounds on the time between election attempts
private static final String ELECTION_MIN_TIMEOUT_SETTING_KEY = "cluster.election.min_timeout";
private static final String ELECTION_BACK_OFF_TIME_SETTING_KEY = "cluster.election.back_off_time";
private static final String ELECTION_MAX_TIMEOUT_SETTING_KEY = "cluster.election.max_timeout";

public static final Setting<TimeValue> ELECTION_MIN_TIMEOUT_SETTING = Setting.timeSetting(ELECTION_MIN_TIMEOUT_SETTING_KEY,
TimeValue.timeValueMillis(100), TimeValue.timeValueMillis(1), Property.NodeScope);

public static final Setting<TimeValue> ELECTION_BACK_OFF_TIME_SETTING = Setting.timeSetting(ELECTION_BACK_OFF_TIME_SETTING_KEY,
TimeValue.timeValueMillis(100), TimeValue.timeValueMillis(1), Property.NodeScope);

public static final Setting<TimeValue> ELECTION_MAX_TIMEOUT_SETTING = Setting.timeSetting(ELECTION_MAX_TIMEOUT_SETTING_KEY,
TimeValue.timeValueSeconds(10), TimeValue.timeValueMillis(200), Property.NodeScope);

static String validationExceptionMessage(final String electionMinTimeout, final String electionMaxTimeout) {
return new ParameterizedMessage(
"Invalid election retry timeouts: [{}] is [{}] and [{}] is [{}], but [{}] should be at least 100ms longer than [{}]",
ELECTION_MIN_TIMEOUT_SETTING_KEY, electionMinTimeout,
ELECTION_MAX_TIMEOUT_SETTING_KEY, electionMaxTimeout,
ELECTION_MAX_TIMEOUT_SETTING_KEY, ELECTION_MIN_TIMEOUT_SETTING_KEY).getFormattedMessage();
}

private final TimeValue minTimeout;
private final TimeValue backoffTime;
private final TimeValue maxTimeout;
private final Random random;
private final TransportService transportService;
private final AtomicLong idSupplier = new AtomicLong();

private volatile Object currentScheduler; // only care about its identity

ElectionScheduler(Settings settings, Random random, TransportService transportService) {
super(settings);

this.random = random;
this.transportService = transportService;

minTimeout = ELECTION_MIN_TIMEOUT_SETTING.get(settings);
backoffTime = ELECTION_BACK_OFF_TIME_SETTING.get(settings);
maxTimeout = ELECTION_MAX_TIMEOUT_SETTING.get(settings);

if (maxTimeout.millis() < minTimeout.millis() + 100) {
throw new IllegalArgumentException(validationExceptionMessage(minTimeout.toString(), maxTimeout.toString()));
}
}

/**
* Start the process to schedule repeated election attempts.
* @param gracePeriod An initial period to wait before attempting the first election.
* @param scheduledRunnable The action to run each time an election should be attempted.
*/
public void start(TimeValue gracePeriod, Runnable scheduledRunnable) {
final ActiveScheduler currentScheduler;
assert this.currentScheduler == null;
this.currentScheduler = currentScheduler = new ActiveScheduler();
currentScheduler.scheduleNextElection(gracePeriod, scheduledRunnable);
}

public void stop() {
assert currentScheduler != null : currentScheduler;
logger.debug("stopping {}", currentScheduler);
currentScheduler = null;
}

@SuppressForbidden(reason = "Argument to Math.abs() is definitely not Long.MIN_VALUE")
private static long nonNegative(long n) {
return n == Long.MIN_VALUE ? 0 : Math.abs(n);
}

/**
* @param upperBound exclusive upper bound
*/
private long randomPositiveLongLessThan(long upperBound) {
assert 1 < upperBound : upperBound;
return nonNegative(random.nextLong()) % (upperBound - 1) + 1;
}

private long backOffCurrentMaxDelay(long currentMaxDelayMillis) {
return Math.min(maxTimeout.getMillis(), currentMaxDelayMillis + backoffTime.getMillis());
}

@Override
public String toString() {
return "ElectionScheduler{" +
"minTimeout=" + minTimeout +
", maxTimeout=" + maxTimeout +
", backoffTime=" + backoffTime +
", currentScheduler=" + currentScheduler +
'}';
}

private class ActiveScheduler {
private AtomicLong currentMaxDelayMillis = new AtomicLong(minTimeout.millis());
private final long schedulerId = idSupplier.incrementAndGet();

boolean isRunning() {
return this == currentScheduler;
}

void scheduleNextElection(final TimeValue gracePeriod, final Runnable scheduledRunnable) {
final long delay;
if (isRunning() == false) {
logger.debug("{} not scheduling election", this);
return;
}

long maxDelayMillis = this.currentMaxDelayMillis.getAndUpdate(ElectionScheduler.this::backOffCurrentMaxDelay);
delay = randomPositiveLongLessThan(maxDelayMillis + 1) + gracePeriod.millis();
logger.debug("{} scheduling election with delay [{}ms] (grace={}, min={}, backoff={}, current={}ms, max={})",
this, delay, gracePeriod, minTimeout, backoffTime, maxDelayMillis, maxTimeout);

transportService.getThreadPool().schedule(TimeValue.timeValueMillis(delay), Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("unexpected exception in wakeup", e);
assert false : e;
}

@Override
protected void doRun() {
if (isRunning() == false) {
logger.debug("{} not starting election", ActiveScheduler.this);
return;
}
logger.debug("{} starting pre-voting", ActiveScheduler.this);
scheduledRunnable.run();
}

@Override
public void onAfter() {
scheduleNextElection(TimeValue.ZERO, scheduledRunnable);
}

@Override
public String toString() {
return "scheduleNextElection[" + ActiveScheduler.this + "]";
}

@Override
public boolean isForceExecution() {
// There are very few of these scheduled, and they back off, but it's important that they're not rejected as
// this could prevent a cluster from ever forming.
return true;
}
});
}

@Override
public String toString() {
return "ActiveScheduler[" + schedulerId + ", currentMaxDelayMillis=" + currentMaxDelayMillis + "]";
}
}
}
Loading