Skip to content

Commit

Permalink
Fix nodes leaving/joining cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Tharp committed Jun 5, 2015
1 parent 2c640bf commit 88ba156
Showing 1 changed file with 32 additions and 19 deletions.
51 changes: 32 additions & 19 deletions elasticstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,16 @@ def get_http_conns(self, node_name, http_conns):
self.node_counters['hconn'][node_name] = http_conns['total_opened']
return("{0}|{1}".format(http_conns['current_open'], open_delta))

def process_node(self, role, node):
def process_node(self, role, node_id, node):
processed_node = {}
processed_node['name'] = node['name']
processed_node['role'] = role
if processed_node['name'] == self.active_master:
if self.active_master == node_id:
# Flag active master in role column
processed_node['role'] += "*"
if node_id in self.new_nodes:
# Flag that this is a node that joined the cluster this round
processed_node['name'] = "+" + processed_node['name']

# Load / mem / heap
processed_node['load_avg'] = "/".join(str(x) for x in node['os']['load_average'])
Expand Down Expand Up @@ -212,18 +215,29 @@ def process_role(self, role, nodes_stats):
for node_id in self.nodes_by_role[role]:
if node_id not in nodes_stats['nodes']:
# did not get any data on this node, likely it left the cluster
failed_node = {}
failed_node['name'] = self.node_names[node_id]
failed_node['role'] = "({0})".format(role) # Role it had when we last saw this node in the cluster
print NODES_FAILED_TEMPLATE.format(**failed_node)
else:
# make sure node's role hasn't changed
current_role = self.get_role(nodes_stats['nodes'][node_id]['attributes'])
if current_role != role:
# Role changed, update lists so output will be correct on next iteration
self.nodes_by_role.setdefault(current_role, []).append(node_id) # add to new role
self.nodes_by_role[role].remove(node_id) # remove from current role
print self.process_node(current_role, nodes_stats['nodes'][node_id])
# ...however it may have re-joined the cluster under a new node_id (such as a node restart)
failed_node_name = self.node_names[node_id]
new_nodes_by_name = {nodes_stats['nodes'][id]['name']: id for id in self.new_nodes}
if failed_node_name in new_nodes_by_name:
# ...found it! Remove the old node_id, we've already added the new node at this point
new_node_id = new_nodes_by_name[failed_node_name]
self.new_nodes.remove(new_node_id) # So we don't flag this as a new node visually
self.nodes_list.remove(node_id)
self.node_names.pop(node_id)
self.nodes_by_role[role].remove(node_id)
else:
failed_node = {}
failed_node['name'] = "-" + failed_node_name
failed_node['role'] = "({0})".format(role) # Role it had when we last saw this node in the cluster
print NODES_FAILED_TEMPLATE.format(**failed_node)
continue
# make sure node's role hasn't changed
current_role = self.get_role(nodes_stats['nodes'][node_id]['attributes'])
if current_role != role:
# Role changed, update lists so output will be correct on next iteration
self.nodes_by_role.setdefault(current_role, []).append(node_id) # add to new role
self.nodes_by_role[role].remove(node_id) # remove from current role
print self.process_node(current_role, node_id, nodes_stats['nodes'][node_id])

def printStats(self):
counter = 0
Expand All @@ -232,7 +246,7 @@ def printStats(self):
while True:
cluster_health = self.es_client.cluster.health()
nodes_stats = self.es_client.nodes.stats(human=True)
self.active_master = self.es_client.cat.master(h="node").strip() # needed to remove trailing newline
self.active_master = self.es_client.cat.master(h="id").strip() # needed to remove trailing newline

# Print cluster health
cluster_health['timestamp'] = self.thetime()
Expand All @@ -252,10 +266,9 @@ def printStats(self):
self.nodes_by_role.setdefault(node_role, []).append(node_id)
else:
# Check for new nodes that have joined the cluster
self.new_nodes = []
if len(nodes_stats['nodes']) > current_nodes_count:
# At least one new node found, so add it to the list
self.new_nodes = list(set(nodes_stats['nodes']) - set(self.nodes_list))
self.new_nodes = list(set(nodes_stats['nodes']) - set(self.nodes_list))
if len(self.new_nodes) > 0:
# At least one new node id found, so add to the list
for node_id in self.new_nodes:
self.nodes_list.append(node_id)
self.node_names[node_id] = nodes_stats['nodes'][node_id]['name']
Expand Down

0 comments on commit 88ba156

Please sign in to comment.