Skip to content

Commit

Permalink
feat: add capability detection to metrics-devtools (#2708)
Browse files Browse the repository at this point in the history
Adds the ability to find configured services that possess a given
capability and interact with that service via rpc.

Starts with a configured PubSub service.
  • Loading branch information
achingbrain authored Sep 20, 2024
1 parent 6ccbb06 commit 4fd7eb2
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 27 deletions.
3 changes: 2 additions & 1 deletion packages/metrics-devtools/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"doc-check": "aegir doc-check",
"build": "aegir build",
"test": "aegir test -t browser",
"test:chrome": "aegir test -t browser --cov"
"test:chrome": "aegir test -t browser --cov",
"test:firefox": "aegir test -t browser --browser firefox"
},
"dependencies": {
"@libp2p/interface": "^2.0.1",
Expand Down
34 changes: 32 additions & 2 deletions packages/metrics-devtools/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* for Chrome or Firefox to inspect the state of your running node.
*/

import { serviceCapabilities, start, stop } from '@libp2p/interface'
import { isPubSub, serviceCapabilities, start, stop } from '@libp2p/interface'
import { simpleMetrics } from '@libp2p/simple-metrics'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
Expand All @@ -25,10 +25,11 @@ import { base64 } from 'multiformats/bases/base64'
import { valueCodecs } from './rpc/index.js'
import { metricsRpc } from './rpc/rpc.js'
import { debounce } from './utils/debounce.js'
import { findCapability } from './utils/find-capability.js'
import { getPeers } from './utils/get-peers.js'
import { getSelf } from './utils/get-self.js'
import type { DevToolsRPC } from './rpc/index.js'
import type { ComponentLogger, Connection, Libp2pEvents, Logger, Metrics, MultiaddrConnection, PeerId, PeerStore, Stream, ContentRouting, PeerRouting, TypedEventTarget, Startable } from '@libp2p/interface'
import type { ComponentLogger, Connection, Libp2pEvents, Logger, Metrics, MultiaddrConnection, PeerId, PeerStore, Stream, ContentRouting, PeerRouting, TypedEventTarget, Startable, Message, SubscriptionChangeData } from '@libp2p/interface'
import type { TransportManager, Registrar, ConnectionManager, AddressManager } from '@libp2p/interface-internal'
import type { Pushable } from 'it-pushable'

Expand Down Expand Up @@ -175,6 +176,10 @@ class DevToolsMetrics implements Metrics, Startable {
this.onSelfUpdate = debounce(this.onSelfUpdate.bind(this), 1000)
this.onIncomingMessage = this.onIncomingMessage.bind(this)

// relay pubsub messages to dev tools panel
this.onPubSubMessage = this.onPubSubMessage.bind(this)
this.onPubSubSubscriptionChange = this.onPubSubSubscriptionChange.bind(this)

// collect metrics
this.simpleMetrics = simpleMetrics({
intervalMs: this.intervalMs,
Expand Down Expand Up @@ -257,6 +262,13 @@ class DevToolsMetrics implements Metrics, Startable {
.catch(err => {
this.log.error('error while reading RPC messages', err)
})

const pubsub = findCapability('@libp2p/pubsub', this.components)

if (isPubSub(pubsub)) {
pubsub.addEventListener('message', this.onPubSubMessage)
pubsub.addEventListener('subscription-change', this.onPubSubSubscriptionChange)
}
}

async stop (): Promise<void> {
Expand Down Expand Up @@ -287,6 +299,24 @@ class DevToolsMetrics implements Metrics, Startable {
}
}

private onPubSubMessage (event: CustomEvent<Message>): void {
this.devTools.safeDispatchEvent('pubsub:message', {
detail: event.detail
})
.catch(err => {
this.log.error('error relaying pubsub message', err)
})
}

private onPubSubSubscriptionChange (event: CustomEvent<SubscriptionChangeData>): void {
this.devTools.safeDispatchEvent('pubsub:subscription-change', {
detail: event.detail
})
.catch(err => {
this.log.error('error relaying pubsub subscription change', err)
})
}

private onSelfUpdate (): void {
Promise.resolve()
.then(async () => {
Expand Down
59 changes: 42 additions & 17 deletions packages/metrics-devtools/src/rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { cidCodec } from './codecs/cid.js'
import { customProgressEventCodec } from './codecs/custom-progress-event.js'
import { multiaddrCodec } from './codecs/multiaddr.js'
import { peerIdCodec } from './codecs/peer-id.js'
import type { ContentRouting, PeerId, PeerRouting, AbortOptions } from '@libp2p/interface'
import type { ContentRouting, PeerId, PeerRouting, AbortOptions, PubSubRPCMessage, SubscriptionChangeData } from '@libp2p/interface'
import type { OpenConnectionOptions } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ValueCodec } from 'it-rpc'
Expand All @@ -14,21 +14,6 @@ export const valueCodecs: Array<ValueCodec<any>> = [
customProgressEventCodec
]

export interface NodeAddress {
multiaddr: Multiaddr
listen?: boolean
announce?: boolean
observed?: boolean
default?: boolean
}

export interface NodeStatus {
peerId: PeerId
agent?: string
addresses: NodeAddress[]
protocols: string[]
}

export interface PeerAddress {
multiaddr: Multiaddr
isConnected?: boolean
Expand Down Expand Up @@ -69,7 +54,7 @@ export interface MetricsRPC {
/**
* Called by DevTools on initial connect
*/
init(options?: AbortOptions): Promise<{ self: Peer, peers: Peer[], debug: string }>
init(options?: AbortOptions): Promise<{ self: Peer, peers: Peer[], debug: string, capabilities: Record<string, string[]> }>

/**
* Update the currently active debugging namespaces
Expand All @@ -95,6 +80,36 @@ export interface MetricsRPC {
* Make peer routing queries
*/
peerRouting: PeerRouting

/**
* PubSub operations
*/
pubsub: {
/**
* Subscribe to a PubSub topic
*/
subscribe(component: string, topic: string): Promise<void>

/**
* Unsubscribe from a PubSub topic
*/
unsubscribe(component: string, topic: string): Promise<void>

/**
* Get the list of subscriptions for the current node
*/
getTopics (component: string): Promise<string[]>

/**
* Get the list of peers we know about who subscribe to the topic
*/
getSubscribers (component: string, topic: string): Promise<PeerId[]>

/**
* Publish a message to a given topic
*/
publish (component: string, topic: string, message: Uint8Array): Promise<void>
}
}

export interface DevToolsEvents {
Expand All @@ -112,6 +127,16 @@ export interface DevToolsEvents {
* The node's connected peers have changed
*/
'peers': CustomEvent<Peer[]>

/**
* A pubsub message was received
*/
'pubsub:message': CustomEvent<PubSubRPCMessage>

/**
* The subscriptions of a peer have changed
*/
'pubsub:subscription-change': CustomEvent<SubscriptionChangeData>
}

/**
Expand Down
32 changes: 25 additions & 7 deletions packages/metrics-devtools/src/rpc/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { enable, disable } from '@libp2p/logger'
import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import { gatherCapabilities } from '../utils/gather-capabilities.js'
import { getPeers } from '../utils/get-peers.js'
import { getPubSub } from '../utils/get-pubsub.js'
import { getSelf } from '../utils/get-self.js'
import type { MetricsRPC } from './index.js'
import type { DevToolsMetricsComponents } from '../index.js'
import type { PeerId } from '@libp2p/interface'
import type { OpenConnectionOptions } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { AbortOptions } from 'it-pushable'

export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {
const log = components.logger.forComponent('libp2p:devtools-metrics:metrics-rpc')
Expand All @@ -18,10 +18,11 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {
return {
self: await getSelf(components),
peers: await getPeers(components, log),
debug: localStorage.getItem('debug') ?? ''
debug: localStorage.getItem('debug') ?? '',
capabilities: gatherCapabilities(components)
}
},
setDebug: async (namespace?: string) => {
setDebug: async (namespace?) => {
if (namespace?.length != null && namespace?.length > 0) {
enable(namespace)
localStorage.setItem('debug', namespace)
Expand All @@ -30,7 +31,7 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {
localStorage.removeItem('debug')
}
},
openConnection: async (peerIdOrMultiaddr: string, options?: OpenConnectionOptions) => {
openConnection: async (peerIdOrMultiaddr, options?) => {
let peer: PeerId | Multiaddr

try {
Expand All @@ -41,7 +42,7 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {

await components.connectionManager.openConnection(peer, options)
},
closeConnection: async (peerId: PeerId, options?: AbortOptions) => {
closeConnection: async (peerId, options?) => {
await Promise.all(
components.connectionManager.getConnections(peerId)
.map(async connection => {
Expand All @@ -54,6 +55,23 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC {
)
},
contentRouting: components.contentRouting,
peerRouting: components.peerRouting
peerRouting: components.peerRouting,
pubsub: {
async getTopics (component) {
return getPubSub(component, components).getTopics()
},
async subscribe (component, topic) {
getPubSub(component, components).subscribe(topic)
},
async unsubscribe (component, topic) {
getPubSub(component, components).unsubscribe(topic)
},
async publish (component, topic, message) {
await getPubSub(component, components).publish(topic, message)
},
async getSubscribers (component: string, topic: string) {
return getPubSub(component, components).getSubscribers(topic)
}
}
}
}
9 changes: 9 additions & 0 deletions packages/metrics-devtools/src/utils/find-capability.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { gatherCapabilities } from './gather-capabilities.js'

export function findCapability (capability: string, components: any): any | undefined {
for (const [name, capabilities] of Object.entries(gatherCapabilities(components))) {
if (capabilities.includes(capability)) {
return components[name]
}
}
}
14 changes: 14 additions & 0 deletions packages/metrics-devtools/src/utils/gather-capabilities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { serviceCapabilities } from '@libp2p/interface'

export function gatherCapabilities (components: any): Record<string, string[]> {
const capabilities: Record<string, string[]> = {}
const services: Record<string, any> = components.components ?? components

Object.entries(services).forEach(([name, component]) => {
if (component?.[serviceCapabilities] != null && Array.isArray(component[serviceCapabilities])) {
capabilities[name] = component[serviceCapabilities]
}
})

return capabilities
}
12 changes: 12 additions & 0 deletions packages/metrics-devtools/src/utils/get-pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { InvalidParametersError, isPubSub } from '@libp2p/interface'
import type { PubSub } from '@libp2p/interface'

export function getPubSub (component: string, components: any): PubSub {
const pubsub = components[component]

if (!isPubSub(pubsub)) {
throw new InvalidParametersError(`Component ${component} did not implement the PubSub interface`)
}

return pubsub
}

0 comments on commit 4fd7eb2

Please sign in to comment.