-
Notifications
You must be signed in to change notification settings - Fork 125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: stop persisting executor metadata #1291
Conversation
return Ok(SchedulerUpdateRequest { | ||
ChangeType::HandleAbandonedAllocations => { | ||
// Get all executor IDs from allocation_by_executor that aren't in executor_ids | ||
let missing_executors: Vec<String> = indexes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could probably collect all the allocations that needs to be removed in this iteration. Pass the allocation down to the unallocate function instead of the executor ids to remove 2 x iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is the map called allocations_by_fn which is executor id -> fn_name -> Allocation we should use that map instead of allocations_by_executor and try to remove this map altogether. Its not used anywhere else I believe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allocation we should use that map instead of allocations_by_executor and try to remove this map altogether
I removed allocations_by_executor in my PR. Thanks for mentioning it. the /internal/allocations also returns fn allocations now.
Pass the allocation down to the unallocate function instead of the executor ids to remove 2 x iteration.
I refactored things a lot more to allocate on deregistration. Take a look. I kept the 2 loop for now, but it only happens on startup and allows us to keep the same code for deregistration and handling of abandoned allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider using threadpools at some point to run the task allocator and task creators to not block the Tokio tasks.
https://docs.rs/tokio-threadpool/0.1.18/tokio_threadpool/struct.ThreadPool.html
_ => { | ||
error!("unhandled change type: {:?}", change); | ||
return Err(anyhow!("unhandled change type")); | ||
} | ||
} | ||
} | ||
|
||
pub fn unallocate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also try to re-allocate the tasks if there are any executors available. This will save some back and forth between state machine and task allocator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! I also reduce the number of state changes happening for executor removal and also fixed an issue where updated_tasks could contain the same task more than once.
dc138ba
to
0858bd2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR is looking good. Left a comment about not requiring an additional state change type.
@@ -238,7 +235,8 @@ impl GraphProcessor { | |||
} | |||
ChangeType::ExecutorAdded(_) | | |||
ChangeType::ExecutorRemoved(_) | | |||
ChangeType::TombStoneExecutor(_) => { | |||
ChangeType::TombStoneExecutor(_) | | |||
ChangeType::HandleAbandonedAllocations => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this new state here? If you did the reconciliation of which executors are not available anymore in the executor manager, you could simply write new Deregister(or delete, forget the request name) Executor requests into the state machine, and that would follow the path of tombstone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not batching anymore, but should be fine.
} | ||
ChangeType::HandleAbandonedAllocations => { | ||
// Get all executor IDs of executors that haven't registered. | ||
let missing_executor_ids: Vec<String> = indexes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the comment above. You would do this logic in executor manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
e0ee000
to
4014354
Compare
Context
As part of changing the protocol between executors and servers, it has become obvious that persisting executors is not required and adds complexity.
What
This PR moves executor to only live in memory. Additionally, we look at persisted allocations to determine whether some allocations were abandoned when the server restarts for executors that did not reconnect within 30s.
The graph processor is also now running within its own thread.
Testing
Contribution Checklist
make fmt
in the package directory.make fmt
inserver/
.