Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heartbeat functionality between master and slaves #422

Merged
merged 10 commits into from
Apr 17, 2018
Merged

Conversation

mayurva
Copy link
Contributor

@mayurva mayurva commented Apr 5, 2018

No description provided.

mayurva added 3 commits April 4, 2018 17:25
This change adds a heartbeat endpoint on the master where slaves can post
heartbeats. The slaves run a thread periodically which sends authenticated http
post requests to master it is alive
There are 2 parts of this change
1. First, the master updates slave objects to keep track of when a heartbeat
request was received from corresponding slave
2. A scheduled heartbeat thread periodically goes through the slave list and
identifies any slaves that have not been sending heartbeats. Those slaves are
marked offline
This change adds two new configuration values in the general section of the
config file as described below
- heartbeat_frequency: interval between two heartbeats sent from a slave
- heartbeat_count_threshold: number of heartbeat failures after which a slave
  determines the master is unreachable

In addtion, master runs its own threshold check after every 'heartbeat_frequency
* heartbeat_count_threshold' seconds
@boxcla
Copy link

boxcla commented Apr 5, 2018

Verified that @mayurva has signed the CLA. Thanks for the pull request!

Copy link
Contributor

@josephharrington josephharrington left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just flooded you with comments, mostly minor, but overall I'm pretty excited for this feature! 😻 Let me know if you have any questions about what I meant.

requirements.txt Outdated
@@ -3,6 +3,7 @@
# Downgrade the setuptools version to workaround the issue replacing the setuptools dependency
# IOError METADATA no such file. Check https://github.com/pypa/setuptools/issues/951 for more details.
setuptools==33.1.1
apscheduler==3.5.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts on the tradeoffs around using this library? I looked at it briefly and it looked like it might be overkill for what we need for this feature. It seems more oriented around generic multithreaded task scheduling than a periodic function that runs on a single thread.

Did you look at using the sched module in the standard library? It seems like we could run a sched.scheduler on a thread and get the necessary behavior without adding a new requirement. Another thought I had was that we could run the slave service's heartbeat loop in a SafeThread so that if an error occurs the slave is taken down. I'm not sure what the apscheduler lib's behavior is when a scheduled task raises an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. Me and @cmcginty briefly discussed about apscheduler being a overkill, but it seemed pretty straight forward to set up and use. I was hoping to use their native TornadoScheduler, but the current tornado version we use doesn't support it. I'll check out the sched module. Also good point on using safe thread. I'll verify the behavior on uncaught exceptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure I'd be crazy about using a TornadoScheduler either. It seems weird to do cron-type work on the Tornado thread where it could potentially block handling web requests. There's currently a very intentional and well-defined separation between the service classes and the API/web layer so I'd recommend against tying those together. Ideally nothing from ClusterMaster/ClusterSlave on down should know about Tornado.

Copy link
Contributor

@josephharrington josephharrington Apr 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for apscheduler being easy to set up and use -- I totally agree. The code itself that you had around it is pretty clean. I'm less concerned about that than just the size and functionality of the library that we won't (and might never) leverage.

I don't think using sched would be much more complex. Something like:

def __init__(self):
    self._hb_scheduler = sched.scheduler()
    SafeThread(target=self._start_beatin, name='HeartbeatLoop', daemon=True).start()  # this could also go in a public start() method similar to BuildRequestHandler

def _start_beatin(self):
    self._hb_scheduler.enter(0, 0, self._do_heartbeat)
    self._hb_scheduler.run()

def _do_heartbeat():
    # network request calls, error handling, metrics recording here
    self._hb_scheduler.enter(self._hb_freq, 0, self._do_heartbeat)  # schedule next beat. (if you wanted to be super precise you could subtract from the frequency the amount of time this method took to execute, but that shouldn't matter much for the intervals we're talking about.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhh.. I hate how GitHub handles comments, but I reimplemented the heartbeat using sched. I still not fully satisfied, but I think it's better than before

@@ -26,6 +27,7 @@ def __init__(self, slave_url, num_executors, slave_session_id=None):
self._num_executors_in_use = Counter()
self._network = Network(min_connection_poolsize=num_executors)
self.current_build_id = None
self._heartbeat = datetime.datetime.now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be clearer to rename to _last_heartbeat_dt or something like that.

@@ -236,6 +238,14 @@ def _expected_session_header(self):

return headers

def set_heartbeat(self, time):
self._heartbeat = time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use current_time or current_dt instead of time (more specific, and there is a stdlib module named time)? Alternatively just use now(). Also, please add docstrings with type hints (or variable annotations).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will prefer now() if you are setting it as a part of blocking (NOT async) operation. Its clean and readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason which I can't recall anymore, I thought passing in a time stamp was a good idea. But now that I think about it, it's probably better to just set the value to now()

self._heartbeat_count = 0
self._heartbeat_count_threshold = Configuration['heartbeat_count_threshold']
self._heartbeat_frequency = Configuration['heartbeat_frequency']
self.scheduler = BackgroundScheduler()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.scheduler and self.heartbeat() should be private?

@@ -56,6 +58,29 @@ def __init__(self, port, host, num_executors=10):
self._build_teardown_coin = None
self._base_executor_index = None

self._heartbeat_count = 0
self._heartbeat_count_threshold = Configuration['heartbeat_count_threshold']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about heartbeat_failure_threshold?

@authenticated
def post(self, slave_id):
self._cluster_master.receive_heartbeat_from_slave(slave_id)
self.write({'message': 'heartbeat API POST'})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think empty response is fine here. Did you have an intent for the message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really. I think I just used it for testing and forgot to clean up.

self._heartbeat_count += 1
if self._heartbeat_count >= self._heartbeat_count_threshold:
self._logger.warning('Master is not responding to heartbeats')
self._heartbeat_job.remove()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to do more here? It seems like this is a good point to shut down the slave service. This is related to my other comment about running in a SafeThread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The idea is to have the slave take an action like trying to reconnect to master. I just felt that should be a separate pull request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thoughts, since slave is just going to sit idle after this point, it probably makes sense to just kill it. We can change the behavior in future.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case then you should add a TODO here explaining the intention. Right now it looks incomplete (because it is)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how/why is the request to reconnect to the master different than the heartbeat request? Why can't you just keep sending requests to the master forever, regardless of if the master is responding (i.e. never call self._heartbeat_job.remove()).

When the master comes back online and the first successful heartbeat is received the master connects that slave.

You may have already explained this in your design doc. If you did, sorry for not noticing this until now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting point. The TODO is to call connect_to_master for reconnection instead of just simple post to heartbeat when master is dead. The idea is that master's identity might have changed as it probably restarted or something. I guess in future heartbeat/connect_to_master can be combined and behave differently based on current state of connection, but we are probably not there yet. In any case, I'll clarify this in a TODO comment.

self._heartbeat_job = self._scheduler.add_job(self.heartbeat, 'interval', seconds=self._heartbeat_frequency)
return self._scheduler

def heartbeat(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the word "heartbeat" refers to two different things. On the slave it's an API call to the master. On the master it's a periodic cleanup of the slave list. Should these have different names?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method is pretty terribly named. The name doesn't describe what the method does at all. Something like disconnect_unresponsive_slaves() seems much more readable to me.

A heuristic I like to use is that a function's name should be a verb or a verb phrase, but this function's name is a noun. A function is a thing that takes action, thus its name should be a verb or a verb phrase (do X, make this thing happen, calculate this). This function isn't "doing heartbeat", i'm not even sure what that would mean, it's disconnecting slaves.

The big exception I can think of to my "functions should have verb phrase names" heuristic is that a function's name is allowed to be a noun phrase if its name describes the thing it's returning. For example a function is allowed to be named slaves() or dead_slaves() if it calculates and returns that thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. I'm working on refactoring this section. Hopefully it will look a lot better then.

@@ -334,3 +357,9 @@ def get_path_for_build_results_archive(self, build_id: int, is_tar_request: bool
raise ItemNotReadyError('Build artifact file is not yet ready. Try again later.')

return archive_file

def receive_heartbeat_from_slave(self, slave_id):
self._thread_pool_executor.submit(self._async_receive_heartbeat_from_slave(int(slave_id)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be async, does it? It's only setting a single value. It should only be necessary to spawn a thread if the work triggered by an API call is going to take a significant amount of time.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Joey. Also it is spawning a thread for each heartbeat for each slave.

t = datetime.datetime.now()
slaves_to_disconnect = []
for slave in self._all_slaves_by_url.values():
if slave.is_alive() and not slave.is_responsive(t, self._heartbeat_frequency):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes more sense to expose slave.last_heartbeat_time() and do the comparison here. It feels a bit strange to ask a Slave instance if it's responsive while also telling it the entire context for which it will be considered responsive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. Interesting point. Thanks!

Copy link
Contributor

@josephharrington josephharrington left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other thought: what is the behavior now if the master calls self._disconnect_slave(slave) if the slave hasn't sent a heartbeat in a while, but then the slave sends a build result?

I think right now it will eventually hit raise DeadSlaveError when it tries to start a new subjob on the same slave. That won't bring down the master since execute_next_subjob_or_free_executor is being run in the thread pool, but it feels messy.


for slave in slaves_to_disconnect:
self._disconnect_slave(slave)
self._logger.warning('Slave {} marked offline as it is not sending heartbeats.'.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be more than a warning. I think this should be logged as an error and/or increase a Prometheus error metric.

Copy link

@shriganeshs-zz shriganeshs-zz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think set_heartbeat can happen when master receives any valid communication from slave. Currently it looks like its only set when slave hits heartbeat endpoint. So even when the slave sends subjob-result to master we can update the last_heartbeat_seen time.

@@ -334,3 +357,9 @@ def get_path_for_build_results_archive(self, build_id: int, is_tar_request: bool
raise ItemNotReadyError('Build artifact file is not yet ready. Try again later.')

return archive_file

def receive_heartbeat_from_slave(self, slave_id):
self._thread_pool_executor.submit(self._async_receive_heartbeat_from_slave(int(slave_id)))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Joey. Also it is spawning a thread for each heartbeat for each slave.

@@ -236,6 +238,14 @@ def _expected_session_header(self):

return headers

def set_heartbeat(self, time):
self._heartbeat = time

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will prefer now() if you are setting it as a part of blocking (NOT async) operation. Its clean and readable.

@@ -35,6 +38,10 @@ def __init__(self):
self._build_request_handler.start()
self._slave_allocator = SlaveAllocator(self._scheduler_pool)
self._slave_allocator.start()

self._heartbeat_frequency = Configuration['heartbeat_frequency'] * Configuration['heartbeat_count_threshold']
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between Configuration['heartbeat_frequency'] and Configuration['heartbeat_count_threshold']?

@@ -55,6 +62,22 @@ def __init__(self):

SlavesCollector.register_slaves_metrics_collector(lambda: self.all_slaves_by_id().values())

def configure_heartbeat(self):
self._heartbeat_job = self._scheduler.add_job(self.heartbeat, 'interval', seconds=self._heartbeat_frequency)
return self._scheduler
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need to return the scheduler if it's already an attribute of the class
  2. This function doesn't seem large enough or complicated enough to warrant it's own method. Just inline line 66 into the constructor. I see that configure_heartbeat is being called from master_subcommand, that seems unnecessary and confusing

@@ -35,6 +38,10 @@ def __init__(self):
self._build_request_handler.start()
self._slave_allocator = SlaveAllocator(self._scheduler_pool)
self._slave_allocator.start()

self._heartbeat_frequency = Configuration['heartbeat_frequency'] * Configuration['heartbeat_count_threshold']
self._scheduler = BackgroundScheduler()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with BackgroundScheduler, but it seems a bit heavy to bring this in for such a simple task. I'd probably prefer to use

tornado.PeriodicCallback (new in version 4.1. You'd have to upgrade tornado in requirements.txt to use this. It's also worth noting that i've never used this before)

or do

ioloop = tornado.ioloop.IOLoop.current()
def f():
  # do some stuff
  yield tornado.gen.sleep(interval)
  ioloop.add_callback(f)

ioloop.add_callback(f)

def heartbeat(self):
t = datetime.datetime.now()
slaves_to_disconnect = []
for slave in self._all_slaves_by_url.values():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find a list comprehension clearer here

slaves_to_disconnect = [slave
   for slave in self._all_slaves_by_url.values()
   if slave.is_alive and not slave.is_responsive(t, self._heartbeat_frequency)

more declarative. Makes it clear at a glance that the only purpose of this loop is to populate the slaves_to_disconnect list. I know some people that were previously on this team would disagree with me here (Greg, Will, TJ). Not sure what the current philosophy is
@josephharrington ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't even know something like this existed. I'll wait to get Joey's thoughts too.

self._heartbeat_job = self._scheduler.add_job(self.heartbeat, 'interval', seconds=self._heartbeat_frequency)
return self._scheduler

def heartbeat(self):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method is pretty terribly named. The name doesn't describe what the method does at all. Something like disconnect_unresponsive_slaves() seems much more readable to me.

A heuristic I like to use is that a function's name should be a verb or a verb phrase, but this function's name is a noun. A function is a thing that takes action, thus its name should be a verb or a verb phrase (do X, make this thing happen, calculate this). This function isn't "doing heartbeat", i'm not even sure what that would mean, it's disconnecting slaves.

The big exception I can think of to my "functions should have verb phrase names" heuristic is that a function's name is allowed to be a noun phrase if its name describes the thing it's returning. For example a function is allowed to be named slaves() or dead_slaves() if it calculates and returns that thing


def configure_heartbeat(self):
self._heartbeat_job = self.scheduler.add_job(self.heartbeat, 'interval', seconds=self._heartbeat_frequency)
return self.scheduler
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about not needing to return the scheduler, and about inlining this into the constructor


self._heartbeat_frequency = Configuration['heartbeat_frequency'] * Configuration['heartbeat_count_threshold']
self._scheduler = BackgroundScheduler()
self._heartbeat_job = None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I don't like explicitly assigning None to variables. Sometimes that can't be avoided, but it's rare.

In this example, inlining configure_heartbeat would do the trick

def heartbeat(self):
state_url = self._master_api.url('slave', self._slave_id, 'heartbeat')
try:
self._network.post_with_digest(state_url, request_params={'slave': {'heartbeat': True}},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunate. I'd really like to see this be
yield self._network.post_with_digest(...)

Unfortunately post_with_digest doesn't return a future. And rewriting it to return a future would be pretty involved. Definitely doesn't belong in this pr. Can you add a TODO to

  1. Rewrite network.post_with_digest to return a future/be a coroutine
  2. Run this code on tornado's event loop instead of on it's own thread

self._heartbeat_count += 1
if self._heartbeat_count >= self._heartbeat_count_threshold:
self._logger.warning('Master is not responding to heartbeats')
self._heartbeat_job.remove()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case then you should add a TODO here explaining the intention. Right now it looks incomplete (because it is)

self._heartbeat_count += 1
if self._heartbeat_count >= self._heartbeat_count_threshold:
self._logger.warning('Master is not responding to heartbeats')
self._heartbeat_job.remove()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how/why is the request to reconnect to the master different than the heartbeat request? Why can't you just keep sending requests to the master forever, regardless of if the master is responding (i.e. never call self._heartbeat_job.remove()).

When the master comes back online and the first successful heartbeat is received the master connects that slave.

You may have already explained this in your design doc. If you did, sorry for not noticing this until now.

@ethomas2
Copy link

ethomas2 commented Apr 6, 2018

Also, put type hints on all your functions pls. Most functions in CR don't have type hints, because we just started adding them recently (probably why you didn't realize you should be doing it), but every new function in CR should have type hints

Examples

mbp-IT002597:ClusterRunner ethomas$ ack "def.*\(.*:.*\)"
app/client/cluster_api_client.py
210:    def connect_slave(self, slave_url: str, num_executors: int=10) -> int:
227:    def get_slave_status(self, slave_id: int) -> dict:
237:    def block_until_slave_offline(self, slave_id: int, timeout: int=None) -> bool:

app/common/console_output.py
14:    def from_plaintext(cls, path: str) -> 'ConsoleOutput':
22:    def from_zipfile(cls, zip_path: str, path_in_archive: str) -> 'ConsoleOutput':
34:    def __init__(self, file: BinaryIO):
41:    def segment(self, max_lines: int=50, offset_line: Optional[int]=None) -> ConsoleOut

self.assertEqual(slave.set_heartbeat.call_count, 1, 'incoming heartbeat sets timestamp for correct slave')

@genty_dataset (
slave_unresponsive=(True,False,),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trailing commas on these tuples are unnecessary
(True, False)

master = ClusterMaster()
slave_url = "url"
slave_id = master.connect_slave(slave_url, 1)
slave = Mock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a spec to this
https://docs.python.org/3/library/unittest.mock.html#unittest.mock.Mock

slave = Mock(ClusterSlave)

alternatively
slave = Mock(spec=ClusterSlave)

The convention on PE is to omit the spec=, which is possible because spec is the first argument to Mock

slave_url = "url"
slave_id = master.connect_slave(slave_url, 1)
slave = Mock()
master.get_slave = MagicMock(return_value=slave)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

master isn't a Mock, it's a real object. I'm not a fan of clobbering a function of a real object

slave_id = master.connect_slave(slave_url, 1)
slave = Mock()
master.get_slave = MagicMock(return_value=slave)
master._async_receive_heartbeat_from_slave(slave_id['slave_id'])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of calling a private method in a test. It's an indication that you're testing an implementation detail, which in turn is an indication that this test might be brittle. Brittle tests are a huge pet peeve of mine, and something that this team has suffered heavily from in the past.

The easiest way to fix this test is to call master.receive_heartbeat_from_slave directly, patch the slave constructor, and assert that when you call master.receive_heartbeat_from_slave the return value of the slave constructor (i.e. the slave) gets set_heartbeat called.

An improvement upon that test is instead of calling master.receive_heartbeat_from_slave, send a POST to the heartbeat endpoint and assert that slave.set_heartbeat is called tornado's AsyncHTTPTestCase will be helpful here. Look at get_app() and fetch

See the internal box code that I slack'd to you

slave = master.connect_slave(slave_url, 1)
slave = master.get_slave(int(slave['slave_id']))

slave.is_alive = MagicMock(return_value=slave_alive)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about clobbering a method from a real object.

Instead of clobbering slave.is_alive, use patch to patch out the Slave() constructor, that way you have a reference to a mock slave object, and set the attributes on that.

Instead of clobbering master._disconnect_slave and asserting that that's called, patch the Slave constructor and, assert slave.mark_dead() is called

@@ -236,6 +238,12 @@ def _expected_session_header(self):

return headers

def set_last_heartbeat_time(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor (can ignore): Say update instead of set. IMO set usually sets the value to whatever is passed in as an argument.

Copy link

@ethomas2 ethomas2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relatively minor comments that I don't care that much about. Lgtm

def start_heartbeat_tracker_thread(self):
self._logger.info('Heartbeat tracker will run every {} seconds'.format(
self._unresponsive_slaves_cleanup_interval))
SafeThread(target=self._start_heartbeat_tracker, name='HeartbeatTrackerThread', daemon=True).start()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree with this use of SafeThread (or with most uses of SafeThread). I know @josephharrington explicitly said to use it here, but I disagree. SafeThread will kill the whole process if an error bubbles to the top of it. My understanding of the history of SafeThread is that it was introduced in a time before we had alerts in cluster runner. The only way of knowing if an error happened was to kill the whole process.

But now we have alerts (at least on the master). If an error bubbles up to here, I wouldn't want to kill the whole process, i'd just want to log an error, trigger an alert and keep chugging along. Especially considering that this thread isn't handling anything super duper business critical. It's not as if failures in this thread will make CR unable to run builds or anything like that. Crashing the master seems like an over reaction.

Copy link
Contributor

@josephharrington josephharrington Apr 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended to say we should use it on the slave, not the master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I agree with Evan that this should definitely not be a SafeThread. The main exception to that that I can think of is if we ever extend this to be a generic periodic task thread using an event loop (for example to support generic periodic tasks like doing git gc on repos, purging old build data, etc.) Even if we do make it a SafeThread in that case, we'd still probably want a try/except around each individual task execution. (Individual tasks should be allowed to fail without killing the entire periodic execution mechanism.)


def start_heartbeat_thread(self):
self._logger.info('Heartbeat will run every {} seconds'.format(self._heartbeat_interval))
SafeThread(target=self._start_heartbeat, name='HeartbeatThread', daemon=True).start()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort've the same comment about SafeThread here, but i'm less confident that this is bad than the use of SafeThread in the master. We should probably talk about this.

  1. Crashing a slave is much less devastating than crashing the master
  2. We don't have alerts on the slave ... so maybe crashing the slave is the best we can do? Maybe we shouldn't even make alerts on the slave. Maybe the proper response for a slave erring is just crashing the slave and relying on the master/kubernetes to spin up a new one. (hehe, kubernetes. I can dream)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd vote that this should be a SafeThread. Let's not build assumptions about short-lived/ephemeral slave nodes into core functionality of ClusterRunner since this is open source and should be usable for cases simpler than Box's. I agree with not adding alerts (directly) from the slave services. It seems more tractable to have a slave tell the master its current state (perhaps eventually in content of heartbeat calls) and have alerting/error logic on the master.

From the master's perspective it doesn't matter if a slave service goes down when it stops sending heartbeat requests since it's going to disconnect it anyway. This is more of a cleanup step to make sure we don't leave slave services running forever on a random host that we forgot about.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say it's acceptable for this to be a SafeThread, but I wouldn't go so far as to say it should be a SafeThread. Given that we're not assuming ephemeral slaves, the ideal for "maximum robustness" is that the master and slave should be able to tolerate arbitrarily long periods of time without being able to communicate with one another. When communication becomes possible again, they should seamlessly reconnect. What happens if there's a network partition between the master and slaves that lasts an hour? What happens if the master's vm dies and it takes us 30 minutes to bring it back up? I think the standard for perfection in those cases is that the slave seamlessly reconnects itself back to the master once the issue is resolved, not that the slave kills itself.

I say it's acceptable for the slave to kill itself is because in practice if the slave can't communicate with the master for more than ~5 minutes then something has probably gone seriously wrong and we probably will need an engineer to intervene anyway. But I do think intentionally killing the slave is intentionally giving up on perfection.

Copy link
Contributor

@josephharrington josephharrington Apr 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the master and slave should be able to tolerate arbitrarily long periods of time without being able to communicate with one another
...
I think the standard for perfection in those cases is that the slave seamlessly reconnects itself back to the master once the issue is resolved

@ethomas2 Let's discuss this offline. It sounds like we have very different thoughts on what the behavior should be in the case of network failure.

# TODO: Right now the slave simply dies when it does not hear back from master. The next step would
# be to try to reconnect to master at this point. In future the heartbeat and connect_to_master
# methods can combined into one. This combined method will behave differently based on current state.
self.kill()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why even kill it? Why not just let it stay up forever? Indefinitely spamming the master with requests.

I suppose this is gonna change really soon anyway, so nbd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ethomas2 Can you elaborate on how it's going to change?

@mayurva This is fine, but if you keep this a SafeThread a valid alternative would be to just call raise here.

Copy link

@ethomas2 ethomas2 Apr 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just talking about the comment @mayurva left

The next step would be to try to reconnect to master at this point. In future
the heartbeat and connect_to_master methods can combined into one. This
combined method will behave differently based on current state.

I think the plan is for this to happen soon

Copy link
Contributor

@josephharrington josephharrington left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty good! Mostly minor comments with one or two major ones in there. Sorry if there's overlap with previous discussion -- I may have missed some of the outdated comments so if you guys outvote me on any of these issues feel free to proceed.

Just a note, it may be worth adding a functional test or two for the heartbeat stuff, both to verify the behavior of the slave when it cannot contact the master and to verify the behavior of the master.

@@ -41,6 +42,10 @@ def async_run(self, port, log_level, eventlog_file):
log_startup = functools.partial(self._logger.info, 'Master service is running on {}:{}.'.format(hostname, port))
ioloop.add_callback(log_startup)

# start heartbeat tracker once ioloop starts
start_master_heartbeat_tracker = functools.partial(cluster_master.start_heartbeat_tracker_thread)
ioloop.add_callback(start_master_heartbeat_tracker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of starting this in the master class constructor? The "heartbeat tracker" seems less related to if the ioloop is running (web layer) and more related to if a slave is connected to the master (service layer). Another way to ask that question is: Would we ever want to create a MasterService without this loop running?


def start_heartbeat_thread(self):
self._logger.info('Heartbeat will run every {} seconds'.format(self._heartbeat_interval))
SafeThread(target=self._start_heartbeat, name='HeartbeatThread', daemon=True).start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd vote that this should be a SafeThread. Let's not build assumptions about short-lived/ephemeral slave nodes into core functionality of ClusterRunner since this is open source and should be usable for cases simpler than Box's. I agree with not adding alerts (directly) from the slave services. It seems more tractable to have a slave tell the master its current state (perhaps eventually in content of heartbeat calls) and have alerting/error logic on the master.

From the master's perspective it doesn't matter if a slave service goes down when it stops sending heartbeat requests since it's going to disconnect it anyway. This is more of a cleanup step to make sure we don't leave slave services running forever on a random host that we forgot about.

self._logger.error('Master is not responding to heartbeats')

# TODO: Right now the slave simply dies when it does not hear back from master. The next step would
# be to try to reconnect to master at this point. In future the heartbeat and connect_to_master
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand this comment. Why would we try to reconnect to the master vs. just increasing the failure threshold (on master or slave or both)?

If the idea behind this is related to having a timeout for individual builds, I'd rather us build that directly instead of doing gymnastics around slave service connectivity.

# TODO: Right now the slave simply dies when it does not hear back from master. The next step would
# be to try to reconnect to master at this point. In future the heartbeat and connect_to_master
# methods can combined into one. This combined method will behave differently based on current state.
self.kill()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ethomas2 Can you elaborate on how it's going to change?

@mayurva This is fine, but if you keep this a SafeThread a valid alternative would be to just call raise here.

@@ -54,9 +54,11 @@ def __init__(self, cluster_master):
RouteNode(r'queue', _QueueHandler),
RouteNode(r'slave', _SlavesHandler, 'slaves').add_children([
RouteNode(r'(\d+)', _SlaveHandler, 'slave').add_children([
RouteNode(r'shutdown', _SlaveShutdownHandler, 'shutdown')
RouteNode(r'shutdown', _SlaveShutdownHandler, 'shutdown'),
RouteNode(r'heartbeat', _SlavesHeartbeatHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that heartbeat is a state change -- the state being the last time the slave was able to make a successful API call to the master. Not a huge deal either way though. Probably not worth changing at this point.


slave.is_alive = MagicMock(return_value=slave_alive)
slave.get_last_heartbeat_time = MagicMock(return_value=last_heartbeat_time)
slave.mark_dead = Mock()
Copy link
Contributor

@josephharrington josephharrington Apr 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need a real slave instance here? It's a bit surprising to see directly replacing methods on a real object with mocks via assignment. It can lead to surprising results and stale tests (for example if someone renamed the mark_dead method but did not rename it here, your assertion about it never being called would pass trivially.) I believe the patch functionality does checks to guard against this.

More specifically, you should be able to patch out the Slave constructor so that you can inject a full mock Slave object that also asserts method calls match the real object api.

slave.get_last_heartbeat_time = MagicMock(return_value=last_heartbeat_time)
slave.mark_dead = Mock()

master._disconnect_non_heartbeating_slaves()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was a comment about this previously, but ideally we don't test via private methods. There are exceptions, but most of the time it's a code smell.

slave_url = "url"
slave = master.connect_slave(slave_url, 1)
slave = master.get_slave(int(slave['slave_id']))
last_heartbeat_time = datetime.now() - timedelta(seconds=seconds_since_last_heartbeat)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use real datetime.now in tests. Mock it out so that we have deterministic test inputs.


slave = self._create_cluster_slave()
slave.connect_to_master(self._FAKE_MASTER_URL)
slave.kill = Mock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above. Also, I'd rather us not mock out methods on the class under test. Assert on the behavior of the method, not the method itself being called.

if not is_master_responsive:
self.mock_network.post_with_digest.side_effect = requests.ConnectionError

slave._run_heartbeat()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above about testing private methods. If you can't find a way around that, maybe at least have a supporting test that verifies start_heartbeat_thread sets up slave._run_heartbeat to run periodically. (Currently it looks like you have no tests that hit the public methods you added to the service classes.)

I didn't try writing this so it may be harder than I think, but my first instinct would be to

  • patch out SafeThread so that it executes its target synchronously
  • set configuration so that the Nth post call will fail
  • set configuration (or stub calls to datetime.now) so that there is no delay between scheduler calls
  • assert that calling start_heartbeat_thread ends up sending N-1 post calls to the master

Anyway, just a suggestion. This is more of a test robustness issue than an actual issue with the feature code.

@mayurva mayurva force-pushed the heartbeat branch 4 times, most recently from 765b4f1 to 3f5052b Compare April 12, 2018 19:39
mayurva added 2 commits April 12, 2018 13:28
- The last heartbeat time is updated in the slave object
- Heartbeat time is also updated when a slave posts to /slave/id
  endpoint.
- The heartbeat time is updated synchronously
Allow the heartbeat configuration to be separate so that they can be
tweaked independent of each other.
@mayurva mayurva force-pushed the heartbeat branch 4 times, most recently from d1a4e76 to fc2b250 Compare April 13, 2018 06:33
Copy link
Contributor

@cmcginty cmcginty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Only minor comments.

import os
import sched
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find. I've haven't seen this lib before.

@@ -53,8 +58,36 @@ def __init__(self):
fs.async_delete(self._master_results_path)
fs.create_dir(self._master_results_path)

# Configure heartbeat tracking
self._unresponsive_slaves_cleanup_interval = Configuration['unresponsive_slaves_cleanup_interval']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is a shorter, but equal descriptive name we could use for this.

@@ -386,6 +388,7 @@ def put(self, slave_id):
new_slave_state = self.decoded_body.get('slave', {}).get('state')
slave = self._cluster_master.get_slave(int(slave_id))
self._cluster_master.handle_slave_state_update(slave, new_slave_state)
self._cluster_master.update_slave_last_heartbeat_time(slave)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why the master directly eposes the slave through the API. Maybe self._cluster_master.update_slave_last_heartbeat_time(slave_id) would be easer (and would simplify the POST handler)

Copy link
Contributor Author

@mayurva mayurva Apr 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Seems like a lot of code depends upon get_slave functionality like the line 390, so I'm not sure if it is worth breaking that consistency.

self._disconnect_non_heartbeating_slaves)

def _is_slave_responsive(self, slave: ClusterSlave) -> bool:
return (datetime.now() - slave.get_last_heartbeat_time()).seconds < self._unresponsive_slaves_cleanup_interval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting the left side logic into a variable here. The line is already pretty long.

@@ -26,6 +27,7 @@ def __init__(self, slave_url, num_executors, slave_session_id=None):
self._num_executors_in_use = Counter()
self._network = Network(min_connection_poolsize=num_executors)
self.current_build_id = None
self._last_heartbeat_time = datetime.now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to just call update_last_heartbeat_time() here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm yeah. I could do that. I think the way it is currently written makes it clear that the variable is being initialized. Calling the function might obscure it a bit


mock_slave = self.patch('app.master.cluster_master.Slave').return_value
# self.patch('app.master.cluster_master.Slave', new=lambda *args: mock_slave)
# master.connect_slave('slave_url', 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove if you no longer need this.

@@ -213,6 +219,40 @@ def test_updating_slave_to_nonexistent_state_should_raise_bad_request_error(self
with self.assertRaises(BadRequestError):
master.handle_slave_state_update(slave, 'NONEXISTENT_STATE')

def test_update_slave_last_heartbeat_time_calls_update_last_heartbeat_time_on_slave(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type of unit test is not extremely useful and makes the code more brittle to changes.

last_heartbeat_time = self._mock_current_datetime - timedelta(seconds=seconds_since_last_heartbeat)
master = ClusterMaster()

mock_slave = Mock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try spec'ing the object like Mock(spec_set=ClusterSlave) if you can.

Copy link
Contributor Author

@mayurva mayurva Apr 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me and Evan ran into a lot of issues in mocking this particular class. If I do specify spec I get the error below. It is because of the way we use public member id. I proper fix may be to change id to a property, but not sure if that should be a part of this PR:

AttributeError: Mock object has no attribute 'id'

else:
if heartbeat_failure_threshold == 1:
self.assertEqual(self._mock_sys.exit.call_count, 1,
'slave dies when it decides that master is dead')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you fix the indent here.

@@ -237,6 +245,33 @@ def test_execute_subjob_passes_base_executor_index_to_executor(self):

executor.execute_subjob.assert_called_with(1, 2, [], 12)

@genty_dataset(
responsive_master=(True,1,),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only need the trailing comma if the tuple only has one value. For example:

responsive_master=(True,)    # required trailing comma
responsive_master=(True,1)   # two args, only 1 comma needed

mayurva added 3 commits April 16, 2018 18:39
Instead of slave checking itself being responsive it now returns last
heartbeat time and master performs the responsiveness check
Instead of using apscheduler, we will be using sched in combination with
threading
- Verify that the master marks the slaves as dead if the slave process dies
- Verify that the slave dies if the master process dies
@mayurva mayurva merged commit 4ff9b19 into master Apr 17, 2018
@mayurva mayurva deleted the heartbeat branch April 17, 2018 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants