-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Problem] How to deploy multiple schedulers on standalone mode but not docker #803
Comments
Could someone give me some advices plz? |
Hi @smallzhongfeng. Do you want to deploy ballista with multiple schedulers outside of kubernetes? Standalone mode does not currently support multiple schedulers as all it does is spins up the scheduler server in a tokio task with a hardcoded bind address of |
Thanks for your patient reply! Is there any |
There is a helm chart, and an example kubernetes.yaml. |
So if I want to deploy multi-scheduler, should I change this ? Does this provide
|
I just want to be able to deploy multiple schedulers to ensure high availability of the scheduler. @thinkharderdev |
I have not deployed Ballista in a production environment, much less HA so I will let others share their experience. My understanding is that as long as you set shared storage (etd, sled, etc) that is all that is required. (And a load balancer, as mentioned before - I think the helm chart did that) |
So you have two options:
Option 2 is what we have done in our deployment. We have multiple schedulers, each using an in-memory One downside of this approach is that the job state is volatile so if a scheduler dies then all jobs running on it are lost. If you are running relatively short-duration queries then this is not a huge issue (at least for us) since the scheduler will try and complete any in-flight jobs before it shuts down so you can set up your deployment such that the schedulers have a shutdown grace period sufficient to complete any outstanding work. |
Thanks for your patient reply. I got it. 👍 |
Yes, currently the active job state is stored in memory on the scheduler and if the scheduler shuts down/restarts before the job completes then the job will fail. The scheduler will try and wait for all it's active jobs to complete once it receives a |
Thanks for your reply again! |
These days, I am testing the performance of etcd as a storage, but I found that the performance is very poor, especially when applying for and releasing resources. The time spent here is much longer than the local storage mode. Here I suspect this is the global distributed lock. Cause, do you have any suggestions here? @thinkharderdev @avantgardnerio |
Hi @smallzhongfeng, could you explain the reason of using multiple schedulers? Is it just for HA or worried about the performance of single scheduler for task scheduling? If for HA, is it acceptable that all jobs scheduled on a scheduler fail when a scheduler is down? |
Yes, we found the same with etcd. I would suggest that if you don't need HA then use a single scheduler with in-memory state. If you need HA, what we did is implement const RESERVATION_SCRIPT: &str = r#"
local desired_slots = tonumber(ARGV[1])
local s = {}
for i=2, #ARGV do
local exists = redis.call('HEXISTS', KEYS[1], ARGV[i])
if( exists == 1 ) then
local value = redis.call('HGET', KEYS[1], ARGV[i])
local slots = tonumber(value)
if( slots >= desired_slots ) then
s[i - 1] = desired_slots
local inc = -desired_slots
redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
desired_slots = 0
elseif slots == 0 then
s[i - 1] = 0
else
s[i - 1] = slots
local inc = -slots
redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
desired_slots = desired_slots - slots
end
else
s[i - 1] = 0
end
if( desired_slots <= 0 ) then
break
end
end
return cjson.encode(s)
"#;
const CANCEL_SCRIPT: &str = r#"
local cancelled = 0
for i=2, #KEYS do
local exists = redis.call('HEXISTS', KEYS[1], KEYS[i])
if( exists == 1 ) then
local inc = tonumber(ARGV[i - 1])
redis.call('HINCRBY', KEYS[1], KEYS[i], inc)
cancelled = cancelled + inc
end
end
return cancelled
"#;
const SLOTS_KEY: &str = "task-slots";
impl ClusterState for MyRedisState {
async fn reserve_slots(
&self,
num_slots: u32,
_distribution: TaskDistribution,
executors: Option<HashSet<String>>,
) -> Result<Vec<ExecutorReservation>> {
if num_slots == 0 {
return Ok(vec![]);
}
if let Some(executors) = executors {
let mut connection = self.get_connection().await?;
let script = Script::new(RESERVATION_SCRIPT);
let mut script = script.key(SLOTS_KEY);
script.arg(num_slots);
if !executors.is_empty() {
let executor_ids: Vec<String> = executors.into_iter().collect();
for executor_id in &executor_ids {
script.arg(executor_id);
}
let result: String = match script.invoke_async(&mut connection).await {
Ok(result) => result,
Err(e) => {
return Err(into_ballista_error(e));
}
};
let reservations = serde_json::from_str::<Vec<u32>>(&result).map_err(|e| {
BallistaError::General(format!(
"Error executing reservations, unexpected response from redis: {e:?}"
))
})?;
let reservations: Vec<ExecutorReservation> = executor_ids
.into_iter()
.zip(reservations)
.flat_map(|(id, reserved)| {
(0..reserved).map(move |_| ExecutorReservation::new_free(id.clone()))
})
.collect();
return Ok(reservations);
}
}
Ok(vec![])
}
async fn cancel_reservations(&self, reservations: Vec<ExecutorReservation>) -> Result<()> {
let mut connection = self.get_connection().await?;
if !reservations.is_empty() {
let script = Script::new(CANCEL_SCRIPT);
let mut script = script.key(SLOTS_KEY);
let reservations = reservations
.into_iter()
.group_by(|r| r.executor_id.clone())
.into_iter()
.map(|(key, group)| (key, group.count()))
.collect::<HashMap<String, usize>>();
for (executor_id, slots) in reservations {
script.key(executor_id);
script.arg(slots);
}
let cancelled: u64 = match script.invoke_async(&mut connection).await {
Ok(cancelled) => {
cancelled
}
Err(e) => {
return Err(into_ballista_error(e));
}
};
debug!("Cancelled {} reservations", cancelled);
Ok(())
} else {
Ok(())
}
}
} Note that this only supports |
Hi @yahoNanJing At present, we intend to support ha
We cannot accept the failure of all the jobs in the switching process, we are supporting the retry of the jobs in the switching process |
As the title.
The text was updated successfully, but these errors were encountered: