@@ -28,6 +28,7 @@ import (
28
28
"runtime"
29
29
"strconv"
30
30
"strings"
31
+ "sync"
31
32
"time"
32
33
33
34
"github.com/ethereum/go-ethereum/common"
@@ -80,6 +81,8 @@ type Service struct {
80
81
81
82
pongCh chan struct {} // Pong notifications are fed into this channel
82
83
histCh chan []uint64 // History request block numbers are fed into this channel
84
+
85
+ connMu sync.Mutex // Mutex to prevent concurrent write on the websocket connection
83
86
}
84
87
85
88
// New returns a monitoring service ready for stats reporting.
@@ -306,10 +309,13 @@ func (s *Service) readLoop(conn *websocket.Conn) {
306
309
// If the network packet is a system ping, respond to it directly
307
310
var ping string
308
311
if err := json .Unmarshal (blob , & ping ); err == nil && strings .HasPrefix (ping , "primus::ping::" ) {
312
+ s .connMu .Lock ()
309
313
if err := conn .WriteJSON (strings .Replace (ping , "ping" , "pong" , - 1 )); err != nil {
314
+ s .connMu .Unlock ()
310
315
log .Warn ("Failed to respond to system ping message" , "err" , err )
311
316
return
312
317
}
318
+ s .connMu .Unlock ()
313
319
continue
314
320
}
315
321
// Not a system ping, try to decode an actual state message
@@ -432,6 +438,8 @@ func (s *Service) login(conn *websocket.Conn) error {
432
438
login := map [string ][]interface {}{
433
439
"emit" : {"hello" , auth },
434
440
}
441
+ s .connMu .Lock ()
442
+ defer s .connMu .Unlock ()
435
443
if err := conn .WriteJSON (login ); err != nil {
436
444
return err
437
445
}
@@ -474,6 +482,8 @@ func (s *Service) reportLatency(conn *websocket.Conn) error {
474
482
"clientTime" : start .String (),
475
483
}},
476
484
}
485
+ s .connMu .Lock ()
486
+ defer s .connMu .Unlock ()
477
487
if err := conn .WriteJSON (ping ); err != nil {
478
488
return err
479
489
}
@@ -547,6 +557,8 @@ func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error {
547
557
report := map [string ][]interface {}{
548
558
"emit" : {"block" , stats },
549
559
}
560
+ s .connMu .Lock ()
561
+ defer s .connMu .Unlock ()
550
562
return conn .WriteJSON (report )
551
563
}
552
564
@@ -661,6 +673,8 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
661
673
report := map [string ][]interface {}{
662
674
"emit" : {"history" , stats },
663
675
}
676
+ s .connMu .Unlock ()
677
+ defer s .connMu .Unlock ()
664
678
return conn .WriteJSON (report )
665
679
}
666
680
@@ -691,6 +705,8 @@ func (s *Service) reportPending(conn *websocket.Conn) error {
691
705
report := map [string ][]interface {}{
692
706
"emit" : {"pending" , stats },
693
707
}
708
+ s .connMu .Lock ()
709
+ defer s .connMu .Unlock ()
694
710
return conn .WriteJSON (report )
695
711
}
696
712
@@ -746,5 +762,7 @@ func (s *Service) reportStats(conn *websocket.Conn) error {
746
762
report := map [string ][]interface {}{
747
763
"emit" : {"stats" , stats },
748
764
}
765
+ s .connMu .Lock ()
766
+ defer s .connMu .Unlock ()
749
767
return conn .WriteJSON (report )
750
768
}
0 commit comments