Skip to content

Commit 697dd15

Browse files
committed
Move ballista standalone mode to client
1 parent 51e5445 commit 697dd15

File tree

16 files changed

+190
-112
lines changed

16 files changed

+190
-112
lines changed

ballista/rust/client/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,14 @@ edition = "2018"
2727

2828
[dependencies]
2929
ballista-core = { path = "../core" }
30+
ballista-executor = { path = "../executor", optional = true }
31+
ballista-scheduler = { path = "../scheduler", optional = true }
3032
futures = "0.3"
3133
log = "0.4"
3234
tokio = "1.0"
3335

3436
datafusion = { path = "../../../datafusion" }
37+
38+
[features]
39+
default = []
40+
standalone = ["ballista-executor", "ballista-scheduler"]

ballista/rust/client/src/columnar_batch.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ use datafusion::scalar::ScalarValue;
2929
pub type MaybeColumnarBatch = Result<Option<ColumnarBatch>>;
3030

3131
/// Batch of columnar data.
32-
#[allow(dead_code)]
3332
#[derive(Debug, Clone)]
34-
3533
pub struct ColumnarBatch {
3634
schema: Arc<Schema>,
3735
columns: HashMap<String, ColumnarValue>,
@@ -112,9 +110,7 @@ impl ColumnarBatch {
112110
}
113111

114112
/// A columnar value can either be a scalar value or an Arrow array.
115-
#[allow(dead_code)]
116113
#[derive(Debug, Clone)]
117-
118114
pub enum ColumnarValue {
119115
Scalar(ScalarValue, usize),
120116
Columnar(ArrayRef),

ballista/rust/client/src/context.rs

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,49 +44,81 @@ use futures::future;
4444
use futures::StreamExt;
4545
use log::{error, info};
4646

47-
#[allow(dead_code)]
4847
struct BallistaContextState {
4948
/// Scheduler host
5049
scheduler_host: String,
5150
/// Scheduler port
5251
scheduler_port: u16,
5352
/// Tables that have been registered with this context
5453
tables: HashMap<String, LogicalPlan>,
55-
/// General purpose settings
56-
settings: HashMap<String, String>,
5754
}
5855

5956
impl BallistaContextState {
60-
pub fn new(
61-
scheduler_host: String,
62-
scheduler_port: u16,
63-
settings: HashMap<String, String>,
64-
) -> Self {
57+
pub fn new(scheduler_host: String, scheduler_port: u16) -> Self {
6558
Self {
6659
scheduler_host,
6760
scheduler_port,
6861
tables: HashMap::new(),
69-
settings,
7062
}
7163
}
72-
}
7364

74-
#[allow(dead_code)]
65+
#[cfg(feature = "standalone")]
66+
pub async fn new_standalone(
67+
concurrent_tasks: usize,
68+
) -> ballista_core::error::Result<Self> {
69+
info!("Running in local mode. Scheduler will be run in-proc");
70+
71+
let addr = ballista_scheduler::new_standalone_scheduler().await?;
72+
73+
let scheduler = loop {
74+
match SchedulerGrpcClient::connect(format!(
75+
"http://localhost:{}",
76+
addr.port()
77+
))
78+
.await
79+
{
80+
Err(_) => {
81+
tokio::time::sleep(Duration::from_millis(100)).await;
82+
info!("Attempting to connect to in-proc scheduler...");
83+
}
84+
Ok(scheduler) => break scheduler,
85+
}
86+
};
87+
88+
ballista_executor::new_standalone_executor(scheduler, concurrent_tasks).await?;
89+
Ok(Self {
90+
scheduler_host: "localhost".to_string(),
91+
scheduler_port: addr.port(),
92+
tables: HashMap::new(),
93+
})
94+
}
95+
}
7596

7697
pub struct BallistaContext {
7798
state: Arc<Mutex<BallistaContextState>>,
7899
}
79100

80101
impl BallistaContext {
81102
/// Create a context for executing queries against a remote Ballista scheduler instance
82-
pub fn remote(host: &str, port: u16, settings: HashMap<String, String>) -> Self {
83-
let state = BallistaContextState::new(host.to_owned(), port, settings);
103+
pub fn remote(host: &str, port: u16) -> Self {
104+
let state = BallistaContextState::new(host.to_owned(), port);
84105

85106
Self {
86107
state: Arc::new(Mutex::new(state)),
87108
}
88109
}
89110

111+
#[cfg(feature = "standalone")]
112+
pub async fn standalone(
113+
concurrent_tasks: usize,
114+
) -> ballista_core::error::Result<Self> {
115+
let state = BallistaContextState::new_standalone(concurrent_tasks).await?;
116+
117+
Ok(Self {
118+
state: Arc::new(Mutex::new(state)),
119+
})
120+
}
121+
90122
/// Create a DataFrame representing a Parquet table scan
91123
92124
pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
@@ -268,3 +300,15 @@ impl BallistaContext {
268300
}
269301
}
270302
}
303+
304+
#[cfg(test)]
305+
mod tests {
306+
#[tokio::test]
307+
#[cfg(feature = "standalone")]
308+
async fn test_standalone_mode() {
309+
use super::*;
310+
let context = BallistaContext::standalone(1).await.unwrap();
311+
let df = context.sql("SELECT 1;").unwrap();
312+
context.collect(&df.to_logical_plan()).await.unwrap();
313+
}
314+
}

ballista/rust/executor/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ snmalloc = ["snmalloc-rs"]
3232
anyhow = "1"
3333
async-trait = "0.1.36"
3434
ballista-core = { path = "../core" }
35-
ballista-scheduler = { path = "../scheduler" }
3635
configure_me = "0.4.0"
3736
env_logger = "0.8"
3837
futures = "0.3"

ballista/rust/executor/executor_config_spec.toml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,6 @@ type = "u16"
3636
default = "50050"
3737
doc = "scheduler port"
3838

39-
[[switch]]
40-
name = "local"
41-
doc = "Running in local mode will launch a standalone scheduler inside the executor process. This will create a single-executor cluster, and is useful for development scenarios."
42-
4339
[[param]]
4440
name = "bind_host"
4541
type = "String"
@@ -69,8 +65,3 @@ name = "concurrent_tasks"
6965
type = "usize"
7066
default = "4"
7167
doc = "Max concurrent tasks."
72-
73-
[[param]]
74-
name = "scheduler_data_path"
75-
type = "String"
76-
doc = "Path for standalone data"

ballista/rust/executor/src/execution_loop.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ use ballista_core::serde::protobuf::{
2929
self, scheduler_grpc_client::SchedulerGrpcClient, task_status, FailedTask,
3030
PartitionId, PollWorkParams, PollWorkResult, TaskDefinition, TaskStatus,
3131
};
32-
use ballista_executor::executor::Executor;
3332
use protobuf::CompletedTask;
3433

34+
use crate::executor::Executor;
35+
3536
pub async fn poll_loop(
3637
mut scheduler: SchedulerGrpcClient<Channel>,
3738
executor: Arc<Executor>,

ballista/rust/executor/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,9 @@
1818
//! Core executor logic for executing queries and storing results in memory.
1919
2020
pub mod collect;
21+
pub mod execution_loop;
2122
pub mod executor;
2223
pub mod flight_service;
24+
25+
mod standalone;
26+
pub use standalone::new_standalone_executor;

ballista/rust/executor/src/main.rs

Lines changed: 4 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
//! Ballista Rust executor binary.
1919
20-
use std::{
21-
net::{IpAddr, Ipv4Addr},
22-
sync::Arc,
23-
};
20+
use std::sync::Arc;
2421

2522
use anyhow::{Context, Result};
2623
use arrow_flight::flight_service_server::FlightServiceServer;
27-
use futures::future::MaybeDone;
24+
use ballista_executor::execution_loop;
2825
use log::info;
2926
use tempfile::TempDir;
3027
use tonic::transport::Server;
@@ -34,17 +31,11 @@ use ballista_core::serde::protobuf::{
3431
executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
3532
ExecutorRegistration,
3633
};
37-
use ballista_core::{
38-
print_version, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
39-
BALLISTA_VERSION,
40-
};
34+
use ballista_core::{print_version, BALLISTA_VERSION};
4135
use ballista_executor::executor::Executor;
4236
use ballista_executor::flight_service::BallistaFlightService;
43-
use ballista_scheduler::{state::StandaloneClient, SchedulerServer};
4437
use config::prelude::*;
4538

46-
mod execution_loop;
47-
4839
#[macro_use]
4940
extern crate configure_me;
5041

@@ -82,11 +73,7 @@ async fn main() -> Result<()> {
8273
.parse()
8374
.with_context(|| format!("Could not parse address: {}", addr))?;
8475

85-
let scheduler_host = if opt.local {
86-
"localhost".to_string()
87-
} else {
88-
opt.scheduler_host
89-
};
76+
let scheduler_host = opt.scheduler_host;
9077
let scheduler_port = opt.scheduler_port;
9178
let scheduler_url = format!("http://{}:{}", scheduler_host, scheduler_port);
9279

@@ -109,58 +96,6 @@ async fn main() -> Result<()> {
10996
port: port as u32,
11097
};
11198

112-
if opt.local {
113-
info!("Running in local mode. Scheduler will be run in-proc");
114-
115-
let client = match opt.scheduler_data_path {
116-
Some(v) => StandaloneClient::try_new(v)
117-
.context("Could not create standalone config backend")?,
118-
None => StandaloneClient::try_new_temporary()
119-
.context("Could not create standalone config backend")?,
120-
};
121-
122-
let server = SchedulerGrpcServer::new(SchedulerServer::new(
123-
Arc::new(client),
124-
"ballista".to_string(),
125-
IpAddr::V4(Ipv4Addr::LOCALHOST),
126-
));
127-
let addr = format!("localhost:{}", scheduler_port);
128-
let addr = addr
129-
.parse()
130-
.with_context(|| format!("Could not parse {}", addr))?;
131-
info!(
132-
"Ballista v{} Rust Scheduler listening on {:?}",
133-
BALLISTA_VERSION, addr
134-
);
135-
let scheduler_future =
136-
tokio::spawn(Server::builder().add_service(server).serve(addr));
137-
let mut scheduler_result = futures::future::maybe_done(scheduler_future);
138-
139-
// Ensure scheduler is ready to receive connections
140-
while SchedulerGrpcClient::connect(scheduler_url.clone())
141-
.await
142-
.is_err()
143-
{
144-
let scheduler_future = match scheduler_result {
145-
MaybeDone::Future(f) => f,
146-
MaybeDone::Done(Err(e)) => return Err(e).context("Tokio error"),
147-
MaybeDone::Done(Ok(Err(e))) => {
148-
return Err(e).context("Scheduler failed to initialize correctly")
149-
}
150-
MaybeDone::Done(Ok(Ok(()))) => {
151-
return Err(anyhow::format_err!(
152-
"Scheduler unexpectedly finished successfully"
153-
))
154-
}
155-
MaybeDone::Gone => {
156-
panic!("Received Gone from recently created MaybeDone")
157-
}
158-
};
159-
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
160-
scheduler_result = futures::future::maybe_done(scheduler_future);
161-
}
162-
}
163-
16499
let scheduler = SchedulerGrpcClient::connect(scheduler_url)
165100
.await
166101
.context("Could not connect to scheduler")?;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::sync::Arc;
2+
3+
use arrow_flight::flight_service_server::FlightServiceServer;
4+
use ballista_core::{
5+
error::Result,
6+
serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration},
7+
BALLISTA_VERSION,
8+
};
9+
use log::info;
10+
use tempfile::TempDir;
11+
use tokio::net::TcpListener;
12+
use tonic::transport::{Channel, Server};
13+
use uuid::Uuid;
14+
15+
use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService};
16+
17+
pub async fn new_standalone_executor(
18+
scheduler: SchedulerGrpcClient<Channel>,
19+
concurrent_tasks: usize,
20+
) -> Result<()> {
21+
let work_dir = TempDir::new()?
22+
.into_path()
23+
.into_os_string()
24+
.into_string()
25+
.unwrap();
26+
let executor = Arc::new(Executor::new(&work_dir));
27+
28+
let service = BallistaFlightService::new(executor.clone());
29+
30+
let server = FlightServiceServer::new(service);
31+
// Let the OS assign a random, free port
32+
let listener = TcpListener::bind("localhost:0").await?;
33+
let addr = listener.local_addr()?;
34+
info!(
35+
"Ballista v{} Rust Executor listening on {:?}",
36+
BALLISTA_VERSION, addr
37+
);
38+
tokio::spawn(
39+
Server::builder().add_service(server).serve_with_incoming(
40+
tokio_stream::wrappers::TcpListenerStream::new(listener),
41+
),
42+
);
43+
let executor_meta = ExecutorRegistration {
44+
id: Uuid::new_v4().to_string(), // assign this executor a unique ID
45+
optional_host: None,
46+
port: addr.port() as u32,
47+
};
48+
tokio::spawn(execution_loop::poll_loop(
49+
scheduler,
50+
executor,
51+
executor_meta,
52+
concurrent_tasks,
53+
));
54+
Ok(())
55+
}

ballista/rust/scheduler/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ edition = "2018"
2828
[features]
2929
default = ["etcd", "sled"]
3030
etcd = ["etcd-client"]
31-
sled = ["sled_package"]
31+
sled = ["sled_package", "tokio-stream"]
3232

3333
[dependencies]
3434
anyhow = "1"
@@ -48,6 +48,7 @@ rand = "0.8"
4848
serde = {version = "1", features = ["derive"]}
4949
sled_package = { package = "sled", version = "0.34", optional = true }
5050
tokio = { version = "1.0", features = ["full"] }
51+
tokio-stream = { version = "0.1", features = ["net"], optional = true }
5152
tonic = "0.4"
5253
tower = { version = "0.4" }
5354
warp = "0.3"

0 commit comments

Comments
 (0)