Skip to content

Commit 6b0c57d

Browse files
authored
fixes log to reflect active nodes (#288)
1 parent 73fa974 commit 6b0c57d

File tree

4 files changed

+226
-117
lines changed

4 files changed

+226
-117
lines changed

driver/norma/progress.go

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"github.com/Fantom-foundation/Norma/driver"
6+
"github.com/Fantom-foundation/Norma/driver/monitoring"
7+
netmon "github.com/Fantom-foundation/Norma/driver/monitoring/network"
8+
nodemon "github.com/Fantom-foundation/Norma/driver/monitoring/node"
9+
"github.com/Fantom-foundation/Norma/driver/network/local"
10+
"golang.org/x/exp/constraints"
11+
"log"
12+
"sort"
13+
"sync"
14+
"time"
15+
)
16+
17+
// progressLogger is a helper struct that logs the progress of the network.
18+
// It lists nodes and logs the progress of the network periodically.
19+
type progressLogger struct {
20+
monitor *monitoring.Monitor
21+
stop chan<- bool
22+
done <-chan bool
23+
}
24+
25+
// startProgressLogger starts a progress logger that logs the progress of the network.
26+
func startProgressLogger(monitor *monitoring.Monitor, net *local.LocalNetwork) *progressLogger {
27+
stop := make(chan bool)
28+
done := make(chan bool)
29+
30+
activeNodes := &activeNodes{
31+
data: make(map[driver.NodeID]struct{}),
32+
}
33+
net.RegisterListener(activeNodes)
34+
for _, node := range net.GetActiveNodes() {
35+
activeNodes.AfterNodeCreation(node)
36+
}
37+
38+
go func() {
39+
defer close(done)
40+
ticker := time.NewTicker(time.Second)
41+
for {
42+
select {
43+
case <-stop:
44+
return
45+
case <-ticker.C:
46+
logState(monitor, activeNodes)
47+
}
48+
}
49+
}()
50+
51+
return &progressLogger{
52+
monitor,
53+
stop,
54+
done,
55+
}
56+
}
57+
58+
type activeNodes struct {
59+
data map[driver.NodeID]struct{}
60+
mutex sync.Mutex
61+
}
62+
63+
func (l *activeNodes) AfterNodeCreation(node driver.Node) {
64+
l.mutex.Lock()
65+
defer l.mutex.Unlock()
66+
l.data[driver.NodeID(node.GetLabel())] = struct{}{}
67+
}
68+
69+
func (l *activeNodes) AfterNodeRemoval(node driver.Node) {
70+
l.mutex.Lock()
71+
defer l.mutex.Unlock()
72+
delete(l.data, driver.NodeID(node.GetLabel()))
73+
}
74+
75+
func (l *activeNodes) AfterApplicationCreation(app driver.Application) {
76+
// noop
77+
}
78+
79+
func (l *activeNodes) containsId(id driver.NodeID) bool {
80+
l.mutex.Lock()
81+
defer l.mutex.Unlock()
82+
83+
_, exists := l.data[id]
84+
return exists
85+
}
86+
87+
func (l *progressLogger) shutdown() {
88+
close(l.stop)
89+
<-l.done
90+
}
91+
92+
func logState(monitor *monitoring.Monitor, nodes *activeNodes) {
93+
numNodes := getNumNodes(monitor)
94+
blockStatuses := getBlockStatuses(monitor, nodes)
95+
txPers := getTxPerSec(monitor, nodes)
96+
txs := getNumTxs(monitor)
97+
gas := getGasUsed(monitor)
98+
processingTimes := getBlockProcessingTimes(monitor, nodes)
99+
100+
log.Printf("Nodes: %s, block heights: %v, tx/s: %v, txs: %v, gas: %s, block processing: %v", numNodes, blockStatuses, txPers, txs, gas, processingTimes)
101+
}
102+
103+
func getNumNodes(monitor *monitoring.Monitor) string {
104+
data, exists := monitoring.GetData(monitor, monitoring.Network{}, netmon.NumberOfNodes)
105+
return getLastValAsString[monitoring.Time, int](exists, data)
106+
}
107+
108+
func getNumTxs(monitor *monitoring.Monitor) string {
109+
data, exists := monitoring.GetData(monitor, monitoring.Network{}, netmon.BlockNumberOfTransactions)
110+
return getLastValAsString[monitoring.BlockNumber, int](exists, data)
111+
}
112+
113+
func getTxPerSec(monitor *monitoring.Monitor, nodes *activeNodes) []string {
114+
metric := nodemon.TransactionsThroughput
115+
return getLastValAllSubjects[monitoring.BlockNumber, float32](monitor, metric, nodes)
116+
}
117+
118+
func getGasUsed(monitor *monitoring.Monitor) string {
119+
data, exists := monitoring.GetData(monitor, monitoring.Network{}, netmon.BlockGasUsed)
120+
return getLastValAsString[monitoring.BlockNumber, int](exists, data)
121+
}
122+
123+
func getBlockStatuses(monitor *monitoring.Monitor, nodes *activeNodes) []string {
124+
metric := nodemon.NodeBlockStatus
125+
return getLastValAllSubjects[
126+
monitoring.Time,
127+
monitoring.BlockStatus,
128+
monitoring.Series[monitoring.Time, monitoring.BlockStatus]](
129+
monitor, metric, nodes)
130+
}
131+
132+
func getBlockProcessingTimes(monitor *monitoring.Monitor, nodes *activeNodes) []string {
133+
metric := nodemon.BlockEventAndTxsProcessingTime
134+
return getLastValAllSubjects[
135+
monitoring.BlockNumber,
136+
time.Duration,
137+
monitoring.Series[monitoring.BlockNumber, time.Duration]](
138+
monitor, metric, nodes)
139+
}
140+
141+
func getLastValAllSubjects[K constraints.Ordered, T any, X monitoring.Series[K, T]](
142+
monitor *monitoring.Monitor,
143+
metric monitoring.Metric[monitoring.Node, X],
144+
activeNodes *activeNodes) []string {
145+
146+
nodes := monitoring.GetSubjects(monitor, metric)
147+
sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] })
148+
149+
res := make([]string, 0, len(nodes))
150+
for _, node := range nodes {
151+
if exists := activeNodes.containsId(driver.NodeID(node)); exists {
152+
data, exists := monitoring.GetData(monitor, node, metric)
153+
d := getLastValAsString[K, T](exists, data)
154+
res = append(res, d)
155+
}
156+
}
157+
return res
158+
}
159+
160+
func getLastValAsString[K constraints.Ordered, T any](exists bool, series monitoring.Series[K, T]) string {
161+
if !exists || series == nil {
162+
return "N/A"
163+
}
164+
point := series.GetLatest()
165+
if point == nil {
166+
return "N/A"
167+
}
168+
return fmt.Sprintf("%v", point.Value)
169+
}

driver/norma/progress_test.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"github.com/Fantom-foundation/Norma/driver"
6+
"go.uber.org/mock/gomock"
7+
"math/rand"
8+
"testing"
9+
)
10+
11+
func TestActiveNodes_TrackActiveNodes(t *testing.T) {
12+
nodes := activeNodes{
13+
data: make(map[driver.NodeID]struct{}),
14+
}
15+
16+
shadow := make(map[driver.NodeID]struct{})
17+
const N = 100
18+
19+
ctrl := gomock.NewController(t)
20+
for i := 0; i < N; i++ {
21+
id := fmt.Sprintf("%d", i)
22+
shadow[driver.NodeID(id)] = struct{}{}
23+
node := driver.NewMockNode(ctrl)
24+
node.EXPECT().GetLabel().Return(id)
25+
nodes.AfterNodeCreation(node)
26+
}
27+
28+
for i := 0; i < N; i++ {
29+
r := rand.Intn(2)
30+
if r == 0 {
31+
id := fmt.Sprintf("%d", i)
32+
node := driver.NewMockNode(ctrl)
33+
node.EXPECT().GetLabel().Return(id)
34+
delete(shadow, driver.NodeID(id))
35+
nodes.AfterNodeRemoval(node)
36+
}
37+
}
38+
39+
for i := 0; i < N; i++ {
40+
id := driver.NodeID(fmt.Sprintf("%d", i))
41+
_, exists := shadow[id]
42+
if got, want := nodes.containsId(id), exists; got != want {
43+
t.Errorf("sahdow and active nodes do not match: got: %v != want: %v", got, want)
44+
}
45+
}
46+
}

driver/norma/run.go

+1-106
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ package main
1919
import (
2020
"fmt"
2121
"io/ioutil"
22-
"log"
2322
"os"
2423
"path/filepath"
25-
"sort"
2624
"time"
2725

2826
"github.com/Fantom-foundation/Norma/driver/checking"
@@ -32,14 +30,11 @@ import (
3230
"github.com/Fantom-foundation/Norma/driver/executor"
3331
"github.com/Fantom-foundation/Norma/driver/monitoring"
3432
_ "github.com/Fantom-foundation/Norma/driver/monitoring/app"
35-
netmon "github.com/Fantom-foundation/Norma/driver/monitoring/network"
36-
nodemon "github.com/Fantom-foundation/Norma/driver/monitoring/node"
3733
prometheusmon "github.com/Fantom-foundation/Norma/driver/monitoring/prometheus"
3834
_ "github.com/Fantom-foundation/Norma/driver/monitoring/user"
3935
"github.com/Fantom-foundation/Norma/driver/network/local"
4036
"github.com/Fantom-foundation/Norma/driver/parser"
4137
"github.com/urfave/cli/v2"
42-
"golang.org/x/exp/constraints"
4338
)
4439

4540
// Run with `go run ./driver/norma run <scenario.yml>`
@@ -215,7 +210,7 @@ func run(ctx *cli.Context) (err error) {
215210

216211
// Run scenario.
217212
fmt.Printf("Running '%s' ...\n", path)
218-
logger := startProgressLogger(monitor)
213+
logger := startProgressLogger(monitor, net)
219214
defer logger.shutdown()
220215
err = executor.Run(clock, net, &scenario, outputDir)
221216
if err != nil {
@@ -236,103 +231,3 @@ func run(ctx *cli.Context) (err error) {
236231

237232
return nil
238233
}
239-
240-
type progressLogger struct {
241-
monitor *monitoring.Monitor
242-
stop chan<- bool
243-
done <-chan bool
244-
}
245-
246-
func startProgressLogger(monitor *monitoring.Monitor) *progressLogger {
247-
stop := make(chan bool)
248-
done := make(chan bool)
249-
250-
go func() {
251-
defer close(done)
252-
ticker := time.NewTicker(time.Second)
253-
for {
254-
select {
255-
case <-stop:
256-
return
257-
case <-ticker.C:
258-
logState(monitor)
259-
}
260-
}
261-
}()
262-
263-
return &progressLogger{
264-
monitor,
265-
stop,
266-
done,
267-
}
268-
}
269-
270-
func (l *progressLogger) shutdown() {
271-
close(l.stop)
272-
<-l.done
273-
}
274-
275-
func logState(monitor *monitoring.Monitor) {
276-
numNodes := getNumNodes(monitor)
277-
blockStatuses := getBlockStatuses(monitor)
278-
txPers := getTxPerSec(monitor)
279-
txs := getNumTxs(monitor)
280-
gas := getGasUsed(monitor)
281-
processingTimes := getBlockProcessingTimes(monitor)
282-
283-
log.Printf("Nodes: %s, block heights: %v, tx/s: %v, txs: %v, gas: %s, block processing: %v", numNodes, blockStatuses, txPers, txs, gas, processingTimes)
284-
}
285-
286-
func getNumNodes(monitor *monitoring.Monitor) string {
287-
data, exists := monitoring.GetData(monitor, monitoring.Network{}, netmon.NumberOfNodes)
288-
return getLastValAsString[monitoring.Time, int](exists, data)
289-
}
290-
291-
func getNumTxs(monitor *monitoring.Monitor) string {
292-
data, exists := monitoring.GetData(monitor, monitoring.Network{}, netmon.BlockNumberOfTransactions)
293-
return getLastValAsString[monitoring.BlockNumber, int](exists, data)
294-
}
295-
296-
func getTxPerSec(monitor *monitoring.Monitor) []string {
297-
metric := nodemon.TransactionsThroughput
298-
return getLastValAllSubjects[monitoring.BlockNumber, float32](monitor, metric)
299-
}
300-
301-
func getGasUsed(monitor *monitoring.Monitor) string {
302-
data, exists := monitoring.GetData(monitor, monitoring.Network{}, netmon.BlockGasUsed)
303-
return getLastValAsString[monitoring.BlockNumber, int](exists, data)
304-
}
305-
306-
func getBlockStatuses(monitor *monitoring.Monitor) []string {
307-
metric := nodemon.NodeBlockStatus
308-
return getLastValAllSubjects[monitoring.Time, monitoring.BlockStatus, monitoring.Series[monitoring.Time, monitoring.BlockStatus]](monitor, metric)
309-
}
310-
311-
func getBlockProcessingTimes(monitor *monitoring.Monitor) []string {
312-
metric := nodemon.BlockEventAndTxsProcessingTime
313-
return getLastValAllSubjects[monitoring.BlockNumber, time.Duration, monitoring.Series[monitoring.BlockNumber, time.Duration]](monitor, metric)
314-
}
315-
316-
func getLastValAllSubjects[K constraints.Ordered, T any, X monitoring.Series[K, T]](monitor *monitoring.Monitor, metric monitoring.Metric[monitoring.Node, X]) []string {
317-
nodes := monitoring.GetSubjects(monitor, metric)
318-
sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] })
319-
320-
res := make([]string, 0, len(nodes))
321-
for _, node := range nodes {
322-
data, exists := monitoring.GetData(monitor, node, metric)
323-
var d string = getLastValAsString[K, T](exists, data)
324-
res = append(res, d)
325-
}
326-
return res
327-
}
328-
329-
func getLastValAsString[K constraints.Ordered, T any](exists bool, series monitoring.Series[K, T]) string {
330-
if !exists || series == nil {
331-
return "N/A"
332-
}
333-
point := series.GetLatest()
334-
if point == nil {
335-
return "N/A"
336-
}
337-
return fmt.Sprintf("%v", point.Value)
338-
}

0 commit comments

Comments
 (0)