-
Notifications
You must be signed in to change notification settings - Fork 212
[Part 5]: Rewriting Mantis Worker to be able to support starting tasks dynamically from the mantis master #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
|
|
||
| public class ExecuteStageRequest { | ||
| public class ExecuteStageRequest implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment on why this needs to be serializable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we do javadoc for these comments or is it fine to be not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a preference. let me know if you prefer one format over another. Added documentation though.
...ane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/ExecuteStageRequest.java
Outdated
Show resolved
Hide resolved
| private List<Parameter> parameters = new LinkedList<Parameter>(); | ||
| private SchedulingInfo schedulingInfo; | ||
| private MantisJobDurationType durationType; | ||
| private Optional<String> nameOfJobProviderClass; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Why jobProviderClass is in the request now? do we ignore the jobProviderClass inside the zip?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@calvin681 no this is just for testing purposes. Imagine running a job that's defined in the same codebase. In that case, the only definition that's needed is the job provider class.
| WorkerExecutionOperations executionOperations, | ||
| Optional<String> jobProviderClass, | ||
| ClassLoaderHandle classLoaderHandle, Collection<URL> requiredClasspaths, | ||
| Job mantisJob) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make this nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Job is used in the spring boot path. We need to figure out how to load class for spring boot and get the Job bean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked too deep into this but I'm guessing this should be some version of the code you have here: https://sourcegraph.netflix.net/stash.corp.netflix.com/CPIE/nfmantis/-/blob/runtime-spring-boot/src/main/java/com/netflix/mantis/runtime/command/SpringReadJobFromZip.java?L65#tab=references
Basically, create an AnnotationBasedApplicationContext based on the classloader and then get the bean for the job class. I'll try to get a test spring-boot app working after the nodequark job migration.
| } | ||
| logger.info("Creating job classpath with pathLocation " + pathLocation); | ||
| ClassLoader cl = URLClassLoader.newInstance(new URL[] {pathLocation}); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleanup
| import org.apache.flink.util.UserCodeClassLoader; | ||
|
|
||
| @RequiredArgsConstructor(staticName = "of") | ||
| public class SimpleClassLoaderHandle implements ClassLoaderHandle { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation
| import io.mantisrx.server.core.Service; | ||
| import java.util.function.Function; | ||
|
|
||
| public interface SinkSubscriptionStateHandler extends Service { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation
|
|
||
|
|
||
| public class ExecuteStageRequest { | ||
| public class ExecuteStageRequest implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we do javadoc for these comments or is it fine to be not.
| executeRequest.getStatus().onError(e1); | ||
| return Observable.empty(); | ||
| } | ||
| logger.info("Creating job classpath with pathLocation " + pathLocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i prefer the logger.info("msg {}", pathLocation); format over string concat. It might be performant if we aren't logging the info log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed for both performance and stylistic reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this was existing code that's been deleted.
|
|
||
| } | ||
|
|
||
| public static SinkSubscriptionStateHandler noop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd convert this to a static final constant to promote reuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this is expected to be used only for testing purposes. I'm not sure if I want to expose this as a field and as a method.
| subscriptionTimeoutSecs + " secs") | ||
| .subscribe(); | ||
| // wait for kill to happen, we won't be running if it succeeds | ||
| try {Thread.sleep(60000);} // arbitrary sleep before we retry the kill |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting and also a constant preferably configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As before, this is existing code actually. If you refresh the PR, you should be able to see it now.
| // let's call that instead. | ||
| onSinkSubscribed(); | ||
| // let's shutdown the executor and reset it. | ||
| executor.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there could be a NPE here that needs to be handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned up.
| this.masterClientApi = masterClientApi; | ||
| this.subscriptionTimeoutSecs = subscriptionTimeoutSecs; | ||
| this.minRuntimeSecs = minRuntimeSecs; | ||
| executor = this.subscriptionTimeoutSecs > 0L ? new ScheduledThreadPoolExecutor(1) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be a good candidate for optional IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Do we have a team preference for the Java Optional or the Vavr one? I suppose this sort of a larger discussion about which libraries we do or don't consider part of the canonical set in our code bases. I know introducing something like Vavr has larger implications for how code should be written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is existing code. I just deleted the file that this was supposed to replace, and you should be able to see the changes I made to this file now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have completely refactored the code. Can you guys take a look now? It should be way simpler than the original version.
codyrioux
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with the exception of the noted changes.
| executeRequest.getStatus().onError(e1); | ||
| return Observable.empty(); | ||
| } | ||
| logger.info("Creating job classpath with pathLocation " + pathLocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed for both performance and stylistic reasons.
| final UserCodeClassLoader userCodeClassLoader = | ||
| classLoaderHandle.getOrResolveClassLoader(ImmutableList.of(executeStageRequest.getJobJarUrl().toURI()), requiredClasspaths); | ||
|
|
||
| log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be wise to also add a metric for this so we can alert in the event that our artifact downloads go latent.
| */ | ||
| public static <T> T instantiate(Class<T> clazz) { | ||
| if (clazz == null) { | ||
| throw new NullPointerException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give me an error message here so I know what was null. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this is not expected at all as the clazz is always supposed to be some value. So, the null pointer exception along with the stack trace should be enough to understand the semantics.
| final Class<? extends T> clazz; | ||
| try { | ||
| clazz = Class.forName(className, false, classLoader).asSubclass(targetType); | ||
| } catch (ClassNotFoundException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to add either specific catches for ExceptionInInitializerError and LinkageError. They aren't checked exceptions but this method can throw them. See the forName(String, boolean, ClassLoader) docs.
| this.masterClientApi = masterClientApi; | ||
| this.subscriptionTimeoutSecs = subscriptionTimeoutSecs; | ||
| this.minRuntimeSecs = minRuntimeSecs; | ||
| executor = this.subscriptionTimeoutSecs > 0L ? new ScheduledThreadPoolExecutor(1) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Do we have a team preference for the Java Optional or the Vavr one? I suppose this sort of a larger discussion about which libraries we do or don't consider part of the canonical set in our code bases. I know introducing something like Vavr has larger implications for how code should be written.
| this.masterClientApi = masterClientApi; | ||
| this.subscriptionTimeoutSecs = subscriptionTimeoutSecs; | ||
| this.minRuntimeSecs = minRuntimeSecs; | ||
| executor = this.subscriptionTimeoutSecs > 0L ? new ScheduledThreadPoolExecutor(1) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On this note, what if we have a subscription timeout of zero? Is that a valid value for users to input? We may want to consider >= 0L here. Further is it valid at all to instantiate this class if the executor will be null?
|
|
||
| @Slf4j | ||
| @RequiredArgsConstructor | ||
| public class Task extends AbstractIdleService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation for this one.
| private List<Parameter> parameters = new LinkedList<Parameter>(); | ||
| private SchedulingInfo schedulingInfo; | ||
| private MantisJobDurationType durationType; | ||
| private Optional<String> nameOfJobProviderClass; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Why jobProviderClass is in the request now? do we ignore the jobProviderClass inside the zip?
| * @param request request for which the metrics need to be published | ||
| * @return MetricsServerService server | ||
| */ | ||
| public static MetricsServerService newMetricsServer(CoreConfiguration configuration, ExecuteStageRequest request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are moving to a factory, should we make these singletons as well?
| executionOperations.shutdownStage(); | ||
| try { | ||
| executionOperations.shutdownStage(); | ||
| } catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you still close cl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CL closure has been added recently. Can you refresh the set of commits and check if looks okay?
| WorkerExecutionOperations executionOperations, | ||
| Optional<String> jobProviderClass, | ||
| ClassLoaderHandle classLoaderHandle, Collection<URL> requiredClasspaths, | ||
| Job mantisJob) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Job is used in the spring boot path. We need to figure out how to load class for spring boot and get the Job bean.
Context
Right now, the mantis worker assumes that the task it needs to perform will be seeded once as part of the environment or by MesosExecutor as part of the launchTask method. In the new Titus model, these tasks are received and acted upon asynchronously at the request of the mantis scheduler. This diff establishes the model for that by decoupling the act of running a task from how it is being received (for instance, Mesos Executor).
This will be followed by the TaskExecutor implementation which will use the building blocks created in this diff to execute tasks on demand.
Checklist
./gradlew buildcompiles code correctly./gradlew testpasses all testsCONTRIBUTING.md