Skip to content

Commit

Permalink
CBG-3834-prereq Add cancellable context to blip.Context (#77)
Browse files Browse the repository at this point in the history
* CBG-3834-prereq Add cancellable context to blip.Context

Allows callers to specify an optional cancellable context on a blipContext disconnect any connected blip clients when that context is used to create a WebSocketServer.

* Test cleanup, accessors for optional cancelCtx

* lint fix

* Move cancellation context to ContextOptions

* Tidy example go.sum
  • Loading branch information
adamcfraser authored Oct 14, 2024
1 parent 73db215 commit 13a798c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 2 deletions.
15 changes: 15 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type Context struct {

bytesSent atomic.Uint64 // Number of bytes sent
bytesReceived atomic.Uint64 // Number of bytes received

cancelCtx context.Context // When cancelled, closes all connections. Terminates receiveLoop(s), which triggers sender and parseLoop stop
}

// Defines a logging interface for use within the blip codebase. Implemented by Context.
Expand All @@ -84,6 +86,9 @@ type ContextOptions struct {
ProtocolIds []string
// Patterns that the Origin header must match (if non-empty). This matches only on hostname: ["example.com", "*"]
Origin []string
// Cancellation context. If specified, when context is cancelled the websocket connect will be closed,
// by terminating receiveLoop (which triggers sender and parseLoop stop). This will not send a close message.
CancelCtx context.Context
}

// Creates a new Context with an empty dispatch table.
Expand All @@ -106,6 +111,7 @@ func NewContextCustomID(id string, opts ContextOptions) (*Context, error) {
ID: id,
SupportedSubProtocols: formatWebSocketSubProtocols(opts.ProtocolIds...),
origin: opts.Origin,
cancelCtx: opts.CancelCtx,
}, nil
}

Expand Down Expand Up @@ -133,6 +139,14 @@ func (blipCtx *Context) GetBytesReceived() uint64 {
return blipCtx.bytesReceived.Load()
}

// GetCancelCtx returns a cancellation context if it has been set in the ContextOptions. Otherwise returns non-cancellable context.
func (blipCtx *Context) GetCancelCtx() context.Context {
if blipCtx.cancelCtx != nil {
return blipCtx.cancelCtx
}
return context.TODO()
}

// DialOptions is used by DialConfig to oepn a BLIP connection.
type DialOptions struct {
URL string
Expand Down Expand Up @@ -208,6 +222,7 @@ func (blipCtx *Context) ActiveSubprotocol() string {

type BlipWebsocketServer struct {
blipCtx *Context
ctx context.Context // Cancellable context to trigger server stop
PostHandshakeCallback func(err error)
}

Expand Down
133 changes: 133 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ licenses/APL2.txt.
package blip

import (
"context"
"fmt"
"log"
"net"
Expand Down Expand Up @@ -577,6 +578,138 @@ func TestOrigin(t *testing.T) {
}
}

// TestServerContextClose tests closing server using cancellable context, ensure that clients are disconnected
//
// Test:
// - Start two blip contexts: an echo server and an echo client
// - The echo server is configured to respond to incoming echo requests and return responses
// - The echo client sends echo requests on a loop
// - Expected: the echo client should receive some sort of error when the server closes the connection, and should not block
func TestServerContextClose(t *testing.T) {

serverCancelCtx, cancelFunc := context.WithCancel(context.Background())
contextOptionsWithCancel := ContextOptions{
ProtocolIds: []string{BlipTestAppProtocolId},
CancelCtx: serverCancelCtx,
}
blipContextEchoServer, err := NewContext(contextOptionsWithCancel)
if err != nil {
t.Fatal(err)
}

receivedRequests := sync.WaitGroup{}

// ----------------- Setup Echo Server that will be closed via cancellation context -------------------------

// Create a blip profile handler to respond to echo requests
dispatchEcho := func(request *Message) {
defer receivedRequests.Done()
body, err := request.Body()
if err != nil {
log.Printf("ERROR reading body of %s: %s", request, err)
return
}
if request.Properties["Content-Type"] != "application/octet-stream" {
t.Fatalf("Incorrect properties: %#x", request.Properties)
}
if response := request.Response(); response != nil {
response.SetBody(body)
response.Properties["Content-Type"] = request.Properties["Content-Type"]
}
}

// Blip setup
blipContextEchoServer.HandlerForProfile["BLIPTest/EchoData"] = dispatchEcho
blipContextEchoServer.LogMessages = true
blipContextEchoServer.LogFrames = true

// Websocket Server
server := blipContextEchoServer.WebSocketServer()

// HTTP Handler wrapping websocket server
http.Handle("/TestServerContextClose", server)
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
defer listener.Close()
go func() {
err := http.Serve(listener, nil)
log.Printf("server goroutine closed with error: %v", err)
}()

// ----------------- Setup Echo Client ----------------------------------------
blipContextEchoClient, err := NewContext(defaultContextOptions)
if err != nil {
t.Fatal(err)
}
port := listener.Addr().(*net.TCPAddr).Port
destUrl := fmt.Sprintf("ws://localhost:%d/TestServerContextClose", port)
sender, err := blipContextEchoClient.Dial(destUrl)
if err != nil {
t.Fatalf("Error opening WebSocket: %v", err)
}

var closeWg, delayWg sync.WaitGroup

// Start a goroutine to send echo request every 100 ms, time out after 30s (if test fails)
delayWg.Add(1) // wait for connection and messages to be sent before cancelling server context
closeWg.Add(1) // wait for client to disconnect before exiting test
go func() {
defer closeWg.Done()
timeout := time.After(time.Second * 30)
ticker := time.NewTicker(time.Millisecond * 50)
echoCount := 0
for {
select {
case <-timeout:
t.Error("Echo client connection wasn't closed before timeout expired")
return
case <-ticker.C:
{
echoCount++
// After sending 10 echoes, close delayWg to trigger server-side cancellation
log.Printf("Sending echo %v", echoCount)
if echoCount == 10 {
delayWg.Done()
}
// Create echo request
echoResponseBody := []byte("hello")
echoRequest := NewRequest()
echoRequest.SetProfile("BLIPTest/EchoData")
echoRequest.Properties["Content-Type"] = "application/octet-stream"
echoRequest.SetBody(echoResponseBody)
receivedRequests.Add(1)
sent := sender.Send(echoRequest)
assert.True(t, sent)

// Read the echo response. Closed connection will result in empty response, as EOF message
// isn't currently returned by blip client
response := echoRequest.Response()
responseBody, err := response.Body()
assert.True(t, err == nil)
if len(responseBody) == 0 {
log.Printf("empty response, connection closed")
return
}

assert.Equal(t, echoResponseBody, responseBody)
}
}
}
}()

// Wait for client to start sending echo messages before stopping server
delayWg.Wait()

// Cancel context on server
cancelFunc()

// Wait for client echo loop to exit due to closed connection before exiting test
closeWg.Wait()

}

// assert that the server handshake callback is called with an error.
func assertHandlerError(t *testing.T, server *BlipWebsocketServer, wg *sync.WaitGroup) {
wg.Add(1)
Expand Down
3 changes: 1 addition & 2 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package blip

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -74,7 +73,7 @@ func (r *receiver) receiveLoop() error {

for {
// Receive the next raw WebSocket frame:
_, frame, err := r.conn.Read(context.TODO())
_, frame, err := r.conn.Read(r.context.GetCancelCtx())
if err != nil {
if isCloseError(err) {
// lower log level for close
Expand Down

0 comments on commit 13a798c

Please sign in to comment.