Skip to content

Commit

Permalink
feat: add server ping
Browse files Browse the repository at this point in the history
Adds an optional `heartbeatMs` key to the server config that will
ping each connected client on that interval - if they have not
responded with a pong since the last interval, the client will be
disconnected.

Fixes #113
  • Loading branch information
achingbrain committed Jul 30, 2024
1 parent 9881c23 commit 138a3ee
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 3 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
"ws": "^8.4.0"
},
"devDependencies": {
"@types/sinon": "^17.0.3",
"aegir": "^44.0.1",
"delay": "^6.0.0",
"it-all": "^3.0.1",
Expand All @@ -207,6 +208,7 @@
"it-ndjson": "^1.0.0",
"it-pipe": "^3.0.1",
"p-defer": "^4.0.0",
"sinon": "^18.0.0",
"wherearewe": "^2.0.1",
"wsurl": "^1.0.0"
},
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export { default as duplex } from './duplex.js'
export { default as source } from './source.js'
export { default as sink } from './sink.js'
export { createServer } from './server.js'
export { createServer, type WebSocketServer } from './server.js'
export { connect } from './client.js'
42 changes: 40 additions & 2 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,23 @@ import duplex, { type DuplexWebSocket } from './duplex.js'
import type WebSocket from './web-socket.js'
import type { VerifyClientCallbackSync, VerifyClientCallbackAsync, AddressInfo } from 'ws'

export interface ClientWebSocket extends WebSocket {
alive?: boolean
}

export interface ServerOptions {
key?: string
cert?: string
server?: http.Server | https.Server
verifyClient?: VerifyClientCallbackAsync | VerifyClientCallbackSync
onConnection?(connection: DuplexWebSocket): void

/**
* If specified, send a PING to every connected client, if
* they do not respond with a PONG before the next interval,
* terminate the connection
*/
heartbeatMs?: number
}

export interface WebSocketServer extends EventEmitter {
Expand All @@ -23,6 +34,8 @@ export interface WebSocketServer extends EventEmitter {
class Server extends EventEmitter {
private readonly server: http.Server | https.Server
private readonly wsServer: WSServer
private readonly heartbeatMs?: number
private heartbeatInterval?: ReturnType<typeof setInterval>

constructor (server: http.Server | https.Server, opts?: ServerOptions) {
super()
Expand All @@ -34,9 +47,26 @@ class Server extends EventEmitter {
verifyClient: opts.verifyClient
})
this.wsServer.on('connection', this.onWsServerConnection.bind(this))
this.heartbeatMs = opts?.heartbeatMs
}

async listen (addrInfo: { port: number } | number): Promise<WebSocketServer> {
if (this.heartbeatMs != null) {
this.heartbeatInterval = setInterval(() => {
this.wsServer.clients.forEach((client: ClientWebSocket) => {
// the client did not send a pong since the last heartbeat so
// terminate the connection
if (client.alive === false) {
client.terminate()
return
}

client.alive = false
client.ping()
})
}, this.heartbeatMs)
}

return new Promise<WebSocketServer>((resolve, reject) => {
this.wsServer.once('error', (e) => { reject(e) })
this.wsServer.once('listening', () => { resolve(this) })
Expand All @@ -45,6 +75,10 @@ class Server extends EventEmitter {
}

async close (): Promise<void> {
if (this.heartbeatInterval != null) {
clearInterval(this.heartbeatInterval)
}

await new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err != null) {
Expand All @@ -60,7 +94,7 @@ class Server extends EventEmitter {
return this.server.address()
}

onWsServerConnection (socket: WebSocket, req: http.IncomingMessage): void {
onWsServerConnection (socket: ClientWebSocket, req: http.IncomingMessage): void {
let addr: string | AddressInfo | null

try {
Expand All @@ -83,6 +117,10 @@ class Server extends EventEmitter {
return
}

socket.on('pong', () => {
socket.alive = true
})

const stream: DuplexWebSocket = {
...duplex(socket, {
remoteAddress: req.socket.remoteAddress,
Expand All @@ -100,7 +138,7 @@ export function createServer (opts?: ServerOptions): WebSocketServer {
opts = opts ?? {}

const server = opts.server ?? (opts.key != null && opts.cert != null ? https.createServer(opts) : http.createServer())
const wss = new Server(server)
const wss = new Server(server, opts)

if (opts.onConnection != null) {
wss.on('connection', opts.onConnection)
Expand Down
56 changes: 56 additions & 0 deletions test/server-ping.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { expect } from 'aegir/chai'
import delay from 'delay'
import Sinon from 'sinon'
import { isNode, isElectronMain } from 'wherearewe'
import * as WS from '../src/index.js'
import WebSocket from '../src/web-socket.js'

describe('ping', () => {
if (!(isNode || isElectronMain)) {
return
}

let server: WS.WebSocketServer
let client: WebSocket

afterEach(async () => {
if (client != null) {
client.close()
}

if (server != null) {
await server.close()
}
})

it('server should ping connected clients', async () => {
server = WS.createServer({
heartbeatMs: 10
})
await server.listen(55214)

client = new WebSocket('http://127.0.0.1:55214')
const pongSpy = Sinon.spy(client, 'pong')

await delay(200)

expect(client).to.have.property('readyState', WebSocket.OPEN)
expect(pongSpy).to.have.property('called', true)
})

it('server should disconnected unresponsive clients', async () => {
server = WS.createServer({
heartbeatMs: 10
})
await server.listen(55214)

client = new WebSocket('http://127.0.0.1:55214')

// make sure the client will not respond to a pong
client.pong = () => {}

await delay(200)

expect(client).to.have.property('readyState', WebSocket.CLOSED)
})
})

0 comments on commit 138a3ee

Please sign in to comment.