Skip to content

Commit

Permalink
Merge pull request #3 from marcos-gonalons/dev
Browse files Browse the repository at this point in the history
v2.1.0
  • Loading branch information
marcos-gonalons authored Mar 30, 2021
2 parents 1227def + a25d3c1 commit e23a475
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 21 deletions.
71 changes: 50 additions & 21 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *Socket) checkFirstReceivedMessage() (err error) {
}

if p["session_id"] == nil {
err = errors.New("Cannot recognize the first received message after establishing the connection")
err = errors.New("cannot recognize the first received message after establishing the connection")
s.onError(err, FirstMessageWithoutSessionIdErrorContext)
return
}
Expand Down Expand Up @@ -146,8 +146,9 @@ func (s *Socket) sendSocketMessage(p *SocketMessage) (err error) {

func (s *Socket) connectionLoop() {
var readMsgError error
var writeKeepAliveMsgError error

for readMsgError == nil {
for readMsgError == nil && writeKeepAliveMsgError == nil {
if s.isClosed {
break
}
Expand All @@ -156,28 +157,32 @@ func (s *Socket) connectionLoop() {
var msg []byte
msgType, msg, readMsgError = s.conn.ReadMessage()

if msgType != websocket.TextMessage {
continue
}
go func() {
if msgType != websocket.TextMessage {
return
}

if isKeepAliveMsg(msg) {
err := s.conn.WriteMessage(msgType, msg)
if err != nil {
s.onError(err, SendKeepAliveMessageErrorContext+" - "+string(msg))
if isKeepAliveMsg(msg) {
writeKeepAliveMsgError = s.conn.WriteMessage(msgType, msg)
return
}
continue
}

s.parsePacket(msg)
go s.parsePacket(msg)
}()
}

if readMsgError != nil {
s.onError(readMsgError, ReadMessageErrorContext)
}
if writeKeepAliveMsgError != nil {
s.onError(writeKeepAliveMsgError, SendKeepAliveMessageErrorContext)
}
}

func (s *Socket) parsePacket(packet []byte) {
var symbolsArr []string
var dataArr []*QuoteData

index := 0
for index < len(packet) {
payloadLength, err := getPayloadLength(packet[index:])
Expand All @@ -190,13 +195,31 @@ func (s *Socket) parsePacket(packet []byte) {
payload := packet[index+headerLength : index+headerLength+payloadLength]
index = index + headerLength + len(payload)

s.parseJSON(payload)
symbol, data, err := s.parseJSON(payload)
if err != nil {
break
}

dataArr = append(dataArr, data)
symbolsArr = append(symbolsArr, symbol)
}

for i := 0; i < len(dataArr); i++ {
isDuplicate := false
for j := i + 1; j < len(dataArr); j++ {
if GetStringRepresentation(dataArr[i]) == GetStringRepresentation(dataArr[j]) {
isDuplicate = true
break
}
}
if !isDuplicate {
s.OnReceiveMarketDataCallback(symbolsArr[i], dataArr[i])
}
}
}

func (s *Socket) parseJSON(msg []byte) {
func (s *Socket) parseJSON(msg []byte) (symbol string, data *QuoteData, err error) {
var decodedMessage *SocketMessage
var err error

err = json.Unmarshal(msg, &decodedMessage)
if err != nil {
Expand All @@ -205,22 +228,26 @@ func (s *Socket) parseJSON(msg []byte) {
}

if decodedMessage.Message == "critical_error" || decodedMessage.Message == "error" {
s.onError(errors.New("Error -> "+string(msg)), DecodedMessageHasErrorPropertyErrorContext)
err = errors.New("Error -> " + string(msg))
s.onError(err, DecodedMessageHasErrorPropertyErrorContext)
return
}

if decodedMessage.Message != "qsd" {
err = errors.New("ignored message - Not QSD")
return
}

if decodedMessage.Payload == nil {
s.onError(errors.New("Msg does not include 'p' -> "+string(msg)), DecodedMessageDoesNotIncludePayloadErrorContext)
err = errors.New("Msg does not include 'p' -> " + string(msg))
s.onError(err, DecodedMessageDoesNotIncludePayloadErrorContext)
return
}

p, isPOk := decodedMessage.Payload.([]interface{})
if !isPOk || len(p) != 2 {
s.onError(errors.New("There is something wrong with the payload - can't be parsed -> "+string(msg)), PayloadCantBeParsedErrorContext)
err = errors.New("There is something wrong with the payload - can't be parsed -> " + string(msg))
s.onError(err, PayloadCantBeParsedErrorContext)
return
}

Expand All @@ -232,11 +259,13 @@ func (s *Socket) parseJSON(msg []byte) {
}

if decodedQuoteMessage.Status != "ok" || decodedQuoteMessage.Symbol == "" || decodedQuoteMessage.Data == nil {
s.onError(errors.New("There is something wrong with the payload - couldn't be parsed -> "+string(msg)), FinalPayloadHasMissingPropertiesErrorContext)
err = errors.New("There is something wrong with the payload - couldn't be parsed -> " + string(msg))
s.onError(err, FinalPayloadHasMissingPropertiesErrorContext)
return
}

s.OnReceiveMarketDataCallback(decodedQuoteMessage.Symbol, decodedQuoteMessage.Data)
symbol = decodedQuoteMessage.Symbol
data = decodedQuoteMessage.Data
return
}

func (s *Socket) onError(err error, context string) {
Expand Down
6 changes: 6 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tradingview

import (
"encoding/json"
"math/rand"
"time"
)
Expand Down Expand Up @@ -29,3 +30,8 @@ func GetRandomString(length int) string {
return string(requestID)
}

// GetStringRepresentation ...
func GetStringRepresentation(data interface{}) string {
str, _ := json.Marshal(data)
return string(str)
}

0 comments on commit e23a475

Please sign in to comment.