Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,39 @@
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.server.core.domain.WorkerId;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import io.mantisrx.shaded.com.google.common.base.Optional;
import java.io.Serializable;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;

/**
* ExecuteStageRequest represents the data structure that defines the StageTask workload a given worker needs to run.
* The data structure is sent over the wire using java serialization when the server requests a given task executor to
* perform a certain stage task.
*/
public class ExecuteStageRequest implements Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment on why this needs to be serializable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do javadoc for these comments or is it fine to be not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a preference. let me know if you prefer one format over another. Added documentation though.


public class ExecuteStageRequest {

// indicates whether this is stage 0 or not. stage 0 runs the autoscaler for the mantis job.
private final boolean hasJobMaster;
// subscription threshold for when a sink should considered to be inactive so that ephemeral jobs producing the sink
// can be shutdown.
private final long subscriptionTimeoutSecs;
private final long minRuntimeSecs;
private final WorkerPorts workerPorts;
private String jobName;
// jobId represents the instance of the job.
private String jobId;
// index of the worker in that stage
private int workerIndex;
// rolling counter of workers for that stage
private int workerNumber;
private URL jobJarUrl;
// index of the stage
private int stage;
private int totalNumStages;
private int metricsPort;
Expand All @@ -47,6 +61,8 @@ public class ExecuteStageRequest {
private List<Parameter> parameters = new LinkedList<Parameter>();
private SchedulingInfo schedulingInfo;
private MantisJobDurationType durationType;
// class name that provides the job provider.
private Optional<String> nameOfJobProviderClass;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Why jobProviderClass is in the request now? do we ignore the jobProviderClass inside the zip?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@calvin681 no this is just for testing purposes. Imagine running a job that's defined in the same codebase. In that case, the only definition that's needed is the job provider class.


@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -65,15 +81,16 @@ public ExecuteStageRequest(@JsonProperty("jobName") String jobName,
@JsonProperty("durationType") MantisJobDurationType durationType,
@JsonProperty("subscriptionTimeoutSecs") long subscriptionTimeoutSecs,
@JsonProperty("minRuntimeSecs") long minRuntimeSecs,
@JsonProperty("workerPorts") WorkerPorts workerPorts
) {
@JsonProperty("workerPorts") WorkerPorts workerPorts,
@JsonProperty("jobProviderClass") Optional<String> nameOfJobProviderClass) {
this.jobName = jobName;
this.jobId = jobId;
this.workerIndex = workerIndex;
this.workerNumber = workerNumber;
this.jobJarUrl = jobJarUrl;
this.stage = stage;
this.totalNumStages = totalNumStages;
this.nameOfJobProviderClass = nameOfJobProviderClass;
this.ports.addAll(ports);
this.metricsPort = metricsPort;
this.timeoutToReportStart = timeoutToReportStart;
Expand Down Expand Up @@ -158,6 +175,10 @@ public long getMinRuntimeSecs() {
return minRuntimeSecs;
}

public Optional<String> getNameOfJobProviderClass() {
return nameOfJobProviderClass;
}

@Override
public String toString() {
return "ExecuteStageRequest{" +
Expand All @@ -180,4 +201,8 @@ public String toString() {
", workerPorts=" + workerPorts +
'}';
}

public WorkerId getWorkerId() {
return new WorkerId(jobId, workerIndex, workerNumber);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.mantisrx.server.core.metrics;

import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.stats.MetricStringConstants;
import java.util.HashMap;
import java.util.Map;

public class MetricsFactory {

/**
* Returns a metrics server, publishing metrics every 1 second
*
* @param request request for which the metrics need to be published
* @return MetricsServerService server
*/
public static MetricsServerService newMetricsServer(CoreConfiguration configuration, ExecuteStageRequest request) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are moving to a factory, should we make these singletons as well?


// todo(sundaram): get rid of the dependency on the metrics port defined at the ExecuteStageRequest level
// because that's a configuration of the task manager rather than the request.
return new MetricsServerService(request.getMetricsPort(), 1, getCommonTags(request));
}

public static MetricsPublisherService newMetricsPublisher(CoreConfiguration config, ExecuteStageRequest request) {
return new MetricsPublisherService(config.getMetricsPublisher(),
config.getMetricsPublisherFrequencyInSeconds(), getCommonTags(request));
}

private static Map<String, String> getCommonTags(ExecuteStageRequest request) {
// provide common tags to metrics publishing service
Map<String, String> commonTags = new HashMap<>();
commonTags.put(MetricStringConstants.MANTIS_WORKER_NUM, Integer.toString(request.getWorkerNumber()));
commonTags.put(MetricStringConstants.MANTIS_STAGE_NUM, Integer.toString(request.getStage()));
commonTags.put(MetricStringConstants.MANTIS_WORKER_INDEX, Integer.toString(request.getWorkerIndex()));
commonTags.put(MetricStringConstants.MANTIS_JOB_NAME, request.getJobName());
commonTags.put(MetricStringConstants.MANTIS_JOB_ID, request.getJobId());

return commonTags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.DeserializationFeature;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.google.common.base.Optional;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand Down Expand Up @@ -162,7 +163,8 @@ private Collection<TaskInfo> createTaskInfo(Protos.SlaveID slaveID, final Launch
scheduleRequest.getDurationType(),
scheduleRequest.getJobMetadata().getSubscriptionTimeoutSecs(),
scheduleRequest.getJobMetadata().getMinRuntimeSecs() - (System.currentTimeMillis() - scheduleRequest.getJobMetadata().getMinRuntimeSecs()),
launchTaskRequest.getPorts()
launchTaskRequest.getPorts(),
Optional.absent()
);
taskInfoBuilder
.setName(name)
Expand Down
3 changes: 3 additions & 0 deletions mantis-server/mantis-server-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ dependencies {
compile project(":mantis-control-plane:mantis-control-plane-core")
compile project(":mantis-server:mantis-server-worker-client")

api libraries.flinkRpcApi
implementation libraries.flinkRpcImpl

compile "org.apache.mesos:mesos:$mesosVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
compile "org.slf4j:slf4j-log4j12:$slf4jVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.mantisrx.server.worker;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;

/**
* Handle to retrieve a user code class loader for the associated job.
*/
interface ClassLoaderHandle extends Closeable {

/**
* Gets or resolves the user code class loader for the associated job.
*
* <p>In order to retrieve the user code class loader the caller has to specify the required
* jars and class paths. Upon calling this method first for a job, it will make sure that
* the required jars are present and potentially cache the created user code class loader.
* Every subsequent call to this method, will ensure that created user code class loader can
* fulfill the required jar files and class paths.
*
* @param requiredJarFiles requiredJarFiles the user code class loader needs to load
* @return the user code class loader fulfilling the requirements
* @throws IOException if the required jar files cannot be downloaded
* @throws IllegalStateException if the cached user code class loader does not fulfill the
* requirements
*/
UserCodeClassLoader getOrResolveClassLoader(Collection<URI> requiredJarFiles,
Collection<URL> requiredClasspaths) throws IOException;

/**
* ClassLoaderHandle that just returns the classloader field that's assigned at the time of construction
* on query for any new handles.
* <p>
* This is primarily used for testing purposes when the classloader doesn't need to get any files.
*/
static ClassLoaderHandle fixed(ClassLoader classLoader) {
return new ClassLoaderHandle() {
@Override
public UserCodeClassLoader getOrResolveClassLoader(Collection<URI> requiredJarFiles, Collection<URL> requiredClasspaths) throws IOException {
return SimpleUserCodeClassLoader.create(classLoader);
}

@Override
public void close() throws IOException {
}
};
}
}
Loading