Skip to content

Commit 293a30b

Browse files
committed
ethstats: use connection wrapper instead of locks all over
1 parent acd88cd commit 293a30b

File tree

1 file changed

+55
-39
lines changed

1 file changed

+55
-39
lines changed

ethstats/ethstats.go

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,48 @@ type Service struct {
8282
pongCh chan struct{} // Pong notifications are fed into this channel
8383
histCh chan []uint64 // History request block numbers are fed into this channel
8484

85-
// Gorilla websocket docs:
86-
// Connections support one concurrent reader and one concurrent writer.
87-
// Applications are responsible for ensuring that no more than one goroutine calls the write methods
88-
// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel
89-
// concurrently and that no more than one goroutine calls the read methods
90-
// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler
91-
// concurrently.
92-
// The Close and WriteControl methods can be called concurrently with all other methods.
93-
//
94-
// In our case, we use a single mutex for both reading and writing.
95-
connMu sync.Mutex // Mutex to prevent concurrent write/read on the websocket connection
85+
}
86+
87+
// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
88+
// websocket.
89+
// From Gorilla websocket docs:
90+
// Connections support one concurrent reader and one concurrent writer.
91+
// Applications are responsible for ensuring that no more than one goroutine calls the write methods
92+
// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel
93+
// concurrently and that no more than one goroutine calls the read methods
94+
// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler
95+
// concurrently.
96+
// The Close and WriteControl methods can be called concurrently with all other methods.
97+
//
98+
// The connWrapper uses a single mutex for both reading and writing.
99+
type connWrapper struct {
100+
conn *websocket.Conn
101+
mu sync.Mutex
102+
}
103+
104+
func newConnectionWrapper(conn *websocket.Conn) *connWrapper {
105+
return &connWrapper{conn: conn}
106+
}
107+
108+
// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling
109+
func (w *connWrapper) WriteJSON(v interface{}) error {
110+
w.mu.Lock()
111+
defer w.mu.Unlock()
112+
return w.conn.WriteJSON(v)
113+
}
114+
115+
// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling
116+
func (w *connWrapper) ReadJSON(v interface{}) error {
117+
w.mu.Lock()
118+
defer w.mu.Unlock()
119+
return w.conn.ReadJSON(v)
120+
}
121+
122+
// Close wraps corresponding method on the websocket but is safe for concurrent calling
123+
func (w *connWrapper) Close() error {
124+
// The Close and WriteControl methods can be called concurrently with all other methods,
125+
// so the mutex is not used here
126+
return w.conn.Close()
96127
}
97128

98129
// New returns a monitoring service ready for stats reporting.
@@ -227,17 +258,19 @@ func (s *Service) loop() {
227258
case <-errTimer.C:
228259
// Establish a websocket connection to the server on any supported URL
229260
var (
230-
conn *websocket.Conn
261+
conn *connWrapper
231262
err error
232263
)
233264
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
234265
header := make(http.Header)
235266
header.Set("origin", "http://localhost")
236267
for _, url := range urls {
237-
conn, _, err = dialer.Dial(url, header)
238-
if err == nil {
268+
c, _, e := dialer.Dial(url, header)
269+
if e == nil {
270+
conn = newConnectionWrapper(c)
239271
break
240272
}
273+
err = e
241274
}
242275
if err != nil {
243276
log.Warn("Stats server unreachable", "err", err)
@@ -305,31 +338,26 @@ func (s *Service) loop() {
305338
// from the network socket. If any of them match an active request, it forwards
306339
// it, if they themselves are requests it initiates a reply, and lastly it drops
307340
// unknown packets.
308-
func (s *Service) readLoop(conn *websocket.Conn) {
341+
func (s *Service) readLoop(conn *connWrapper) {
309342
// If the read loop exists, close the connection
310343
defer conn.Close()
311344

312345
for {
313346
// Retrieve the next generic network packet and bail out on error
314347
var blob json.RawMessage
315-
s.connMu.Lock()
316348
if err := conn.ReadJSON(&blob); err != nil {
317349
log.Warn("Failed to retrieve stats server message", "err", err)
318-
s.connMu.Unlock()
319350
return
320351
}
321352
// If the network packet is a system ping, respond to it directly
322353
var ping string
323354
if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") {
324355
if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil {
325356
log.Warn("Failed to respond to system ping message", "err", err)
326-
s.connMu.Unlock()
327357
return
328358
}
329-
s.connMu.Unlock()
330359
continue
331360
}
332-
s.connMu.Unlock()
333361
// Not a system ping, try to decode an actual state message
334362
var msg map[string][]interface{}
335363
if err := json.Unmarshal(blob, &msg); err != nil {
@@ -419,7 +447,7 @@ type authMsg struct {
419447
}
420448

421449
// login tries to authorize the client at the remote server.
422-
func (s *Service) login(conn *websocket.Conn) error {
450+
func (s *Service) login(conn *connWrapper) error {
423451
// Construct and send the login authentication
424452
infos := s.server.NodeInfo()
425453

@@ -450,8 +478,6 @@ func (s *Service) login(conn *websocket.Conn) error {
450478
login := map[string][]interface{}{
451479
"emit": {"hello", auth},
452480
}
453-
s.connMu.Lock()
454-
defer s.connMu.Unlock()
455481
if err := conn.WriteJSON(login); err != nil {
456482
return err
457483
}
@@ -466,7 +492,7 @@ func (s *Service) login(conn *websocket.Conn) error {
466492
// report collects all possible data to report and send it to the stats server.
467493
// This should only be used on reconnects or rarely to avoid overloading the
468494
// server. Use the individual methods for reporting subscribed events.
469-
func (s *Service) report(conn *websocket.Conn) error {
495+
func (s *Service) report(conn *connWrapper) error {
470496
if err := s.reportLatency(conn); err != nil {
471497
return err
472498
}
@@ -484,7 +510,7 @@ func (s *Service) report(conn *websocket.Conn) error {
484510

485511
// reportLatency sends a ping request to the server, measures the RTT time and
486512
// finally sends a latency update.
487-
func (s *Service) reportLatency(conn *websocket.Conn) error {
513+
func (s *Service) reportLatency(conn *connWrapper) error {
488514
// Send the current time to the ethstats server
489515
start := time.Now()
490516

@@ -494,8 +520,6 @@ func (s *Service) reportLatency(conn *websocket.Conn) error {
494520
"clientTime": start.String(),
495521
}},
496522
}
497-
s.connMu.Lock()
498-
defer s.connMu.Unlock()
499523
if err := conn.WriteJSON(ping); err != nil {
500524
return err
501525
}
@@ -555,7 +579,7 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
555579
}
556580

557581
// reportBlock retrieves the current chain head and reports it to the stats server.
558-
func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error {
582+
func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error {
559583
// Gather the block details from the header or block chain
560584
details := s.assembleBlockStats(block)
561585

@@ -569,8 +593,6 @@ func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error {
569593
report := map[string][]interface{}{
570594
"emit": {"block", stats},
571595
}
572-
s.connMu.Lock()
573-
defer s.connMu.Unlock()
574596
return conn.WriteJSON(report)
575597
}
576598

@@ -629,7 +651,7 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
629651

630652
// reportHistory retrieves the most recent batch of blocks and reports it to the
631653
// stats server.
632-
func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
654+
func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
633655
// Figure out the indexes that need reporting
634656
indexes := make([]uint64, 0, historyUpdateRange)
635657
if len(list) > 0 {
@@ -685,8 +707,6 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
685707
report := map[string][]interface{}{
686708
"emit": {"history", stats},
687709
}
688-
s.connMu.Unlock()
689-
defer s.connMu.Unlock()
690710
return conn.WriteJSON(report)
691711
}
692712

@@ -697,7 +717,7 @@ type pendStats struct {
697717

698718
// reportPending retrieves the current number of pending transactions and reports
699719
// it to the stats server.
700-
func (s *Service) reportPending(conn *websocket.Conn) error {
720+
func (s *Service) reportPending(conn *connWrapper) error {
701721
// Retrieve the pending count from the local blockchain
702722
var pending int
703723
if s.eth != nil {
@@ -717,8 +737,6 @@ func (s *Service) reportPending(conn *websocket.Conn) error {
717737
report := map[string][]interface{}{
718738
"emit": {"pending", stats},
719739
}
720-
s.connMu.Lock()
721-
defer s.connMu.Unlock()
722740
return conn.WriteJSON(report)
723741
}
724742

@@ -735,7 +753,7 @@ type nodeStats struct {
735753

736754
// reportPending retrieves various stats about the node at the networking and
737755
// mining layer and reports it to the stats server.
738-
func (s *Service) reportStats(conn *websocket.Conn) error {
756+
func (s *Service) reportStats(conn *connWrapper) error {
739757
// Gather the syncing and mining infos from the local miner instance
740758
var (
741759
mining bool
@@ -774,7 +792,5 @@ func (s *Service) reportStats(conn *websocket.Conn) error {
774792
report := map[string][]interface{}{
775793
"emit": {"stats", stats},
776794
}
777-
s.connMu.Lock()
778-
defer s.connMu.Unlock()
779795
return conn.WriteJSON(report)
780796
}

0 commit comments

Comments
 (0)