-
Notifications
You must be signed in to change notification settings - Fork 212
[Part 7]: Introducing the Task Executor #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5aacd70
de625a6
ebc355c
ad6308f
750cddc
11b8705
2aeeb1c
46fd281
fbec8bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| /* | ||
| * Copyright 2022 Netflix, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.mantisrx.common.utils; | ||
|
|
||
| import io.mantisrx.shaded.com.google.common.base.Preconditions; | ||
| import io.mantisrx.shaded.com.google.common.util.concurrent.Service; | ||
| import io.mantisrx.shaded.com.google.common.util.concurrent.Service.Listener; | ||
| import io.mantisrx.shaded.com.google.common.util.concurrent.Service.State; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.Executor; | ||
|
|
||
| public class Services { | ||
|
|
||
| /** | ||
| * Equivalent of service.startAsync().awaitRunning() except that this provides a future that completes when | ||
| * the service reaches the RUNNING state. if the service on the other hand fails to start, then the future is completed | ||
| * exceptionally with the Throwable cause. | ||
| * | ||
| * @param service service that needs to be started | ||
| * @param executor executor to be used for notifying the caller. | ||
| * @return a future | ||
| */ | ||
| public static CompletableFuture<Void> startAsync(Service service, Executor executor) { | ||
| Preconditions.checkArgument(service.state() == State.NEW, "Assumes the service has not been started yet"); | ||
| final CompletableFuture<Void> result = new CompletableFuture<>(); | ||
| service.addListener(new Listener() { | ||
| @Override | ||
| public void running() { | ||
| result.complete(null); | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(State from, Throwable failure) { | ||
| if (from.ordinal() < State.RUNNING.ordinal()) { | ||
| result.completeExceptionally(failure); | ||
| } | ||
| } | ||
| }, executor); | ||
|
|
||
| service.startAsync(); | ||
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * Equivalent service.stopAsync().awaitTerminated() except that this method returns a future that gets completed | ||
| * when the service terminated successfully. | ||
| * | ||
| * @param service service to be stopped | ||
| * @param executor executor on which the caller needs to be notified. | ||
| * @return future | ||
| */ | ||
| public static CompletableFuture<Void> awaitAsync(Service service, Executor executor) { | ||
| final CompletableFuture<Void> result = new CompletableFuture<>(); | ||
| service.addListener(new Listener() { | ||
| @Override | ||
| public void terminated(State from) { | ||
| result.complete(null); | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(State from, Throwable failure) { | ||
| result.completeExceptionally(failure); | ||
| } | ||
| }, executor); | ||
|
|
||
| if (service.state() == State.FAILED) { | ||
| result.completeExceptionally(service.failureCause()); | ||
| } else if (service.state() == State.TERMINATED) { | ||
| result.complete(null); | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| public static CompletableFuture<Void> stopAsync(Service service, Executor executor) { | ||
| CompletableFuture<Void> result = awaitAsync(service, executor); | ||
| service.stopAsync(); | ||
| return result; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,8 +18,19 @@ | |
| import io.mantisrx.server.core.CoreConfiguration; | ||
| import io.mantisrx.server.core.master.MasterMonitor; | ||
| import io.mantisrx.server.core.zookeeper.CuratorService; | ||
| import io.mantisrx.server.master.resourcecluster.ClusterID; | ||
| import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway; | ||
| import io.mantisrx.server.master.resourcecluster.ResourceClusterGatewayClient; | ||
| import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService; | ||
| import io.mantisrx.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import rx.Scheduler; | ||
| import rx.Subscription; | ||
| import rx.schedulers.Schedulers; | ||
|
|
||
| /** | ||
| * HighAvailabilityServicesUtil helps you create HighAvailabilityServices instance based on the core configuration. | ||
|
|
@@ -43,6 +54,7 @@ private static class ZkHighAvailabilityServices extends AbstractIdleService impl | |
| HighAvailabilityServices { | ||
|
|
||
| private final CuratorService curatorService; | ||
| private final AtomicInteger rmConnections = new AtomicInteger(0); | ||
|
|
||
| public ZkHighAvailabilityServices(CoreConfiguration configuration) { | ||
| curatorService = new CuratorService(configuration, null); | ||
|
|
@@ -67,5 +79,48 @@ public MantisMasterGateway getMasterClientApi() { | |
| public MasterMonitor getMasterMonitor() { | ||
| return curatorService.getMasterMonitor(); | ||
| } | ||
|
|
||
| @Override | ||
| public ResourceLeaderConnection<ResourceClusterGateway> connectWithResourceManager( | ||
| ClusterID clusterID) { | ||
| return new ResourceLeaderConnection<ResourceClusterGateway>() { | ||
| final MasterMonitor masterMonitor = curatorService.getMasterMonitor(); | ||
|
|
||
| ResourceClusterGateway currentResourceClusterGateway = | ||
| new ResourceClusterGatewayClient(clusterID, masterMonitor.getLatestMaster()); | ||
|
|
||
| final String nameFormat = | ||
| "ResourceClusterGatewayCxn (" + rmConnections.getAndIncrement() + ")-%d"; | ||
| final Scheduler scheduler = | ||
| Schedulers | ||
| .from( | ||
| Executors | ||
| .newSingleThreadExecutor( | ||
| new ThreadFactoryBuilder().setNameFormat(nameFormat).build())); | ||
|
|
||
| final List<Subscription> subscriptions = new ArrayList<>(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where do we use these subscriptions?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nowhere right now. But, I think the intention when I wrote the code (a long time ago) was that we would close these subscriptions when the task executor is about to be shut down. So, it just makes for a clean, graceful shutdown. |
||
|
|
||
| @Override | ||
| public ResourceClusterGateway getCurrent() { | ||
| return currentResourceClusterGateway; | ||
| } | ||
|
|
||
| @Override | ||
| public void register(ResourceLeaderChangeListener<ResourceClusterGateway> changeListener) { | ||
| Subscription subscription = masterMonitor | ||
| .getMasterObservable() | ||
| .observeOn(scheduler) | ||
| .subscribe(nextDescription -> { | ||
| log.info("nextDescription={}", nextDescription); | ||
| ResourceClusterGateway previous = currentResourceClusterGateway; | ||
| currentResourceClusterGateway = | ||
| new ResourceClusterGatewayClient(clusterID, nextDescription); | ||
| changeListener.onResourceLeaderChanged(previous, currentResourceClusterGateway); | ||
| }); | ||
|
|
||
| subscriptions.add(subscription); | ||
| } | ||
| }; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Copyright 2022 Netflix, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.mantisrx.server.master.client; | ||
|
|
||
| /** | ||
| * ResourceLeaderConnection helps create a connection with the leader resource. You create a connection using the | ||
| * register method and on subsequent changes to the leader resource, the listener gets notified so that the caller | ||
| * can perform appropriate actions as necessary. | ||
| * | ||
| * @param <ResourceT> type of the resource being connected to. | ||
| */ | ||
| public interface ResourceLeaderConnection<ResourceT> { | ||
| ResourceT getCurrent(); | ||
|
|
||
| void register(ResourceLeaderChangeListener<ResourceT> changeListener); | ||
|
|
||
| interface ResourceLeaderChangeListener<ResourceT> { | ||
| void onResourceLeaderChanged(ResourceT previousResourceLeader, ResourceT newResourceLeader); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| /* | ||
| * Copyright 2022 Netflix, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package io.mantisrx.server.master.resourcecluster; | ||
|
|
||
| import static org.asynchttpclient.Dsl.asyncHttpClient; | ||
| import static org.asynchttpclient.Dsl.post; | ||
|
|
||
| import com.spotify.futures.CompletableFutures; | ||
| import io.mantisrx.common.Ack; | ||
| import io.mantisrx.server.core.master.MasterDescription; | ||
| import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; | ||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import lombok.ToString; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.asynchttpclient.AsyncHttpClient; | ||
| import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder; | ||
| import org.asynchttpclient.Request; | ||
|
|
||
| @ToString(of = {"masterDescription", "clusterID"}) | ||
| @Slf4j | ||
| public class ResourceClusterGatewayClient implements ResourceClusterGateway, Closeable { | ||
|
|
||
| private final int connectTimeout = 100; | ||
| private final int connectionRequestTimeout = 1000; | ||
| private final int socketTimeout = 2000; | ||
| private final ClusterID clusterID; | ||
| private final MasterDescription masterDescription; | ||
| private AsyncHttpClient client; | ||
| private final ObjectMapper mapper; | ||
|
|
||
| public ResourceClusterGatewayClient( | ||
| ClusterID clusterID, | ||
| MasterDescription masterDescription) { | ||
| this.clusterID = clusterID; | ||
| this.masterDescription = masterDescription; | ||
| this.mapper = new ObjectMapper(); | ||
| this.client = buildCloseableHttpClient(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| client.close(); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<Ack> registerTaskExecutor(TaskExecutorRegistration registration) { | ||
| return performAction("registerTaskExecutor", registration); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<Ack> heartBeatFromTaskExecutor(TaskExecutorHeartbeat heartbeat) { | ||
| return performAction("heartBeatFromTaskExecutor", heartbeat); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<Ack> notifyTaskExecutorStatusChange( | ||
| TaskExecutorStatusChange taskExecutorStatusChange) { | ||
| return performAction("notifyTaskExecutorStatusChange", taskExecutorStatusChange); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<Ack> disconnectTaskExecutor( | ||
| TaskExecutorDisconnection taskExecutorDisconnection) { | ||
| return performAction("disconnectTaskExecutor", taskExecutorDisconnection); | ||
| } | ||
|
|
||
| private CompletableFuture<Ack> performAction(String action, Object body) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: An enum or similar closed form solution might be safer than a free form string for the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, this is strictly within the class - so I'm not sure what value enum would particularly add. If this was exposed, I agree enums are definitely better as they are stronger types than free-form strings. |
||
| try { | ||
| final String bodyStr = mapper.writeValueAsString(body); | ||
| final Request request = post( | ||
| getActionUri(action)).setBody(bodyStr).addHeader("Content-Type", "application/json").build(); | ||
| log.debug("request={}", request); | ||
| return client.executeRequest(request).toCompletableFuture().thenCompose(response -> { | ||
| if (response.getStatusCode() == 200) { | ||
| return CompletableFuture.completedFuture(Ack.getInstance()); | ||
| } else { | ||
| try { | ||
| log.error("failed request {} with response {}", request, response.getResponseBody()); | ||
| return CompletableFutures.exceptionallyCompletedFuture( | ||
| mapper.readValue(response.getResponseBody(), Throwable.class)); | ||
| } catch (Exception e) { | ||
| return CompletableFutures.exceptionallyCompletedFuture( | ||
| new Exception(String.format("response=%s", response), e)); | ||
| } | ||
| } | ||
| }); | ||
| } catch (Exception e) { | ||
| return CompletableFutures.exceptionallyCompletedFuture(e); | ||
| } | ||
| } | ||
|
|
||
| private String getActionUri(String action) { | ||
| String uri = String.format("http://%s:%d/api/v1/resourceClusters/%s/actions/%s", | ||
| masterDescription.getHostname(), masterDescription.getApiPort(), clusterID.getResourceID(), | ||
| action); | ||
|
|
||
| log.debug("uri={}", uri); | ||
| return uri; | ||
| } | ||
|
|
||
| private AsyncHttpClient buildCloseableHttpClient() { | ||
| return asyncHttpClient( | ||
| new Builder().setConnectTimeout(connectTimeout).setRequestTimeout(connectionRequestTimeout) | ||
| .setReadTimeout(socketTimeout).build()); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.