Skip to content

Commit

Permalink
use WithTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
mattn committed Jul 22, 2024
1 parent 70a1daf commit abcb4ce
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
15 changes: 6 additions & 9 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
defer s.clientsMu.Unlock()
s.clients[conn] = struct{}{}
ticker := time.NewTicker(pingPeriod)
stop := make(chan struct{})

ip := conn.RemoteAddr().String()
if realIP := r.Header.Get("X-Forwarded-For"); realIP != "" {
Expand All @@ -374,12 +373,13 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
)
}

ctx, cancel := context.WithCancel(context.Background())

// reader
go func() {
defer func() {
cancel()
ticker.Stop()
stop <- struct{}{}
close(stop)
s.clientsMu.Lock()
if _, ok := s.clients[conn]; ok {
conn.Close()
Expand All @@ -388,8 +388,6 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
}
s.clientsMu.Unlock()
s.Log.Infof("disconnected from %s", ip)

ctx.Done()
}()

conn.SetReadLimit(maxMessageSize)
Expand Down Expand Up @@ -432,17 +430,16 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
continue
}

go s.handleMessage(context.TODO(), ws, message, store)
go s.handleMessage(ctx, ws, message, store)
}
}()

// writer
go func() {
defer func() {
cancel()
ticker.Stop()
conn.Close()
for range stop {
}
}()

for {
Expand All @@ -454,7 +451,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
return
}
s.Log.Infof("pinging for %s", ip)
case <-stop:
case <-ctx.Done():
return
}
}
Expand Down
1 change: 1 addition & 0 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func removeListenerId(ws *WebSocket, id string) {
func removeListener(ws *WebSocket) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
clear(listeners[ws])
delete(listeners, ws)
}

Expand Down

0 comments on commit abcb4ce

Please sign in to comment.