@@ -139,12 +139,8 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
139
139
140
140
private final LeaderRetrievalService webMonitorRetrievalService ;
141
141
142
- private final LeaderRetrievalService dispatcherRetrievalService ;
143
-
144
142
private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever ();
145
143
146
- private final LeaderRetriever dispatcherLeaderRetriever = new LeaderRetriever ();
147
-
148
144
/** ExecutorService to run operations that can be retried on exceptions. */
149
145
private ScheduledExecutorService retryExecutorService ;
150
146
@@ -193,14 +189,12 @@ public RestClusterClient(
193
189
} else {
194
190
this .webMonitorRetrievalService = webMonitorRetrievalService ;
195
191
}
196
- this .dispatcherRetrievalService = highAvailabilityServices .getDispatcherLeaderRetriever ();
197
192
this .retryExecutorService = Executors .newSingleThreadScheduledExecutor (new ExecutorThreadFactory ("Flink-RestClusterClient-Retry" ));
198
193
startLeaderRetrievers ();
199
194
}
200
195
201
196
private void startLeaderRetrievers () throws Exception {
202
197
this .webMonitorRetrievalService .start (webMonitorLeaderRetriever );
203
- this .dispatcherRetrievalService .start (dispatcherLeaderRetriever );
204
198
}
205
199
206
200
@ Override
@@ -216,12 +210,6 @@ public void shutdown() {
216
210
log .error ("An error occurred during stopping the webMonitorRetrievalService" , e );
217
211
}
218
212
219
- try {
220
- dispatcherRetrievalService .stop ();
221
- } catch (Exception e ) {
222
- log .error ("An error occurred during stopping the dispatcherLeaderRetriever" , e );
223
- }
224
-
225
213
try {
226
214
// we only call this for legacy reasons to shutdown components that are started in the ClusterClient constructor
227
215
super .shutdown ();
0 commit comments