Skip to content

Commit c84ddf7

Browse files
committed
remove publishListener concept in MasterService
1 parent 526511d commit c84ddf7

File tree

2 files changed

+31
-40
lines changed

2 files changed

+31
-40
lines changed

server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
2424
import org.elasticsearch.Assertions;
25-
import org.elasticsearch.action.ActionListener;
2625
import org.elasticsearch.action.support.PlainActionFuture;
2726
import org.elasticsearch.cluster.AckedClusterStateTaskListener;
2827
import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -249,50 +248,43 @@ protected boolean blockingAllowed() {
249248
};
250249
clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()));
251250

252-
final ActionListener<Void> publishListener = getPublishListener(clusterChangedEvent, taskOutputs, startTimeNS);
253251
// indefinitely wait for publication to complete
254252
try {
255253
FutureUtils.get(fut);
256-
publishListener.onResponse(null);
254+
onPublicationSuccess(clusterChangedEvent, taskOutputs, startTimeNS);
257255
} catch (Exception e) {
258-
publishListener.onFailure(e);
256+
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNS, e);
259257
}
260258
}
261259

262-
protected ActionListener<Void> getPublishListener(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
263-
return new ActionListener<Void>() {
260+
protected void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
261+
taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state());
264262

265-
@Override
266-
public void onResponse(Void ignore) {
267-
taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state());
268-
269-
try {
270-
taskOutputs.clusterStatePublished(clusterChangedEvent);
271-
} catch (Exception e) {
272-
logger.error(() -> new ParameterizedMessage(
273-
"exception thrown while notifying executor of new cluster state publication [{}]",
274-
clusterChangedEvent.source()), e);
275-
}
276-
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
277-
logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})",
278-
clusterChangedEvent.source(),
279-
executionTime, clusterChangedEvent.state().version(),
280-
clusterChangedEvent.state().stateUUID());
281-
warnAboutSlowTaskIfNeeded(executionTime, clusterChangedEvent.source());
282-
}
283-
284-
@Override
285-
public void onFailure(Exception exception) {
286-
if (exception instanceof FailedToCommitClusterStateException) {
287-
final long version = clusterChangedEvent.state().version();
288-
logger.warn(() -> new ParameterizedMessage(
289-
"failing [{}]: failed to commit cluster state version [{}]", clusterChangedEvent.source(), version), exception);
290-
taskOutputs.publishingFailed((FailedToCommitClusterStateException) exception);
291-
} else {
292-
handleException(clusterChangedEvent.source(), startTimeNS, clusterChangedEvent.state(), exception);
293-
}
294-
}
295-
};
263+
try {
264+
taskOutputs.clusterStatePublished(clusterChangedEvent);
265+
} catch (Exception e) {
266+
logger.error(() -> new ParameterizedMessage(
267+
"exception thrown while notifying executor of new cluster state publication [{}]",
268+
clusterChangedEvent.source()), e);
269+
}
270+
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
271+
logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})",
272+
clusterChangedEvent.source(),
273+
executionTime, clusterChangedEvent.state().version(),
274+
clusterChangedEvent.state().stateUUID());
275+
warnAboutSlowTaskIfNeeded(executionTime, clusterChangedEvent.source());
276+
}
277+
278+
protected void onPublicationFailed(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS,
279+
Exception exception) {
280+
if (exception instanceof FailedToCommitClusterStateException) {
281+
final long version = clusterChangedEvent.state().version();
282+
logger.warn(() -> new ParameterizedMessage(
283+
"failing [{}]: failed to commit cluster state version [{}]", clusterChangedEvent.source(), version), exception);
284+
taskOutputs.publishingFailed((FailedToCommitClusterStateException) exception);
285+
} else {
286+
handleException(clusterChangedEvent.source(), startTimeNS, clusterChangedEvent.state(), exception);
287+
}
296288
}
297289

298290
private void handleException(String summary, long startTimeNS, ClusterState newClusterState, Exception e) {

server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs task
113113
assert waitForPublish == false;
114114
waitForPublish = true;
115115
final Discovery.AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state());
116-
final ActionListener<Void> publishListener = getPublishListener(clusterChangedEvent, taskOutputs, startTimeNS);
117116
clusterStatePublisher.publish(clusterChangedEvent, new ActionListener<Void>() {
118117

119118
private boolean listenerCalled = false;
@@ -125,7 +124,7 @@ public void onResponse(Void aVoid) {
125124
assert waitForPublish;
126125
waitForPublish = false;
127126
try {
128-
publishListener.onResponse(null);
127+
onPublicationSuccess(clusterChangedEvent, taskOutputs, startTimeNS);
129128
} finally {
130129
taskInProgress = false;
131130
scheduleNextTaskIfNecessary();
@@ -139,7 +138,7 @@ public void onFailure(Exception e) {
139138
assert waitForPublish;
140139
waitForPublish = false;
141140
try {
142-
publishListener.onFailure(e);
141+
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNS, e);
143142
} finally {
144143
taskInProgress = false;
145144
scheduleNextTaskIfNecessary();

0 commit comments

Comments
 (0)