From 4fd7eb2e14c2ac30150060adb2c8aca4c5e295f3 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 20 Sep 2024 15:40:42 +0200 Subject: [PATCH] feat: add capability detection to metrics-devtools (#2708) Adds the ability to find configured services that possess a given capability and interact with that service via rpc. Starts with a configured PubSub service. --- packages/metrics-devtools/package.json | 3 +- packages/metrics-devtools/src/index.ts | 34 ++++++++++- packages/metrics-devtools/src/rpc/index.ts | 59 +++++++++++++------ packages/metrics-devtools/src/rpc/rpc.ts | 32 +++++++--- .../src/utils/find-capability.ts | 9 +++ .../src/utils/gather-capabilities.ts | 14 +++++ .../metrics-devtools/src/utils/get-pubsub.ts | 12 ++++ 7 files changed, 136 insertions(+), 27 deletions(-) create mode 100644 packages/metrics-devtools/src/utils/find-capability.ts create mode 100644 packages/metrics-devtools/src/utils/gather-capabilities.ts create mode 100644 packages/metrics-devtools/src/utils/get-pubsub.ts diff --git a/packages/metrics-devtools/package.json b/packages/metrics-devtools/package.json index 9f3829c8f6..baf76067dd 100644 --- a/packages/metrics-devtools/package.json +++ b/packages/metrics-devtools/package.json @@ -64,7 +64,8 @@ "doc-check": "aegir doc-check", "build": "aegir build", "test": "aegir test -t browser", - "test:chrome": "aegir test -t browser --cov" + "test:chrome": "aegir test -t browser --cov", + "test:firefox": "aegir test -t browser --browser firefox" }, "dependencies": { "@libp2p/interface": "^2.0.1", diff --git a/packages/metrics-devtools/src/index.ts b/packages/metrics-devtools/src/index.ts index cd988e060a..0e520768c7 100644 --- a/packages/metrics-devtools/src/index.ts +++ b/packages/metrics-devtools/src/index.ts @@ -16,7 +16,7 @@ * for Chrome or Firefox to inspect the state of your running node. */ -import { serviceCapabilities, start, stop } from '@libp2p/interface' +import { isPubSub, serviceCapabilities, start, stop } from '@libp2p/interface' import { simpleMetrics } from '@libp2p/simple-metrics' import { pipe } from 'it-pipe' import { pushable } from 'it-pushable' @@ -25,10 +25,11 @@ import { base64 } from 'multiformats/bases/base64' import { valueCodecs } from './rpc/index.js' import { metricsRpc } from './rpc/rpc.js' import { debounce } from './utils/debounce.js' +import { findCapability } from './utils/find-capability.js' import { getPeers } from './utils/get-peers.js' import { getSelf } from './utils/get-self.js' import type { DevToolsRPC } from './rpc/index.js' -import type { ComponentLogger, Connection, Libp2pEvents, Logger, Metrics, MultiaddrConnection, PeerId, PeerStore, Stream, ContentRouting, PeerRouting, TypedEventTarget, Startable } from '@libp2p/interface' +import type { ComponentLogger, Connection, Libp2pEvents, Logger, Metrics, MultiaddrConnection, PeerId, PeerStore, Stream, ContentRouting, PeerRouting, TypedEventTarget, Startable, Message, SubscriptionChangeData } from '@libp2p/interface' import type { TransportManager, Registrar, ConnectionManager, AddressManager } from '@libp2p/interface-internal' import type { Pushable } from 'it-pushable' @@ -175,6 +176,10 @@ class DevToolsMetrics implements Metrics, Startable { this.onSelfUpdate = debounce(this.onSelfUpdate.bind(this), 1000) this.onIncomingMessage = this.onIncomingMessage.bind(this) + // relay pubsub messages to dev tools panel + this.onPubSubMessage = this.onPubSubMessage.bind(this) + this.onPubSubSubscriptionChange = this.onPubSubSubscriptionChange.bind(this) + // collect metrics this.simpleMetrics = simpleMetrics({ intervalMs: this.intervalMs, @@ -257,6 +262,13 @@ class DevToolsMetrics implements Metrics, Startable { .catch(err => { this.log.error('error while reading RPC messages', err) }) + + const pubsub = findCapability('@libp2p/pubsub', this.components) + + if (isPubSub(pubsub)) { + pubsub.addEventListener('message', this.onPubSubMessage) + pubsub.addEventListener('subscription-change', this.onPubSubSubscriptionChange) + } } async stop (): Promise { @@ -287,6 +299,24 @@ class DevToolsMetrics implements Metrics, Startable { } } + private onPubSubMessage (event: CustomEvent): void { + this.devTools.safeDispatchEvent('pubsub:message', { + detail: event.detail + }) + .catch(err => { + this.log.error('error relaying pubsub message', err) + }) + } + + private onPubSubSubscriptionChange (event: CustomEvent): void { + this.devTools.safeDispatchEvent('pubsub:subscription-change', { + detail: event.detail + }) + .catch(err => { + this.log.error('error relaying pubsub subscription change', err) + }) + } + private onSelfUpdate (): void { Promise.resolve() .then(async () => { diff --git a/packages/metrics-devtools/src/rpc/index.ts b/packages/metrics-devtools/src/rpc/index.ts index 3ec19f14b2..b1f0f87c51 100644 --- a/packages/metrics-devtools/src/rpc/index.ts +++ b/packages/metrics-devtools/src/rpc/index.ts @@ -2,7 +2,7 @@ import { cidCodec } from './codecs/cid.js' import { customProgressEventCodec } from './codecs/custom-progress-event.js' import { multiaddrCodec } from './codecs/multiaddr.js' import { peerIdCodec } from './codecs/peer-id.js' -import type { ContentRouting, PeerId, PeerRouting, AbortOptions } from '@libp2p/interface' +import type { ContentRouting, PeerId, PeerRouting, AbortOptions, PubSubRPCMessage, SubscriptionChangeData } from '@libp2p/interface' import type { OpenConnectionOptions } from '@libp2p/interface-internal' import type { Multiaddr } from '@multiformats/multiaddr' import type { ValueCodec } from 'it-rpc' @@ -14,21 +14,6 @@ export const valueCodecs: Array> = [ customProgressEventCodec ] -export interface NodeAddress { - multiaddr: Multiaddr - listen?: boolean - announce?: boolean - observed?: boolean - default?: boolean -} - -export interface NodeStatus { - peerId: PeerId - agent?: string - addresses: NodeAddress[] - protocols: string[] -} - export interface PeerAddress { multiaddr: Multiaddr isConnected?: boolean @@ -69,7 +54,7 @@ export interface MetricsRPC { /** * Called by DevTools on initial connect */ - init(options?: AbortOptions): Promise<{ self: Peer, peers: Peer[], debug: string }> + init(options?: AbortOptions): Promise<{ self: Peer, peers: Peer[], debug: string, capabilities: Record }> /** * Update the currently active debugging namespaces @@ -95,6 +80,36 @@ export interface MetricsRPC { * Make peer routing queries */ peerRouting: PeerRouting + + /** + * PubSub operations + */ + pubsub: { + /** + * Subscribe to a PubSub topic + */ + subscribe(component: string, topic: string): Promise + + /** + * Unsubscribe from a PubSub topic + */ + unsubscribe(component: string, topic: string): Promise + + /** + * Get the list of subscriptions for the current node + */ + getTopics (component: string): Promise + + /** + * Get the list of peers we know about who subscribe to the topic + */ + getSubscribers (component: string, topic: string): Promise + + /** + * Publish a message to a given topic + */ + publish (component: string, topic: string, message: Uint8Array): Promise + } } export interface DevToolsEvents { @@ -112,6 +127,16 @@ export interface DevToolsEvents { * The node's connected peers have changed */ 'peers': CustomEvent + + /** + * A pubsub message was received + */ + 'pubsub:message': CustomEvent + + /** + * The subscriptions of a peer have changed + */ + 'pubsub:subscription-change': CustomEvent } /** diff --git a/packages/metrics-devtools/src/rpc/rpc.ts b/packages/metrics-devtools/src/rpc/rpc.ts index f1a4bef734..5d23d66a75 100644 --- a/packages/metrics-devtools/src/rpc/rpc.ts +++ b/packages/metrics-devtools/src/rpc/rpc.ts @@ -1,14 +1,14 @@ import { enable, disable } from '@libp2p/logger' import { peerIdFromString } from '@libp2p/peer-id' import { multiaddr } from '@multiformats/multiaddr' +import { gatherCapabilities } from '../utils/gather-capabilities.js' import { getPeers } from '../utils/get-peers.js' +import { getPubSub } from '../utils/get-pubsub.js' import { getSelf } from '../utils/get-self.js' import type { MetricsRPC } from './index.js' import type { DevToolsMetricsComponents } from '../index.js' import type { PeerId } from '@libp2p/interface' -import type { OpenConnectionOptions } from '@libp2p/interface-internal' import type { Multiaddr } from '@multiformats/multiaddr' -import type { AbortOptions } from 'it-pushable' export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC { const log = components.logger.forComponent('libp2p:devtools-metrics:metrics-rpc') @@ -18,10 +18,11 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC { return { self: await getSelf(components), peers: await getPeers(components, log), - debug: localStorage.getItem('debug') ?? '' + debug: localStorage.getItem('debug') ?? '', + capabilities: gatherCapabilities(components) } }, - setDebug: async (namespace?: string) => { + setDebug: async (namespace?) => { if (namespace?.length != null && namespace?.length > 0) { enable(namespace) localStorage.setItem('debug', namespace) @@ -30,7 +31,7 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC { localStorage.removeItem('debug') } }, - openConnection: async (peerIdOrMultiaddr: string, options?: OpenConnectionOptions) => { + openConnection: async (peerIdOrMultiaddr, options?) => { let peer: PeerId | Multiaddr try { @@ -41,7 +42,7 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC { await components.connectionManager.openConnection(peer, options) }, - closeConnection: async (peerId: PeerId, options?: AbortOptions) => { + closeConnection: async (peerId, options?) => { await Promise.all( components.connectionManager.getConnections(peerId) .map(async connection => { @@ -54,6 +55,23 @@ export function metricsRpc (components: DevToolsMetricsComponents): MetricsRPC { ) }, contentRouting: components.contentRouting, - peerRouting: components.peerRouting + peerRouting: components.peerRouting, + pubsub: { + async getTopics (component) { + return getPubSub(component, components).getTopics() + }, + async subscribe (component, topic) { + getPubSub(component, components).subscribe(topic) + }, + async unsubscribe (component, topic) { + getPubSub(component, components).unsubscribe(topic) + }, + async publish (component, topic, message) { + await getPubSub(component, components).publish(topic, message) + }, + async getSubscribers (component: string, topic: string) { + return getPubSub(component, components).getSubscribers(topic) + } + } } } diff --git a/packages/metrics-devtools/src/utils/find-capability.ts b/packages/metrics-devtools/src/utils/find-capability.ts new file mode 100644 index 0000000000..b38563de9f --- /dev/null +++ b/packages/metrics-devtools/src/utils/find-capability.ts @@ -0,0 +1,9 @@ +import { gatherCapabilities } from './gather-capabilities.js' + +export function findCapability (capability: string, components: any): any | undefined { + for (const [name, capabilities] of Object.entries(gatherCapabilities(components))) { + if (capabilities.includes(capability)) { + return components[name] + } + } +} diff --git a/packages/metrics-devtools/src/utils/gather-capabilities.ts b/packages/metrics-devtools/src/utils/gather-capabilities.ts new file mode 100644 index 0000000000..ecc971a64b --- /dev/null +++ b/packages/metrics-devtools/src/utils/gather-capabilities.ts @@ -0,0 +1,14 @@ +import { serviceCapabilities } from '@libp2p/interface' + +export function gatherCapabilities (components: any): Record { + const capabilities: Record = {} + const services: Record = components.components ?? components + + Object.entries(services).forEach(([name, component]) => { + if (component?.[serviceCapabilities] != null && Array.isArray(component[serviceCapabilities])) { + capabilities[name] = component[serviceCapabilities] + } + }) + + return capabilities +} diff --git a/packages/metrics-devtools/src/utils/get-pubsub.ts b/packages/metrics-devtools/src/utils/get-pubsub.ts new file mode 100644 index 0000000000..361d7c0106 --- /dev/null +++ b/packages/metrics-devtools/src/utils/get-pubsub.ts @@ -0,0 +1,12 @@ +import { InvalidParametersError, isPubSub } from '@libp2p/interface' +import type { PubSub } from '@libp2p/interface' + +export function getPubSub (component: string, components: any): PubSub { + const pubsub = components[component] + + if (!isPubSub(pubsub)) { + throw new InvalidParametersError(`Component ${component} did not implement the PubSub interface`) + } + + return pubsub +}