Skip to content
This repository was archived by the owner on Aug 29, 2023. It is now read-only.

feat: add metrics #217

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { Connection } from '@libp2p/interface-connection'
import { getMetrics, Metrics, MetricsRegister } from './metrics.js'

const log = logger('libp2p:tcp')

Expand Down Expand Up @@ -57,9 +58,11 @@ export interface TCPCreateListenerOptions extends CreateListenerOptions, TCPSock

class TCP implements Transport {
private readonly opts: TCPOptions
private readonly metrics: Metrics | null

constructor (options: TCPOptions = {}) {
constructor (options: TCPOptions = {}, metricsRegistry?: MetricsRegister | null) {
this.opts = options
this.metrics = metricsRegistry != null ? getMetrics(metricsRegistry) : null
}

get [symbol] (): true {
Expand All @@ -78,6 +81,7 @@ class TCP implements Transport {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
this.metrics?.socketEvents.inc({ event: 'error' })
})

const maConn = toMultiaddrConnection(socket, {
Expand Down Expand Up @@ -113,6 +117,7 @@ class TCP implements Transport {

const onTimeout = () => {
log('connection timeout %s', cOptsStr)
this.metrics?.listenerErrors.inc({ error: 'outbound_connection_timeout' })

const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
Expand Down Expand Up @@ -166,7 +171,8 @@ class TCP implements Transport {
...options,
maxConnections: this.opts.maxConnections,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
socketCloseTimeout: this.opts.socketCloseTimeout,
metrics: this.metrics
})
}

Expand Down
65 changes: 35 additions & 30 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti
import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TCPCreateListenerOptions } from './index.js'
import { ServerStatusMetric, Metrics } from './metrics.js'

const log = logger('libp2p:tcp:listener')

Expand All @@ -31,6 +32,7 @@ interface Context extends TCPCreateListenerOptions {
socketInactivityTimeout?: number
socketCloseTimeout?: number
maxConnections?: number
metrics: Metrics | null
}

type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }
Expand All @@ -39,6 +41,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()
private readonly metrics: Metrics | null

private status: Status = { started: false }

Expand All @@ -47,7 +50,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene

context.keepAlive = context.keepAlive ?? true

this.server = net.createServer(context, this.onSocket.bind(this))
this.server = net.createServer(context, socket => {
this.onSocket(socket).catch(e => log('onSocket error', e))
})

// https://nodejs.org/api/net.html#servermaxconnections
// If set reject connections when the server's connection count gets high
Expand All @@ -60,58 +65,58 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
.on('listening', () => this.dispatchEvent(new CustomEvent('listening')))
.on('error', err => this.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
.on('close', () => this.dispatchEvent(new CustomEvent('close')))

this.metrics = context.metrics
this.metrics?.connections.addCollect(() => {
this.metrics?.connections.set(this.connections.size)
this.metrics?.serverStatus.set(this.status.started ? ServerStatusMetric.started : ServerStatusMetric.stopped)
})
}

private onSocket (socket: net.Socket) {
private async onSocket (socket: net.Socket) {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
this.metrics?.socketEvents.inc({ event: 'error' })
})

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics
})
} catch (err) {
log.error('inbound connection failed', err)
this.metrics?.listenerErrors.inc({ error: 'inbound_to_connection' })
return
}

log('new inbound connection %s', maConn.remoteAddr)
try {
this.context.upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)
this.connections.add(maConn)

socket.once('close', () => {
this.connections.delete(maConn)
})

if (this.context.handler != null) {
this.context.handler(conn)
}

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
})
.catch(async err => {
log.error('inbound connection failed', err)

await attemptClose(maConn)
})
.catch(err => {
log.error('closing inbound connection failed', err)
})
const conn = await this.context.upgrader.upgradeInbound(maConn)
log('inbound connection %s upgraded', maConn.remoteAddr)
this.connections.add(maConn)

socket.once('close', () => {
this.connections.delete(maConn)
})

if (this.context.handler != null) {
this.context.handler(conn)
}

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
} catch (err) {
log.error('inbound connection failed', err)
this.metrics?.listenerErrors.inc({ error: 'inbound_upgrade' })

attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
})
attemptClose(maConn).catch(err => {
log.error('closing inbound connection failed', err)
this.metrics?.listenerErrors.inc({ error: 'inbound_closing_failed' })
})
}
}

Expand Down
64 changes: 64 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
export enum ServerStatusMetric {
stopped = 0,
started = 1
}

export function getMetrics (register: MetricsRegister) {
return {
serverStatus: register.gauge({
name: 'libp2p_tcp_server_status',
help: 'Current status of the TCP server'
}),

connections: register.gauge({
name: 'libp2p_tcp_connections_count',
help: 'Current active connections in TCP listener'
}),

listenerErrors: register.gauge<{ error: string }>({
name: 'libp2p_tcp_listener_errors_total',
help: 'Total count of TCP listener errors by error type',
labelNames: ['error']
}),

socketEvents: register.gauge<{ event: string }>({
name: 'libp2p_tcp_socket_events',
help: 'Total count of TCP socket events by event',
labelNames: ['event']
})
}
}

export type Metrics = ReturnType<typeof getMetrics>

/* eslint-disable etc/prefer-interface, @typescript-eslint/method-signature-style */

export interface MetricsRegister {
gauge<T extends LabelsGeneric>(config: GaugeConfig<T>): Gauge<T>
}

interface GaugeConfig<Labels extends LabelsGeneric> {
name: string
help: string
labelNames?: keyof Labels extends string ? Array<keyof Labels> : undefined
}

type LabelsGeneric = Record<string, string | undefined>
type CollectFn<Labels extends LabelsGeneric> = (metric: Gauge<Labels>) => void

interface Gauge<Labels extends LabelsGeneric = never> {
// Follows `prom-client` API choices, to require less middleware on consumer
inc(value?: number): void
inc(labels: Labels, value?: number): void
inc(arg1?: Labels | number, arg2?: number): void

dec(value?: number): void
dec(labels: Labels, value?: number): void
dec(arg1?: Labels | number, arg2?: number): void

set(value: number): void
set(labels: Labels, value: number): void
set(arg1?: Labels | number, arg2?: number): void

addCollect(collectFn: CollectFn<Labels>): void
}
9 changes: 7 additions & 2 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import errCode from 'err-code'
import type { Socket } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { MultiaddrConnection } from '@libp2p/interface-connection'
import type { Metrics } from './metrics.js'

const log = logger('libp2p:tcp:socket')

Expand All @@ -19,14 +20,15 @@ interface ToConnectionOptions {
signal?: AbortSignal
socketInactivityTimeout?: number
socketCloseTimeout?: number
metrics?: Metrics | null
}

/**
* Convert a socket into a MultiaddrConnection
* https://github.com/libp2p/interface-transport#multiaddrconnection
*/
export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => {
options = options ?? {}
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions) => {
const metrics = options.metrics
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT

Expand Down Expand Up @@ -61,6 +63,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s socket read timeout', lOptsStr)
metrics?.socketEvents.inc({ event: 'timeout' })

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
Expand All @@ -75,6 +78,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti

socket.once('close', () => {
log('%s socket read timeout', lOptsStr)
metrics?.socketEvents.inc({ event: 'close' })

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
Expand All @@ -88,6 +92,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('socket ended', maConn.remoteAddr.toString())
metrics?.socketEvents.inc({ event: 'end' })
})

const maConn: MultiaddrConnection = {
Expand Down