Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ ext {
vavrVersion = '0.9.2'
}

ext.libraries = [
flinkRpcApi: [
"org.apache.flink:flink-rpc-core:1.14.2"
],
flinkRpcImpl: [
"org.apache.flink:flink-rpc-akka:1.14.2",
"org.apache.flink:flink-rpc-akka-loader:1.14.2"
]
]

allprojects {
apply plugin: 'nebula.netflixoss'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
api project(":mantis-common")

api "org.skife.config:config-magic:$configMagicVersion"
api libraries.flinkRpcApi

api "org.apache.mesos:mesos:$mesosVersion"
api "org.json:json:$jsonVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.mantisrx.server.worker;

import io.mantisrx.common.Ack;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.domain.WorkerId;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import org.apache.flink.runtime.rpc.RpcGateway;

/**
* Gateway to talk to the task executor running on the mantis-agent.
*/
public interface TaskExecutorGateway extends RpcGateway {
/**
* submit a new task to be run on the task executor. The definition of the task is represented by
* {@link ExecuteStageRequest}.
*
* @param request Task that needs to be run on the executor.
* @return Ack to indicate that the gateway was able to receive the task.
* @throws TaskAlreadyRunningException wrapped inside {@link java.util.concurrent.CompletionException}
* in case there's already an existing task that's running on the task executor.
*/
CompletableFuture<Ack> submitTask(ExecuteStageRequest request);

/**
* cancel the currently running task and get rid of all of the associated resources.
*
* @param workerId of the task that needs to be cancelled.
* @return Ack to indicate that the gateway was able to receive the request and the worker ID represents the currently
* running task.
* @throws TaskNotFoundException wrapped inside a {@link java.util.concurrent.CompletionException} in case
* workerId is not running on the executor.
*/
CompletableFuture<Ack> cancelTask(WorkerId workerId);

/**
* request a thread dump on the worker to see what threads are running on it.
*
* @return thread dump in the string format.
*/
CompletableFuture<String> requestThreadDump();

@RequiredArgsConstructor
class TaskAlreadyRunningException extends Exception {
private final WorkerId currentlyRunningWorkerTask;
}

class TaskNotFoundException extends Exception {

public TaskNotFoundException(WorkerId workerId) {
super(String.format("Task %s not found", workerId.toString()));
}
}
}