Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/goal/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,13 @@ var changeOnlineCmd = &cobra.Command{
}

dataDir := ensureSingleDataDir()
client := ensureFullClient(dataDir)
var client libgoal.Client
if statusChangeTxFile != "" {
// writing out a txn, don't need kmd
client = ensureAlgodClient(dataDir)
} else {
client = ensureFullClient(dataDir)
}

var part *algodAcct.Participation
if partKeyFile != "" {
Expand Down
19 changes: 15 additions & 4 deletions cmd/goal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,17 +371,25 @@ func ensureFullClient(dataDir string) libgoal.Client {
return ensureGoalClient(dataDir, libgoal.FullClient)
}

func ensureGoalClient(dataDir string, clientType libgoal.ClientType) libgoal.Client {
func getGoalClient(dataDir string, clientType libgoal.ClientType) (client libgoal.Client, err error) {
clientConfig := libgoal.ClientConfig{
AlgodDataDir: dataDir,
KMDDataDir: resolveKmdDataDir(dataDir),
CacheDir: ensureCacheDir(dataDir),
}
client, err := libgoal.MakeClientFromConfig(clientConfig, clientType)
client, err = libgoal.MakeClientFromConfig(clientConfig, clientType)
if err != nil {
reportErrorf(errorNodeStatus, err)
return
}
client.SetAPIVersionAffinity(algodclient.APIVersionV2, kmdclient.APIVersionV1)
return
}

func ensureGoalClient(dataDir string, clientType libgoal.ClientType) libgoal.Client {
client, err := getGoalClient(dataDir, clientType)
if err != nil {
reportErrorf(errorNodeStatus, err)
}
return client
}

Expand All @@ -404,7 +412,10 @@ func getWalletHandleMaybePassword(dataDir string, walletName string, getPassword
var dup bool

accountList := makeAccountsList(dataDir)
kmd := ensureKmdClient(dataDir)
kmd, err := getGoalClient(dataDir, libgoal.KmdClient)
if err != nil {
return nil, nil, fmt.Errorf("kmd client init error: %w", err)
}

// If the user didn't manually specify a wallet, use the default wallet ID
if walletName == "" {
Expand Down
105 changes: 105 additions & 0 deletions test/heapwatch/client_ram_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!/usr/bin/env python3

import argparse
import glob
import json
import logging
import os
import re
import sys
import subprocess

from metrics_delta import parse_metrics, gather_metrics_files_by_nick

logger = logging.getLogger(__name__)

# go tool pprof -sample_index=inuse_space -text Primary.20210708_131740.heap|grep ^Showing.\*total\$
# Showing nodes accounting for 82.08MB, 100% of 82.08MB total

total_inuse_re = re.compile(r'Showing nodes accounting for [^,]+, .* of ([0-9.]+)([kKmMgGtT]?B) total', re.MULTILINE)

multipliers = {
'B': 1,
'KB': 1024,
'MB': 1024*1024,
'GB': 1024*1024*1024,
'TB': 1024*1024*1024*1024,
}

# d = {k:[v,...]}
def dapp(d, k, v):
l = d.get(k)
if l is None:
d[k] = [v]
else:
l.append(v)

def get_heap_inuse_totals(dirpath):
'''return {"node nickname":[(YYYYmmdd_HHMMSS, bytes), ...], ...}'''
cache_mtime = 0
cache_path = os.path.join(dirpath, 'heap_inuse_totals.json')
if os.path.exists(cache_path):
cache_mtime = os.path.getmtime(cache_path)
with open(cache_path, 'rb') as fin:
cached = json.load(fin)
else:
cached = {}

heap_name_re = re.compile(r'(.*)\.(.*).heap')
bynick = {}
skipcount = 0
for path in glob.glob(os.path.join(dirpath, '*.*.heap')):
if os.path.getmtime(path) < cache_mtime:
skipcount += 1
continue
fname = os.path.basename(path)
m = heap_name_re.match(fname)
if not m:
logger.warning('could not parse heap filename: %r', path)
continue
nick = m.group(1)
timestamp = m.group(2)
cmd = ['go', 'tool', 'pprof', '-sample_index=inuse_space', '-text', path]
result = subprocess.run(cmd, capture_output=True)
text = result.stdout.decode()
m = total_inuse_re.search(text)
if not m:
logger.error('could not find total in output: %s', text)
raise Exception('could not find total in output of: %s', ' '.join([repr(x) for x in cmd]))
bytesinuse = float(m.group(1)) * multipliers[m.group(2).upper()]
dapp(bynick, nick, (timestamp, bytesinuse))
logger.debug('%s ok, %s %f', path, timestamp, bytesinuse)

logger.debug('%d skipped older than cache', skipcount)
for nick, recs in bynick.items():
old = cached.get(nick)
if old is None:
cached[nick] = sorted(recs)
else:
cached[nick] = sorted(old + recs)
if cached and bynick:
with open(cache_path, 'wb') as fout:
json.dump(cached, fout)
return cached


def main():
ap = argparse.ArgumentParser()
ap.add_argument('-d', '--dir', required=True, help='dir path to find /*.metrics in')
ap.add_argument('--verbose', default=False, action='store_true')
args = ap.parse_args()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

metrics_files = glob.glob(os.path.join(args.dir, '*.metrics'))
filesByNick = gather_metrics_files_by_nick(metrics_files)

heap_totals = get_heap_inuse_totals(args.dir)

return 0

if __name__ == '__main__':
sys.exit(main())
43 changes: 27 additions & 16 deletions test/heapwatch/heapWatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,28 @@ def get_goroutine_snapshot(self, snapshot_name=None, outdir=None):

def get_metrics(self, snapshot_name=None, outdir=None):
url = 'http://' + self.net + '/metrics'
response = urllib.request.urlopen(urllib.request.Request(url, headers=self.headers))
if response.code != 200:
logger.error('could not fetch %s from %s via %r', name, self.path. url)
try:
response = urllib.request.urlopen(urllib.request.Request(url, headers=self.headers))
if response.code != 200:
logger.error('could not fetch %s from %s via %r', snapshot_name, self.path. url)
return
blob = response.read()
except Exception as e:
logger.error('could not fetch %s from %s via %r: %s', snapshot_name, self.path, url, e)
return
blob = response.read()
outpath = os.path.join(outdir or '.', self.nick + '.' + snapshot_name + '.metrics')
with open(outpath, 'wb') as fout:
fout.write(blob)
logger.debug('%s -> %s', self.nick, outpath)

def get_blockinfo(self, snapshot_name=None, outdir=None):
algod = self.algod()
status = algod.status()
try:
algod = self.algod()
status = algod.status()
except Exception as e:
logger.error('could not get blockinfo from %s: %s', self.net, e)
self._algod = None
return
bi = msgpack.loads(algod.block_info(status['last-round'], response_format='msgpack'), strict_map_key=False)
if snapshot_name is None:
snapshot_name = time.strftime('%Y%m%d_%H%M%S', time.gmtime())
Expand All @@ -154,7 +163,7 @@ def get_blockinfo(self, snapshot_name=None, outdir=None):
#txncount = bi['block']['tc']

def psHeap(self):
# return rss, vsz
# return rss, vsz (in kilobytes)
# ps -o rss,vsz $(cat ${ALGORAND_DATA}/algod.pid)
subp = subprocess.Popen(['ps', '-o', 'rss,vsz', str(self.pid())], stdout=subprocess.PIPE)
try:
Expand Down Expand Up @@ -243,15 +252,16 @@ def do_snap(self, now):
if self.args.blockinfo:
for ad in self.they:
ad.get_blockinfo(snapshot_name, outdir=self.args.out)
logger.debug('snapped, processing...')
# make absolute and differential plots
for path, snappath in newsnapshots.items():
subprocess.call(['go', 'tool', 'pprof', '-sample_index=inuse_space', '-svg', '-output', snappath + '.inuse.svg', snappath])
subprocess.call(['go', 'tool', 'pprof', '-sample_index=alloc_space', '-svg', '-output', snappath + '.alloc.svg', snappath])
prev = self.prevsnapshots.get(path)
if prev:
subprocess.call(['go', 'tool', 'pprof', '-sample_index=inuse_space', '-svg', '-output', snappath + '.inuse_diff.svg', '-base='+prev, snappath])
subprocess.call(['go', 'tool', 'pprof', '-sample_index=alloc_space', '-svg', '-output', snappath + '.alloc_diff.svg', '-diff_base='+prev, snappath])
if self.args.svg:
logger.debug('snapped, processing...')
# make absolute and differential plots
for path, snappath in newsnapshots.items():
subprocess.call(['go', 'tool', 'pprof', '-sample_index=inuse_space', '-svg', '-output', snappath + '.inuse.svg', snappath])
subprocess.call(['go', 'tool', 'pprof', '-sample_index=alloc_space', '-svg', '-output', snappath + '.alloc.svg', snappath])
prev = self.prevsnapshots.get(path)
if prev:
subprocess.call(['go', 'tool', 'pprof', '-sample_index=inuse_space', '-svg', '-output', snappath + '.inuse_diff.svg', '-base='+prev, snappath])
subprocess.call(['go', 'tool', 'pprof', '-sample_index=alloc_space', '-svg', '-output', snappath + '.alloc_diff.svg', '-diff_base='+prev, snappath])
self.prevsnapshots = newsnapshots

def main():
Expand All @@ -268,6 +278,7 @@ def main():
ap.add_argument('--admin-token', default='', help='default algod admin-api token to use')
ap.add_argument('--tf-roles', default='relay', help='comma separated list of terraform roles to follow')
ap.add_argument('--tf-name-re', action='append', default=[], help='regexp to match terraform node names, may be repeated')
ap.add_argument('--no-svg', dest='svg', default=True, action='store_false', help='do not automatically run `go tool pprof` to generate svg from collected data')
ap.add_argument('-o', '--out', default=None, help='directory to write to')
ap.add_argument('--verbose', default=False, action='store_true')
args = ap.parse_args()
Expand Down
51 changes: 31 additions & 20 deletions test/heapwatch/metrics_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def dictMin(dest, more):
dest[k] = min(ov,v)
return dest

def meanOrZero(seq):
if not seq:
return 0
return statistics.mean(seq)

class summary:
def __init__(self):
self.tpsMeanSum = 0
Expand All @@ -125,9 +130,9 @@ def __call__(self, ttr, nick):
return
self.nodes[nick] = ttr
logger.debug('%d points from %s', len(ttr.tpsList), nick)
self.tpsMeanSum += statistics.mean(ttr.tpsList)
self.txBpsMeanSum += statistics.mean(ttr.txBpsList)
self.rxBpsMeanSum += statistics.mean(ttr.rxBpsList)
self.tpsMeanSum += meanOrZero(ttr.tpsList)
self.txBpsMeanSum += meanOrZero(ttr.txBpsList)
self.rxBpsMeanSum += meanOrZero(ttr.rxBpsList)
self.sumsCount += 1

def byMsg(self):
Expand Down Expand Up @@ -191,6 +196,28 @@ def anynickre(nick_re, nicks):
return True
return False

def gather_metrics_files_by_nick(metrics_files, metrics_dirs=None):
'''return {"node nickname":[path, path, ...], ...}'''
metrics_fname_re = re.compile(r'(.*)\.(.*).metrics')
filesByNick = {}
nonick = []
tf_inventory_path = None
for path in metrics_files:
fname = os.path.basename(path)
if fname == 'terraform-inventory.host':
tf_inventory_path = path
continue
if metrics_dirs is not None:
metrics_dirs.add(os.path.dirname(path))
m = metrics_fname_re.match(fname)
if not m:
logger.error('could not parse metrics file name %r', fname)
nonick.append(path)
continue
nick = m.group(1)
dapp(filesByNick, nick, path)
return filesByNick

def main():
test_metric_line_re()
ap = argparse.ArgumentParser()
Expand All @@ -213,23 +240,7 @@ def main():
if args.dir:
metrics_dirs.add(args.dir)
metrics_files += glob.glob(os.path.join(args.dir, '*.metrics'))
metrics_fname_re = re.compile(r'(.*)\.(.*).metrics')
filesByNick = {}
nonick = []
tf_inventory_path = None
for path in metrics_files:
fname = os.path.basename(path)
if fname == 'terraform-inventory.host':
tf_inventory_path = path
continue
metrics_dirs.add(os.path.dirname(path))
m = metrics_fname_re.match(fname)
if not m:
logger.error('could not parse metrics file name %r', fname)
nonick.append(path)
continue
nick = m.group(1)
dapp(filesByNick, nick, path)
filesByNick = gather_metrics_files_by_nick(metrics_files, metrics_dirs)
if not tf_inventory_path:
for md in metrics_dirs:
tp = os.path.join(md, 'terraform-inventory.host')
Expand Down
Loading