-
Notifications
You must be signed in to change notification settings - Fork 212
[Part 5]: Rewriting Mantis Worker to be able to support starting tasks dynamically from the mantis master #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1dad4dc
e88862b
92d7f33
51c9414
246b9c0
161687c
01f4780
8ceb5c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,25 +20,39 @@ | |
| import io.mantisrx.runtime.MantisJobDurationType; | ||
| import io.mantisrx.runtime.descriptor.SchedulingInfo; | ||
| import io.mantisrx.runtime.parameter.Parameter; | ||
| import io.mantisrx.server.core.domain.WorkerId; | ||
| import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator; | ||
| import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
| import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty; | ||
| import io.mantisrx.shaded.com.google.common.base.Optional; | ||
| import java.io.Serializable; | ||
| import java.net.URL; | ||
| import java.util.LinkedList; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * ExecuteStageRequest represents the data structure that defines the StageTask workload a given worker needs to run. | ||
| * The data structure is sent over the wire using java serialization when the server requests a given task executor to | ||
| * perform a certain stage task. | ||
| */ | ||
| public class ExecuteStageRequest implements Serializable { | ||
|
|
||
| public class ExecuteStageRequest { | ||
|
|
||
| // indicates whether this is stage 0 or not. stage 0 runs the autoscaler for the mantis job. | ||
| private final boolean hasJobMaster; | ||
| // subscription threshold for when a sink should considered to be inactive so that ephemeral jobs producing the sink | ||
| // can be shutdown. | ||
| private final long subscriptionTimeoutSecs; | ||
| private final long minRuntimeSecs; | ||
| private final WorkerPorts workerPorts; | ||
| private String jobName; | ||
| // jobId represents the instance of the job. | ||
| private String jobId; | ||
| // index of the worker in that stage | ||
| private int workerIndex; | ||
| // rolling counter of workers for that stage | ||
| private int workerNumber; | ||
| private URL jobJarUrl; | ||
| // index of the stage | ||
| private int stage; | ||
| private int totalNumStages; | ||
| private int metricsPort; | ||
|
|
@@ -47,6 +61,8 @@ public class ExecuteStageRequest { | |
| private List<Parameter> parameters = new LinkedList<Parameter>(); | ||
| private SchedulingInfo schedulingInfo; | ||
| private MantisJobDurationType durationType; | ||
| // class name that provides the job provider. | ||
| private Optional<String> nameOfJobProviderClass; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Documentation.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. Why jobProviderClass is in the request now? do we ignore the jobProviderClass inside the zip?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @calvin681 no this is just for testing purposes. Imagine running a job that's defined in the same codebase. In that case, the only definition that's needed is the job provider class. |
||
|
|
||
| @JsonCreator | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
|
|
@@ -65,15 +81,16 @@ public ExecuteStageRequest(@JsonProperty("jobName") String jobName, | |
| @JsonProperty("durationType") MantisJobDurationType durationType, | ||
| @JsonProperty("subscriptionTimeoutSecs") long subscriptionTimeoutSecs, | ||
| @JsonProperty("minRuntimeSecs") long minRuntimeSecs, | ||
| @JsonProperty("workerPorts") WorkerPorts workerPorts | ||
| ) { | ||
| @JsonProperty("workerPorts") WorkerPorts workerPorts, | ||
| @JsonProperty("jobProviderClass") Optional<String> nameOfJobProviderClass) { | ||
| this.jobName = jobName; | ||
| this.jobId = jobId; | ||
| this.workerIndex = workerIndex; | ||
| this.workerNumber = workerNumber; | ||
| this.jobJarUrl = jobJarUrl; | ||
| this.stage = stage; | ||
| this.totalNumStages = totalNumStages; | ||
| this.nameOfJobProviderClass = nameOfJobProviderClass; | ||
| this.ports.addAll(ports); | ||
| this.metricsPort = metricsPort; | ||
| this.timeoutToReportStart = timeoutToReportStart; | ||
|
|
@@ -158,6 +175,10 @@ public long getMinRuntimeSecs() { | |
| return minRuntimeSecs; | ||
| } | ||
|
|
||
| public Optional<String> getNameOfJobProviderClass() { | ||
| return nameOfJobProviderClass; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "ExecuteStageRequest{" + | ||
|
|
@@ -180,4 +201,8 @@ public String toString() { | |
| ", workerPorts=" + workerPorts + | ||
| '}'; | ||
| } | ||
|
|
||
| public WorkerId getWorkerId() { | ||
| return new WorkerId(jobId, workerIndex, workerNumber); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Copyright 2022 Netflix, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.mantisrx.server.core.metrics; | ||
|
|
||
| import io.mantisrx.server.core.CoreConfiguration; | ||
| import io.mantisrx.server.core.ExecuteStageRequest; | ||
| import io.mantisrx.server.core.stats.MetricStringConstants; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
||
| public class MetricsFactory { | ||
|
|
||
| /** | ||
| * Returns a metrics server, publishing metrics every 1 second | ||
| * | ||
| * @param request request for which the metrics need to be published | ||
| * @return MetricsServerService server | ||
| */ | ||
| public static MetricsServerService newMetricsServer(CoreConfiguration configuration, ExecuteStageRequest request) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are moving to a factory, should we make these singletons as well? |
||
|
|
||
| // todo(sundaram): get rid of the dependency on the metrics port defined at the ExecuteStageRequest level | ||
| // because that's a configuration of the task manager rather than the request. | ||
| return new MetricsServerService(request.getMetricsPort(), 1, getCommonTags(request)); | ||
| } | ||
|
|
||
| public static MetricsPublisherService newMetricsPublisher(CoreConfiguration config, ExecuteStageRequest request) { | ||
| return new MetricsPublisherService(config.getMetricsPublisher(), | ||
| config.getMetricsPublisherFrequencyInSeconds(), getCommonTags(request)); | ||
| } | ||
|
|
||
| private static Map<String, String> getCommonTags(ExecuteStageRequest request) { | ||
| // provide common tags to metrics publishing service | ||
| Map<String, String> commonTags = new HashMap<>(); | ||
| commonTags.put(MetricStringConstants.MANTIS_WORKER_NUM, Integer.toString(request.getWorkerNumber())); | ||
| commonTags.put(MetricStringConstants.MANTIS_STAGE_NUM, Integer.toString(request.getStage())); | ||
| commonTags.put(MetricStringConstants.MANTIS_WORKER_INDEX, Integer.toString(request.getWorkerIndex())); | ||
| commonTags.put(MetricStringConstants.MANTIS_JOB_NAME, request.getJobName()); | ||
| commonTags.put(MetricStringConstants.MANTIS_JOB_ID, request.getJobId()); | ||
|
|
||
| return commonTags; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| /* | ||
| * Copyright 2022 Netflix, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.mantisrx.server.worker; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.net.URI; | ||
| import java.net.URL; | ||
| import java.util.Collection; | ||
| import org.apache.flink.util.SimpleUserCodeClassLoader; | ||
| import org.apache.flink.util.UserCodeClassLoader; | ||
|
|
||
| /** | ||
| * Handle to retrieve a user code class loader for the associated job. | ||
| */ | ||
| interface ClassLoaderHandle extends Closeable { | ||
|
|
||
| /** | ||
| * Gets or resolves the user code class loader for the associated job. | ||
| * | ||
| * <p>In order to retrieve the user code class loader the caller has to specify the required | ||
| * jars and class paths. Upon calling this method first for a job, it will make sure that | ||
| * the required jars are present and potentially cache the created user code class loader. | ||
| * Every subsequent call to this method, will ensure that created user code class loader can | ||
| * fulfill the required jar files and class paths. | ||
| * | ||
| * @param requiredJarFiles requiredJarFiles the user code class loader needs to load | ||
| * @return the user code class loader fulfilling the requirements | ||
| * @throws IOException if the required jar files cannot be downloaded | ||
| * @throws IllegalStateException if the cached user code class loader does not fulfill the | ||
| * requirements | ||
| */ | ||
| UserCodeClassLoader getOrResolveClassLoader(Collection<URI> requiredJarFiles, | ||
| Collection<URL> requiredClasspaths) throws IOException; | ||
|
|
||
| /** | ||
| * ClassLoaderHandle that just returns the classloader field that's assigned at the time of construction | ||
| * on query for any new handles. | ||
| * <p> | ||
| * This is primarily used for testing purposes when the classloader doesn't need to get any files. | ||
| */ | ||
| static ClassLoaderHandle fixed(ClassLoader classLoader) { | ||
| return new ClassLoaderHandle() { | ||
| @Override | ||
| public UserCodeClassLoader getOrResolveClassLoader(Collection<URI> requiredJarFiles, Collection<URL> requiredClasspaths) throws IOException { | ||
| return SimpleUserCodeClassLoader.create(classLoader); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| } | ||
| }; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment on why this needs to be serializable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we do javadoc for these comments or is it fine to be not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a preference. let me know if you prefer one format over another. Added documentation though.