From f60da79937cdb008c471ee6a1d5594881733dc5b Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 15 Oct 2019 12:42:24 +0200 Subject: [PATCH] feat: use registrar --- .aegir.js | 63 ------ README.md | 131 +++++++++++- package.json | 28 +-- src/index.js | 396 +++++++++++++++-------------------- src/message/sign.js | 64 +++--- src/peer.js | 31 ++- test/fixtures/test-peer.json | 5 - test/instance.spec.js | 72 +++++++ test/pubsub.spec.js | 382 ++++++++++----------------------- test/sign.spec.js | 104 ++++----- test/utils/browser-bundle.js | 31 --- test/utils/constants.js | 40 ---- test/utils/index.js | 76 +++++-- test/utils/nodejs-bundle.js | 26 --- 14 files changed, 622 insertions(+), 827 deletions(-) delete mode 100644 .aegir.js delete mode 100644 test/fixtures/test-peer.json create mode 100644 test/instance.spec.js delete mode 100644 test/utils/browser-bundle.js delete mode 100644 test/utils/constants.js delete mode 100644 test/utils/nodejs-bundle.js diff --git a/.aegir.js b/.aegir.js deleted file mode 100644 index 6b18a9c..0000000 --- a/.aegir.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict' - -const pull = require('pull-stream') -const WebSocketStarRendezvous = require('libp2p-websocket-star-rendezvous') - -const Node = require('./test/utils/nodejs-bundle.js') -const { - getPeerRelay, - WS_RENDEZVOUS_MULTIADDR -} = require('./test/utils/constants.js') - -let wsRendezvous -let node - -const before = async () => { - [wsRendezvous, node] = await Promise.all([ - WebSocketStarRendezvous.start({ - port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port, - refreshPeerListIntervalMS: 1000, - strictMultiaddr: false, - cryptoChallenge: true - }), - new Promise(async (resolve) => { - const peerInfo = await getPeerRelay() - const n = new Node({ - peerInfo, - config: { - relay: { - enabled: true, - hop: { - enabled: true, - active: true - } - } - } - }) - - n.handle('/echo/1.0.0', (_, conn) => pull(conn, conn)) - await n.start() - - resolve(n) - }) - ]) -} - -const after = () => { - return new Promise((resolve) => { - setTimeout(async () => { - await Promise.all([ - node.stop(), - wsRendezvous.stop() - ]) - resolve() - }, 2000) - }) -} - -module.exports = { - hooks: { - pre: before, - post: after - } -} diff --git a/README.md b/README.md index 047d0e3..8ac7de9 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ js-libp2p-pubsub [![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) [![](https://img.shields.io/badge/pm-waffle-yellow.svg?style=flat-square)](https://waffle.io/libp2p/js-libp2p-pubsub) -> libp2p-pubsub consits on the base protocol for libp2p pubsub implementation. This module is responsible for all the logic regarding peer connections. +> libp2p-pubsub consists on the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol in libp2p, as well as all the logic regarding pubsub connections with other peers. ## Lead Maintainer @@ -22,6 +22,7 @@ js-libp2p-pubsub - [Install](#install) - [Usage](#usage) +- [API](#api) - [Contribute](#contribute) - [License](#license) @@ -33,23 +34,34 @@ js-libp2p-pubsub ## Usage -A pubsub implementation **MUST** override the `_processConnection`, `publish`, `subscribe` and `unsubscribe` functions. +`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algortithm, instead of also needing to create the setup for it. -Other functions, such as `_addPeer`, `_removePeer`, `_onDial`, `start` and `stop` may be overwritten if the pubsub implementation needs to add custom logic on them. It is important pointing out that `start` and `stop` **must** call `super`. The `start` function is responsible for mounting the pubsub protocol onto the libp2p node and sending its' subscriptions to every peer connected, while the `stop` function is responsible for unmounting the pubsub protocol and shutting down every connection +A pubsub implementation **MUST** override the `_processMessages`, `publish`, `subscribe`, `unsubscribe` and `getTopics` functions. + +Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to add custom logic on them. It is important pointing out that `start` and `stop` **must** call `super`. The `start` function is responsible for registering the pubsub protocol onto the libp2p node, while the `stop` function is responsible for unregistering the pubsub protocol and shutting down every connection All the remaining functions **MUST NOT** be overwritten. The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic. +TODO: add explanation for registrar! + ```JavaScript const Pubsub = require('libp2p-pubsub') class PubsubImplementation extends Pubsub { - constructor(libp2p) { - super('libp2p:pubsub', '/pubsub-implementation/1.0.0', libp2p) + constructor(peerInfo, registrar, options = {}) { + super({ + debugName: 'libp2p:pubsub', + multicodecs: '/pubsub-implementation/1.0.0', + peerInfo: peerInfo, + registrar: registrar, + signMessages: options.signMessages, + strictSigning: options.strictSigning + }) } - _processConnection(idB58Str, conn, peer) { + _processMessages(idB58Str, conn, peer) { // Required to be implemented by the subclass // Process each message accordingly } @@ -65,9 +77,114 @@ class PubsubImplementation extends Pubsub { unsubscribe() { // Required to be implemented by the subclass } + + getTopics() { + // Required to be implemented by the subclass + } } ``` +## API + +The following specified API should be the base API for a pubsub implementation on top of `libp2p`. + +### Start + +Start the pubsub subsystem. The protocol will be registered to `libp2p`, which will notify about peers being connected and disconnected with the protocol. + +#### `pubsub.start()` + +##### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves once pubsub starts | + +### Stop + +Stop the pubsub subsystem. The protocol will be unregistered to `libp2p`, which will remove all listeners for the protocol and the streams with other peers will be closed. + +#### `pubsub.stop()` + +##### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves once pubsub stops | + +### Publish + +Publish data messages to pubsub topics. + +#### `pubsub.publish(topics, messages)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topics | `Array|string` | set of pubsub topics | +| messages | `Array|any` | set of messages to publish | + +##### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves once messages are published to the network | + +### Subscribe + +Subscribe to the given topic(s). + +#### `pubsub.subscribe(topics)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topics | `Array|string` | set of pubsub topics | + +### Unsubscribe + +Unsubscribe from the given topic(s). + +#### `pubsub.unsubscribe(topics)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topics | `Array|string` | set of pubsub topics | + +### Get Topics + +Get the list of topics which the peer is subscribed to. + +#### `pubsub.getTopics()` + +##### Returns + +| Type | Description | +|------|-------------| +| `Array` | Array of subscribed topics | + +### Get Peers Subscribed to a topic + +Get a list of the peer-ids that are subscribed to one topic. + +#### `pubsub.getPeersSubscribed(topic)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | pubsub topic | + +##### Returns + +| Type | Description | +|------|-------------| +| `Array` | Array of base-58 peer id's | + ### Validate Validates the signature of a message. @@ -99,8 +216,6 @@ Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/js- This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). -[![](https://cdn.rawgit.com/jbenet/contribute-ipfs-gif/master/img/contribute.gif)](https://github.com/ipfs/community/blob/master/contributing.md) - ## License Copyright (c) Protocol Labs, Inc. under the **MIT License**. See [LICENSE file](./LICENSE) for details. diff --git a/package.json b/package.json index 5b1a5d3..78ceb4c 100644 --- a/package.json +++ b/package.json @@ -17,9 +17,6 @@ "coverage": "aegir coverage", "coverage-publish": "aegir coverage --provider coveralls" }, - "browser": { - "test/utils/nodejs-bundle": "./test/utils/browser-bundle.js" - }, "files": [ "src", "dist" @@ -45,35 +42,26 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-pubsub#readme", "devDependencies": { - "aegir": "^20.3.1", + "aegir": "^20.4.1", "benchmark": "^2.1.4", "chai": "^4.2.0", "chai-spies": "^1.0.0", "dirty-chai": "^2.0.1", - "libp2p": "~0.26.2", - "libp2p-secio": "~0.11.1", - "libp2p-spdy": "~0.13.3", - "libp2p-tcp": "~0.13.1", - "libp2p-websocket-star": "~0.10.2", - "libp2p-websocket-star-rendezvous": "~0.4.1", - "lodash": "^4.17.15", + "it-pair": "^1.0.0", "multiaddr": "^6.1.0", - "peer-id": "~0.12.2", - "peer-info": "~0.15.1" + "peer-id": "~0.13.3", + "peer-info": "~0.17.0" }, "dependencies": { - "async": "^2.6.2", "bs58": "^4.0.1", "debug": "^4.1.1", "err-code": "^2.0.0", - "length-prefixed-stream": "^2.0.0", + "it-length-prefixed": "^2.0.0", + "it-pipe": "^1.0.1", + "it-pushable": "^1.3.2", "libp2p-crypto": "~0.17.0", "protons": "^1.0.1", - "pull-length-prefixed": "^1.3.3", - "pull-pushable": "^2.2.0", - "pull-stream": "^3.6.14", - "sinon": "^7.5.0", - "time-cache": "~0.3.0" + "sinon": "^7.5.0" }, "contributors": [ "Cayman ", diff --git a/src/index.js b/src/index.js index 753f9d1..54188ab 100644 --- a/src/index.js +++ b/src/index.js @@ -1,57 +1,63 @@ 'use strict' -const EventEmitter = require('events') -const pull = require('pull-stream/pull') -const empty = require('pull-stream/sources/empty') -const TimeCache = require('time-cache') +const assert = require('assert') const debug = require('debug') +const EventEmitter = require('events') const errcode = require('err-code') -const Peer = require('./peer') +const PeerInfo = require('peer-info') + const message = require('./message') +const Peer = require('./peer') +const utils = require('./utils') const { signMessage, verifySignature } = require('./message/sign') -const utils = require('./utils') /** * PubsubBaseProtocol handles the peers and connections logic for pubsub routers */ class PubsubBaseProtocol extends EventEmitter { /** - * @param {String} debugName - * @param {String} multicodec - * @param {Object} libp2p libp2p implementation - * @param {Object} options - * @param {boolean} options.signMessages if messages should be signed, defaults to true - * @param {boolean} options.strictSigning if message signing should be required, defaults to true - * @constructor + * @param {Object} props + * @param {String} props.debugName log namespace + * @param {Array|string} props.multicodecs protocol identificers to connect + * @param {PeerInfo} props.peerInfo peer's peerInfo + * @param {Object} props.registrar registrar for libp2p protocols + * @param {function} props.registrar.register + * @param {function} props.registrar.unregister + * @param {boolean} [props.signMessages] if messages should be signed, defaults to true + * @param {boolean} [props.strictSigning] if message signing should be required, defaults to true + * @abstract */ - constructor (debugName, multicodec, libp2p, options) { - super() + constructor ({ + debugName, + multicodecs, + peerInfo, + registrar, + signMessages = true, + strictSigning = true + }) { + assert(debugName && typeof debugName === 'string', 'a debugname `string` is required') + assert(multicodecs, 'multicodecs are required') + assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') + + // registrar handling + assert(registrar && typeof registrar === 'object', 'a registrar object is required') // TODO: isRegistrar when it's implemented + assert(typeof registrar.register === 'function', 'a register function must be provided in registrar') + assert(typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar') - options = { - signMessages: true, - strictSigning: true, - ...options - } + super() this.log = debug(debugName) this.log.err = debug(`${debugName}:error`) - this.multicodec = multicodec - this.libp2p = libp2p - this.started = false - if (options.signMessages) { - this.peerId = this.libp2p.peerInfo.id - } + this.multicodecs = utils.ensureArray(multicodecs) + this.peerInfo = peerInfo + this.registrar = registrar - /** - * If message signing should be required for incoming messages - * @type {boolean} - */ - this.strictSigning = options.strictSigning + this.started = false /** * Map of topics to which peers are subscribed to @@ -60,13 +66,6 @@ class PubsubBaseProtocol extends EventEmitter { */ this.topics = new Map() - /** - * Cache of seen messages - * - * @type {TimeCache} - */ - this.seenCache = new TimeCache() - /** * Map of peers. * @@ -74,181 +73,151 @@ class PubsubBaseProtocol extends EventEmitter { */ this.peers = new Map() - // Dials that are currently in progress - this._dials = new Set() - - this._onConnection = this._onConnection.bind(this) - this._dialPeer = this._dialPeer.bind(this) - } - - /** - * Add a new connected peer to the peers map. - * @private - * @param {PeerInfo} peer peer info - * @returns {PeerInfo} - */ - _addPeer (peer) { - const id = peer.info.id.toB58String() - - /* - Always use an existing peer. + // Message signing + if (signMessages) { + this.peerId = this.peerInfo.id + } - What is happening here is: "If the other peer has already dialed to me, we already have - an establish link between the two, what might be missing is a - Connection specifically between me and that Peer" + /** + * If message signing should be required for incoming messages + * @type {boolean} */ - let existing = this.peers.get(id) - if (!existing) { - this.log('new peer', id) - this.peers.set(id, peer) - existing = peer - - peer.once('close', () => this._removePeer(peer)) - } - ++existing._references + this.strictSigning = strictSigning - return existing + this._onPeerConnected = this._onPeerConnected.bind(this) + this._onPeerDisconnected = this._onPeerDisconnected.bind(this) } /** - * Remove a peer from the peers map if it has no references. - * @private - * @param {Peer} peer peer state - * @returns {PeerInfo} + * Register the pubsub protocol onto the libp2p node. + * @returns {Promise} */ - _removePeer (peer) { - const id = peer.info.id.toB58String() - - this.log('remove', id, peer._references) - // Only delete when no one else is referencing this peer. - if (--peer._references === 0) { - this.log('delete peer', id) - this.peers.delete(id) + async start () { + if (this.started) { + return } + this.log('starting') - return peer + // register protocol with multicodec and handlers + await this.registrar.register(this.multicodecs, { + onConnect: this._onPeerConnected, + onDisconnect: this._onPeerDisconnected + }) + + this.log('started') + this.started = true } /** - * Dial a received peer. - * @private - * @param {PeerInfo} peerInfo peer info + * Unregister the pubsub protocol and the streams with other peers will be closed. * @returns {Promise} */ - _dialPeer (peerInfo) { - const idB58Str = peerInfo.id.toB58String() - - // If already have a PubSub conn, ignore - const peer = this.peers.get(idB58Str) - if (peer && peer.isConnected) { - return Promise.resolve() - } - - // If already dialing this peer, ignore - if (this._dials.has(idB58Str)) { - this.log('already dialing %s, ignoring dial attempt', idB58Str) - return Promise.resolve() + async stop () { + if (!this.started) { + return } - this._dials.add(idB58Str) - - this.log('dialing %s', idB58Str) - - return new Promise((resolve) => { - this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => { - this.log('dial to %s complete', idB58Str) - // If the dial is not in the set, it means that pubsub has been - // stopped - const pubsubStopped = !this._dials.has(idB58Str) - this._dials.delete(idB58Str) + // unregister protocol and handlers + await this.registrar.unregister(this.multicodecs) - if (err) { - this.log.err(err) - return resolve() - } - - // pubsub has been stopped, so we should just bail out - if (pubsubStopped) { - this.log('pubsub was stopped, not processing dial to %s', idB58Str) - return resolve() - } + this.log('stopping') + this.peers.forEach((peer) => peer.close()) - this._onDial(peerInfo, conn) - resolve() - }) - }) + this.peers = new Map() + this.started = false + this.log('stopped') } /** - * Dial a received peer. + * Registrar notifies a connection successfully with pubsub protocol. * @private - * @param {PeerInfo} peerInfo peer info + * @param {PeerInfo} peerInfo remote peer info * @param {Connection} conn connection to the peer */ - _onDial (peerInfo, conn) { + _onPeerConnected (peerInfo, conn) { const idB58Str = peerInfo.id.toB58String() this.log('connected', idB58Str) const peer = this._addPeer(new Peer(peerInfo)) peer.attachConnection(conn) + + this._processMessages(idB58Str, conn, peer) } /** - * On successful connection event. + * Registrar notifies a closing connection with pubsub protocol. * @private - * @param {String} protocol connection protocol - * @param {Connection} conn connection to the peer + * @param {PeerInfo} peerInfo peer info + * @param {Error} err error for connection end */ - _onConnection (protocol, conn) { - conn.getPeerInfo((err, peerInfo) => { - if (err) { - this.log.err('Failed to identify incomming conn', err) - return pull(empty(), conn) - } - - const idB58Str = peerInfo.id.toB58String() - const peer = this._addPeer(new Peer(peerInfo)) + _onPeerDisconnected (peerInfo, err) { + const idB58Str = peerInfo.id.toB58String() + const peer = this.peers.get(idB58Str) - this._processConnection(idB58Str, conn, peer) - }) + this.log('connection ended', idB58Str, err ? err.message : '') + this._removePeer(peer) } /** - * Overriding the implementation of _processConnection should keep the connection and is - * responsible for processing each RPC message received by other peers. - * @abstract - * @param {string} idB58Str peer id string in base58 - * @param {Connection} conn connection + * Add a new connected peer to the peers map. + * @private * @param {PeerInfo} peer peer info - * @returns {undefined} - * + * @returns {PeerInfo} */ - _processConnection (idB58Str, conn, peer) { - throw errcode('_processConnection must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + _addPeer (peer) { + const id = peer.info.id.toB58String() + + let existing = this.peers.get(id) + if (!existing) { + this.log('new peer', id) + this.peers.set(id, peer) + existing = peer + + peer.once('close', () => this._removePeer(peer)) + } + + return existing } /** - * On connection end event. + * Remove a peer from the peers map. * @private - * @param {string} idB58Str peer id string in base58 - * @param {PeerInfo} peer peer info - * @param {Error} err error for connection end + * @param {Peer} peer peer state + * @returns {PeerInfo} */ - _onConnectionEnd (idB58Str, peer, err) { - // socket hang up, means the one side canceled - if (err && err.message !== 'socket hang up') { - this.log.err(err) + _removePeer (peer) { + const id = peer.info.id.toB58String() + + this.log('delete peer', id) + this.peers.delete(id) + return peer + } + + /** + * Validates the given message. The signature will be checked for authenticity. + * @param {rpc.RPC.Message} message + * @returns {Promise} + */ + async validate (message) { // eslint-disable-line require-await + // If strict signing is on and we have no signature, abort + if (this.strictSigning && !message.signature) { + this.log('Signing required and no signature was present, dropping message:', message) + return Promise.resolve(false) } - this.log('connection ended', idB58Str, err ? err.message : '') - this._removePeer(peer) + // Check the message signature if present + if (message.signature) { + return verifySignature(message) + } else { + return Promise.resolve(true) + } } /** * Normalizes the message and signs it, if signing is enabled - * + * @private * @param {Message} message - * @returns {Message} + * @returns {Promise} */ _buildMessage (message) { const msg = utils.normalizeOutRpcMessage(message) @@ -259,17 +228,36 @@ class PubsubBaseProtocol extends EventEmitter { } } + /** + * Get a list of the peer-ids that are subscribed to one topic. + * @param {string} topic + * @returns {Array} + */ + getPeersSubscribed (topic) { + if (!this.started) { + throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') + } + + if (!topic || typeof topic !== 'string') { + throw errcode(new Error('a string topic must be provided'), 'ERR_NOT_VALID_TOPIC') + } + + return Array.from(this.peers.values()) + .filter((peer) => peer.topics.has(topic)) + .map((peer) => peer.info.id.toB58String()) + } + /** * Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation. * For example, a Floodsub implementation might simply publish each message to each topic for every peer * @abstract * @param {Array|string} topics * @param {Array|any} messages - * @returns {undefined} + * @returns {Promise} * */ publish (topics, messages) { - throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('publish must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** @@ -277,10 +265,10 @@ class PubsubBaseProtocol extends EventEmitter { * For example, a Floodsub implementation might simply send a message for every peer showing interest in the topics * @abstract * @param {Array|string} topics - * @returns {undefined} + * @returns {void} */ subscribe (topics) { - throw errcode('subscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('subscribe must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** @@ -288,78 +276,34 @@ class PubsubBaseProtocol extends EventEmitter { * For example, a Floodsub implementation might simply send a message for every peer revoking interest in the topics * @abstract * @param {Array|string} topics - * @returns {undefined} + * @returns {void} */ unsubscribe (topics) { - throw errcode('unsubscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('unsubscribe must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** - * Mounts the pubsub protocol onto the libp2p node and sends our - * subscriptions to every peer conneceted - * @returns {Promise} + * Overriding the implementation of getTopics should handle the appropriate algorithms for the publish/subscriber implementation. + * Get the list of subscriptions the peer is subscribed to. + * @abstract + * @returns {Array} */ - async start () { - if (this.started) { - throw errcode(new Error('already started'), 'ERR_ALREADY_STARTED') - } - this.log('starting') - - this.libp2p.handle(this.multicodec, this._onConnection) - - // Speed up any new peer that comes in my way - this.libp2p.on('peer:connect', this._dialPeer) - - // Dial already connected peers - const peerInfos = Object.values(this.libp2p.peerBook.getAll()) - - await Promise.all(peerInfos.map((peer) => this._dialPeer(peer))) - - this.log('started') - this.started = true + getTopics () { + throw errcode(new Error('getTopics must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** - * Unmounts the pubsub protocol and shuts down every connection + * Overriding the implementation of _processMessages should keep the connection and is + * responsible for processing each RPC message received by other peers. + * @abstract + * @param {string} idB58Str peer id string in base58 + * @param {Connection} conn connection + * @param {PeerInfo} peer peer info * @returns {void} + * */ - stop () { - if (!this.started) { - throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') - } - - this.libp2p.unhandle(this.multicodec) - this.libp2p.removeListener('peer:connect', this._dialPeer) - - // Prevent any dials that are in flight from being processed - this._dials = new Set() - - this.log('stopping') - this.peers.forEach((peer) => peer.close()) - - this.log('stopped') - this.peers = new Map() - this.started = false - } - - /** - * Validates the given message. The signature will be checked for authenticity. - * @param {rpc.RPC.Message} message - * @returns {Promise} - */ - async validate (message) { // eslint-disable-line require-await - // If strict signing is on and we have no signature, abort - if (this.strictSigning && !message.signature) { - this.log('Signing required and no signature was present, dropping message:', message) - return Promise.resolve(false) - } - - // Check the message signature if present - if (message.signature) { - return verifySignature(message) - } else { - return Promise.resolve(true) - } + _processMessages (idB58Str, conn, peer) { + throw errcode(new Error('_processMessages must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } } diff --git a/src/message/sign.js b/src/message/sign.js index 7e7e70b..1ccf118 100644 --- a/src/message/sign.js +++ b/src/message/sign.js @@ -11,25 +11,20 @@ const SignPrefix = Buffer.from('libp2p-pubsub:') * @param {Message} message * @returns {Promise} */ -function signMessage (peerId, message) { +async function signMessage (peerId, message) { // Get the message in bytes, and prepend with the pubsub prefix const bytes = Buffer.concat([ SignPrefix, Message.encode(message) ]) - return new Promise((resolve, reject) => { - // Sign the bytes with the private key - peerId.privKey.sign(bytes, (err, signature) => { - if (err) return reject(err) + const signature = await peerId.privKey.sign(bytes) - resolve({ - ...message, - signature: signature, - key: peerId.pubKey.bytes - }) - }) - }) + return { + ...message, + signature: signature, + key: peerId.pubKey.bytes + } } /** @@ -50,15 +45,8 @@ async function verifySignature (message) { // Get the public key const pubKey = await messagePublicKey(message) - // Verify the base message - return new Promise((resolve, reject) => { - pubKey.verify(bytes, message.signature, (err, res) => { - if (err) { - return reject(err) - } - resolve(res) - }) - }) + // verify the base message + return pubKey.verify(bytes, message.signature) } /** @@ -68,26 +56,24 @@ async function verifySignature (message) { * @param {Message} message * @returns {Promise} */ -function messagePublicKey (message) { - return new Promise((resolve, reject) => { - if (message.key) { - PeerId.createFromPubKey(message.key, (err, peerId) => { - if (err) return reject(err) - // the key belongs to the sender, return the key - if (peerId.isEqual(message.from)) return resolve(peerId.pubKey) - // We couldn't validate pubkey is from the originator, error - return reject(new Error('Public Key does not match the originator')) - }) +async function messagePublicKey (message) { + if (message.key) { + const peerId = await PeerId.createFromPubKey(message.key) + + // the key belongs to the sender, return the key + if (peerId.isEqual(message.from)) return peerId.pubKey + // We couldn't validate pubkey is from the originator, error + throw new Error('Public Key does not match the originator') + } else { + // should be available in the from property of the message (peer id) + const from = PeerId.createFromBytes(message.from) + + if (from.pubKey) { + return from.pubKey } else { - // should be available in the from property of the message (peer id) - const from = PeerId.createFromBytes(message.from) - if (from.pubKey) { - return resolve(from.pubKey) - } else { - reject(new Error('Could not get the public key from the originator id')) - } + throw new Error('Could not get the public key from the originator id') } - }) + } } module.exports = { diff --git a/src/peer.js b/src/peer.js index c1bb4ab..0939f93 100644 --- a/src/peer.js +++ b/src/peer.js @@ -1,10 +1,11 @@ 'use strict' -const lp = require('pull-length-prefixed') -const Pushable = require('pull-pushable') -const pull = require('pull-stream') const EventEmitter = require('events') +const lp = require('it-length-prefixed') +const pushable = require('it-pushable') +const pipe = require('it-pipe') + const { RPC } = require('./message') /** @@ -33,8 +34,6 @@ class Peer extends EventEmitter { * @type {Pushable} */ this.stream = null - - this._references = 0 } /** @@ -75,21 +74,22 @@ class Peer extends EventEmitter { * Attach the peer to a connection and setup a write stream * * @param {Connection} conn - * @returns {undefined} + * @returns {void} */ attachConnection (conn) { this.conn = conn - this.stream = new Pushable() - - pull( - this.stream, - lp.encode(), - conn, - pull.onEnd(() => { + this.stream = pushable({ + onEnd: () => { this.conn = null this.stream = null this.emit('close') - }) + } + }) + + pipe( + this.stream, + lp.encode(), + conn ) this.emit('connection') @@ -164,9 +164,6 @@ class Peer extends EventEmitter { * @returns {void} */ close () { - // Force removal of peer - this._references = 1 - // End the pushable if (this.stream) { this.stream.end() diff --git a/test/fixtures/test-peer.json b/test/fixtures/test-peer.json deleted file mode 100644 index 105046a..0000000 --- a/test/fixtures/test-peer.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "id": "Qmex1SSsueWFsUfjdkugJ5zhcnjddAt8TxcnDLUXKD9Sx7", - "privKey": "CAASqAkwggSkAgEAAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAECggEAdBUzV/GaQ0nmoQrWvOnUxmFIho7kCjkh1NwnNVPNc+Msa1r7pcI9wJNPwap8j1w4L/cZuYhOJgcg+o2mWFiuULKZ4F9Ro/M89gZ038457g2/2pPu43c/Xoi/2YcAHXg0Gr+OCe2zCIyITBWKAFqyAzL6DubAxrJW2Ezj1LrZ+EZgMyzbh/go/eEGSJaaGkINeAkY144DqDWWWvzyhKhryipsGkZGEkVy9xJgMEI3ipVvuPez2XAvoyyeuinBBLe+Z2vY5G50XXzbIMhIQGLncHf9MwTv6wt1ilyOSLOXK0BoQbB76J3R3is5dSULXXP9r8VocjLBEkmBuf4FXAKzoQKBgQDNNS4F1XE1gxD8LPkL+aB/hi6eVHVPhr+w0I/9ATikcLGeUfBM2Gd6cZRPFtNVrv1p6ZF1D1UyGDknGbDBSQd9wLUgb0fDoo3jKYMGWq6G+VvaP5rzWQeBV8YV2EhSmUk1i6kiYe2ZE8WyrPie7iwpQIY60e2A8Ly0GKZiBZUcHQKBgQC9YDAVsGnEHFVFkTDpvw5HwEzCgTb2A3NgkGY3rTYZ7L6AFjqCYmUwFB8Fmbyc4kdFWNh8wfmq5Qrvl49NtaeukiqWKUUlB8uPdztB1P0IahA2ks0owStZlRifmwfgYyMd4xE17lhaOgQQJZZPxmP0F6mdOvb3YJafNURCdMS51wKBgEvvIM+h0tmFXXSjQ6kNvzlRMtD92ccKysYn9xAdMpOO6/r0wSH+dhQWEVZO0PcE4NsfReb2PIVj90ojtIdhebcr5xpQc1LORQjJJKXmSmzBux6AqNrhl+hhzXfp56FA/Zkly/lgGWaqrV5XqUxOP+Mn8EO1yNgMvRc7g94DyNB1AoGBAKLBuXHalXwDsdHBUB2Eo3xNLGt6bEcRfia+0+sEBdxQGQWylQScFkU09dh1YaIf44sZKa5HdBFJGpYCVxo9hmjFnK5Dt/Z0daHOonIY4INLzLVqg8KECoLKXkhGEIXsDjFQhukn+G1LMVTDSSU055DQiWjlVX4UWD9qo0jOXIkvAoGBAMP50p2X6PsWWZUuuR7i1JOJHRyQZPWdHh9p8SSLnCtEpHYZfJr4INXNmhnSiB/3TUnHix2vVKjosjMTCk/CjfzXV2H41WPOLZ2/Pi3SxCicWIRj4kCcWhkEuIF2jGkg1+jmNiCl/zNMaBOAIP3QbDPtqOWbYlPd2YIzdj6WQ6R4", - "pubKey": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAE=" -} \ No newline at end of file diff --git a/test/instance.spec.js b/test/instance.spec.js new file mode 100644 index 0000000..e57a107 --- /dev/null +++ b/test/instance.spec.js @@ -0,0 +1,72 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-spies')) +const expect = chai.expect + +const PubsubBaseProtocol = require('../src') +const { createPeerInfo, mockRegistrar } = require('./utils') + +describe('should validate instance parameters', () => { + let peerInfo + + before(async () => { + peerInfo = await createPeerInfo() + }) + + it('should throw if no debugName is provided', () => { + expect(() => { + new PubsubBaseProtocol() // eslint-disable-line no-new + }).to.throw() + }) + + it('should throw if no multicodec is provided', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub' + }) + }).to.throw() + }) + + it('should throw if no peerInfo is provided', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0' + }) + }).to.throw() + }) + + it('should throw if an invalid peerInfo is provided', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: 'fake-peer-info' + }) + }).to.throw() + }) + + it('should throw if no registrar object is provided', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: peerInfo + }) + }).to.throw() + }) + + it('should accept valid parameters', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: peerInfo, + registrar: mockRegistrar + }) + }).not.to.throw() + }) +}) diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 44a2bd0..1e68d02 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,347 +6,197 @@ chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect const sinon = require('sinon') +const DuplexPair = require('it-pair/duplex') const PubsubBaseProtocol = require('../src') const { randomSeqno } = require('../src/utils') -const utils = require('./utils') -const createNode = utils.createNode - -class PubsubImplementation extends PubsubBaseProtocol { - constructor (libp2p) { - super('libp2p:pubsub', 'libp2p:pubsub-implementation', libp2p) - } - - publish (topics, messages) { - // ... - } - - subscribe (topics) { - // ... - } - - unsubscribe (topics) { - // ... - } - - _processConnection (idB58Str, conn, peer) { - // ... - } -} +const { createPeerInfo, mockRegistrar, PubsubImplementation } = require('./utils') describe('pubsub base protocol', () => { - afterEach(() => { - sinon.restore() - }) + describe('should start and stop properly', () => { + let pubsub + let sinonMockRegistrar + + beforeEach(async () => { + const peerInfo = await createPeerInfo() + sinonMockRegistrar = { + register: sinon.stub(), + unregister: sinon.stub() + } - describe('fresh nodes', () => { - let nodeA - let nodeB - let psA - let psB + pubsub = new PubsubBaseProtocol({ + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: peerInfo, + registrar: sinonMockRegistrar + }) - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) + expect(pubsub.peers.size).to.be.eql(0) }) - before('mount the pubsub protocol', () => { - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) + afterEach(() => { + sinon.restore() + }) - return new Promise((resolve) => { - setTimeout(() => { - expect(psA.peers.size).to.be.eql(0) - expect(psB.peers.size).to.be.eql(0) - resolve() - }, 50) - }) + it('should be able to start and stop', async () => { + await pubsub.start() + expect(sinonMockRegistrar.register.calledOnce).to.be.true() + + await pubsub.stop() + expect(sinonMockRegistrar.unregister.calledOnce).to.be.true() }) - before('start both Pubsub', () => { - return Promise.all([ - psA.start(), - psB.start() - ]) + it('should not throw to start if already started', async () => { + await pubsub.start() + await pubsub.start() + expect(sinonMockRegistrar.register.calledOnce).to.be.true() + + await pubsub.stop() + expect(sinonMockRegistrar.unregister.calledOnce).to.be.true() }) - after(() => { - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) + it('should not throw if stop before start', async () => { + await pubsub.stop() + expect(sinonMockRegistrar.register.calledOnce).to.be.false() + expect(sinonMockRegistrar.unregister.calledOnce).to.be.false() }) + }) - it('Dial from nodeA to nodeB', async () => { - await nodeA.dial(nodeB.peerInfo) + describe('should handle messages creating and signing', () => { + let peerInfo + let pubsub - return new Promise((resolve) => { - setTimeout(() => { - expect(psA.peers.size).to.equal(1) - expect(psB.peers.size).to.equal(1) - resolve() - }, 1000) + before(async () => { + peerInfo = await createPeerInfo() + pubsub = new PubsubBaseProtocol({ + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: peerInfo, + registrar: mockRegistrar }) }) + afterEach(() => { + sinon.restore() + }) + it('_buildMessage normalizes and signs messages', async () => { const message = { - from: psA.peerId.id, + from: peerInfo.id.id, data: 'hello', seqno: randomSeqno(), topicIDs: ['test-topic'] } - const signedMessage = await psA._buildMessage(message) - const verified = await psA.validate(signedMessage) + const signedMessage = await pubsub._buildMessage(message) + const verified = await pubsub.validate(signedMessage) expect(verified).to.eql(true) }) it('validate with strict signing off will validate a present signature', async () => { const message = { - from: psA.peerId.id, + from: peerInfo.id.id, data: 'hello', seqno: randomSeqno(), topicIDs: ['test-topic'] } - sinon.stub(psA, 'strictSigning').value(false) + sinon.stub(pubsub, 'strictSigning').value(false) - const signedMessage = await psA._buildMessage(message) - const verified = await psA.validate(signedMessage) + const signedMessage = await pubsub._buildMessage(message) + const verified = await pubsub.validate(signedMessage) expect(verified).to.eql(true) }) it('validate with strict signing requires a signature', async () => { const message = { - from: psA.peerId.id, + from: peerInfo.id.id, data: 'hello', seqno: randomSeqno(), topicIDs: ['test-topic'] } - const verified = await psA.validate(message) + const verified = await pubsub.validate(message) expect(verified).to.eql(false) }) }) - describe('dial the pubsub protocol on mount', () => { - let nodeA - let nodeB - let psA - let psB - - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - - await nodeA.dial(nodeB.peerInfo) - await new Promise((resolve) => setTimeout(resolve, 1000)) - }) - - after(() => { - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) - }) - - it('dial on pubsub on mount', async () => { - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) - - await Promise.all([ - psA.start(), - psB.start() - ]) - - expect(psA.peers.size).to.equal(1) - expect(psB.peers.size).to.equal(1) - }) - - it('stop both pubsubs', () => { - psA.stop() - psB.stop() - }) - }) - - describe('prevent concurrent dials', () => { - let sandbox - let nodeA - let nodeB - let psA - let psB - - before(async () => { - // sandbox = chai.spy.sandbox() - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) - - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) - - sandbox = chai.spy.sandbox() - - return psB.start() + describe('should be able to register two nodes', () => { + const protocol = '/pubsub/1.0.0' + let pubsubA, pubsubB + let peerInfoA, peerInfoB + const registrarRecordA = {} + const registrarRecordB = {} + + const registrar = (registrarRecord) => ({ + register: (multicodecs, handlers) => { + registrarRecord[multicodecs[0]] = handlers + }, + unregister: (multicodecs) => { + delete registrarRecord[multicodecs[0]] + } }) - after(() => { - sandbox.restore() + // mount pubsub + beforeEach(async () => { + peerInfoA = await createPeerInfo() + peerInfoB = await createPeerInfo() - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) + pubsubA = new PubsubImplementation(protocol, peerInfoA, registrar(registrarRecordA)) + pubsubB = new PubsubImplementation(protocol, peerInfoB, registrar(registrarRecordB)) }) - it('does not dial twice to same peer', async () => { - sandbox.on(psA, ['_onDial']) - - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - await psA.start() - - // Simulate a connection coming in from peer B at the same time. This - // causes pubsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - return new Promise((resolve) => { - // Check that only one dial was made - setTimeout(() => { - expect(psA._onDial).to.have.been.called.once() - resolve() - }, 1000) - }) - }) - }) - - describe('allow dials even after error', () => { - let sandbox - let nodeA - let nodeB - let psA - let psB - - before(async () => { - // sandbox = chai.spy.sandbox() - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() + // start pubsub + beforeEach(async () => { + await Promise.all([ + pubsubA.start(), + pubsubB.start() ]) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) - - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) - sandbox = chai.spy.sandbox() - - return psB.start() + expect(Object.keys(registrarRecordA)).to.have.lengthOf(1) + expect(Object.keys(registrarRecordB)).to.have.lengthOf(1) }) - after(() => { - sandbox.restore() + afterEach(() => { + sinon.restore() return Promise.all([ - nodeA.stop(), - nodeB.stop() + pubsubA.stop(), + pubsubB.stop() ]) }) - it('can dial again after error', async () => { - let firstTime = true - const dialProtocol = psA.libp2p.dialProtocol.bind(psA.libp2p) - sandbox.on(psA.libp2p, 'dialProtocol', (peerInfo, multicodec, cb) => { - // Return an error for the first dial - if (firstTime) { - firstTime = false - return cb(new Error('dial error')) - } - - // Subsequent dials proceed as normal - dialProtocol(peerInfo, multicodec, cb) - }) + it('should handle onConnect as expected', () => { + const onConnectA = registrarRecordA[protocol].onConnect + const onConnectB = registrarRecordB[protocol].onConnect - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - await psA.start() + // Notice peers of connection + const [d0, d1] = DuplexPair() + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) - // Simulate a connection coming in from peer B. This causes pubsub - // to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - return new Promise((resolve) => { - // Check that both dials were made - setTimeout(() => { - expect(psA.libp2p.dialProtocol).to.have.been.called.twice() - resolve() - }, 1000) - }) - }) - }) - - describe('prevent processing dial after stop', () => { - let sandbox - let nodeA - let nodeB - let psA - let psB - - before(async () => { - // sandbox = chai.spy.sandbox() - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) - - sandbox = chai.spy.sandbox() - - return Promise.all([ - psA.start(), - psB.start() - ]) - }) - - after(() => { - sandbox.restore() - - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) + expect(pubsubA.peers.size).to.be.eql(1) + expect(pubsubB.peers.size).to.be.eql(1) }) - it('does not process dial after stop', () => { - sandbox.on(psA, ['_onDial']) + it('should handle onDisconnect as expected', () => { + const onConnectA = registrarRecordA[protocol].onConnect + const onDisconnectA = registrarRecordA[protocol].onDisconnect + const onConnectB = registrarRecordB[protocol].onConnect + const onDisconnectB = registrarRecordB[protocol].onDisconnect - // Simulate a connection coming in from peer B at the same time. This - // causes pubsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) + // Notice peers of connection + const [d0, d1] = DuplexPair() + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) + onDisconnectA(peerInfoB) + onDisconnectB(peerInfoA) - // Stop pubsub before the dial can complete - psA.stop() - - return new Promise((resolve) => { - // Check that the dial was not processed - setTimeout(() => { - expect(psA._onDial).to.not.have.been.called() - resolve() - }, 1000) - }) + expect(pubsubA.peers.size).to.be.eql(0) + expect(pubsubB.peers.size).to.be.eql(0) }) }) }) diff --git a/test/sign.spec.js b/test/sign.spec.js index f9cadf1..573aac0 100644 --- a/test/sign.spec.js +++ b/test/sign.spec.js @@ -18,19 +18,12 @@ const { randomSeqno } = require('../src/utils') describe('message signing', () => { let peerId before(async () => { - peerId = await new Promise((resolve, reject) => { - peerId = PeerId.create({ - bits: 1024 - }, (err, id) => { - if (err) { - reject(err) - } - resolve(id) - }) + peerId = await PeerId.create({ + bits: 1024 }) }) - it('should be able to sign and verify a message', () => { + it('should be able to sign and verify a message', async () => { const message = { from: peerId.id, data: 'hello', @@ -39,60 +32,44 @@ describe('message signing', () => { } const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + const expectedSignature = await peerId.privKey.sign(bytesToSign) - return new Promise((resolve, reject) => { - peerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { - if (err) return reject(err) + const signedMessage = await signMessage(peerId, message) - const signedMessage = await signMessage(peerId, message) + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + expect(signedMessage.key).to.eql(peerId.pubKey.bytes) - // Check the signature and public key - expect(signedMessage.signature).to.eql(expectedSignature) - expect(signedMessage.key).to.eql(peerId.pubKey.bytes) - - // Verify the signature - const verified = await verifySignature(signedMessage) - expect(verified).to.eql(true) - - resolve() - }) - }) + // Verify the signature + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) }) - it('should be able to extract the public key from an inlined key', () => { - return new Promise((resolve, reject) => { - PeerId.create({ keyType: 'secp256k1', bits: 256 }, (err, secPeerId) => { - if (err) return reject(err) - - const message = { - from: secPeerId.id, - data: 'hello', - seqno: randomSeqno(), - topicIDs: ['test-topic'] - } + it('should be able to extract the public key from an inlined key', async () => { + const secPeerId = await PeerId.create({ keyType: 'secp256k1', bits: 256 }) - const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) - - secPeerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { - if (err) return reject(err) + const message = { + from: secPeerId.id, + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } - const signedMessage = await signMessage(secPeerId, message) + const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + const expectedSignature = await secPeerId.privKey.sign(bytesToSign) - // Check the signature and public key - expect(signedMessage.signature).to.eql(expectedSignature) - signedMessage.key = undefined + const signedMessage = await signMessage(secPeerId, message) - // Verify the signature - const verified = await verifySignature(signedMessage) - expect(verified).to.eql(true) + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + signedMessage.key = undefined - resolve() - }) - }) - }) + // Verify the signature + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) }) - it('should be able to extract the public key from the message', () => { + it('should be able to extract the public key from the message', async () => { const message = { from: peerId.id, data: 'hello', @@ -101,23 +78,16 @@ describe('message signing', () => { } const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + const expectedSignature = await peerId.privKey.sign(bytesToSign) - return new Promise((resolve, reject) => { - peerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { - if (err) return reject(err) + const signedMessage = await signMessage(peerId, message) - const signedMessage = await signMessage(peerId, message) + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + expect(signedMessage.key).to.eql(peerId.pubKey.bytes) - // Check the signature and public key - expect(signedMessage.signature).to.eql(expectedSignature) - expect(signedMessage.key).to.eql(peerId.pubKey.bytes) - - // Verify the signature - const verified = await verifySignature(signedMessage) - expect(verified).to.eql(true) - - resolve() - }) - }) + // Verify the signature + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) }) }) diff --git a/test/utils/browser-bundle.js b/test/utils/browser-bundle.js deleted file mode 100644 index 117e0a0..0000000 --- a/test/utils/browser-bundle.js +++ /dev/null @@ -1,31 +0,0 @@ -'use strict' - -const WebSocketStar = require('libp2p-websocket-star') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -const { WS_STAR_MULTIADDR } = require('./constants') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const starOpts = { id: peerInfo.id } - const wsStar = new WebSocketStar(starOpts) - - peerInfo.multiaddrs.add(WS_STAR_MULTIADDR) - - const modules = { - transport: [wsStar], - streamMuxer: [spdy], - connEncryption: [secio] - } - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node diff --git a/test/utils/constants.js b/test/utils/constants.js deleted file mode 100644 index 93516b1..0000000 --- a/test/utils/constants.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict' - -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const peerJSON = require('../fixtures/test-peer') -const multiaddr = require('multiaddr') - -let peerRelay = null - -/** - * Creates a `PeerInfo` that can be used across testing. Once the - * relay `PeerInfo` has been requested, it will be reused for each - * additional request. - * - * This is currently being used to create a relay on test bootstrapping - * so that it can be used by browser nodes during their test suite. This - * is necessary for running a TCP node during browser tests. - * @private - * @returns {Promise} - */ -module.exports.getPeerRelay = () => { - if (peerRelay) return peerRelay - - return new Promise((resolve, reject) => { - PeerId.createFromJSON(peerJSON, (err, peerId) => { - if (err) { - return reject(err) - } - peerRelay = new PeerInfo(peerId) - - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9200/ws') - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9245') - - resolve(peerRelay) - }) - }) -} - -module.exports.WS_STAR_MULTIADDR = multiaddr('/ip4/127.0.0.1/tcp/14444/ws/p2p-websocket-star/') -module.exports.WS_RENDEZVOUS_MULTIADDR = multiaddr('/ip4/127.0.0.1/tcp/14444/wss') diff --git a/test/utils/index.js b/test/utils/index.js index 3baadda..90cc9f0 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -1,26 +1,64 @@ 'use strict' +const lp = require('it-length-prefixed') +const pipe = require('it-pipe') + const PeerId = require('peer-id') const PeerInfo = require('peer-info') -const Node = require('./nodejs-bundle') - -const waterfall = require('async/waterfall') - -exports.createNode = () => { - return new Promise((resolve, reject) => { - waterfall([ - (cb) => PeerId.create({ bits: 1024 }, cb), - (id, cb) => PeerInfo.create(id, cb), - (peerInfo, cb) => { - cb(null, new Node({ peerInfo })) - }, - (node, cb) => node.start((err) => cb(err, node)) - ], (err, node) => { - if (err) { - return reject(err) - } - resolve(node) +const PubsubBaseProtocol = require('../../src') +const { message } = require('../../src') + +exports.createPeerInfo = async () => { + const peerId = await PeerId.create({ bits: 1024 }) + + return PeerInfo.create(peerId) +} + +class PubsubImplementation extends PubsubBaseProtocol { + constructor (protocol, peerInfo, registrar) { + super({ + debugName: 'libp2p:pubsub', + multicodecs: protocol, + peerInfo: peerInfo, + registrar: registrar }) - }) + } + + publish (topics, messages) { + // ... + } + + subscribe (topics) { + // ... + } + + unsubscribe (topics) { + // ... + } + + _processMessages (idB58Str, conn, peer) { + pipe( + conn, + lp.decode(), + async function collect (source) { + for await (const val of source) { + const rpc = message.rpc.RPC.decode(val) + + return rpc + } + } + ) + } +} + +exports.PubsubImplementation = PubsubImplementation + +exports.mockRegistrar = { + register: (multicodec, handlers) => { + + }, + unregister: (multicodec) => { + + } } diff --git a/test/utils/nodejs-bundle.js b/test/utils/nodejs-bundle.js deleted file mode 100644 index ef572d1..0000000 --- a/test/utils/nodejs-bundle.js +++ /dev/null @@ -1,26 +0,0 @@ -'use strict' - -const TCP = require('libp2p-tcp') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const modules = { - transport: [TCP], - streamMuxer: [spdy], - connEncryption: [secio] - } - - peerInfo.multiaddrs.add('/ip4/127.0.0.1/tcp/0') - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node