Skip to content

Conversation

@sundargates
Copy link
Contributor

Context

Right now, the mantis worker assumes that the task it needs to perform will be seeded once as part of the environment or by MesosExecutor as part of the launchTask method. In the new Titus model, these tasks are received and acted upon asynchronously at the request of the mantis scheduler. This diff establishes the model for that by decoupling the act of running a task from how it is being received (for instance, Mesos Executor).

This will be followed by the TaskExecutor implementation which will use the building blocks created in this diff to execute tasks on demand.

Checklist

  • ./gradlew build compiles code correctly
  • Added new tests where applicable
  • ./gradlew test passes all tests
  • Extended README or added javadocs where applicable
  • Added copyright headers for new files from CONTRIBUTING.md



public class ExecuteStageRequest {
public class ExecuteStageRequest implements Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment on why this needs to be serializable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do javadoc for these comments or is it fine to be not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a preference. let me know if you prefer one format over another. Added documentation though.

private List<Parameter> parameters = new LinkedList<Parameter>();
private SchedulingInfo schedulingInfo;
private MantisJobDurationType durationType;
private Optional<String> nameOfJobProviderClass;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Why jobProviderClass is in the request now? do we ignore the jobProviderClass inside the zip?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@calvin681 no this is just for testing purposes. Imagine running a job that's defined in the same codebase. In that case, the only definition that's needed is the job provider class.

WorkerExecutionOperations executionOperations,
Optional<String> jobProviderClass,
ClassLoaderHandle classLoaderHandle, Collection<URL> requiredClasspaths,
Job mantisJob) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make this nullable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Job is used in the spring boot path. We need to figure out how to load class for spring boot and get the Job bean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked too deep into this but I'm guessing this should be some version of the code you have here: https://sourcegraph.netflix.net/stash.corp.netflix.com/CPIE/nfmantis/-/blob/runtime-spring-boot/src/main/java/com/netflix/mantis/runtime/command/SpringReadJobFromZip.java?L65#tab=references

Basically, create an AnnotationBasedApplicationContext based on the classloader and then get the bean for the job class. I'll try to get a test spring-boot app working after the nodequark job migration.

}
logger.info("Creating job classpath with pathLocation " + pathLocation);
ClassLoader cl = URLClassLoader.newInstance(new URL[] {pathLocation});

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanup

import org.apache.flink.util.UserCodeClassLoader;

@RequiredArgsConstructor(staticName = "of")
public class SimpleClassLoaderHandle implements ClassLoaderHandle {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation

import io.mantisrx.server.core.Service;
import java.util.function.Function;

public interface SinkSubscriptionStateHandler extends Service {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation



public class ExecuteStageRequest {
public class ExecuteStageRequest implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do javadoc for these comments or is it fine to be not.

executeRequest.getStatus().onError(e1);
return Observable.empty();
}
logger.info("Creating job classpath with pathLocation " + pathLocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i prefer the logger.info("msg {}", pathLocation); format over string concat. It might be performant if we aren't logging the info log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed for both performance and stylistic reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this was existing code that's been deleted.


}

public static SinkSubscriptionStateHandler noop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd convert this to a static final constant to promote reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is expected to be used only for testing purposes. I'm not sure if I want to expose this as a field and as a method.

subscriptionTimeoutSecs + " secs")
.subscribe();
// wait for kill to happen, we won't be running if it succeeds
try {Thread.sleep(60000);} // arbitrary sleep before we retry the kill
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting and also a constant preferably configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As before, this is existing code actually. If you refresh the PR, you should be able to see it now.

// let's call that instead.
onSinkSubscribed();
// let's shutdown the executor and reset it.
executor.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there could be a NPE here that needs to be handled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up.

this.masterClientApi = masterClientApi;
this.subscriptionTimeoutSecs = subscriptionTimeoutSecs;
this.minRuntimeSecs = minRuntimeSecs;
executor = this.subscriptionTimeoutSecs > 0L ? new ScheduledThreadPoolExecutor(1) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be a good candidate for optional IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Do we have a team preference for the Java Optional or the Vavr one? I suppose this sort of a larger discussion about which libraries we do or don't consider part of the canonical set in our code bases. I know introducing something like Vavr has larger implications for how code should be written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is existing code. I just deleted the file that this was supposed to replace, and you should be able to see the changes I made to this file now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have completely refactored the code. Can you guys take a look now? It should be way simpler than the original version.

Copy link
Contributor

@codyrioux codyrioux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with the exception of the noted changes.

executeRequest.getStatus().onError(e1);
return Observable.empty();
}
logger.info("Creating job classpath with pathLocation " + pathLocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed for both performance and stylistic reasons.

final UserCodeClassLoader userCodeClassLoader =
classLoaderHandle.getOrResolveClassLoader(ImmutableList.of(executeStageRequest.getJobJarUrl().toURI()), requiredClasspaths);

log.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be wise to also add a metric for this so we can alert in the event that our artifact downloads go latent.

*/
public static <T> T instantiate(Class<T> clazz) {
if (clazz == null) {
throw new NullPointerException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give me an error message here so I know what was null. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is not expected at all as the clazz is always supposed to be some value. So, the null pointer exception along with the stack trace should be enough to understand the semantics.

final Class<? extends T> clazz;
try {
clazz = Class.forName(className, false, classLoader).asSubclass(targetType);
} catch (ClassNotFoundException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to add either specific catches for ExceptionInInitializerError and LinkageError. They aren't checked exceptions but this method can throw them. See the forName(String, boolean, ClassLoader) docs.

this.masterClientApi = masterClientApi;
this.subscriptionTimeoutSecs = subscriptionTimeoutSecs;
this.minRuntimeSecs = minRuntimeSecs;
executor = this.subscriptionTimeoutSecs > 0L ? new ScheduledThreadPoolExecutor(1) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Do we have a team preference for the Java Optional or the Vavr one? I suppose this sort of a larger discussion about which libraries we do or don't consider part of the canonical set in our code bases. I know introducing something like Vavr has larger implications for how code should be written.

this.masterClientApi = masterClientApi;
this.subscriptionTimeoutSecs = subscriptionTimeoutSecs;
this.minRuntimeSecs = minRuntimeSecs;
executor = this.subscriptionTimeoutSecs > 0L ? new ScheduledThreadPoolExecutor(1) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this note, what if we have a subscription timeout of zero? Is that a valid value for users to input? We may want to consider >= 0L here. Further is it valid at all to instantiate this class if the executor will be null?


@Slf4j
@RequiredArgsConstructor
public class Task extends AbstractIdleService {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation for this one.

private List<Parameter> parameters = new LinkedList<Parameter>();
private SchedulingInfo schedulingInfo;
private MantisJobDurationType durationType;
private Optional<String> nameOfJobProviderClass;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Why jobProviderClass is in the request now? do we ignore the jobProviderClass inside the zip?

* @param request request for which the metrics need to be published
* @return MetricsServerService server
*/
public static MetricsServerService newMetricsServer(CoreConfiguration configuration, ExecuteStageRequest request) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are moving to a factory, should we make these singletons as well?

executionOperations.shutdownStage();
try {
executionOperations.shutdownStage();
} catch (IOException e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you still close cl?

Copy link
Contributor Author

@sundargates sundargates Mar 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CL closure has been added recently. Can you refresh the set of commits and check if looks okay?

WorkerExecutionOperations executionOperations,
Optional<String> jobProviderClass,
ClassLoaderHandle classLoaderHandle, Collection<URL> requiredClasspaths,
Job mantisJob) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Job is used in the spring boot path. We need to figure out how to load class for spring boot and get the Job bean.

@sundargates sundargates merged commit 5f18638 into Netflix:master Mar 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants