Skip to content

Commit bd5a000

Browse files
authored
per-client RAM measurement scripts (#2783)
new scripts runNodeHost.py and nodeHostTarget.py manage starting a new ec2 node with algod (or several) pointed at a specific relay so that we can test per-client usage on that one relay. improve goal: don't crash if kmd isn't present when we don't need it tweaks to other heapWatch scripts for measuring this data.
1 parent 98ed08f commit bd5a000

File tree

7 files changed

+854
-41
lines changed

7 files changed

+854
-41
lines changed

cmd/goal/account.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,13 @@ var changeOnlineCmd = &cobra.Command{
777777
}
778778

779779
dataDir := ensureSingleDataDir()
780-
client := ensureFullClient(dataDir)
780+
var client libgoal.Client
781+
if statusChangeTxFile != "" {
782+
// writing out a txn, don't need kmd
783+
client = ensureAlgodClient(dataDir)
784+
} else {
785+
client = ensureFullClient(dataDir)
786+
}
781787

782788
var part *algodAcct.Participation
783789
if partKeyFile != "" {

cmd/goal/commands.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -371,17 +371,25 @@ func ensureFullClient(dataDir string) libgoal.Client {
371371
return ensureGoalClient(dataDir, libgoal.FullClient)
372372
}
373373

374-
func ensureGoalClient(dataDir string, clientType libgoal.ClientType) libgoal.Client {
374+
func getGoalClient(dataDir string, clientType libgoal.ClientType) (client libgoal.Client, err error) {
375375
clientConfig := libgoal.ClientConfig{
376376
AlgodDataDir: dataDir,
377377
KMDDataDir: resolveKmdDataDir(dataDir),
378378
CacheDir: ensureCacheDir(dataDir),
379379
}
380-
client, err := libgoal.MakeClientFromConfig(clientConfig, clientType)
380+
client, err = libgoal.MakeClientFromConfig(clientConfig, clientType)
381381
if err != nil {
382-
reportErrorf(errorNodeStatus, err)
382+
return
383383
}
384384
client.SetAPIVersionAffinity(algodclient.APIVersionV2, kmdclient.APIVersionV1)
385+
return
386+
}
387+
388+
func ensureGoalClient(dataDir string, clientType libgoal.ClientType) libgoal.Client {
389+
client, err := getGoalClient(dataDir, clientType)
390+
if err != nil {
391+
reportErrorf(errorNodeStatus, err)
392+
}
385393
return client
386394
}
387395

@@ -404,7 +412,10 @@ func getWalletHandleMaybePassword(dataDir string, walletName string, getPassword
404412
var dup bool
405413

406414
accountList := makeAccountsList(dataDir)
407-
kmd := ensureKmdClient(dataDir)
415+
kmd, err := getGoalClient(dataDir, libgoal.KmdClient)
416+
if err != nil {
417+
return nil, nil, fmt.Errorf("kmd client init error: %w", err)
418+
}
408419

409420
// If the user didn't manually specify a wallet, use the default wallet ID
410421
if walletName == "" {
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
#!/usr/bin/env python3
2+
3+
import argparse
4+
import glob
5+
import json
6+
import logging
7+
import os
8+
import re
9+
import sys
10+
import subprocess
11+
12+
from metrics_delta import parse_metrics, gather_metrics_files_by_nick
13+
14+
logger = logging.getLogger(__name__)
15+
16+
# go tool pprof -sample_index=inuse_space -text Primary.20210708_131740.heap|grep ^Showing.\*total\$
17+
# Showing nodes accounting for 82.08MB, 100% of 82.08MB total
18+
19+
total_inuse_re = re.compile(r'Showing nodes accounting for [^,]+, .* of ([0-9.]+)([kKmMgGtT]?B) total', re.MULTILINE)
20+
21+
multipliers = {
22+
'B': 1,
23+
'KB': 1024,
24+
'MB': 1024*1024,
25+
'GB': 1024*1024*1024,
26+
'TB': 1024*1024*1024*1024,
27+
}
28+
29+
# d = {k:[v,...]}
30+
def dapp(d, k, v):
31+
l = d.get(k)
32+
if l is None:
33+
d[k] = [v]
34+
else:
35+
l.append(v)
36+
37+
def get_heap_inuse_totals(dirpath):
38+
'''return {"node nickname":[(YYYYmmdd_HHMMSS, bytes), ...], ...}'''
39+
cache_mtime = 0
40+
cache_path = os.path.join(dirpath, 'heap_inuse_totals.json')
41+
if os.path.exists(cache_path):
42+
cache_mtime = os.path.getmtime(cache_path)
43+
with open(cache_path, 'rb') as fin:
44+
cached = json.load(fin)
45+
else:
46+
cached = {}
47+
48+
heap_name_re = re.compile(r'(.*)\.(.*).heap')
49+
bynick = {}
50+
skipcount = 0
51+
for path in glob.glob(os.path.join(dirpath, '*.*.heap')):
52+
if os.path.getmtime(path) < cache_mtime:
53+
skipcount += 1
54+
continue
55+
fname = os.path.basename(path)
56+
m = heap_name_re.match(fname)
57+
if not m:
58+
logger.warning('could not parse heap filename: %r', path)
59+
continue
60+
nick = m.group(1)
61+
timestamp = m.group(2)
62+
cmd = ['go', 'tool', 'pprof', '-sample_index=inuse_space', '-text', path]
63+
result = subprocess.run(cmd, capture_output=True)
64+
text = result.stdout.decode()
65+
m = total_inuse_re.search(text)
66+
if not m:
67+
logger.error('could not find total in output: %s', text)
68+
raise Exception('could not find total in output of: %s', ' '.join([repr(x) for x in cmd]))
69+
bytesinuse = float(m.group(1)) * multipliers[m.group(2).upper()]
70+
dapp(bynick, nick, (timestamp, bytesinuse))
71+
logger.debug('%s ok, %s %f', path, timestamp, bytesinuse)
72+
73+
logger.debug('%d skipped older than cache', skipcount)
74+
for nick, recs in bynick.items():
75+
old = cached.get(nick)
76+
if old is None:
77+
cached[nick] = sorted(recs)
78+
else:
79+
cached[nick] = sorted(old + recs)
80+
if cached and bynick:
81+
with open(cache_path, 'wb') as fout:
82+
json.dump(cached, fout)
83+
return cached
84+
85+
86+
def main():
87+
ap = argparse.ArgumentParser()
88+
ap.add_argument('-d', '--dir', required=True, help='dir path to find /*.metrics in')
89+
ap.add_argument('--verbose', default=False, action='store_true')
90+
args = ap.parse_args()
91+
92+
if args.verbose:
93+
logging.basicConfig(level=logging.DEBUG)
94+
else:
95+
logging.basicConfig(level=logging.INFO)
96+
97+
metrics_files = glob.glob(os.path.join(args.dir, '*.metrics'))
98+
filesByNick = gather_metrics_files_by_nick(metrics_files)
99+
100+
heap_totals = get_heap_inuse_totals(args.dir)
101+
102+
return 0
103+
104+
if __name__ == '__main__':
105+
sys.exit(main())

test/heapwatch/heapWatch.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -129,19 +129,28 @@ def get_goroutine_snapshot(self, snapshot_name=None, outdir=None):
129129

130130
def get_metrics(self, snapshot_name=None, outdir=None):
131131
url = 'http://' + self.net + '/metrics'
132-
response = urllib.request.urlopen(urllib.request.Request(url, headers=self.headers))
133-
if response.code != 200:
134-
logger.error('could not fetch %s from %s via %r', name, self.path. url)
132+
try:
133+
response = urllib.request.urlopen(urllib.request.Request(url, headers=self.headers))
134+
if response.code != 200:
135+
logger.error('could not fetch %s from %s via %r', snapshot_name, self.path. url)
136+
return
137+
blob = response.read()
138+
except Exception as e:
139+
logger.error('could not fetch %s from %s via %r: %s', snapshot_name, self.path, url, e)
135140
return
136-
blob = response.read()
137141
outpath = os.path.join(outdir or '.', self.nick + '.' + snapshot_name + '.metrics')
138142
with open(outpath, 'wb') as fout:
139143
fout.write(blob)
140144
logger.debug('%s -> %s', self.nick, outpath)
141145

142146
def get_blockinfo(self, snapshot_name=None, outdir=None):
143-
algod = self.algod()
144-
status = algod.status()
147+
try:
148+
algod = self.algod()
149+
status = algod.status()
150+
except Exception as e:
151+
logger.error('could not get blockinfo from %s: %s', self.net, e)
152+
self._algod = None
153+
return
145154
bi = msgpack.loads(algod.block_info(status['last-round'], response_format='msgpack'), strict_map_key=False)
146155
if snapshot_name is None:
147156
snapshot_name = time.strftime('%Y%m%d_%H%M%S', time.gmtime())
@@ -154,7 +163,7 @@ def get_blockinfo(self, snapshot_name=None, outdir=None):
154163
#txncount = bi['block']['tc']
155164

156165
def psHeap(self):
157-
# return rss, vsz
166+
# return rss, vsz (in kilobytes)
158167
# ps -o rss,vsz $(cat ${ALGORAND_DATA}/algod.pid)
159168
subp = subprocess.Popen(['ps', '-o', 'rss,vsz', str(self.pid())], stdout=subprocess.PIPE)
160169
try:
@@ -243,15 +252,16 @@ def do_snap(self, now):
243252
if self.args.blockinfo:
244253
for ad in self.they:
245254
ad.get_blockinfo(snapshot_name, outdir=self.args.out)
246-
logger.debug('snapped, processing...')
247-
# make absolute and differential plots
248-
for path, snappath in newsnapshots.items():
249-
subprocess.call(['go', 'tool', 'pprof', '-sample_index=inuse_space', '-svg', '-output', snappath + '.inuse.svg', snappath])
250-
subprocess.call(['go', 'tool', 'pprof', '-sample_index=alloc_space', '-svg', '-output', snappath + '.alloc.svg', snappath])
251-
prev = self.prevsnapshots.get(path)
252-
if prev:
253-
subprocess.call(['go', 'tool', 'pprof', '-sample_index=inuse_space', '-svg', '-output', snappath + '.inuse_diff.svg', '-base='+prev, snappath])
254-
subprocess.call(['go', 'tool', 'pprof', '-sample_index=alloc_space', '-svg', '-output', snappath + '.alloc_diff.svg', '-diff_base='+prev, snappath])
255+
if self.args.svg:
256+
logger.debug('snapped, processing...')
257+
# make absolute and differential plots
258+
for path, snappath in newsnapshots.items():
259+
subprocess.call(['go', 'tool', 'pprof', '-sample_index=inuse_space', '-svg', '-output', snappath + '.inuse.svg', snappath])
260+
subprocess.call(['go', 'tool', 'pprof', '-sample_index=alloc_space', '-svg', '-output', snappath + '.alloc.svg', snappath])
261+
prev = self.prevsnapshots.get(path)
262+
if prev:
263+
subprocess.call(['go', 'tool', 'pprof', '-sample_index=inuse_space', '-svg', '-output', snappath + '.inuse_diff.svg', '-base='+prev, snappath])
264+
subprocess.call(['go', 'tool', 'pprof', '-sample_index=alloc_space', '-svg', '-output', snappath + '.alloc_diff.svg', '-diff_base='+prev, snappath])
255265
self.prevsnapshots = newsnapshots
256266

257267
def main():
@@ -268,6 +278,7 @@ def main():
268278
ap.add_argument('--admin-token', default='', help='default algod admin-api token to use')
269279
ap.add_argument('--tf-roles', default='relay', help='comma separated list of terraform roles to follow')
270280
ap.add_argument('--tf-name-re', action='append', default=[], help='regexp to match terraform node names, may be repeated')
281+
ap.add_argument('--no-svg', dest='svg', default=True, action='store_false', help='do not automatically run `go tool pprof` to generate svg from collected data')
271282
ap.add_argument('-o', '--out', default=None, help='directory to write to')
272283
ap.add_argument('--verbose', default=False, action='store_true')
273284
args = ap.parse_args()

test/heapwatch/metrics_delta.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ def dictMin(dest, more):
112112
dest[k] = min(ov,v)
113113
return dest
114114

115+
def meanOrZero(seq):
116+
if not seq:
117+
return 0
118+
return statistics.mean(seq)
119+
115120
class summary:
116121
def __init__(self):
117122
self.tpsMeanSum = 0
@@ -125,9 +130,9 @@ def __call__(self, ttr, nick):
125130
return
126131
self.nodes[nick] = ttr
127132
logger.debug('%d points from %s', len(ttr.tpsList), nick)
128-
self.tpsMeanSum += statistics.mean(ttr.tpsList)
129-
self.txBpsMeanSum += statistics.mean(ttr.txBpsList)
130-
self.rxBpsMeanSum += statistics.mean(ttr.rxBpsList)
133+
self.tpsMeanSum += meanOrZero(ttr.tpsList)
134+
self.txBpsMeanSum += meanOrZero(ttr.txBpsList)
135+
self.rxBpsMeanSum += meanOrZero(ttr.rxBpsList)
131136
self.sumsCount += 1
132137

133138
def byMsg(self):
@@ -191,6 +196,28 @@ def anynickre(nick_re, nicks):
191196
return True
192197
return False
193198

199+
def gather_metrics_files_by_nick(metrics_files, metrics_dirs=None):
200+
'''return {"node nickname":[path, path, ...], ...}'''
201+
metrics_fname_re = re.compile(r'(.*)\.(.*).metrics')
202+
filesByNick = {}
203+
nonick = []
204+
tf_inventory_path = None
205+
for path in metrics_files:
206+
fname = os.path.basename(path)
207+
if fname == 'terraform-inventory.host':
208+
tf_inventory_path = path
209+
continue
210+
if metrics_dirs is not None:
211+
metrics_dirs.add(os.path.dirname(path))
212+
m = metrics_fname_re.match(fname)
213+
if not m:
214+
logger.error('could not parse metrics file name %r', fname)
215+
nonick.append(path)
216+
continue
217+
nick = m.group(1)
218+
dapp(filesByNick, nick, path)
219+
return filesByNick
220+
194221
def main():
195222
test_metric_line_re()
196223
ap = argparse.ArgumentParser()
@@ -213,23 +240,7 @@ def main():
213240
if args.dir:
214241
metrics_dirs.add(args.dir)
215242
metrics_files += glob.glob(os.path.join(args.dir, '*.metrics'))
216-
metrics_fname_re = re.compile(r'(.*)\.(.*).metrics')
217-
filesByNick = {}
218-
nonick = []
219-
tf_inventory_path = None
220-
for path in metrics_files:
221-
fname = os.path.basename(path)
222-
if fname == 'terraform-inventory.host':
223-
tf_inventory_path = path
224-
continue
225-
metrics_dirs.add(os.path.dirname(path))
226-
m = metrics_fname_re.match(fname)
227-
if not m:
228-
logger.error('could not parse metrics file name %r', fname)
229-
nonick.append(path)
230-
continue
231-
nick = m.group(1)
232-
dapp(filesByNick, nick, path)
243+
filesByNick = gather_metrics_files_by_nick(metrics_files, metrics_dirs)
233244
if not tf_inventory_path:
234245
for md in metrics_dirs:
235246
tp = os.path.join(md, 'terraform-inventory.host')

0 commit comments

Comments
 (0)