Skip to content

Commit

Permalink
Split loop
Browse files Browse the repository at this point in the history
  • Loading branch information
mstoykov committed Apr 19, 2023
1 parent a9f5cb0 commit 15fa17e
Showing 1 changed file with 113 additions and 101 deletions.
214 changes: 113 additions & 101 deletions websockets/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ func (w *webSocket) emitConnectionMetrics(ctx context.Context, start time.Time,

const writeWait = 10 * time.Second

//nolint:funlen,gocognit
func (w *webSocket) loop() {
// Pass ping/pong events through the main control loop
pingChan := make(chan string)
Expand All @@ -324,7 +323,6 @@ func (w *webSocket) loop() {

// readCloseChan := make(chan int)
// readErrChan := make(chan error)
samplesOutput := w.vu.State().Samples
ctx := w.vu.Context()
wg := new(sync.WaitGroup)

Expand All @@ -347,107 +345,10 @@ func (w *webSocket) loop() {
}()
wg.Add(1)
// Wraps a couple of channels around conn.ReadMessage
go func() { // copied from k6/ws
defer wg.Done()
for {
messageType, data, err := w.conn.ReadMessage()
if err != nil {
if !websocket.IsUnexpectedCloseError(
err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
w.tq.Queue(func() error {
return w.connectionClosedWithError(nil)
})
return
}
w.tq.Queue(func() error {
_ = w.conn.Close() // TODO fix this
return w.connectionClosedWithError(err)
})
return
}

w.queueMessage(&message{
mtype: messageType,
data: data,
t: time.Now(),
})
}
}()
go w.readPump(wg)

wg.Add(1)
go func() {
defer wg.Done()
wg.Add(1)
writeChannel := make(chan message)
go func() {
defer wg.Done()
for {
select {
case msg, ok := <-writeChannel:
if !ok {
return
}
size := len(msg.data)

err := func() error {
if msg.mtype != websocket.PingMessage {
return w.conn.WriteMessage(msg.mtype, msg.data)
}

// WriteControl is concurrently okay
return w.conn.WriteControl(msg.mtype, msg.data, msg.t.Add(writeWait))
}()
if err != nil {
w.tq.Queue(func() error {
_ = w.conn.Close() // TODO fix
closeErr := w.connectionClosedWithError(err)
return closeErr
})
return
}
// This from the specification needs to happen like that instead of with
// atomics or locks outside of the event loop
w.tq.Queue(func() error {
w.bufferedAmount -= size
return nil
})

metrics.PushIfNotDone(ctx, samplesOutput, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: w.builtinMetrics.WSMessagesSent,
Tags: w.tagsAndMeta.Tags,
},
Time: time.Now(),
Metadata: w.tagsAndMeta.Metadata,
Value: 1,
})
case <-w.done:
return
}
}
}()
{
defer close(writeChannel)
queue := make([]message, 0)
var wch chan message
var msg message
for {
wch = nil // this way if nothing to read it will just block
if len(queue) > 0 {
msg = queue[0]
wch = writeChannel
}
select {
case msg = <-w.writeQueueCh:
queue = append(queue, msg)
case wch <- msg:
queue = queue[:copy(queue, queue[1:])]
case <-w.done:
return
}
}
}
}()
go w.writePump(wg)
ctxDone := ctx.Done()
for {
select {
Expand Down Expand Up @@ -529,6 +430,117 @@ func (w *webSocket) queueMessage(msg *message) {
})
}

func (w *webSocket) readPump(wg *sync.WaitGroup) {
defer wg.Done()
for {
messageType, data, err := w.conn.ReadMessage()
if err == nil {
w.queueMessage(&message{
mtype: messageType,
data: data,
t: time.Now(),
})

continue
}

if !websocket.IsUnexpectedCloseError(
err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
// maybe still log it with debug level?
err = nil
}

if err != nil {
w.tq.Queue(func() error {
_ = w.conn.Close() // TODO fix this
return nil
})
}

w.tq.Queue(func() error {
return w.connectionClosedWithError(err)
})

return
}
}

func (w *webSocket) writePump(wg *sync.WaitGroup) {
defer wg.Done()
wg.Add(1)
samplesOutput := w.vu.State().Samples
ctx := w.vu.Context()
writeChannel := make(chan message)
go func() {
defer wg.Done()
for {
select {
case msg, ok := <-writeChannel:
if !ok {
return
}
size := len(msg.data)

err := func() error {
if msg.mtype != websocket.PingMessage {
return w.conn.WriteMessage(msg.mtype, msg.data)
}

// WriteControl is concurrently okay
return w.conn.WriteControl(msg.mtype, msg.data, msg.t.Add(writeWait))
}()
if err != nil {
w.tq.Queue(func() error {
_ = w.conn.Close() // TODO fix
closeErr := w.connectionClosedWithError(err)
return closeErr
})
return
}
// This from the specification needs to happen like that instead of with
// atomics or locks outside of the event loop
w.tq.Queue(func() error {
w.bufferedAmount -= size
return nil
})

metrics.PushIfNotDone(ctx, samplesOutput, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: w.builtinMetrics.WSMessagesSent,
Tags: w.tagsAndMeta.Tags,
},
Time: time.Now(),
Metadata: w.tagsAndMeta.Metadata,
Value: 1,
})
case <-w.done:
return
}
}
}()
{
defer close(writeChannel)
queue := make([]message, 0)
var wch chan message
var msg message
for {
wch = nil // this way if nothing to read it will just block
if len(queue) > 0 {
msg = queue[0]
wch = writeChannel
}
select {
case msg = <-w.writeQueueCh:
queue = append(queue, msg)
case wch <- msg:
queue = queue[:copy(queue, queue[1:])]
case <-w.done:
return
}
}
}
}

func (w *webSocket) send(msg goja.Value) {
w.assertStateOpen()

Expand Down

0 comments on commit 15fa17e

Please sign in to comment.