-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Zen2: Deterministic MasterService #32493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started to review this but did not get very far so only have superficial comments here. To be continued...
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.elasticsearch.discovery; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think I'd prefer this to be in org.elasticsearch.cluster.coordination
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
* The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether | ||
* they updated their own cluster state or not. | ||
* | ||
* The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not committed and should be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "throw" here now means "pass to publishListener::onFailure
".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
* Publish all the changes to the cluster from the master (can be called just by the master). The publish | ||
* process should apply this state to the master as well! | ||
* | ||
* The publishListener allows to wait for the publication to go through. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"go through" meaning complete/fail/timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
|
||
interface AckListener { | ||
/** | ||
* Should be called when the discovery layer has committed the clusters state (i.e. even if this publication fails, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/discovery/coordination/
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
void onCommit(TimeValue commitTime); | ||
|
||
/** | ||
* Should be called whenever the discovery layer receives confirmation from a node that it has successfully applied |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/discovery/coordination/
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 40d7c95
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change to ZenDiscovery
doesn't look right. Also some other minor comments.
@@ -1006,8 +1007,8 @@ public String toString() { | |||
UNKNOWN_VERSION_ADDED), | |||
TYPE_MISSING_EXCEPTION(org.elasticsearch.indices.TypeMissingException.class, | |||
org.elasticsearch.indices.TypeMissingException::new, 137, UNKNOWN_VERSION_ADDED), | |||
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class, | |||
org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED), | |||
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(FailedToCommitClusterStateException.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost all of these registrations use the fully-qualified class name (except CoordinationStateRejectedException
, oops) so it looks like this should too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, also fixed for CoordinationStateRejectedException
in d3c5a3d
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.elasticsearch.discovery; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in org.elasticsearch.cluster.coordination
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in d3c5a3d
@@ -385,14 +387,6 @@ public void onNewClusterStateFailed(Exception e) { | |||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This, and the containing synchronised
block, don't look right. They throw FailedToCommitClusterStateException
rather than passing it to the publishListener
, and previously they returned early without blocking on the publication but the equivalent flow now would be to call publishListener.onResponse(null)
early somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 506608c
@@ -140,9 +144,29 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th | |||
DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterSettings); | |||
Environment environment = TestEnvironment.newEnvironment(settings); | |||
Transport transport = mock(Transport.class); // it's not used | |||
nextMasterTaskToRun = new AtomicReference<>(); | |||
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("fake-master", nextMasterTaskToRun::set); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need any kind of assertion that nextMasterTaskToRun
isn't already set?
LGTM after 506608c, but needs another reviewer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some minor comments. The main change LGTM. The only concern I had, as discussed with @ywelsch , is that the integration of FakeThreadPoolMasterService
with ClusterStateChanges
is a bit clunky. It is my understanding that FakeThreadPoolMasterService
is a very useful testing component for other parts of the work, but in that case I rather not use (as is) with ClusterStateChanges
.
|
||
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) throws Exception { | ||
CompletableFuture<Void> fut = new CompletableFuture<>(); | ||
clusterStatePublisher.publish(clusterChangedEvent, new ActionListener<Void>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use ActionListener#wrap to make this slightly more compact. Also, I presume you consciously choose for a CompletableFuture over PlainActionFuture ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I did not choose PlainActionFuture
was because it asserts that we're not blocking on the MasterServiceUpdateThread (which this future deliberately does). Unfortunately, CompletableFuture
has other problems (#32512 (comment)), so I've gone back to PlainActionFuture
, but added a hook that allows to disable checking some of the assertions, see 526511d
} | ||
}, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state())); | ||
|
||
final ActionListener<Void> publishListener = getPublishListener(clusterChangedEvent, taskOutputs, startTimeNS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this extra listener construct? at the moment it's activated fully sequentially. It will be simpler to just process the results of the future inline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra listener construct is not needed. I've changed this in c84ddf7
@@ -291,6 +339,10 @@ private ClusterState patchVersions(ClusterState previousClusterState, ClusterTas | |||
return newClusterState; | |||
} | |||
|
|||
public Builder incrementVersion(ClusterState clusterState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be protected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 526511d
@@ -371,8 +372,11 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException { | |||
public void testClosedIndexOnReroute() throws InterruptedException { | |||
final String index = "test"; | |||
// no replicas in oder to skip the replication part | |||
setState(clusterService, new ClusterStateChanges(xContentRegistry(), threadPool).closeIndices(state(index, true, | |||
ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index))); | |||
ClusterStateChanges clusterStateChanges = new ClusterStateChanges(xContentRegistry(), threadPool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering - why is this change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason was that we made ClusterStateChanges
more realistic now by introducing MasterService
to it. This test was using a cluster state where the local node was not the master. As ClusterStateChanges now used the proper MasterService, it simply rejected the cluster state update to close the indices.
|
||
@Override | ||
public void onResponse(Void aVoid) { | ||
assertThat(countDownLatch.getCount(), is(1L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe synchronize this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in b9d407f
Ok, I've reverted this in e4bd482 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a couple of nits but the extra changes still LGTM.
@@ -801,7 +802,7 @@ public void testIds() { | |||
ids.put(137, org.elasticsearch.indices.TypeMissingException.class); | |||
ids.put(138, null); | |||
ids.put(139, null); | |||
ids.put(140, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class); | |||
ids.put(140, FailedToCommitClusterStateException.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should still be fully-qualified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
urgs... fixed in a0030c0
final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() { | ||
@Override | ||
protected boolean blockingAllowed() { | ||
// allow this one to block on the MasterServiceUpdateThread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment would be unnecessary if we wrote something like:
return Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME) || super.blockingAllowed();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great idea. fixed in a0030c0
Increases testability of MasterService and the discovery layer. Changes: