Skip to content

Commit

Permalink
dynamic categories and threads mostly working
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Tharp committed Jun 13, 2015
1 parent 5128bec commit 9bd6933
Showing 1 changed file with 115 additions and 46 deletions.
161 changes: 115 additions & 46 deletions elasticstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@
CLUSTER_HEADINGS["timestamp"] = "time"

# node_name role load_avg mem% heap% old sz old gc young gc
NODES_TEMPLATE = """{name:24} {role:<6} {load_avg:>18} {used_mem:>4} {used_heap:>4} {old_gc_sz:8} {old_gc:8} {young_gc:8} {index_threads:<8} {search_threads:<8} {bulk_threads:<8} {get_threads:<8} {merge_threads:<8} {fielddata:^7} {http_conn:>6} {transport_conn:>6} {merge_time:>8} {store_throttle:>8} {docs}"""
NODES_TEMPLATE = {}
NODES_TEMPLATE['general'] = """{name:24} {role:<6}"""
NODES_TEMPLATE['os'] = """{load_avg:>18} {used_mem:>4}"""
NODES_TEMPLATE['jvm'] = """{used_heap:>4} {old_gc_sz:8} {old_gc:8} {young_gc:8}"""
NODES_TEMPLATE['threads'] = """{threads:<8}"""
NODES_TEMPLATE['fielddata'] = """{fielddata:^7}"""
NODES_TEMPLATE['connections'] = """{http_conn:>6} {transport_conn:>6}"""
NODES_TEMPLATE['data_nodes'] = """{merge_time:>8} {store_throttle:>8} {docs}"""
NODES_FAILED_TEMPLATE = """{name:24} {role:<6} (No data received, node may have left cluster)"""
NODE_HEADINGS = {}
NODE_HEADINGS["name"] = "nodes"
Expand All @@ -45,7 +52,8 @@
NODE_HEADINGS["merge_time"] = "merges"
NODE_HEADINGS["store_throttle"] = "idx st"
NODE_HEADINGS["docs"] = "docs"
THREAD_POOLS = ["index", "search", "bulk", "get", "merge"]
DEFAULT_THREAD_POOLS = ["index", "search", "bulk", "get", "merge"]
CATEGORIES = ['general', 'os', 'jvm', 'threads', 'fielddata', 'connections', 'data_nodes']

class ESArgParser(argparse.ArgumentParser):
"""ArgumentParser which prints help by default on any arg parsing error"""
Expand All @@ -56,7 +64,7 @@ def error(self, message):
class Elasticstat:
"""Elasticstat"""

def __init__(self, host, port, username, password, delay_interval):
def __init__(self, host, port, username, password, delay_interval, categories, threadpools):

self.sleep_interval = delay_interval
self.node_counters = {}
Expand All @@ -68,6 +76,13 @@ def __init__(self, host, port, username, password, delay_interval):
self.node_names = {} # node names, organized by id
self.new_nodes = [] # used to track new nodes that join the cluster
self.active_master = ""
self.threadpools = threadpools

# categories for display
if categories == 'all':
self.categories = CATEGORIES
else:
self.categories = ['general'] + categories

# check for port in host
if ':' in host:
Expand Down Expand Up @@ -159,57 +174,75 @@ def get_http_conns(self, node_id, http_conns):
self.node_counters['hconn'][node_id] = http_conns['total_opened']
return("{0}|{1}".format(http_conns['current_open'], open_delta))

def process_node(self, role, node_id, node):
processed_node = {}
# Node name and role
def process_node_general(self, role, node_id, node):
if node_id in self.new_nodes:
# Flag that this is a node that joined the cluster this round
processed_node['name'] = "+" + node['name']
node_name = node['name'] + "+"
else:
processed_node['name'] = node['name']
node_name = node['name']
if self.active_master == node_id:
# Flag active master in role column
processed_node['role'] = "*" + role
node_role = role + "*"
else:
processed_node['role'] = role

# Load / memory used / heap used
processed_node['load_avg'] = "/".join(str(x) for x in node['os']['load_average'])
processed_node['used_mem'] = "{0}%".format(node['os']['mem']['used_percent'])
processed_node['used_heap'] = "{0}%".format(node['jvm']['mem']['heap_used_percent'])
node_role = role
return(NODES_TEMPLATE['general'].format(name=node_name, role=node_role))

# GC counters and old region size
processed_node['old_gc_sz'] = node['jvm']['mem']['pools']['old']['used']
def process_node_os(self, role, node_id, node):
return(NODES_TEMPLATE['os'].format(load_avg="/".join(str(x) for x in node['os']['load_average']),
used_mem="{0}%".format(node['os']['mem']['used_percent'])))

def process_node_jvm(self, role, node_id, node):
processed_node_jvm = {}
processed_node_jvm['used_heap'] = "{0}%".format(node['jvm']['mem']['heap_used_percent'])
processed_node_jvm ['old_gc_sz'] = node['jvm']['mem']['pools']['old']['used']
node_gc_stats = node['jvm']['gc']['collectors']
processed_node['old_gc'], processed_node['young_gc'] = self.get_gc_stats(node_id, node_gc_stats)

# Threads
for pool in THREAD_POOLS:
processed_node[pool + '_threads'] = "{0}|{1}|{2}".format(node['thread_pool'][pool]['active'],
node['thread_pool'][pool]['queue'],
node['thread_pool'][pool]['rejected'])

# Field data evictions (fde) | circuit breaker trips (fdt)
processed_node['fielddata'] = self.get_fd_stats(node_id,
node['indices']['fielddata']['evictions'],
node['breakers']['fielddata']['tripped'])

# Connections
processed_node['http_conn'] = self.get_http_conns(node_id, node['http'])
processed_node['transport_conn'] = node['transport']['server_open']
processed_node_jvm['old_gc'], processed_node_jvm['young_gc'] = self.get_gc_stats(node_id, node_gc_stats)
return(NODES_TEMPLATE['jvm'].format(**processed_node_jvm))


def process_node_threads(self, role, node_id, node):
thread_segments = []
for pool in self.threadpools:
if pool in node['thread_pool']:
threads ="{0}|{1}|{2}".format(node['thread_pool'][pool]['active'],
node['thread_pool'][pool]['queue'],
node['thread_pool'][pool]['rejected'])
thread_segments.append(NODES_TEMPLATE['threads'].format(threads=threads))
else:
thread_segments.append(NODES_TEMPLATE['threads'].format(threads='-|-|-'))
return(" ".join(thread_segments))

def process_node_fielddata(self, role, node_id, node):
return(self.get_fd_stats(node_id,
node['indices']['fielddata']['evictions'],
node['breakers']['fielddata']['tripped']))

def process_node_connections(self, role, node_id, node):
processed_node_conns = {}
processed_node_conns['http_conn'] = self.get_http_conns(node_id, node['http'])
processed_node_conns['transport_conn'] = node['transport']['server_open']
return(NODES_TEMPLATE['connections'].format(**processed_node_conns))

def process_node_data_nodes(self, role, node_id, node):
processed_node_dn = {}
# Data node specific metrics
if role in ['DATA', 'ALL']:
processed_node['merge_time'] = node['indices']['merges']['total_time']
processed_node['store_throttle'] = node['indices']['store']['throttle_time']
processed_node['docs'] = "{0}|{1}".format(node['indices']['docs']['count'],
node['indices']['docs']['deleted'])
processed_node_dn['merge_time'] = node['indices']['merges']['total_time']
processed_node_dn['store_throttle'] = node['indices']['store']['throttle_time']
processed_node_dn['docs'] = "{0}|{1}".format(node['indices']['docs']['count'],
node['indices']['docs']['deleted'])
else:
processed_node['merge_time'] = "-"
processed_node['store_throttle'] = "-"
processed_node['docs'] = "-|-"

return(NODES_TEMPLATE.format(**processed_node))
processed_node_dn['merge_time'] = "-"
processed_node_dn['store_throttle'] = "-"
processed_node_dn['docs'] = "-|-"
return(NODES_TEMPLATE['data_nodes'].format(**processed_node_dn))

def process_node(self, role, node_id, node):
node_segments = []
for category in self.categories:
category_func = getattr(self, 'process_node_' + category)
node_segments.append(category_func(role, node_id, node))
return(" ".join(node_segments))

def process_role(self, role, nodes_stats):
procs = []
Expand Down Expand Up @@ -239,7 +272,28 @@ def process_role(self, role, nodes_stats):
self.nodes_by_role.setdefault(current_role, []).append(node_id) # add to new role
self.nodes_by_role[role].remove(node_id) # remove from current role
print self.process_node(current_role, node_id, nodes_stats['nodes'][node_id])


def get_threads_headings(self):
thread_segments = []
for pool in self.threadpools:
thread_segments.append(NODES_TEMPLATE['threads'].format(threads=pool))
return(" ".join(thread_segments))

def format_headings(self):
"""Format both cluster and node headings once and then store for later output"""
node_heading_segments = []

# cluster headings
self.cluster_headings = CLUSTER_TEMPLATE.format(**CLUSTER_HEADINGS)

# node headings
for category in self.categories:
if category == 'threads':
node_heading_segments.append(self.get_threads_headings())
else:
node_heading_segments.append(NODES_TEMPLATE[category].format(**NODE_HEADINGS))
self.node_headings = " ".join(node_heading_segments)

def print_stats(self):
counter = 0

Expand All @@ -251,7 +305,7 @@ def print_stats(self):

# Print cluster health
cluster_health['timestamp'] = self.thetime()
print CLUSTER_TEMPLATE.format(**CLUSTER_HEADINGS)
print self.cluster_headings
print CLUSTER_TEMPLATE.format(**cluster_health)
print "" # space for readability

Expand All @@ -277,7 +331,7 @@ def print_stats(self):
self.nodes_by_role.setdefault(node_role, []).append(node_id)

# Print node stats
print NODES_TEMPLATE.format(**NODE_HEADINGS)
print self.node_headings
for role in self.nodes_by_role:
self.process_role(role, nodes_stats)
print "" # space out each run for readability
Expand Down Expand Up @@ -309,6 +363,20 @@ def main():
const='PROMPT',
default=None,
help='Password')
parser.add_argument('-c',
'--categories',
dest='categories',
default='all',
metavar='CATEGORY',
nargs='+',
help='Statistic categories to show')
parser.add_argument('-T',
'--threadpools',
dest='threadpools',
default=DEFAULT_THREAD_POOLS,
metavar='THREADPOOL',
nargs='+',
help='Thread pools to show')
parser.add_argument('delay_interval',
default='1',
nargs='?',
Expand All @@ -319,7 +387,8 @@ def main():
args = parser.parse_args()

signal.signal(signal.SIGINT, lambda signum, frame: sys.exit())
elasticstat = Elasticstat(args.hostlist, args.port, args.username, args.password, args.delay_interval)
elasticstat = Elasticstat(args.hostlist, args.port, args.username, args.password, args.delay_interval, args.categories, args.threadpools)
elasticstat.format_headings()
elasticstat.print_stats()


Expand Down

0 comments on commit 9bd6933

Please sign in to comment.