-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add heartbeat functionality between master and slaves #422
Conversation
This change adds a heartbeat endpoint on the master where slaves can post heartbeats. The slaves run a thread periodically which sends authenticated http post requests to master it is alive
There are 2 parts of this change 1. First, the master updates slave objects to keep track of when a heartbeat request was received from corresponding slave 2. A scheduled heartbeat thread periodically goes through the slave list and identifies any slaves that have not been sending heartbeats. Those slaves are marked offline
This change adds two new configuration values in the general section of the config file as described below - heartbeat_frequency: interval between two heartbeats sent from a slave - heartbeat_count_threshold: number of heartbeat failures after which a slave determines the master is unreachable In addtion, master runs its own threshold check after every 'heartbeat_frequency * heartbeat_count_threshold' seconds
Verified that @mayurva has signed the CLA. Thanks for the pull request! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just flooded you with comments, mostly minor, but overall I'm pretty excited for this feature! 😻 Let me know if you have any questions about what I meant.
requirements.txt
Outdated
@@ -3,6 +3,7 @@ | |||
# Downgrade the setuptools version to workaround the issue replacing the setuptools dependency | |||
# IOError METADATA no such file. Check https://github.com/pypa/setuptools/issues/951 for more details. | |||
setuptools==33.1.1 | |||
apscheduler==3.5.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are your thoughts on the tradeoffs around using this library? I looked at it briefly and it looked like it might be overkill for what we need for this feature. It seems more oriented around generic multithreaded task scheduling than a periodic function that runs on a single thread.
Did you look at using the sched
module in the standard library? It seems like we could run a sched.scheduler
on a thread and get the necessary behavior without adding a new requirement. Another thought I had was that we could run the slave service's heartbeat loop in a SafeThread so that if an error occurs the slave is taken down. I'm not sure what the apscheduler lib's behavior is when a scheduled task raises an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. Me and @cmcginty briefly discussed about apscheduler
being a overkill, but it seemed pretty straight forward to set up and use. I was hoping to use their native TornadoScheduler
, but the current tornado version we use doesn't support it. I'll check out the sched
module. Also good point on using safe thread. I'll verify the behavior on uncaught exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not sure I'd be crazy about using a TornadoScheduler either. It seems weird to do cron-type work on the Tornado thread where it could potentially block handling web requests. There's currently a very intentional and well-defined separation between the service classes and the API/web layer so I'd recommend against tying those together. Ideally nothing from ClusterMaster/ClusterSlave on down should know about Tornado.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for apscheduler being easy to set up and use -- I totally agree. The code itself that you had around it is pretty clean. I'm less concerned about that than just the size and functionality of the library that we won't (and might never) leverage.
I don't think using sched
would be much more complex. Something like:
def __init__(self):
self._hb_scheduler = sched.scheduler()
SafeThread(target=self._start_beatin, name='HeartbeatLoop', daemon=True).start() # this could also go in a public start() method similar to BuildRequestHandler
def _start_beatin(self):
self._hb_scheduler.enter(0, 0, self._do_heartbeat)
self._hb_scheduler.run()
def _do_heartbeat():
# network request calls, error handling, metrics recording here
self._hb_scheduler.enter(self._hb_freq, 0, self._do_heartbeat) # schedule next beat. (if you wanted to be super precise you could subtract from the frequency the amount of time this method took to execute, but that shouldn't matter much for the intervals we're talking about.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uhh.. I hate how GitHub handles comments, but I reimplemented the heartbeat using sched. I still not fully satisfied, but I think it's better than before
app/master/slave.py
Outdated
@@ -26,6 +27,7 @@ def __init__(self, slave_url, num_executors, slave_session_id=None): | |||
self._num_executors_in_use = Counter() | |||
self._network = Network(min_connection_poolsize=num_executors) | |||
self.current_build_id = None | |||
self._heartbeat = datetime.datetime.now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be clearer to rename to _last_heartbeat_dt
or something like that.
app/master/slave.py
Outdated
@@ -236,6 +238,14 @@ def _expected_session_header(self): | |||
|
|||
return headers | |||
|
|||
def set_heartbeat(self, time): | |||
self._heartbeat = time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use current_time
or current_dt
instead of time
(more specific, and there is a stdlib module named time)? Alternatively just use now()
. Also, please add docstrings with type hints (or variable annotations).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will prefer now()
if you are setting it as a part of blocking (NOT async) operation. Its clean and readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason which I can't recall anymore, I thought passing in a time stamp was a good idea. But now that I think about it, it's probably better to just set the value to now()
app/slave/cluster_slave.py
Outdated
self._heartbeat_count = 0 | ||
self._heartbeat_count_threshold = Configuration['heartbeat_count_threshold'] | ||
self._heartbeat_frequency = Configuration['heartbeat_frequency'] | ||
self.scheduler = BackgroundScheduler() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.scheduler
and self.heartbeat()
should be private?
app/slave/cluster_slave.py
Outdated
@@ -56,6 +58,29 @@ def __init__(self, port, host, num_executors=10): | |||
self._build_teardown_coin = None | |||
self._base_executor_index = None | |||
|
|||
self._heartbeat_count = 0 | |||
self._heartbeat_count_threshold = Configuration['heartbeat_count_threshold'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about heartbeat_failure_threshold
?
@authenticated | ||
def post(self, slave_id): | ||
self._cluster_master.receive_heartbeat_from_slave(slave_id) | ||
self.write({'message': 'heartbeat API POST'}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think empty response is fine here. Did you have an intent for the message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really. I think I just used it for testing and forgot to clean up.
app/slave/cluster_slave.py
Outdated
self._heartbeat_count += 1 | ||
if self._heartbeat_count >= self._heartbeat_count_threshold: | ||
self._logger.warning('Master is not responding to heartbeats') | ||
self._heartbeat_job.remove() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you plan to do more here? It seems like this is a good point to shut down the slave service. This is related to my other comment about running in a SafeThread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The idea is to have the slave take an action like trying to reconnect to master. I just felt that should be a separate pull request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thoughts, since slave is just going to sit idle after this point, it probably makes sense to just kill it. We can change the behavior in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the case then you should add a TODO here explaining the intention. Right now it looks incomplete (because it is)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, how/why is the request to reconnect to the master different than the heartbeat request? Why can't you just keep sending requests to the master forever, regardless of if the master is responding (i.e. never call self._heartbeat_job.remove()).
When the master comes back online and the first successful heartbeat is received the master connects that slave.
You may have already explained this in your design doc. If you did, sorry for not noticing this until now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting point. The TODO is to call connect_to_master for reconnection instead of just simple post to heartbeat when master is dead. The idea is that master's identity might have changed as it probably restarted or something. I guess in future heartbeat/connect_to_master can be combined and behave differently based on current state of connection, but we are probably not there yet. In any case, I'll clarify this in a TODO comment.
app/master/cluster_master.py
Outdated
self._heartbeat_job = self._scheduler.add_job(self.heartbeat, 'interval', seconds=self._heartbeat_frequency) | ||
return self._scheduler | ||
|
||
def heartbeat(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the word "heartbeat" refers to two different things. On the slave it's an API call to the master. On the master it's a periodic cleanup of the slave list. Should these have different names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method is pretty terribly named. The name doesn't describe what the method does at all. Something like disconnect_unresponsive_slaves()
seems much more readable to me.
A heuristic I like to use is that a function's name should be a verb or a verb phrase, but this function's name is a noun. A function is a thing that takes action, thus its name should be a verb or a verb phrase (do X, make this thing happen, calculate this). This function isn't "doing heartbeat", i'm not even sure what that would mean, it's disconnecting slaves.
The big exception I can think of to my "functions should have verb phrase names" heuristic is that a function's name is allowed to be a noun phrase if its name describes the thing it's returning. For example a function is allowed to be named slaves()
or dead_slaves()
if it calculates and returns that thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. I'm working on refactoring this section. Hopefully it will look a lot better then.
app/master/cluster_master.py
Outdated
@@ -334,3 +357,9 @@ def get_path_for_build_results_archive(self, build_id: int, is_tar_request: bool | |||
raise ItemNotReadyError('Build artifact file is not yet ready. Try again later.') | |||
|
|||
return archive_file | |||
|
|||
def receive_heartbeat_from_slave(self, slave_id): | |||
self._thread_pool_executor.submit(self._async_receive_heartbeat_from_slave(int(slave_id))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this needs to be async, does it? It's only setting a single value. It should only be necessary to spawn a thread if the work triggered by an API call is going to take a significant amount of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Joey. Also it is spawning a thread for each heartbeat for each slave.
app/master/cluster_master.py
Outdated
t = datetime.datetime.now() | ||
slaves_to_disconnect = [] | ||
for slave in self._all_slaves_by_url.values(): | ||
if slave.is_alive() and not slave.is_responsive(t, self._heartbeat_frequency): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes more sense to expose slave.last_heartbeat_time()
and do the comparison here. It feels a bit strange to ask a Slave
instance if it's responsive while also telling it the entire context for which it will be considered responsive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah. Interesting point. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One other thought: what is the behavior now if the master calls self._disconnect_slave(slave)
if the slave hasn't sent a heartbeat in a while, but then the slave sends a build result?
I think right now it will eventually hit raise DeadSlaveError
when it tries to start a new subjob on the same slave. That won't bring down the master since execute_next_subjob_or_free_executor
is being run in the thread pool, but it feels messy.
app/master/cluster_master.py
Outdated
|
||
for slave in slaves_to_disconnect: | ||
self._disconnect_slave(slave) | ||
self._logger.warning('Slave {} marked offline as it is not sending heartbeats.'.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be more than a warning. I think this should be logged as an error and/or increase a Prometheus error metric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think set_heartbeat
can happen when master receives any valid communication from slave. Currently it looks like its only set when slave hits heartbeat endpoint. So even when the slave sends subjob-result to master we can update the last_heartbeat_seen time.
app/master/cluster_master.py
Outdated
@@ -334,3 +357,9 @@ def get_path_for_build_results_archive(self, build_id: int, is_tar_request: bool | |||
raise ItemNotReadyError('Build artifact file is not yet ready. Try again later.') | |||
|
|||
return archive_file | |||
|
|||
def receive_heartbeat_from_slave(self, slave_id): | |||
self._thread_pool_executor.submit(self._async_receive_heartbeat_from_slave(int(slave_id))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Joey. Also it is spawning a thread for each heartbeat for each slave.
app/master/slave.py
Outdated
@@ -236,6 +238,14 @@ def _expected_session_header(self): | |||
|
|||
return headers | |||
|
|||
def set_heartbeat(self, time): | |||
self._heartbeat = time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will prefer now()
if you are setting it as a part of blocking (NOT async) operation. Its clean and readable.
app/master/cluster_master.py
Outdated
@@ -35,6 +38,10 @@ def __init__(self): | |||
self._build_request_handler.start() | |||
self._slave_allocator = SlaveAllocator(self._scheduler_pool) | |||
self._slave_allocator.start() | |||
|
|||
self._heartbeat_frequency = Configuration['heartbeat_frequency'] * Configuration['heartbeat_count_threshold'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between Configuration['heartbeat_frequency']
and Configuration['heartbeat_count_threshold']
?
app/master/cluster_master.py
Outdated
@@ -55,6 +62,22 @@ def __init__(self): | |||
|
|||
SlavesCollector.register_slaves_metrics_collector(lambda: self.all_slaves_by_id().values()) | |||
|
|||
def configure_heartbeat(self): | |||
self._heartbeat_job = self._scheduler.add_job(self.heartbeat, 'interval', seconds=self._heartbeat_frequency) | |||
return self._scheduler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do we need to return the scheduler if it's already an attribute of the class
- This function doesn't seem large enough or complicated enough to warrant it's own method. Just inline line 66 into the constructor. I see that configure_heartbeat is being called from
master_subcommand
, that seems unnecessary and confusing
app/master/cluster_master.py
Outdated
@@ -35,6 +38,10 @@ def __init__(self): | |||
self._build_request_handler.start() | |||
self._slave_allocator = SlaveAllocator(self._scheduler_pool) | |||
self._slave_allocator.start() | |||
|
|||
self._heartbeat_frequency = Configuration['heartbeat_frequency'] * Configuration['heartbeat_count_threshold'] | |||
self._scheduler = BackgroundScheduler() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with BackgroundScheduler
, but it seems a bit heavy to bring this in for such a simple task. I'd probably prefer to use
tornado.PeriodicCallback (new in version 4.1. You'd have to upgrade tornado in requirements.txt to use this. It's also worth noting that i've never used this before)
or do
ioloop = tornado.ioloop.IOLoop.current()
def f():
# do some stuff
yield tornado.gen.sleep(interval)
ioloop.add_callback(f)
ioloop.add_callback(f)
app/master/cluster_master.py
Outdated
def heartbeat(self): | ||
t = datetime.datetime.now() | ||
slaves_to_disconnect = [] | ||
for slave in self._all_slaves_by_url.values(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find a list comprehension clearer here
slaves_to_disconnect = [slave
for slave in self._all_slaves_by_url.values()
if slave.is_alive and not slave.is_responsive(t, self._heartbeat_frequency)
more declarative. Makes it clear at a glance that the only purpose of this loop is to populate the slaves_to_disconnect
list. I know some people that were previously on this team would disagree with me here (Greg, Will, TJ). Not sure what the current philosophy is
@josephharrington ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I didn't even know something like this existed. I'll wait to get Joey's thoughts too.
app/master/cluster_master.py
Outdated
self._heartbeat_job = self._scheduler.add_job(self.heartbeat, 'interval', seconds=self._heartbeat_frequency) | ||
return self._scheduler | ||
|
||
def heartbeat(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method is pretty terribly named. The name doesn't describe what the method does at all. Something like disconnect_unresponsive_slaves()
seems much more readable to me.
A heuristic I like to use is that a function's name should be a verb or a verb phrase, but this function's name is a noun. A function is a thing that takes action, thus its name should be a verb or a verb phrase (do X, make this thing happen, calculate this). This function isn't "doing heartbeat", i'm not even sure what that would mean, it's disconnecting slaves.
The big exception I can think of to my "functions should have verb phrase names" heuristic is that a function's name is allowed to be a noun phrase if its name describes the thing it's returning. For example a function is allowed to be named slaves()
or dead_slaves()
if it calculates and returns that thing
app/slave/cluster_slave.py
Outdated
|
||
def configure_heartbeat(self): | ||
self._heartbeat_job = self.scheduler.add_job(self.heartbeat, 'interval', seconds=self._heartbeat_frequency) | ||
return self.scheduler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about not needing to return the scheduler, and about inlining this into the constructor
app/master/cluster_master.py
Outdated
|
||
self._heartbeat_frequency = Configuration['heartbeat_frequency'] * Configuration['heartbeat_count_threshold'] | ||
self._scheduler = BackgroundScheduler() | ||
self._heartbeat_job = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I don't like explicitly assigning None to variables. Sometimes that can't be avoided, but it's rare.
In this example, inlining configure_heartbeat would do the trick
app/slave/cluster_slave.py
Outdated
def heartbeat(self): | ||
state_url = self._master_api.url('slave', self._slave_id, 'heartbeat') | ||
try: | ||
self._network.post_with_digest(state_url, request_params={'slave': {'heartbeat': True}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unfortunate. I'd really like to see this be
yield self._network.post_with_digest(...)
Unfortunately post_with_digest doesn't return a future. And rewriting it to return a future would be pretty involved. Definitely doesn't belong in this pr. Can you add a TODO to
- Rewrite network.post_with_digest to return a future/be a coroutine
- Run this code on tornado's event loop instead of on it's own thread
app/slave/cluster_slave.py
Outdated
self._heartbeat_count += 1 | ||
if self._heartbeat_count >= self._heartbeat_count_threshold: | ||
self._logger.warning('Master is not responding to heartbeats') | ||
self._heartbeat_job.remove() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the case then you should add a TODO here explaining the intention. Right now it looks incomplete (because it is)
app/slave/cluster_slave.py
Outdated
self._heartbeat_count += 1 | ||
if self._heartbeat_count >= self._heartbeat_count_threshold: | ||
self._logger.warning('Master is not responding to heartbeats') | ||
self._heartbeat_job.remove() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, how/why is the request to reconnect to the master different than the heartbeat request? Why can't you just keep sending requests to the master forever, regardless of if the master is responding (i.e. never call self._heartbeat_job.remove()).
When the master comes back online and the first successful heartbeat is received the master connects that slave.
You may have already explained this in your design doc. If you did, sorry for not noticing this until now.
Also, put type hints on all your functions pls. Most functions in CR don't have type hints, because we just started adding them recently (probably why you didn't realize you should be doing it), but every new function in CR should have type hints Examples
|
self.assertEqual(slave.set_heartbeat.call_count, 1, 'incoming heartbeat sets timestamp for correct slave') | ||
|
||
@genty_dataset ( | ||
slave_unresponsive=(True,False,), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trailing commas on these tuples are unnecessary
(True, False)
master = ClusterMaster() | ||
slave_url = "url" | ||
slave_id = master.connect_slave(slave_url, 1) | ||
slave = Mock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a spec to this
https://docs.python.org/3/library/unittest.mock.html#unittest.mock.Mock
slave = Mock(ClusterSlave)
alternatively
slave = Mock(spec=ClusterSlave)
The convention on PE is to omit the spec=, which is possible because spec is the first argument to Mock
slave_url = "url" | ||
slave_id = master.connect_slave(slave_url, 1) | ||
slave = Mock() | ||
master.get_slave = MagicMock(return_value=slave) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
master isn't a Mock, it's a real object. I'm not a fan of clobbering a function of a real object
slave_id = master.connect_slave(slave_url, 1) | ||
slave = Mock() | ||
master.get_slave = MagicMock(return_value=slave) | ||
master._async_receive_heartbeat_from_slave(slave_id['slave_id']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan of calling a private method in a test. It's an indication that you're testing an implementation detail, which in turn is an indication that this test might be brittle. Brittle tests are a huge pet peeve of mine, and something that this team has suffered heavily from in the past.
The easiest way to fix this test is to call master.receive_heartbeat_from_slave
directly, patch the slave constructor, and assert that when you call master.receive_heartbeat_from_slave
the return value of the slave constructor (i.e. the slave) gets set_heartbeat
called.
An improvement upon that test is instead of calling master.receive_heartbeat_from_slave
, send a POST to the heartbeat endpoint and assert that slave.set_heartbeat is called tornado's AsyncHTTPTestCase will be helpful here. Look at get_app()
and fetch
See the internal box code that I slack'd to you
slave = master.connect_slave(slave_url, 1) | ||
slave = master.get_slave(int(slave['slave_id'])) | ||
|
||
slave.is_alive = MagicMock(return_value=slave_alive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about clobbering a method from a real object.
Instead of clobbering slave.is_alive
, use patch to patch out the Slave()
constructor, that way you have a reference to a mock slave object, and set the attributes on that.
Instead of clobbering master._disconnect_slave and asserting that that's called, patch the Slave constructor and, assert slave.mark_dead()
is called
app/master/slave.py
Outdated
@@ -236,6 +238,12 @@ def _expected_session_header(self): | |||
|
|||
return headers | |||
|
|||
def set_last_heartbeat_time(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor (can ignore): Say update
instead of set
. IMO set
usually sets the value to whatever is passed in as an argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relatively minor comments that I don't care that much about. Lgtm
app/master/cluster_master.py
Outdated
def start_heartbeat_tracker_thread(self): | ||
self._logger.info('Heartbeat tracker will run every {} seconds'.format( | ||
self._unresponsive_slaves_cleanup_interval)) | ||
SafeThread(target=self._start_heartbeat_tracker, name='HeartbeatTrackerThread', daemon=True).start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I agree with this use of SafeThread
(or with most uses of SafeThread
). I know @josephharrington explicitly said to use it here, but I disagree. SafeThread
will kill the whole process if an error bubbles to the top of it. My understanding of the history of SafeThread is that it was introduced in a time before we had alerts in cluster runner. The only way of knowing if an error happened was to kill the whole process.
But now we have alerts (at least on the master). If an error bubbles up to here, I wouldn't want to kill the whole process, i'd just want to log an error, trigger an alert and keep chugging along. Especially considering that this thread isn't handling anything super duper business critical. It's not as if failures in this thread will make CR unable to run builds or anything like that. Crashing the master seems like an over reaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intended to say we should use it on the slave, not the master.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, I agree with Evan that this should definitely not be a SafeThread. The main exception to that that I can think of is if we ever extend this to be a generic periodic task thread using an event loop (for example to support generic periodic tasks like doing git gc on repos, purging old build data, etc.) Even if we do make it a SafeThread in that case, we'd still probably want a try/except around each individual task execution. (Individual tasks should be allowed to fail without killing the entire periodic execution mechanism.)
|
||
def start_heartbeat_thread(self): | ||
self._logger.info('Heartbeat will run every {} seconds'.format(self._heartbeat_interval)) | ||
SafeThread(target=self._start_heartbeat, name='HeartbeatThread', daemon=True).start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort've the same comment about SafeThread here, but i'm less confident that this is bad than the use of SafeThread in the master. We should probably talk about this.
- Crashing a slave is much less devastating than crashing the master
- We don't have alerts on the slave ... so maybe crashing the slave is the best we can do? Maybe we shouldn't even make alerts on the slave. Maybe the proper response for a slave erring is just crashing the slave and relying on the master/kubernetes to spin up a new one. (hehe, kubernetes. I can dream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd vote that this should be a SafeThread. Let's not build assumptions about short-lived/ephemeral slave nodes into core functionality of ClusterRunner since this is open source and should be usable for cases simpler than Box's. I agree with not adding alerts (directly) from the slave services. It seems more tractable to have a slave tell the master its current state (perhaps eventually in content of heartbeat calls) and have alerting/error logic on the master.
From the master's perspective it doesn't matter if a slave service goes down when it stops sending heartbeat requests since it's going to disconnect it anyway. This is more of a cleanup step to make sure we don't leave slave services running forever on a random host that we forgot about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say it's acceptable for this to be a SafeThread, but I wouldn't go so far as to say it should be a SafeThread. Given that we're not assuming ephemeral slaves, the ideal for "maximum robustness" is that the master and slave should be able to tolerate arbitrarily long periods of time without being able to communicate with one another. When communication becomes possible again, they should seamlessly reconnect. What happens if there's a network partition between the master and slaves that lasts an hour? What happens if the master's vm dies and it takes us 30 minutes to bring it back up? I think the standard for perfection in those cases is that the slave seamlessly reconnects itself back to the master once the issue is resolved, not that the slave kills itself.
I say it's acceptable for the slave to kill itself is because in practice if the slave can't communicate with the master for more than ~5 minutes then something has probably gone seriously wrong and we probably will need an engineer to intervene anyway. But I do think intentionally killing the slave is intentionally giving up on perfection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the master and slave should be able to tolerate arbitrarily long periods of time without being able to communicate with one another
...
I think the standard for perfection in those cases is that the slave seamlessly reconnects itself back to the master once the issue is resolved
@ethomas2 Let's discuss this offline. It sounds like we have very different thoughts on what the behavior should be in the case of network failure.
# TODO: Right now the slave simply dies when it does not hear back from master. The next step would | ||
# be to try to reconnect to master at this point. In future the heartbeat and connect_to_master | ||
# methods can combined into one. This combined method will behave differently based on current state. | ||
self.kill() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why even kill it? Why not just let it stay up forever? Indefinitely spamming the master with requests.
I suppose this is gonna change really soon anyway, so nbd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just talking about the comment @mayurva left
The next step would be to try to reconnect to master at this point. In future
the heartbeat and connect_to_master methods can combined into one. This
combined method will behave differently based on current state.
I think the plan is for this to happen soon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking pretty good! Mostly minor comments with one or two major ones in there. Sorry if there's overlap with previous discussion -- I may have missed some of the outdated comments so if you guys outvote me on any of these issues feel free to proceed.
Just a note, it may be worth adding a functional test or two for the heartbeat stuff, both to verify the behavior of the slave when it cannot contact the master and to verify the behavior of the master.
@@ -41,6 +42,10 @@ def async_run(self, port, log_level, eventlog_file): | |||
log_startup = functools.partial(self._logger.info, 'Master service is running on {}:{}.'.format(hostname, port)) | |||
ioloop.add_callback(log_startup) | |||
|
|||
# start heartbeat tracker once ioloop starts | |||
start_master_heartbeat_tracker = functools.partial(cluster_master.start_heartbeat_tracker_thread) | |||
ioloop.add_callback(start_master_heartbeat_tracker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of starting this in the master class constructor? The "heartbeat tracker" seems less related to if the ioloop is running (web layer) and more related to if a slave is connected to the master (service layer). Another way to ask that question is: Would we ever want to create a MasterService without this loop running?
|
||
def start_heartbeat_thread(self): | ||
self._logger.info('Heartbeat will run every {} seconds'.format(self._heartbeat_interval)) | ||
SafeThread(target=self._start_heartbeat, name='HeartbeatThread', daemon=True).start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd vote that this should be a SafeThread. Let's not build assumptions about short-lived/ephemeral slave nodes into core functionality of ClusterRunner since this is open source and should be usable for cases simpler than Box's. I agree with not adding alerts (directly) from the slave services. It seems more tractable to have a slave tell the master its current state (perhaps eventually in content of heartbeat calls) and have alerting/error logic on the master.
From the master's perspective it doesn't matter if a slave service goes down when it stops sending heartbeat requests since it's going to disconnect it anyway. This is more of a cleanup step to make sure we don't leave slave services running forever on a random host that we forgot about.
self._logger.error('Master is not responding to heartbeats') | ||
|
||
# TODO: Right now the slave simply dies when it does not hear back from master. The next step would | ||
# be to try to reconnect to master at this point. In future the heartbeat and connect_to_master |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand this comment. Why would we try to reconnect to the master vs. just increasing the failure threshold (on master or slave or both)?
If the idea behind this is related to having a timeout for individual builds, I'd rather us build that directly instead of doing gymnastics around slave service connectivity.
# TODO: Right now the slave simply dies when it does not hear back from master. The next step would | ||
# be to try to reconnect to master at this point. In future the heartbeat and connect_to_master | ||
# methods can combined into one. This combined method will behave differently based on current state. | ||
self.kill() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -54,9 +54,11 @@ def __init__(self, cluster_master): | |||
RouteNode(r'queue', _QueueHandler), | |||
RouteNode(r'slave', _SlavesHandler, 'slaves').add_children([ | |||
RouteNode(r'(\d+)', _SlaveHandler, 'slave').add_children([ | |||
RouteNode(r'shutdown', _SlaveShutdownHandler, 'shutdown') | |||
RouteNode(r'shutdown', _SlaveShutdownHandler, 'shutdown'), | |||
RouteNode(r'heartbeat', _SlavesHeartbeatHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd argue that heartbeat is a state change -- the state being the last time the slave was able to make a successful API call to the master. Not a huge deal either way though. Probably not worth changing at this point.
|
||
slave.is_alive = MagicMock(return_value=slave_alive) | ||
slave.get_last_heartbeat_time = MagicMock(return_value=last_heartbeat_time) | ||
slave.mark_dead = Mock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need a real slave instance here? It's a bit surprising to see directly replacing methods on a real object with mocks via assignment. It can lead to surprising results and stale tests (for example if someone renamed the mark_dead method but did not rename it here, your assertion about it never being called would pass trivially.) I believe the patch
functionality does checks to guard against this.
More specifically, you should be able to patch out the Slave
constructor so that you can inject a full mock Slave object that also asserts method calls match the real object api.
slave.get_last_heartbeat_time = MagicMock(return_value=last_heartbeat_time) | ||
slave.mark_dead = Mock() | ||
|
||
master._disconnect_non_heartbeating_slaves() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there was a comment about this previously, but ideally we don't test via private methods. There are exceptions, but most of the time it's a code smell.
slave_url = "url" | ||
slave = master.connect_slave(slave_url, 1) | ||
slave = master.get_slave(int(slave['slave_id'])) | ||
last_heartbeat_time = datetime.now() - timedelta(seconds=seconds_since_last_heartbeat) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use real datetime.now in tests. Mock it out so that we have deterministic test inputs.
|
||
slave = self._create_cluster_slave() | ||
slave.connect_to_master(self._FAKE_MASTER_URL) | ||
slave.kill = Mock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above. Also, I'd rather us not mock out methods on the class under test. Assert on the behavior of the method, not the method itself being called.
if not is_master_responsive: | ||
self.mock_network.post_with_digest.side_effect = requests.ConnectionError | ||
|
||
slave._run_heartbeat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above about testing private methods. If you can't find a way around that, maybe at least have a supporting test that verifies start_heartbeat_thread
sets up slave._run_heartbeat
to run periodically. (Currently it looks like you have no tests that hit the public methods you added to the service classes.)
I didn't try writing this so it may be harder than I think, but my first instinct would be to
- patch out SafeThread so that it executes its target synchronously
- set configuration so that the Nth post call will fail
- set configuration (or stub calls to datetime.now) so that there is no delay between scheduler calls
- assert that calling
start_heartbeat_thread
ends up sending N-1 post calls to the master
Anyway, just a suggestion. This is more of a test robustness issue than an actual issue with the feature code.
765b4f1
to
3f5052b
Compare
- The last heartbeat time is updated in the slave object - Heartbeat time is also updated when a slave posts to /slave/id endpoint. - The heartbeat time is updated synchronously
Allow the heartbeat configuration to be separate so that they can be tweaked independent of each other.
d1a4e76
to
fc2b250
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Only minor comments.
import os | ||
import sched |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice find. I've haven't seen this lib before.
@@ -53,8 +58,36 @@ def __init__(self): | |||
fs.async_delete(self._master_results_path) | |||
fs.create_dir(self._master_results_path) | |||
|
|||
# Configure heartbeat tracking | |||
self._unresponsive_slaves_cleanup_interval = Configuration['unresponsive_slaves_cleanup_interval'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is a shorter, but equal descriptive name we could use for this.
@@ -386,6 +388,7 @@ def put(self, slave_id): | |||
new_slave_state = self.decoded_body.get('slave', {}).get('state') | |||
slave = self._cluster_master.get_slave(int(slave_id)) | |||
self._cluster_master.handle_slave_state_update(slave, new_slave_state) | |||
self._cluster_master.update_slave_last_heartbeat_time(slave) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why the master directly eposes the slave through the API. Maybe self._cluster_master.update_slave_last_heartbeat_time(slave_id)
would be easer (and would simplify the POST handler)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Seems like a lot of code depends upon get_slave
functionality like the line 390, so I'm not sure if it is worth breaking that consistency.
app/master/cluster_master.py
Outdated
self._disconnect_non_heartbeating_slaves) | ||
|
||
def _is_slave_responsive(self, slave: ClusterSlave) -> bool: | ||
return (datetime.now() - slave.get_last_heartbeat_time()).seconds < self._unresponsive_slaves_cleanup_interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the left side logic into a variable here. The line is already pretty long.
@@ -26,6 +27,7 @@ def __init__(self, slave_url, num_executors, slave_session_id=None): | |||
self._num_executors_in_use = Counter() | |||
self._network = Network(min_connection_poolsize=num_executors) | |||
self.current_build_id = None | |||
self._last_heartbeat_time = datetime.now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to just call update_last_heartbeat_time()
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umm yeah. I could do that. I think the way it is currently written makes it clear that the variable is being initialized. Calling the function might obscure it a bit
|
||
mock_slave = self.patch('app.master.cluster_master.Slave').return_value | ||
# self.patch('app.master.cluster_master.Slave', new=lambda *args: mock_slave) | ||
# master.connect_slave('slave_url', 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove if you no longer need this.
@@ -213,6 +219,40 @@ def test_updating_slave_to_nonexistent_state_should_raise_bad_request_error(self | |||
with self.assertRaises(BadRequestError): | |||
master.handle_slave_state_update(slave, 'NONEXISTENT_STATE') | |||
|
|||
def test_update_slave_last_heartbeat_time_calls_update_last_heartbeat_time_on_slave(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type of unit test is not extremely useful and makes the code more brittle to changes.
last_heartbeat_time = self._mock_current_datetime - timedelta(seconds=seconds_since_last_heartbeat) | ||
master = ClusterMaster() | ||
|
||
mock_slave = Mock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try spec'ing the object like Mock(spec_set=ClusterSlave)
if you can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Me and Evan ran into a lot of issues in mocking this particular class. If I do specify spec
I get the error below. It is because of the way we use public member id. I proper fix may be to change id
to a property, but not sure if that should be a part of this PR:
AttributeError: Mock object has no attribute 'id'
else: | ||
if heartbeat_failure_threshold == 1: | ||
self.assertEqual(self._mock_sys.exit.call_count, 1, | ||
'slave dies when it decides that master is dead') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you fix the indent here.
@@ -237,6 +245,33 @@ def test_execute_subjob_passes_base_executor_index_to_executor(self): | |||
|
|||
executor.execute_subjob.assert_called_with(1, 2, [], 12) | |||
|
|||
@genty_dataset( | |||
responsive_master=(True,1,), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You only need the trailing comma if the tuple only has one value. For example:
responsive_master=(True,) # required trailing comma
responsive_master=(True,1) # two args, only 1 comma needed
Instead of slave checking itself being responsive it now returns last heartbeat time and master performs the responsiveness check
Instead of using apscheduler, we will be using sched in combination with threading
- Verify that the master marks the slaves as dead if the slave process dies - Verify that the slave dies if the master process dies
No description provided.