Skip to content

Commit

Permalink
feat: provide callback for connection alive messages (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
sermojohn authored Mar 4, 2023
1 parent bb54675 commit aa8b2bc
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
27 changes: 17 additions & 10 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,17 @@ type SubscriptionProtocol interface {
// SubscriptionContext represents a shared context for protocol implementations with the websocket connection inside
type SubscriptionContext struct {
context.Context
websocketConn WebsocketConn
OnConnected func()
onDisconnected func()
cancel context.CancelFunc
subscriptions map[string]Subscription
disabledLogTypes []OperationMessageType
log func(args ...interface{})
acknowledged int32
exitStatusCodes []int
mutex sync.Mutex
websocketConn WebsocketConn
OnConnected func()
onDisconnected func()
onConnectionAlive func()
cancel context.CancelFunc
subscriptions map[string]Subscription
disabledLogTypes []OperationMessageType
log func(args ...interface{})
acknowledged int32
exitStatusCodes []int
mutex sync.Mutex
}

// Log prints condition logging with message type filters
Expand Down Expand Up @@ -419,6 +420,12 @@ func (sc *SubscriptionClient) OnDisconnected(fn func()) *SubscriptionClient {
return sc
}

// OnConnectionAlive event is triggered when the websocket receive a connection alive message (differs per protocol)
func (sc *SubscriptionClient) OnConnectionAlive(fn func()) *SubscriptionClient {
sc.context.onConnectionAlive = fn
return sc
}

// get internal client status
func (sc *SubscriptionClient) getClientStatus() int32 {
return atomic.LoadInt32(&sc.clientStatus)
Expand Down
3 changes: 3 additions & 0 deletions subscription_graphql_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func (gws *graphqlWS) OnMessage(ctx *SubscriptionContext, subscription Subscript
_ = gws.Unsubscribe(ctx, message.ID)
case GQLPing:
ctx.Log(message, "server", GQLPing)
if ctx.onConnectionAlive != nil {
ctx.onConnectionAlive()
}
// send pong response message back to the server
msg := OperationMessage{
Type: GQLPong,
Expand Down
3 changes: 3 additions & 0 deletions subscriptions_transport_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ func (stw *subscriptionsTransportWS) OnMessage(ctx *SubscriptionContext, subscri
_ = stw.Unsubscribe(ctx, message.ID)
case GQLConnectionKeepAlive:
ctx.Log(message, "server", GQLConnectionKeepAlive)
if ctx.onConnectionAlive != nil {
ctx.onConnectionAlive()
}
case GQLConnectionAck:
// Expected response to the ConnectionInit message from the client acknowledging a successful connection with the server.
// The client is now ready to request subscription operations.
Expand Down

0 comments on commit aa8b2bc

Please sign in to comment.