Skip to content
This repository was archived by the owner on Aug 29, 2023. It is now read-only.

feat: close server on maxConnections #218

Merged
merged 5 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as mafmt from '@multiformats/mafmt'
import errCode from 'err-code'
import { logger } from '@libp2p/logger'
import { toMultiaddrConnection } from './socket-to-conn.js'
import { TCPListener } from './listener.js'
import { CloseServerOnMaxConnectionsOpts, TCPListener } from './listener.js'
import { multiaddrToNetConfig } from './utils.js'
import { AbortError } from '@libp2p/interfaces/errors'
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
Expand Down Expand Up @@ -36,6 +36,12 @@ export interface TCPOptions {
* https://nodejs.org/api/net.html#servermaxconnections
*/
maxConnections?: number

/**
* Close server (stop listening for new connections) if connections exceed a limit.
* Open server (start listening for new connections) if connections fall below a limit.
*/
closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts
}

/**
Expand Down Expand Up @@ -191,6 +197,7 @@ class TCP implements Transport {
return new TCPListener({
...options,
maxConnections: this.opts.maxConnections,
closeServerOnMaxConnections: this.opts.closeServerOnMaxConnections,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout,
metrics: this.components.metrics
Expand Down
115 changes: 97 additions & 18 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { toMultiaddrConnection } from './socket-to-conn.js'
import { CODE_P2P } from './constants.js'
import {
getMultiaddrs,
multiaddrToNetConfig
multiaddrToNetConfig,
NetConfig
} from './utils.js'
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events'
import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection'
Expand All @@ -26,13 +27,22 @@ async function attemptClose (maConn: MultiaddrConnection) {
}
}

export interface CloseServerOnMaxConnectionsOpts {
/** Server listens once connection count is less than `listenBelow` */
listenBelow: number
/** Close server once connection count is greater than or equal to `closeAbove` */
closeAbove: number
onListenError?: (err: Error) => void
}

interface Context extends TCPCreateListenerOptions {
handler?: (conn: Connection) => void
upgrader: Upgrader
socketInactivityTimeout?: number
socketCloseTimeout?: number
maxConnections?: number
metrics?: Metrics
closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts
}

const SERVER_STATUS_UP = 1
Expand All @@ -44,7 +54,12 @@ export interface TCPListenerMetrics {
events: CounterGroup
}

type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }
type Status = {started: false} | {
started: true
listeningAddr: Multiaddr
peerId: string | null
netConfig: NetConfig
}

export class TCPListener extends EventEmitter<ListenerEvents> implements Listener {
private readonly server: net.Server
Expand All @@ -69,6 +84,13 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.server.maxConnections = context.maxConnections
}

if (context.closeServerOnMaxConnections != null) {
// Sanity check options
if (context.closeServerOnMaxConnections.closeAbove < context.closeServerOnMaxConnections.listenBelow) {
throw Error('closeAbove must be >= listenBelow')
}
}

this.server
.on('listening', () => {
if (context.metrics != null) {
Expand Down Expand Up @@ -159,12 +181,33 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene

socket.once('close', () => {
this.connections.delete(maConn)

if (
this.context.closeServerOnMaxConnections != null &&
this.connections.size < this.context.closeServerOnMaxConnections.listenBelow
) {
// The most likely case of error is if the port taken by this application is binded by
// another process during the time the server if closed. In that case there's not much
// we can do. netListen() will be called again every time a connection is dropped, which
// acts as an eventual retry mechanism. onListenError allows the consumer act on this.
this.netListen().catch(e => {
log.error('error attempting to listen server once connection count under limit', e)
this.context.closeServerOnMaxConnections?.onListenError?.(e as Error)
})
}
})

if (this.context.handler != null) {
this.context.handler(conn)
}

if (
this.context.closeServerOnMaxConnections != null &&
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
) {
this.netClose()
}

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
})
.catch(async err => {
Expand Down Expand Up @@ -220,34 +263,70 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

async listen (ma: Multiaddr) {
if (this.status.started) {
throw Error('server is already listening')
}

const peerId = ma.getPeerId()
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma

this.status = { started: true, listeningAddr, peerId }
this.status = {
started: true,
listeningAddr,
peerId,
netConfig: multiaddrToNetConfig(listeningAddr)
}

return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
this.server.on('error', (err) => {
reject(err)
})
this.server.listen(options, () => {
log('Listening on %s', this.server.address())
resolve()
})
})
await this.netListen()
}

async close () {
if (!this.server.listening) {
return
}

await Promise.all(
Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn))
)

// netClose already checks if server.listening
this.netClose()
}

private async netListen (): Promise<void> {
if (!this.status.started || this.server.listening) {
return
}

const netConfig = this.status.netConfig

await new Promise<void>((resolve, reject) => {
this.server.close(err => (err != null) ? reject(err) : resolve())
// NOTE: 'listening' event is only fired on success. Any error such as port already binded, is emitted via 'error'
this.server.once('error', reject)
this.server.listen(netConfig, resolve)
})

log('Listening on %s', this.server.address())
}

private netClose (): void {
if (!this.status.started || !this.server.listening) {
return
}

log('Closing server on %s', this.server.address())

// NodeJS implementation tracks listening status with `this._handle` property.
// - Server.close() sets this._handle to null immediately. If this._handle is null, ERR_SERVER_NOT_RUNNING is thrown
// - Server.listening returns `this._handle !== null` https://github.com/nodejs/node/blob/386d761943bb1b217fba27d6b80b658c23009e60/lib/net.js#L1675
// - Server.listen() if `this._handle !== null` throws ERR_SERVER_ALREADY_LISTEN
//
// NOTE: Both listen and close are technically not async actions, so it's not necessary to track
// states 'pending-close' or 'pending-listen'

// From docs https://nodejs.org/api/net.html#serverclosecallback
// Stops the server from accepting new connections and keeps existing connections.
// 'close' event is emitted only emitted when all connections are ended.
// The optional callback will be called once the 'close' event occurs.
//
// NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
// to pass a callback to close.
this.server.close()
}
}
4 changes: 3 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import path from 'path'

const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' }

export function multiaddrToNetConfig (addr: Multiaddr): ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) {
export type NetConfig = ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts)

export function multiaddrToNetConfig (addr: Multiaddr): NetConfig {
const listenPath = addr.getPath()

// unix socket listening
Expand Down
118 changes: 118 additions & 0 deletions test/max-connections-close.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import net from 'node:net'
import { promisify } from 'util'
import { expect } from 'aegir/chai'
import { mockUpgrader } from '@libp2p/interface-mocks'
import { multiaddr } from '@multiformats/multiaddr'
import { tcp } from '../src/index.js'
import type { TCPListener } from '../src/listener.js'

describe('close server on maxConnections', () => {
const afterEachCallbacks: Array<() => Promise<any> | any> = []
afterEach(async () => {
await Promise.all(afterEachCallbacks.map(fn => fn()))
afterEachCallbacks.length = 0
})

it('reject dial of connection above closeAbove', async () => {
const listenBelow = 2
const closeAbove = 3
const port = 9900

const seenRemoteConnections = new Set<string>()
const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })()

const upgrader = mockUpgrader()
const listener = trasnport.createListener({ upgrader }) as TCPListener
// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.push(() => listener.close())
await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`))

listener.addEventListener('connection', (conn) => {
seenRemoteConnections.add(conn.detail.remoteAddr.toString())
})

function createSocket (): net.Socket {
const socket = net.connect({ port })

// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.unshift(async () => {
if (!socket.destroyed) {
socket.destroy()
await new Promise((resolve) => socket.on('close', resolve))
}
})

return socket
}

async function assertConnectedSocket (i: number) {
const socket = createSocket()

await new Promise<void>((resolve, reject) => {
socket.once('connect', () => {
resolve()
})
socket.once('error', (err) => {
err.message = `Socket[${i}] ${err.message}`
reject(err)
})
})

return socket
}

async function assertRefusedSocket (i: number) {
const socket = createSocket()

await new Promise<void>((resolve, reject) => {
socket.once('connect', () => {
reject(Error(`Socket[${i}] connected but was expected to reject`))
})
socket.once('error', (err) => {
if (err.message.includes('ECONNREFUSED')) {
resolve()
} else {
err.message = `Socket[${i}] unexpected error ${err.message}`
reject(err)
}
})
})
}

async function assertServerConnections (connections: number) {
// Expect server connections but allow time for sockets to connect or disconnect
for (let i = 0; i < 100; i++) {
// eslint-disable-next-line @typescript-eslint/dot-notation
if (listener['connections'].size === connections) {
return
} else {
await promisify(setTimeout)(10)
}
}
// eslint-disable-next-line @typescript-eslint/dot-notation
expect(listener['connections'].size).equals(connections, 'Wrong server connections')
}

const socket1 = await assertConnectedSocket(1)
const socket2 = await assertConnectedSocket(2)
const socket3 = await assertConnectedSocket(3)
await assertServerConnections(3)
// Limit reached, server should be closed here
await assertRefusedSocket(4)
await assertRefusedSocket(5)
// Destroy sockets to be have connections < listenBelow
socket1.destroy()
socket2.destroy()
await assertServerConnections(1)
// Attempt to connect more sockets
const socket6 = await assertConnectedSocket(6)
const socket7 = await assertConnectedSocket(7)
await assertServerConnections(3)
// Limit reached, server should be closed here
await assertRefusedSocket(8)

expect(socket3.destroyed).equals(false, 'socket3 must not destroyed')
expect(socket6.destroyed).equals(false, 'socket6 must not destroyed')
expect(socket7.destroyed).equals(false, 'socket7 must not destroyed')
})
})