Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mantisrx.common.utils;

import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service.Listener;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service.State;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

public class Services {

/**
* Equivalent of service.startAsync().awaitRunning() except that this provides a future that completes when
* the service reaches the RUNNING state. if the service on the other hand fails to start, then the future is completed
* exceptionally with the Throwable cause.
*
* @param service service that needs to be started
* @param executor executor to be used for notifying the caller.
* @return a future
*/
public static CompletableFuture<Void> startAsync(Service service, Executor executor) {
Preconditions.checkArgument(service.state() == State.NEW, "Assumes the service has not been started yet");
final CompletableFuture<Void> result = new CompletableFuture<>();
service.addListener(new Listener() {
@Override
public void running() {
result.complete(null);
}

@Override
public void failed(State from, Throwable failure) {
if (from.ordinal() < State.RUNNING.ordinal()) {
result.completeExceptionally(failure);
}
}
}, executor);

service.startAsync();
return result;
}

/**
* Equivalent service.stopAsync().awaitTerminated() except that this method returns a future that gets completed
* when the service terminated successfully.
*
* @param service service to be stopped
* @param executor executor on which the caller needs to be notified.
* @return future
*/
public static CompletableFuture<Void> awaitAsync(Service service, Executor executor) {
final CompletableFuture<Void> result = new CompletableFuture<>();
service.addListener(new Listener() {
@Override
public void terminated(State from) {
result.complete(null);
}

@Override
public void failed(State from, Throwable failure) {
result.completeExceptionally(failure);
}
}, executor);

if (service.state() == State.FAILED) {
result.completeExceptionally(service.failureCause());
} else if (service.state() == State.TERMINATED) {
result.complete(null);
}
return result;
}

public static CompletableFuture<Void> stopAsync(Service service, Executor executor) {
CompletableFuture<Void> result = awaitAsync(service, executor);
service.stopAsync();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.mantisrx.common;

import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnore;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -120,6 +121,11 @@ && isValidPort(customPort)
&& isValidPort(sinkPort);
}

@JsonIgnore
public int getNumberOfPorts() {
return ports.size();
}

/**
* A port with 0 is technically correct, but we disallow it because there would be an inconsistency between
* what unused port the OS selects (some port number) and what this object's metadata holds (0).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public boolean isValidStateChgTo(MantisJobState newState) {
return false;
}

public boolean isTerminalState() {
return isTerminalState(this);
}

public static boolean isTerminalState(MantisJobState state) {
switch (state) {
case Failed:
Expand Down Expand Up @@ -105,4 +109,4 @@ public static boolean isOnSlaveState(MantisJobState state) {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@
package io.mantisrx.server.master.client;

import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service;

/**
* HighAvailabilityServices is a container for a group of services which are considered to be highly available because
* of multiple standbys capable of handling the service in case the leader goes down for instance.
*
* <p>
* In Mantis, the following services are considered highly-available:
* 1. Mantis master which handles all the job-cluster/job/stage/worker interactions.
* 2. Resource Manager which handles all the resource specific interactions such as resource status updates,
* registrations and heartbeats.
*
* <p>
* These services can be obtained from the HighAvailabilityServices implementation.
*/
public interface HighAvailabilityServices extends Service {
MantisMasterGateway getMasterClientApi();
MantisMasterGateway getMasterClientApi();

MasterMonitor getMasterMonitor();

MasterMonitor getMasterMonitor();
ResourceLeaderConnection<ResourceClusterGateway> connectWithResourceManager(ClusterID clusterID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,19 @@
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.server.core.zookeeper.CuratorService;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGatewayClient;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService;
import io.mantisrx.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;

/**
* HighAvailabilityServicesUtil helps you create HighAvailabilityServices instance based on the core configuration.
Expand All @@ -43,6 +54,7 @@ private static class ZkHighAvailabilityServices extends AbstractIdleService impl
HighAvailabilityServices {

private final CuratorService curatorService;
private final AtomicInteger rmConnections = new AtomicInteger(0);

public ZkHighAvailabilityServices(CoreConfiguration configuration) {
curatorService = new CuratorService(configuration, null);
Expand All @@ -67,5 +79,48 @@ public MantisMasterGateway getMasterClientApi() {
public MasterMonitor getMasterMonitor() {
return curatorService.getMasterMonitor();
}

@Override
public ResourceLeaderConnection<ResourceClusterGateway> connectWithResourceManager(
ClusterID clusterID) {
return new ResourceLeaderConnection<ResourceClusterGateway>() {
final MasterMonitor masterMonitor = curatorService.getMasterMonitor();

ResourceClusterGateway currentResourceClusterGateway =
new ResourceClusterGatewayClient(clusterID, masterMonitor.getLatestMaster());

final String nameFormat =
"ResourceClusterGatewayCxn (" + rmConnections.getAndIncrement() + ")-%d";
final Scheduler scheduler =
Schedulers
.from(
Executors
.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(nameFormat).build()));

final List<Subscription> subscriptions = new ArrayList<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we use these subscriptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nowhere right now. But, I think the intention when I wrote the code (a long time ago) was that we would close these subscriptions when the task executor is about to be shut down. So, it just makes for a clean, graceful shutdown.


@Override
public ResourceClusterGateway getCurrent() {
return currentResourceClusterGateway;
}

@Override
public void register(ResourceLeaderChangeListener<ResourceClusterGateway> changeListener) {
Subscription subscription = masterMonitor
.getMasterObservable()
.observeOn(scheduler)
.subscribe(nextDescription -> {
log.info("nextDescription={}", nextDescription);
ResourceClusterGateway previous = currentResourceClusterGateway;
currentResourceClusterGateway =
new ResourceClusterGatewayClient(clusterID, nextDescription);
changeListener.onResourceLeaderChanged(previous, currentResourceClusterGateway);
});

subscriptions.add(subscription);
}
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.mantisrx.server.master.client;

/**
* ResourceLeaderConnection helps create a connection with the leader resource. You create a connection using the
* register method and on subsequent changes to the leader resource, the listener gets notified so that the caller
* can perform appropriate actions as necessary.
*
* @param <ResourceT> type of the resource being connected to.
*/
public interface ResourceLeaderConnection<ResourceT> {
ResourceT getCurrent();

void register(ResourceLeaderChangeListener<ResourceT> changeListener);

interface ResourceLeaderChangeListener<ResourceT> {
void onResourceLeaderChanged(ResourceT previousResourceLeader, ResourceT newResourceLeader);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.server.master.resourcecluster;

import static org.asynchttpclient.Dsl.asyncHttpClient;
import static org.asynchttpclient.Dsl.post;

import com.spotify.futures.CompletableFutures;
import io.mantisrx.common.Ack;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
import org.asynchttpclient.Request;

@ToString(of = {"masterDescription", "clusterID"})
@Slf4j
public class ResourceClusterGatewayClient implements ResourceClusterGateway, Closeable {

private final int connectTimeout = 100;
private final int connectionRequestTimeout = 1000;
private final int socketTimeout = 2000;
private final ClusterID clusterID;
private final MasterDescription masterDescription;
private AsyncHttpClient client;
private final ObjectMapper mapper;

public ResourceClusterGatewayClient(
ClusterID clusterID,
MasterDescription masterDescription) {
this.clusterID = clusterID;
this.masterDescription = masterDescription;
this.mapper = new ObjectMapper();
this.client = buildCloseableHttpClient();
}

@Override
public void close() throws IOException {
client.close();
}

@Override
public CompletableFuture<Ack> registerTaskExecutor(TaskExecutorRegistration registration) {
return performAction("registerTaskExecutor", registration);
}

@Override
public CompletableFuture<Ack> heartBeatFromTaskExecutor(TaskExecutorHeartbeat heartbeat) {
return performAction("heartBeatFromTaskExecutor", heartbeat);
}

@Override
public CompletableFuture<Ack> notifyTaskExecutorStatusChange(
TaskExecutorStatusChange taskExecutorStatusChange) {
return performAction("notifyTaskExecutorStatusChange", taskExecutorStatusChange);
}

@Override
public CompletableFuture<Ack> disconnectTaskExecutor(
TaskExecutorDisconnection taskExecutorDisconnection) {
return performAction("disconnectTaskExecutor", taskExecutorDisconnection);
}

private CompletableFuture<Ack> performAction(String action, Object body) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: An enum or similar closed form solution might be safer than a free form string for the action parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this is strictly within the class - so I'm not sure what value enum would particularly add. If this was exposed, I agree enums are definitely better as they are stronger types than free-form strings.

try {
final String bodyStr = mapper.writeValueAsString(body);
final Request request = post(
getActionUri(action)).setBody(bodyStr).addHeader("Content-Type", "application/json").build();
log.debug("request={}", request);
return client.executeRequest(request).toCompletableFuture().thenCompose(response -> {
if (response.getStatusCode() == 200) {
return CompletableFuture.completedFuture(Ack.getInstance());
} else {
try {
log.error("failed request {} with response {}", request, response.getResponseBody());
return CompletableFutures.exceptionallyCompletedFuture(
mapper.readValue(response.getResponseBody(), Throwable.class));
} catch (Exception e) {
return CompletableFutures.exceptionallyCompletedFuture(
new Exception(String.format("response=%s", response), e));
}
}
});
} catch (Exception e) {
return CompletableFutures.exceptionallyCompletedFuture(e);
}
}

private String getActionUri(String action) {
String uri = String.format("http://%s:%d/api/v1/resourceClusters/%s/actions/%s",
masterDescription.getHostname(), masterDescription.getApiPort(), clusterID.getResourceID(),
action);

log.debug("uri={}", uri);
return uri;
}

private AsyncHttpClient buildCloseableHttpClient() {
return asyncHttpClient(
new Builder().setConnectTimeout(connectTimeout).setRequestTimeout(connectionRequestTimeout)
.setReadTimeout(socketTimeout).build());
}
}
Loading