Skip to content

Commit 71757bb

Browse files
yahoNanJingyangzhong
andauthored
Introduce push-based task scheduling for Ballista (#1560)
* Remove call_ip in the SchedulerServer * Introduce push-based task scheduling Co-authored-by: yangzhong <yangzhong@ebay.com>
1 parent 9c5ccae commit 71757bb

File tree

17 files changed

+1245
-103
lines changed

17 files changed

+1245
-103
lines changed

ballista/rust/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ tokio = "1.0"
4242
tonic = "0.6"
4343
uuid = { version = "0.8", features = ["v4"] }
4444
chrono = { version = "0.4", default-features = false }
45+
clap = "2"
46+
parse_arg = "0.1.3"
4547

4648
arrow-flight = { version = "7.0.0" }
4749
datafusion = { path = "../../../datafusion", version = "6.0.0" }

ballista/rust/core/proto/ballista.proto

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,7 @@ message ExecutorMetadata {
838838
string id = 1;
839839
string host = 2;
840840
uint32 port = 3;
841+
uint32 grpc_port = 4;
841842
}
842843

843844
message ExecutorRegistration {
@@ -848,12 +849,46 @@ message ExecutorRegistration {
848849
string host = 2;
849850
}
850851
uint32 port = 3;
852+
uint32 grpc_port = 4;
851853
}
852854

853855
message ExecutorHeartbeat {
854856
ExecutorMetadata meta = 1;
855857
// Unix epoch-based timestamp in seconds
856858
uint64 timestamp = 2;
859+
ExecutorState state = 3;
860+
}
861+
862+
message ExecutorState {
863+
repeated ExecutorMetric metrics = 1;
864+
}
865+
866+
message ExecutorMetric {
867+
// TODO add more metrics
868+
oneof metric {
869+
uint64 available_memory = 1;
870+
}
871+
}
872+
873+
message ExecutorSpecification {
874+
repeated ExecutorResource resources = 1;
875+
}
876+
877+
message ExecutorResource {
878+
// TODO add more resources
879+
oneof resource {
880+
uint32 task_slots = 1;
881+
}
882+
}
883+
884+
message ExecutorData {
885+
string executor_id = 1;
886+
repeated ExecutorResourcePair resources = 2;
887+
}
888+
889+
message ExecutorResourcePair {
890+
ExecutorResource total = 1;
891+
ExecutorResource available = 2;
857892
}
858893

859894
message RunningTask {
@@ -906,6 +941,41 @@ message PollWorkResult {
906941
TaskDefinition task = 1;
907942
}
908943

944+
message RegisterExecutorParams {
945+
ExecutorRegistration metadata = 1;
946+
ExecutorSpecification specification = 2;
947+
}
948+
949+
message RegisterExecutorResult {
950+
bool success = 1;
951+
}
952+
953+
message SendHeartBeatParams {
954+
ExecutorRegistration metadata = 1;
955+
ExecutorState state = 2;
956+
}
957+
958+
message SendHeartBeatResult {
959+
// TODO it's from Spark for BlockManager
960+
bool reregister = 1;
961+
}
962+
963+
message StopExecutorParams {
964+
}
965+
966+
message StopExecutorResult {
967+
}
968+
969+
message UpdateTaskStatusParams {
970+
ExecutorRegistration metadata = 1;
971+
// All tasks must be reported until they reach the failed or completed state
972+
repeated TaskStatus task_status = 2;
973+
}
974+
975+
message UpdateTaskStatusResult {
976+
bool success = 1;
977+
}
978+
909979
message ExecuteQueryParams {
910980
oneof query {
911981
LogicalPlanNode logical_plan = 1;
@@ -965,17 +1035,41 @@ message FilePartitionMetadata {
9651035
repeated string filename = 1;
9661036
}
9671037

1038+
message LaunchTaskParams {
1039+
// Allow to launch a task set to an executor at once
1040+
repeated TaskDefinition task = 1;
1041+
}
1042+
1043+
message LaunchTaskResult {
1044+
bool success = 1;
1045+
// TODO when part of the task set are scheduled successfully
1046+
}
1047+
9681048
service SchedulerGrpc {
9691049
// Executors must poll the scheduler for heartbeat and to receive tasks
9701050
rpc PollWork (PollWorkParams) returns (PollWorkResult) {}
9711051

1052+
rpc RegisterExecutor(RegisterExecutorParams) returns (RegisterExecutorResult) {}
1053+
1054+
// Push-based task scheduler will only leverage this interface
1055+
// rather than the PollWork interface to report executor states
1056+
rpc SendHeartBeat (SendHeartBeatParams) returns (SendHeartBeatResult) {}
1057+
1058+
rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}
1059+
9721060
rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {}
9731061

9741062
rpc ExecuteQuery (ExecuteQueryParams) returns (ExecuteQueryResult) {}
9751063

9761064
rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {}
9771065
}
9781066

1067+
service ExecutorGrpc {
1068+
rpc LaunchTask (LaunchTaskParams) returns (LaunchTaskResult) {}
1069+
1070+
rpc StopExecutor (StopExecutorParams) returns (StopExecutorResult) {}
1071+
}
1072+
9791073
///////////////////////////////////////////////////////////////////////////////////////////////////
9801074
// Arrow Data Types
9811075
///////////////////////////////////////////////////////////////////////////////////////////////////

ballista/rust/core/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
//! Ballista configuration
2020
21+
use clap::arg_enum;
2122
use core::fmt;
2223
use std::collections::HashMap;
2324
use std::result;
@@ -196,6 +197,22 @@ impl BallistaConfig {
196197
}
197198
}
198199

200+
// an enum used to configure the scheduler policy
201+
// needs to be visible to code generated by configure_me
202+
arg_enum! {
203+
#[derive(Clone, Copy, Debug, serde::Deserialize)]
204+
pub enum TaskSchedulingPolicy {
205+
PullStaged,
206+
PushStaged,
207+
}
208+
}
209+
210+
impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
211+
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
212+
write!(writer, "The scheduler policy for the scheduler")
213+
}
214+
}
215+
199216
#[cfg(test)]
200217
mod tests {
201218
use super::*;

ballista/rust/core/src/serde/scheduler/mod.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ pub struct ExecutorMeta {
7777
pub id: String,
7878
pub host: String,
7979
pub port: u16,
80+
pub grpc_port: u16,
8081
}
8182

8283
#[allow(clippy::from_over_into)]
@@ -86,6 +87,7 @@ impl Into<protobuf::ExecutorMetadata> for ExecutorMeta {
8687
id: self.id,
8788
host: self.host,
8889
port: self.port as u32,
90+
grpc_port: self.grpc_port as u32,
8991
}
9092
}
9193
}
@@ -96,10 +98,149 @@ impl From<protobuf::ExecutorMetadata> for ExecutorMeta {
9698
id: meta.id,
9799
host: meta.host,
98100
port: meta.port as u16,
101+
grpc_port: meta.grpc_port as u16,
99102
}
100103
}
101104
}
102105

106+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
107+
pub struct ExecutorSpecification {
108+
pub task_slots: u32,
109+
}
110+
111+
#[allow(clippy::from_over_into)]
112+
impl Into<protobuf::ExecutorSpecification> for ExecutorSpecification {
113+
fn into(self) -> protobuf::ExecutorSpecification {
114+
protobuf::ExecutorSpecification {
115+
resources: vec![protobuf::executor_resource::Resource::TaskSlots(
116+
self.task_slots,
117+
)]
118+
.into_iter()
119+
.map(|r| protobuf::ExecutorResource { resource: Some(r) })
120+
.collect(),
121+
}
122+
}
123+
}
124+
125+
impl From<protobuf::ExecutorSpecification> for ExecutorSpecification {
126+
fn from(input: protobuf::ExecutorSpecification) -> Self {
127+
let mut ret = Self { task_slots: 0 };
128+
for resource in input.resources {
129+
if let Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
130+
resource.resource
131+
{
132+
ret.task_slots = task_slots
133+
}
134+
}
135+
ret
136+
}
137+
}
138+
139+
#[derive(Debug, Clone, Serialize)]
140+
pub struct ExecutorData {
141+
pub executor_id: String,
142+
pub total_task_slots: u32,
143+
pub available_task_slots: u32,
144+
}
145+
146+
struct ExecutorResourcePair {
147+
total: protobuf::executor_resource::Resource,
148+
available: protobuf::executor_resource::Resource,
149+
}
150+
151+
#[allow(clippy::from_over_into)]
152+
impl Into<protobuf::ExecutorData> for ExecutorData {
153+
fn into(self) -> protobuf::ExecutorData {
154+
protobuf::ExecutorData {
155+
executor_id: self.executor_id,
156+
resources: vec![ExecutorResourcePair {
157+
total: protobuf::executor_resource::Resource::TaskSlots(
158+
self.total_task_slots,
159+
),
160+
available: protobuf::executor_resource::Resource::TaskSlots(
161+
self.available_task_slots,
162+
),
163+
}]
164+
.into_iter()
165+
.map(|r| protobuf::ExecutorResourcePair {
166+
total: Some(protobuf::ExecutorResource {
167+
resource: Some(r.total),
168+
}),
169+
available: Some(protobuf::ExecutorResource {
170+
resource: Some(r.available),
171+
}),
172+
})
173+
.collect(),
174+
}
175+
}
176+
}
177+
178+
impl From<protobuf::ExecutorData> for ExecutorData {
179+
fn from(input: protobuf::ExecutorData) -> Self {
180+
let mut ret = Self {
181+
executor_id: input.executor_id,
182+
total_task_slots: 0,
183+
available_task_slots: 0,
184+
};
185+
for resource in input.resources {
186+
if let Some(task_slots) = resource.total {
187+
if let Some(protobuf::executor_resource::Resource::TaskSlots(
188+
task_slots,
189+
)) = task_slots.resource
190+
{
191+
ret.total_task_slots = task_slots
192+
}
193+
};
194+
if let Some(task_slots) = resource.available {
195+
if let Some(protobuf::executor_resource::Resource::TaskSlots(
196+
task_slots,
197+
)) = task_slots.resource
198+
{
199+
ret.available_task_slots = task_slots
200+
}
201+
};
202+
}
203+
ret
204+
}
205+
}
206+
207+
#[derive(Debug, Clone, Copy, Serialize)]
208+
pub struct ExecutorState {
209+
// in bytes
210+
pub available_memory_size: u64,
211+
}
212+
213+
#[allow(clippy::from_over_into)]
214+
impl Into<protobuf::ExecutorState> for ExecutorState {
215+
fn into(self) -> protobuf::ExecutorState {
216+
protobuf::ExecutorState {
217+
metrics: vec![protobuf::executor_metric::Metric::AvailableMemory(
218+
self.available_memory_size,
219+
)]
220+
.into_iter()
221+
.map(|m| protobuf::ExecutorMetric { metric: Some(m) })
222+
.collect(),
223+
}
224+
}
225+
}
226+
227+
impl From<protobuf::ExecutorState> for ExecutorState {
228+
fn from(input: protobuf::ExecutorState) -> Self {
229+
let mut ret = Self {
230+
available_memory_size: u64::MAX,
231+
};
232+
for metric in input.metrics {
233+
if let Some(protobuf::executor_metric::Metric::AvailableMemory(
234+
available_memory_size,
235+
)) = metric.metric
236+
{
237+
ret.available_memory_size = available_memory_size
238+
}
239+
}
240+
ret
241+
}
242+
}
243+
103244
/// Summary of executed partition
104245
#[derive(Debug, Copy, Clone, Default)]
105246
pub struct PartitionStats {

ballista/rust/executor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
4545
tokio-stream = { version = "0.1", features = ["net"] }
4646
tonic = "0.6"
4747
uuid = { version = "0.8", features = ["v4"] }
48+
hyper = "0.14.4"
4849

4950
[dev-dependencies]
5051

ballista/rust/executor/executor_config_spec.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ type = "u16"
5454
default = "50051"
5555
doc = "bind port"
5656

57+
[[param]]
58+
name = "bind_grpc_port"
59+
type = "u16"
60+
default = "50052"
61+
doc = "bind grpc service port"
62+
5763
[[param]]
5864
name = "work_dir"
5965
type = "String"
@@ -65,3 +71,10 @@ name = "concurrent_tasks"
6571
type = "usize"
6672
default = "4"
6773
doc = "Max concurrent tasks."
74+
75+
[[param]]
76+
abbr = "s"
77+
name = "task_scheduling_policy"
78+
type = "ballista_core::config::TaskSchedulingPolicy"
79+
doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged"
80+
default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"

0 commit comments

Comments
 (0)