diff --git a/dggchat.go b/dggchat.go index 922a566..8b9d76e 100644 --- a/dggchat.go +++ b/dggchat.go @@ -18,7 +18,7 @@ func New(args ...string) (*Session, error) { } s := &Session{ - AttempToReconnect: true, + attempToReconnect: true, state: newState(), wsURL: wsURL, dialer: websocket.DefaultDialer, diff --git a/eventHandlers.go b/eventHandlers.go index e45cfc1..757ca3a 100644 --- a/eventHandlers.go +++ b/eventHandlers.go @@ -2,6 +2,7 @@ package dggchat type handlers struct { msgHandler func(Message, *Session) + namesHandler func(Names, *Session) muteHandler func(Mute, *Session) unmuteHandler func(Mute, *Session) banHandler func(Ban, *Session) @@ -13,6 +14,8 @@ type handlers struct { broadcastHandler func(Broadcast, *Session) pingHandler func(Ping, *Session) subOnlyHandler func(SubOnly, *Session) + + socketErrorHandler func(error, *Session) } // AddMessageHandler adds a function that will be called every time a message is received @@ -20,6 +23,11 @@ func (s *Session) AddMessageHandler(fn func(Message, *Session)) { s.handlers.msgHandler = fn } +// AddNamesHandler adds a function that will be called every time a names message is received +func (s *Session) AddNamesHandler(fn func(Names, *Session)) { + s.handlers.namesHandler = fn +} + // AddMuteHandler adds a function that will be called every time a mute message is received func (s *Session) AddMuteHandler(fn func(Mute, *Session)) { s.handlers.muteHandler = fn @@ -74,3 +82,8 @@ func (s *Session) AddPingHandler(fn func(Ping, *Session)) { func (s *Session) AddSubOnlyHandler(fn func(SubOnly, *Session)) { s.handlers.subOnlyHandler = fn } + +// AddSocketErrorHandler adds a function that will be called every time a socket error occurs +func (s *Session) AddSocketErrorHandler(fn func(error, *Session)) { + s.handlers.socketErrorHandler = fn +} diff --git a/messageStructs.go b/messageStructs.go index d1f4314..c6c0def 100644 --- a/messageStructs.go +++ b/messageStructs.go @@ -26,6 +26,7 @@ const ( FeatureBot2 = "flair11" FeatureBroadcaster = "flair12" FeatureTier1 = "flair13" + FeatureBirthday = "flair15" ) // Constants for different types of errors the chat can return @@ -76,7 +77,8 @@ type ( Online bool } - namesMessage struct { + // Names reprents the initial status message containing user information + Names struct { Connections int `json:"connectioncount"` Users []User `json:"users"` } @@ -116,13 +118,9 @@ type ( // Broadcast represents a chat broadcast Broadcast struct { - Message string + Sender User Timestamp time.Time - } - - broadcast struct { - Data string `json:"data"` - Timestamp int64 `json:"timestamp"` + Message string } // Ping represents a pong response from the server diff --git a/parsers.go b/parsers.go index ee7a48e..ac6a905 100644 --- a/parsers.go +++ b/parsers.go @@ -74,14 +74,14 @@ func parseBan(s string, sess *Session) (Ban, error) { return ban, nil } -func parseNames(s string) (namesMessage, error) { - var nm namesMessage - err := json.Unmarshal([]byte(s), &nm) +func parseNames(s string) (Names, error) { + var n Names + err := json.Unmarshal([]byte(s), &n) if err != nil { - return namesMessage{}, err + return Names{}, err } - return nm, nil + return n, nil } func parseRoomAction(s string) (RoomAction, error) { @@ -134,16 +134,22 @@ func parsePrivateMessage(s string, sess *Session) (PrivateMessage, error) { } func parseBroadcast(s string) (Broadcast, error) { - var b broadcast + var m message - err := json.Unmarshal([]byte(s), &b) + err := json.Unmarshal([]byte(s), &m) if err != nil { return Broadcast{}, err } + user := User{ + Nick: m.Nick, + Features: m.Features, + } + broadcast := Broadcast{ - Message: b.Data, - Timestamp: unixToTime(b.Timestamp), + Sender: user, + Message: m.Data, + Timestamp: unixToTime(m.Timestamp), } return broadcast, nil diff --git a/session.go b/session.go index 797c536..93cae2b 100644 --- a/session.go +++ b/session.go @@ -17,16 +17,15 @@ import ( type Session struct { sync.RWMutex // If true, attempt to reconnect on error - AttempToReconnect bool - - readOnly bool - loginKey string - listening chan bool - wsURL url.URL - ws *websocket.Conn - handlers handlers - state *state - dialer *websocket.Dialer + attempToReconnect bool + + readOnly bool + loginKey string + wsURL url.URL + ws *websocket.Conn + handlers handlers + state *state + dialer *websocket.Dialer } type messageOut struct { @@ -44,10 +43,11 @@ type muteOut struct { } type banOut struct { - Nick string `json:"nick"` - Reason string `json:"reason,omitempty"` - Duration int64 `json:"duration,omitempty"` - Banip bool `json:"banip,omitempty"` + Nick string `json:"nick"` + Reason string `json:"reason,omitempty"` + Duration int64 `json:"duration,omitempty"` + Banip bool `json:"banip,omitempty"` + Ispermanent bool `json:"ispermanent"` } type pingOut struct { @@ -66,25 +66,42 @@ var wsURL = url.URL{Scheme: "wss", Host: "www.destiny.gg", Path: "/ws"} // SetURL changes the url that will be used when connecting to the socket server. // This should be done before calling *session.Open() func (s *Session) SetURL(u url.URL) { + s.Lock() + defer s.Unlock() s.wsURL = u } // SetDialer changes the websocket dialer that will be used when connecting to the socket server. func (s *Session) SetDialer(d websocket.Dialer) { + s.Lock() + defer s.Unlock() s.dialer = &d } // Open opens a websocket connection to destinygg chat. func (s *Session) Open() error { + s.Lock() defer s.Unlock() + // Only support a single Open() call. if s.ws != nil { return ErrAlreadyOpen } + return s.open() +} - header := http.Header{} +// call with locks held +func (s *Session) open() error { + // Repeatedly calling Open() acts like reconnect. + // this makes sure any old routines die. + if s.ws != nil { + _ = s.ws.Close() + s.ws = nil + } + + header := http.Header{} if !s.readOnly { header.Add("Cookie", fmt.Sprintf("authtoken=%s", s.loginKey)) } @@ -93,17 +110,22 @@ func (s *Session) Open() error { if err != nil { return err } - s.ws = ws - s.listening = make(chan bool) - go s.listen(s.ws, s.listening) + go s.listen() return nil } // Close cleanly closes the connection and stops running listeners func (s *Session) Close() error { + + s.Lock() + defer s.Unlock() + + // Assume if Close() is explicitly called, we do not want reconnection behaviour + s.attempToReconnect = false + if s.ws == nil { return nil } @@ -114,33 +136,40 @@ func (s *Session) Close() error { } s.ws = nil - return nil } -// GetUsers returns a list of users currently online -func (s *Session) GetUsers() []User { - s.state.RLock() - defer s.state.RUnlock() - u := make([]User, len(s.state.users)) - copy(u, s.state.users) - return u +func (s *Session) reconnect() { + + wait := 1 + for { + s.Lock() + err := s.open() + s.Unlock() + + if err == nil { + return + } + + wait *= 2 + if wait > 32 { + wait = 32 + } + time.Sleep(time.Duration(wait) * time.Second) + } } -func (s *Session) listen(ws *websocket.Conn, listening <-chan bool) { +func (s *Session) listen() { for { _, message, err := s.ws.ReadMessage() if err != nil { - if ws != s.ws { - return + if s.handlers.socketErrorHandler != nil { + s.handlers.socketErrorHandler(err, s) } - - err := ws.Close() - if err != nil { - return + if s.attempToReconnect { + s.reconnect() } - - s.reconnect() + return } mslice := strings.SplitN(string(message[:]), " ", 2) @@ -210,8 +239,8 @@ func (s *Session) listen(ws *websocket.Conn, listening <-chan bool) { s.handlers.pmHandler(pm, s) case "PRIVMSGSENT": - //TODO confirms sending of a PM was successful - + // confirms sending of a PM was successful. + // If not successful, an ERR message is sent anyways. Ignore this. case "PING": case "PONG": p, err := parsePing(mContent) @@ -232,8 +261,13 @@ func (s *Session) listen(ws *websocket.Conn, listening <-chan bool) { if err != nil { continue } + s.state.Lock() s.state.users = n.Users - s.state.connections = n.Connections + s.state.Unlock() + + if s.handlers.namesHandler != nil { + s.handlers.namesHandler(n, s) + } case "JOIN": ra, err := parseRoomAction(mContent) @@ -260,36 +294,13 @@ func (s *Session) listen(ws *websocket.Conn, listening <-chan bool) { } case "REFRESH": - // TODO voluntary reconnect when server determines state should be refreshed - // happens e.g. when user-flair is modified in the database - } - - select { - case <-listening: - return - default: - } - } -} - -func (s *Session) reconnect() { - if !s.AttempToReconnect { - return - } + // This message is received immediately before the server closes the + // connection because user information was changed, and we need to reinitialize. - wait := 1 - for { - err := s.Open() - if err == nil || err == ErrAlreadyOpen { + // TODO possibly add an eventhandler here + s.reconnect() return } - - wait *= 2 - <-time.After(time.Duration(wait) * time.Second) - - if wait > 600 { - wait = 600 - } } } @@ -297,8 +308,8 @@ func (s *Session) reconnect() { // If the user is found, returns the user and true, // otherwise false is returned as the second parameter. func (s *Session) GetUser(name string) (User, bool) { - s.RLock() - defer s.RUnlock() + s.state.RLock() + defer s.state.RUnlock() for _, user := range s.state.users { if strings.EqualFold(name, user.Nick) { @@ -309,6 +320,15 @@ func (s *Session) GetUser(name string) (User, bool) { return User{}, false } +// GetUsers returns a list of users currently online +func (s *Session) GetUsers() []User { + s.state.RLock() + defer s.state.RUnlock() + u := make([]User, len(s.state.users)) + copy(u, s.state.users) + return u +} + func (s *Session) send(message interface{}, mType string) error { if s.readOnly { return ErrReadOnly @@ -317,6 +337,13 @@ func (s *Session) send(message interface{}, mType string) error { if err != nil { return err } + + s.Lock() + defer s.Unlock() + // Close() might have been called for some reason, this prevents panicing in those cases + if s.ws == nil { + return errors.New("connection not established") + } return s.ws.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("%s %s", mType, m))) } @@ -359,6 +386,18 @@ func (s *Session) SendBan(nick string, reason string, duration time.Duration, ba return s.send(b, "BAN") } +// SendPermanentBan bans the user with the given nick permanently. +// Bans require a ban reason to be specified. +func (s *Session) SendPermanentBan(nick string, reason string, banip bool) error { + b := banOut{ + Nick: nick, + Reason: reason, + Banip: banip, + Ispermanent: true, + } + return s.send(b, "BAN") +} + // SendUnban unbans the user with the given nick. // Unbanning also removes mutes. func (s *Session) SendUnban(nick string) error { @@ -393,6 +432,12 @@ func (s *Session) SendSubOnly(subonly bool) error { return s.send(so, "SUBONLY") } +// SendBroadcast sends a broadcast message to chat +func (s *Session) SendBroadcast(message string) error { + b := messageOut{Data: message} + return s.send(b, "BROADCAST") +} + // SendPing sends a ping to the server with the current timestamp. func (s *Session) SendPing() error { t := pingOut{Timestamp: timeToUnix(time.Now())} diff --git a/state.go b/state.go index 84b02e1..9a6a0ea 100644 --- a/state.go +++ b/state.go @@ -7,8 +7,7 @@ import ( type state struct { sync.RWMutex - connections int - users []User + users []User } func (s *state) removeUser(nick string) { @@ -18,7 +17,7 @@ func (s *state) removeUser(nick string) { for i, user := range s.users { if strings.EqualFold(user.Nick, nick) { s.users = append(s.users[:i], s.users[i+1:]...) - s.connections-- + break } } } @@ -26,15 +25,23 @@ func (s *state) removeUser(nick string) { func (s *state) addUser(user User) { s.Lock() defer s.Unlock() + + // If you are not in chat (0 instances of your user), and join, + // chat backend includes your name in the NAMES command, and ALSO + // sends a JOIN command with your name. This makes sure we do not + // include ourself 2 times. Otherwise this check would not be needed. + for _, u := range s.users { + if strings.EqualFold(user.Nick, u.Nick) { + return + } + } + s.users = append(s.users, user) - s.connections++ } func newState() *state { s := &state{ - connections: 0, - users: make([]User, 0), + users: make([]User, 0), } - return s }