-
Notifications
You must be signed in to change notification settings - Fork 25.3k
[Zen2] Introduce ElectionScheduler #32709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
DaveCTurner
wants to merge
41
commits into
elastic:zen2
from
DaveCTurner:2018-08-08-election-scheduler
Closed
Changes from all commits
Commits
Show all changes
41 commits
Select commit
Hold shift + click to select a range
82fcbb1
Introduce ElectionScheduler
DaveCTurner cee906d
Imports
DaveCTurner 4734d7a
Make it restartable and improve the tests
DaveCTurner b0d9a8d
Lifecycle stuff duplicates the presence/absence of currentScheduler, …
DaveCTurner 15ac140
Merge branch 'zen2' into 2018-08-08-election-scheduler
DaveCTurner 6cbca69
On reflection, these settings do not need to be dynamic, which simpli…
DaveCTurner a42c4f3
Final fields need no mutex
DaveCTurner b1b2dcc
Rename settings and add separate backoff parameter
DaveCTurner 3e59fe4
Just validate in ctor
DaveCTurner da19702
Less indenting
DaveCTurner 1d1d2b9
Log other parameters when scheduling election
DaveCTurner b3f429e
... under mutex
DaveCTurner 82ae493
All election activity is off the happy path so promoting it to debug
DaveCTurner f664744
Missing param
DaveCTurner 56a089c
Move mutex
DaveCTurner 7cbd40c
Rename mutex
DaveCTurner 5637da3
Introduce PreVoteRequest
DaveCTurner 5cccf2c
Introduce PreVoteResponse
DaveCTurner 665e7a9
ElectionScheduler now receives a TransportService
DaveCTurner 94d261b
WIP add pre-voting
DaveCTurner 0c96aff
Fix up ElectionSchedulerTests
DaveCTurner 439daa0
No need for braces
DaveCTurner a77cf41
Move currentDelayMillis inside current scheduler and remove mutexes
DaveCTurner 2e6af7b
Start work on better test suite
DaveCTurner eb6c0b8
Improve, and test, scheduling
DaveCTurner d252d29
Add tests of prevoting
DaveCTurner b1285e3
Imports
DaveCTurner fe17921
Imports
DaveCTurner 896d456
Logger usage
DaveCTurner 61fd397
Introduce initial grace period before first election attempt
DaveCTurner 7542117
Separate PreVoteCollector out as a top-level class
DaveCTurner 1036ece
Simplify ElectionSchedulerTests
DaveCTurner 2f936d9
Simplify PreVoteCollectorTests
DaveCTurner 7eaa959
Pass broadcast nodes into PVC.start()
DaveCTurner c5fd779
Just schedule a runnable
DaveCTurner 78c3079
Simplify start/stop state
DaveCTurner a6080b3
Pass in a ClusterState rather than overriding isElectionQuorum
DaveCTurner 6f05620
Pass in max-term consumer rather than overriding method
DaveCTurner 40bcc97
PreVoteCollector now manages its rounds
DaveCTurner 9aa9586
Add PreVoteRequest handler
DaveCTurner 85573b0
Add tests of request handling
DaveCTurner File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
199 changes: 199 additions & 0 deletions
199
server/src/main/java/org/elasticsearch/cluster/coordination/ElectionScheduler.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.cluster.coordination; | ||
|
||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.elasticsearch.common.SuppressForbidden; | ||
import org.elasticsearch.common.component.AbstractComponent; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Setting.Property; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
import org.elasticsearch.threadpool.ThreadPool.Names; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.util.Random; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
public class ElectionScheduler extends AbstractComponent { | ||
|
||
/* | ||
* It's provably impossible to guarantee that any leader election algorithm ever elects a leader, but they generally work (with | ||
* probability that approaches 1 over time) as long as elections occur sufficiently infrequently, compared to the time it takes to send | ||
* a message to another node and receive a response back. We do not know the round-trip latency here, but we can approximate it by | ||
* attempting elections randomly at reasonably high frequency and backing off (linearly) until one of them succeeds. We also place an | ||
* upper bound on the backoff so that if elections are failing due to a network partition that lasts for a long time then when the | ||
* partition heals there is an election attempt reasonably quickly. | ||
*/ | ||
|
||
// bounds on the time between election attempts | ||
private static final String ELECTION_MIN_TIMEOUT_SETTING_KEY = "cluster.election.min_timeout"; | ||
private static final String ELECTION_BACK_OFF_TIME_SETTING_KEY = "cluster.election.back_off_time"; | ||
private static final String ELECTION_MAX_TIMEOUT_SETTING_KEY = "cluster.election.max_timeout"; | ||
|
||
public static final Setting<TimeValue> ELECTION_MIN_TIMEOUT_SETTING = Setting.timeSetting(ELECTION_MIN_TIMEOUT_SETTING_KEY, | ||
TimeValue.timeValueMillis(100), TimeValue.timeValueMillis(1), Property.NodeScope); | ||
|
||
public static final Setting<TimeValue> ELECTION_BACK_OFF_TIME_SETTING = Setting.timeSetting(ELECTION_BACK_OFF_TIME_SETTING_KEY, | ||
TimeValue.timeValueMillis(100), TimeValue.timeValueMillis(1), Property.NodeScope); | ||
|
||
public static final Setting<TimeValue> ELECTION_MAX_TIMEOUT_SETTING = Setting.timeSetting(ELECTION_MAX_TIMEOUT_SETTING_KEY, | ||
TimeValue.timeValueSeconds(10), TimeValue.timeValueMillis(200), Property.NodeScope); | ||
|
||
static String validationExceptionMessage(final String electionMinTimeout, final String electionMaxTimeout) { | ||
return new ParameterizedMessage( | ||
"Invalid election retry timeouts: [{}] is [{}] and [{}] is [{}], but [{}] should be at least 100ms longer than [{}]", | ||
ELECTION_MIN_TIMEOUT_SETTING_KEY, electionMinTimeout, | ||
ELECTION_MAX_TIMEOUT_SETTING_KEY, electionMaxTimeout, | ||
ELECTION_MAX_TIMEOUT_SETTING_KEY, ELECTION_MIN_TIMEOUT_SETTING_KEY).getFormattedMessage(); | ||
} | ||
|
||
private final TimeValue minTimeout; | ||
private final TimeValue backoffTime; | ||
private final TimeValue maxTimeout; | ||
private final Random random; | ||
private final TransportService transportService; | ||
private final AtomicLong idSupplier = new AtomicLong(); | ||
|
||
private volatile Object currentScheduler; // only care about its identity | ||
|
||
ElectionScheduler(Settings settings, Random random, TransportService transportService) { | ||
super(settings); | ||
|
||
this.random = random; | ||
this.transportService = transportService; | ||
|
||
minTimeout = ELECTION_MIN_TIMEOUT_SETTING.get(settings); | ||
backoffTime = ELECTION_BACK_OFF_TIME_SETTING.get(settings); | ||
maxTimeout = ELECTION_MAX_TIMEOUT_SETTING.get(settings); | ||
|
||
if (maxTimeout.millis() < minTimeout.millis() + 100) { | ||
throw new IllegalArgumentException(validationExceptionMessage(minTimeout.toString(), maxTimeout.toString())); | ||
} | ||
} | ||
|
||
/** | ||
* Start the process to schedule repeated election attempts. | ||
* @param gracePeriod An initial period to wait before attempting the first election. | ||
* @param scheduledRunnable The action to run each time an election should be attempted. | ||
*/ | ||
public void start(TimeValue gracePeriod, Runnable scheduledRunnable) { | ||
final ActiveScheduler currentScheduler; | ||
assert this.currentScheduler == null; | ||
this.currentScheduler = currentScheduler = new ActiveScheduler(); | ||
currentScheduler.scheduleNextElection(gracePeriod, scheduledRunnable); | ||
} | ||
|
||
public void stop() { | ||
assert currentScheduler != null : currentScheduler; | ||
logger.debug("stopping {}", currentScheduler); | ||
currentScheduler = null; | ||
} | ||
|
||
@SuppressForbidden(reason = "Argument to Math.abs() is definitely not Long.MIN_VALUE") | ||
private static long nonNegative(long n) { | ||
return n == Long.MIN_VALUE ? 0 : Math.abs(n); | ||
} | ||
|
||
/** | ||
* @param upperBound exclusive upper bound | ||
*/ | ||
private long randomPositiveLongLessThan(long upperBound) { | ||
assert 1 < upperBound : upperBound; | ||
return nonNegative(random.nextLong()) % (upperBound - 1) + 1; | ||
} | ||
|
||
private long backOffCurrentMaxDelay(long currentMaxDelayMillis) { | ||
return Math.min(maxTimeout.getMillis(), currentMaxDelayMillis + backoffTime.getMillis()); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "ElectionScheduler{" + | ||
"minTimeout=" + minTimeout + | ||
", maxTimeout=" + maxTimeout + | ||
", backoffTime=" + backoffTime + | ||
", currentScheduler=" + currentScheduler + | ||
'}'; | ||
} | ||
|
||
private class ActiveScheduler { | ||
private AtomicLong currentMaxDelayMillis = new AtomicLong(minTimeout.millis()); | ||
private final long schedulerId = idSupplier.incrementAndGet(); | ||
|
||
boolean isRunning() { | ||
return this == currentScheduler; | ||
} | ||
|
||
void scheduleNextElection(final TimeValue gracePeriod, final Runnable scheduledRunnable) { | ||
final long delay; | ||
if (isRunning() == false) { | ||
logger.debug("{} not scheduling election", this); | ||
return; | ||
} | ||
|
||
long maxDelayMillis = this.currentMaxDelayMillis.getAndUpdate(ElectionScheduler.this::backOffCurrentMaxDelay); | ||
delay = randomPositiveLongLessThan(maxDelayMillis + 1) + gracePeriod.millis(); | ||
logger.debug("{} scheduling election with delay [{}ms] (grace={}, min={}, backoff={}, current={}ms, max={})", | ||
this, delay, gracePeriod, minTimeout, backoffTime, maxDelayMillis, maxTimeout); | ||
|
||
transportService.getThreadPool().schedule(TimeValue.timeValueMillis(delay), Names.GENERIC, new AbstractRunnable() { | ||
@Override | ||
public void onFailure(Exception e) { | ||
logger.debug("unexpected exception in wakeup", e); | ||
assert false : e; | ||
} | ||
|
||
@Override | ||
protected void doRun() { | ||
if (isRunning() == false) { | ||
logger.debug("{} not starting election", ActiveScheduler.this); | ||
return; | ||
} | ||
logger.debug("{} starting pre-voting", ActiveScheduler.this); | ||
scheduledRunnable.run(); | ||
} | ||
|
||
@Override | ||
public void onAfter() { | ||
scheduleNextElection(TimeValue.ZERO, scheduledRunnable); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "scheduleNextElection[" + ActiveScheduler.this + "]"; | ||
} | ||
|
||
@Override | ||
public boolean isForceExecution() { | ||
// There are very few of these scheduled, and they back off, but it's important that they're not rejected as | ||
// this could prevent a cluster from ever forming. | ||
return true; | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "ActiveScheduler[" + schedulerId + ", currentMaxDelayMillis=" + currentMaxDelayMillis + "]"; | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have a setting for the linear backoff factor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've added such a setting.