diff --git a/.aegir.js b/.aegir.js deleted file mode 100644 index f5b042b..0000000 --- a/.aegir.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict' - -const pull = require('pull-stream') -const WebSocketStarRendezvous = require('libp2p-websocket-star-rendezvous') - -const Node = require('./test/utils/nodejs-bundle.js') -const { - getPeerRelay, - WS_RENDEZVOUS_MULTIADDR -} = require('./test/utils/constants') - -let wsRendezvous -let node - -const before = async () => { - [wsRendezvous, node] = await Promise.all([ - WebSocketStarRendezvous.start({ - port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port, - refreshPeerListIntervalMS: 1000, - strictMultiaddr: false, - cryptoChallenge: true - }), - new Promise(async (resolve) => { - const peerInfo = await getPeerRelay() - const n = new Node({ - peerInfo, - config: { - relay: { - enabled: true, - hop: { - enabled: true, - active: true - } - } - } - }) - - n.handle('/echo/1.0.0', (_, conn) => pull(conn, conn)) - await n.start() - - resolve(n) - }) - ]) -} - -const after = () => { - return new Promise((resolve) => { - setTimeout(async () => { - await Promise.all([ - node.stop(), - wsRendezvous.stop() - ]) - resolve() - }, 2000) - }) -} - -module.exports = { - hooks: { - pre: before, - post: after - } -} diff --git a/README.md b/README.md index 6042102..d96fcfa 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ js-libp2p-floodsub - [Install](#install) - [Usage](#usage) - [API](#api) +- [Events](#events) - [Contribute](#contribute) - [License](#license) @@ -31,12 +32,22 @@ js-libp2p-floodsub > npm install libp2p-floodsub ``` -## Examples +## Usage ```JavaScript const FloodSub = require('libp2p-floodsub') -const fsub = new FloodSub(node) +const registrar = { + register: (multicodec, handlers) => { + // register multicodec to libp2p + // handlers will be used to notify pubsub of peer connection establishment or closing + }, + unregister: (multicodec) => { + + } +} + +const fsub = new FloodSub(peerInfo, registrar, options) await fsub.start() @@ -48,6 +59,21 @@ fsub.subscribe('fruit') fsub.publish('fruit', new Buffer('banana')) ``` +## API + +### Create a floodsub implementation + +```js +const options = {…} +const floodsub = new Floodsub(peerInfo, registrar, options) +``` + +Options is an optional object with the following key-value pairs: + +* **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **false**). + +For the remaining API, see https://github.com/libp2p/js-libp2p-pubsub + ## Events Floodsub emits two kinds of events: @@ -65,27 +91,11 @@ Floodsub emits two kinds of events: - `changes`: an array of `{ topicID: , subscribe: }` eg `[ { topicID: 'fruit', subscribe: true }, { topicID: 'vegetables': false } ]` - -## API - -### Create a floodsub implementation - -```js -const options = {…} -const floodsub = new Floodsub(libp2pNode, options) -``` - -Options is an optional object with the following key-value pairs: - -* **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **false**). - -For more, see https://libp2p.github.io/js-libp2p-floodsub - ## Contribute -PRs are welcome! +Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/js-libp2p-pubsub/issues)! -Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification. +This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). ## License diff --git a/benchmarks/index.js b/benchmarks/index.js index 475b355..2ad540f 100644 --- a/benchmarks/index.js +++ b/benchmarks/index.js @@ -2,81 +2,78 @@ const Benchmark = require('benchmark') const crypto = require('crypto') -const map = require('async/map') -const parallel = require('async/parallel') -const series = require('async/series') -const PSG = require('../src') -const utils = require('../test/utils') +const DuplexPair = require('it-pair/duplex') + +const Floodsub = require('../src') +const { multicodec } = require('../src') +const { createPeerInfo } = require('../test/utils') const suite = new Benchmark.Suite('pubsub') // Simple benchmark, how many messages can we send from // one node to another. -map([0, 1], (i, cb) => { - utils.createNode((err, node) => { - if (err) { - return cb(err) +;(async () => { + const registrarRecordA = {} + const registrarRecordB = {} + + const registrar = (registrarRecord) => ({ + register: (multicodec, handlers) => { + registrarRecord[multicodec] = handlers + }, + unregister: (multicodec) => { + delete registrarRecord[multicodec] } + }) - const ps = new PSG(node) + const [peerInfoA, peerInfoB] = await Promise.all([ + createPeerInfo(), + createPeerInfo() + ]) - series([ - (cb) => node.start(cb), - (cb) => ps.start(cb) - ], (err) => { - if (err) { - return cb(err) - } + const fsA = new Floodsub(peerInfoA, registrar(registrarRecordA)) + const fsB = new Floodsub(peerInfoB, registrar(registrarRecordB)) - cb(null, { - libp2p: node, - ps - }) - }) - }) -}, (err, peers) => { - if (err) { - throw err - } - - parallel([ - (cb) => peers[0].libp2p.dial(peers[1].libp2p.peerInfo, cb), - (cb) => setTimeout(() => { - peers[0].ps.subscribe('Z', () => {}, () => {}) - peers[1].ps.subscribe('Z', () => {}, () => {}) - cb(null, peers) - }, 200) - ], (err, res) => { - if (err) { - throw err - } + // Start pubsub + await Promise.all([ + fsA.start(), + fsB.start() + ]) - const peers = res[1] + // Connect floodsub nodes + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect - suite.add('publish and receive', (deferred) => { - const onMsg = (msg) => { - deferred.resolve() - peers[1].ps.removeListener('Z', onMsg) - } + // Notice peers of connection + const [d0, d1] = DuplexPair() + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) - peers[1].ps.on('Z', onMsg) + fsA.subscribe('Z') + fsB.subscribe('Z') - peers[0].ps.publish('Z', crypto.randomBytes(1024)) - }, { - defer: true - }) + suite.add('publish and receive', (deferred) => { + const onMsg = (msg) => { + deferred.resolve() + fsB.removeListener('Z', onMsg) + } + + fsB.on('Z', onMsg) - suite - .on('cycle', (event) => { - console.log(String(event.target)) // eslint-disable-line - }) - .on('complete', () => { - process.exit() - }) - .run({ - async: true - }) + fsA.publish('Z', crypto.randomBytes(1024)) + }, { + defer: true }) -}) + + suite + .on('cycle', (event) => { + console.log(String(event.target)) // eslint-disable-line + }) + .on('complete', () => { + process.exit() + }) + .run({ + async: true + }) +})() diff --git a/examples/pub-sub-1-topic/publisher.js b/examples/pub-sub-1-topic/publisher.js deleted file mode 100644 index 8fd5fb1..0000000 --- a/examples/pub-sub-1-topic/publisher.js +++ /dev/null @@ -1,54 +0,0 @@ -'use strict' - -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const multiaddr = require('multiaddr') -const Node = require('libp2p-ipfs-nodejs') -const FloodSub = require('../../src') -const series = require('async/series') - -const privateKey = 'CAASpgkwggSiAgEAAoIBAQC2SKo/HMFZeBml1AF3XijzrxrfQXdJzjePBZAbdxqKR1Mc6juRHXij6HXYPjlAk01BhF1S3Ll4Lwi0cAHhggf457sMg55UWyeGKeUv0ucgvCpBwlR5cQ020i0MgzjPWOLWq1rtvSbNcAi2ZEVn6+Q2EcHo3wUvWRtLeKz+DZSZfw2PEDC+DGPJPl7f8g7zl56YymmmzH9liZLNrzg/qidokUv5u1pdGrcpLuPNeTODk0cqKB+OUbuKj9GShYECCEjaybJDl9276oalL9ghBtSeEv20kugatTvYy590wFlJkkvyl+nPxIH0EEYMKK9XRWlu9XYnoSfboiwcv8M3SlsjAgMBAAECggEAZtju/bcKvKFPz0mkHiaJcpycy9STKphorpCT83srBVQi59CdFU6Mj+aL/xt0kCPMVigJw8P3/YCEJ9J+rS8BsoWE+xWUEsJvtXoT7vzPHaAtM3ci1HZd302Mz1+GgS8Epdx+7F5p80XAFLDUnELzOzKftvWGZmWfSeDnslwVONkL/1VAzwKy7Ce6hk4SxRE7l2NE2OklSHOzCGU1f78ZzVYKSnS5Ag9YrGjOAmTOXDbKNKN/qIorAQ1bovzGoCwx3iGIatQKFOxyVCyO1PsJYT7JO+kZbhBWRRE+L7l+ppPER9bdLFxs1t5CrKc078h+wuUr05S1P1JjXk68pk3+kQKBgQDeK8AR11373Mzib6uzpjGzgNRMzdYNuExWjxyxAzz53NAR7zrPHvXvfIqjDScLJ4NcRO2TddhXAfZoOPVH5k4PJHKLBPKuXZpWlookCAyENY7+Pd55S8r+a+MusrMagYNljb5WbVTgN8cgdpim9lbbIFlpN6SZaVjLQL3J8TWH6wKBgQDSChzItkqWX11CNstJ9zJyUE20I7LrpyBJNgG1gtvz3ZMUQCn3PxxHtQzN9n1P0mSSYs+jBKPuoSyYLt1wwe10/lpgL4rkKWU3/m1Myt0tveJ9WcqHh6tzcAbb/fXpUFT/o4SWDimWkPkuCb+8j//2yiXk0a/T2f36zKMuZvujqQKBgC6B7BAQDG2H2B/ijofp12ejJU36nL98gAZyqOfpLJ+FeMz4TlBDQ+phIMhnHXA5UkdDapQ+zA3SrFk+6yGk9Vw4Hf46B+82SvOrSbmnMa+PYqKYIvUzR4gg34rL/7AhwnbEyD5hXq4dHwMNsIDq+l2elPjwm/U9V0gdAl2+r50HAoGALtsKqMvhv8HucAMBPrLikhXP/8um8mMKFMrzfqZ+otxfHzlhI0L08Bo3jQrb0Z7ByNY6M8epOmbCKADsbWcVre/AAY0ZkuSZK/CaOXNX/AhMKmKJh8qAOPRY02LIJRBCpfS4czEdnfUhYV/TYiFNnKRj57PPYZdTzUsxa/yVTmECgYBr7slQEjb5Onn5mZnGDh+72BxLNdgwBkhO0OCdpdISqk0F0Pxby22DFOKXZEpiyI9XYP1C8wPiJsShGm2yEwBPWXnrrZNWczaVuCbXHrZkWQogBDG3HGXNdU4MAWCyiYlyinIBpPpoAJZSzpGLmWbMWh28+RJS6AQX6KHrK1o2uw==' - -let nodePublisher -let psPublisher - -function bootNode (next) { - PeerId.createFromPrivKey(privateKey, (err, idPublisher) => { - if (err) { - throw err - } - const peerPublisher = new PeerInfo(idPublisher) - peerPublisher.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/12345')) - nodePublisher = new Node(peerPublisher) - nodePublisher.start((err) => { - console.log('Publisher listening on:') - - peerPublisher.multiaddrs.forEach((ma) => { - console.log(ma.toString() + '/ipfs/' + idPublisher.toB58String()) - }) - next(err) - }) - }) -} - -function setUpPS (next) { - console.log('attaching pubsub') - psPublisher = new FloodSub(nodePublisher) - psPublisher.start(next) -} - -function publishMsg (err) { - if (err) { - throw err - } - - setInterval(() => { - process.stdout.write('.') - psPublisher.publish('interop', Buffer.from('hey, how is it going?')) - }, 300) -} - -series([ - bootNode, - setUpPS -], publishMsg) diff --git a/examples/pub-sub-1-topic/subscriber.js b/examples/pub-sub-1-topic/subscriber.js deleted file mode 100644 index ccacec3..0000000 --- a/examples/pub-sub-1-topic/subscriber.js +++ /dev/null @@ -1 +0,0 @@ -'use strict' diff --git a/package.json b/package.json index 4a5be86..26d1ab8 100644 --- a/package.json +++ b/package.json @@ -17,9 +17,6 @@ "coverage": "aegir coverage", "coverage-publish": "aegir coverage --provider coveralls" }, - "browser": { - "test/utils/nodejs-bundle": "./test/utils/browser-bundle.js" - }, "files": [ "src", "dist" @@ -45,34 +42,28 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-floodsub#readme", "devDependencies": { - "aegir": "^20.3.1", + "aegir": "^20.4.1", "benchmark": "^2.1.4", "chai": "^4.2.0", "chai-spies": "^1.0.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", - "libp2p": "~0.26.2", - "libp2p-secio": "~0.11.1", - "libp2p-spdy": "~0.13.3", - "libp2p-tcp": "~0.13.2", - "libp2p-websocket-star": "~0.10.2", - "libp2p-websocket-star-rendezvous": "~0.4.1", + "it-pair": "^1.0.0", "lodash": "^4.17.15", "multiaddr": "^6.1.0", - "peer-id": "~0.12.5", - "peer-info": "~0.15.1", + "p-defer": "^3.0.0", + "peer-id": "~0.13.3", + "peer-info": "~0.17.0", "sinon": "^7.5.0" }, "dependencies": { - "async": "^2.6.2", "debug": "^4.1.1", - "length-prefixed-stream": "^2.0.0", + "it-length-prefixed": "^2.0.0", + "it-pipe": "^1.0.1", "libp2p-pubsub": "libp2p/js-libp2p-pubsub#refactor/async", "p-map": "^3.0.0", "protons": "^1.0.1", - "pull-length-prefixed": "^1.3.3", - "pull-pushable": "^2.2.0", - "pull-stream": "^3.6.9" + "time-cache": "^0.3.0" }, "contributors": [ "Alan Shaw ", diff --git a/src/index.js b/src/index.js index 154488f..d00abe8 100644 --- a/src/index.js +++ b/src/index.js @@ -1,17 +1,22 @@ 'use strict' -const pull = require('pull-stream') -const lp = require('pull-length-prefixed') const assert = require('assert') +const debug = require('debug') +const debugName = 'libp2p:floodsub' +const log = debug(debugName) +log.error = debug(`${debugName}:error`) +const pipe = require('it-pipe') +const lp = require('it-length-prefixed') +const pMap = require('p-map') +const TimeCache = require('time-cache') + +const PeerInfo = require('peer-info') const BaseProtocol = require('libp2p-pubsub') const { message, utils } = require('libp2p-pubsub') -const config = require('./config') +const { multicodec } = require('./config') -const multicodec = config.multicodec const ensureArray = utils.ensureArray -const setImmediate = require('async/setImmediate') -const pMap = require('p-map') /** * FloodSub (aka dumbsub is an implementation of pubsub focused on @@ -20,13 +25,22 @@ const pMap = require('p-map') */ class FloodSub extends BaseProtocol { /** - * @param {Object} libp2p an instance of Libp2p + * @param {PeerInfo} peerInfo instance of the peer's PeerInfo + * @param {Object} registrar + * @param {function} registrar.register + * @param {function} registrar.unregister * @param {Object} [options] * @param {boolean} options.emitSelf if publish should emit to self, if subscribed, defaults to false * @constructor */ - constructor (libp2p, options = {}) { - super('libp2p:floodsub', multicodec, libp2p, options) + constructor (peerInfo, registrar, options = {}) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') + + // registrar handling + assert(registrar && typeof registrar.register === 'function', 'a register function must be provided in registrar') + assert(registrar && typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar') + + super(debugName, multicodec, peerInfo, registrar, options) /** * List of our subscriptions @@ -34,6 +48,13 @@ class FloodSub extends BaseProtocol { */ this.subscriptions = new Set() + /** + * Cache of seen messages + * + * @type {TimeCache} + */ + this.seenCache = new TimeCache() + /** * Pubsub options */ @@ -41,16 +62,18 @@ class FloodSub extends BaseProtocol { emitSelf: false, ...options } + + this._onRpc = this._onRpc.bind(this) } /** - * Dial a received peer. + * Peer connected successfully with pubsub protocol. * @override * @param {PeerInfo} peerInfo peer info * @param {Connection} conn connection to the peer */ - _onDial (peerInfo, conn) { - super._onDial(peerInfo, conn) + _onPeerConnected (peerInfo, conn) { + super._onPeerConnected(peerInfo, conn) const idB58Str = peerInfo.id.toB58String() const peer = this.peers.get(idB58Str) @@ -70,16 +93,24 @@ class FloodSub extends BaseProtocol { * @returns {void} * */ - _processConnection (idB58Str, conn, peer) { - pull( - conn, - lp.decode(), - pull.map((data) => message.rpc.RPC.decode(data)), - pull.drain( - (rpc) => this._onRpc(idB58Str, rpc), - (err) => this._onConnectionEnd(idB58Str, peer, err) + async _processMessages (idB58Str, conn, peer) { + const onRpcFunc = this._onRpc + try { + await pipe( + conn, + lp.decode(), + // paramap(source, data => message.rpc.RPC.decode(data)), + async function collect (source) { + for await (const data of source) { + const rpc = Buffer.isBuffer(data) ? data : data.slice() + + onRpcFunc(idB58Str, message.rpc.RPC.decode(rpc)) + } + } ) - ) + } catch (err) { + this._onPeerDisconnected(peer, err) + } } /** @@ -93,7 +124,7 @@ class FloodSub extends BaseProtocol { return } - this.log('rpc from', idB58Str) + log('rpc from', idB58Str) const subs = rpc.subscriptions const msgs = rpc.msgs @@ -136,7 +167,7 @@ class FloodSub extends BaseProtocol { } if (error || !isValid) { - this.log('Message could not be validated, dropping it. isValid=%s', isValid, error) + log('Message could not be validated, dropping it. isValid=%s', isValid, error) return } @@ -167,7 +198,7 @@ class FloodSub extends BaseProtocol { peer.sendMessages(utils.normalizeOutRpcMessages(messages)) - this.log('publish msgs on topics', topics, peer.info.id.toB58String()) + log('publish msgs on topics', topics, peer.info.id.toB58String()) }) } @@ -175,10 +206,9 @@ class FloodSub extends BaseProtocol { * Unmounts the floodsub protocol and shuts down every connection * @override * @returns {void} - * */ - stop () { - super.stop() + async stop () { + await super.stop() this.subscriptions = new Set() } @@ -189,17 +219,16 @@ class FloodSub extends BaseProtocol { * @param {Array|string} topics * @param {Array|any} messages * @returns {Promise} - * */ async publish (topics, messages) { assert(this.started, 'FloodSub is not started') - this.log('publish', topics, messages) + log('publish', topics, messages) topics = ensureArray(topics) messages = ensureArray(messages) - const from = this.libp2p.peerInfo.id.toB58String() + const from = this.peerInfo.id.toB58String() const buildMessage = (msg) => { const seqno = utils.randomSeqno() @@ -259,10 +288,7 @@ class FloodSub extends BaseProtocol { * @returns {void} */ unsubscribe (topics) { - // Avoid race conditions, by quietly ignoring unsub when shutdown. - if (!this.started) { - return - } + assert(this.started, 'FloodSub is not started') topics = ensureArray(topics) @@ -273,11 +299,20 @@ class FloodSub extends BaseProtocol { function checkIfReady (peer) { if (peer && peer.isWritable) { peer.sendUnsubscriptions(topics) - } else { - setImmediate(checkIfReady.bind(peer)) } } } + + /** + * Get the list of topics which the peer is subscribed to. + * @override + * @returns {Array} + */ + getTopics () { + assert(this.started, 'FloodSub is not started') + + return Array.from(this.subscriptions) + } } module.exports = FloodSub diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index e6590ad..289c30b 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -6,226 +6,264 @@ const chai = require('chai') chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect + +const pDefer = require('p-defer') const times = require('lodash/times') +const DuplexPair = require('it-pair/duplex') const FloodSub = require('../src') -const utils = require('./utils') -const first = utils.first -const createNode = utils.createNode -const expectSet = utils.expectSet +const { multicodec } = require('../src') +const { first, createPeerInfo, expectSet } = require('./utils') + +const defOptions = { + emitSelf: true +} + +function shouldNotHappen (_) { + expect.fail() +} describe('basics between 2 nodes', () => { describe('fresh nodes', () => { - let nodeA - let nodeB - let fsA - let fsB + let peerInfoA, peerInfoB + let fsA, fsB - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) + const registrarRecordA = {} + const registrarRecordB = {} + + const registrar = (registrarRecord) => ({ + register: (multicodec, handlers) => { + registrarRecord[multicodec] = handlers + }, + unregister: (multicodec) => { + delete registrarRecord[multicodec] + } }) - after(() => { - return Promise.all([ - nodeA.stop(), - nodeB.stop() + // Mount pubsub protocol + before(async () => { + [peerInfoA, peerInfoB] = await Promise.all([ + createPeerInfo(), + createPeerInfo() ]) + + fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions) + fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions) + + expect(fsA.peers.size).to.be.eql(0) + expect(fsA.subscriptions.size).to.eql(0) + expect(fsB.peers.size).to.be.eql(0) + expect(fsB.subscriptions.size).to.eql(0) }) - it('Mount the pubsub protocol', () => { - fsA = new FloodSub(nodeA, { emitSelf: true }) - fsB = new FloodSub(nodeB, { emitSelf: true }) + // Start pubsub + before(() => Promise.all([ + fsA.start(), + fsB.start() + ])) - return new Promise((resolve) => { - setTimeout(() => { - expect(fsA.peers.size).to.be.eql(0) - expect(fsA.subscriptions.size).to.eql(0) - expect(fsB.peers.size).to.be.eql(0) - expect(fsB.subscriptions.size).to.eql(0) - resolve() - }, 50) - }) + // Connect floodsub nodes + before(() => { + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = DuplexPair() + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) + + expect(fsA.peers.size).to.be.eql(1) + expect(fsB.peers.size).to.be.eql(1) }) - it('start both FloodSubs', () => { + after(() => { return Promise.all([ - fsA.start(), - fsB.start() + fsA.started && fsA.stop(), + fsB.started && fsB.stop() ]) }) - it('Dial from nodeA to nodeB', async () => { - await nodeA.dial(nodeB.peerInfo) - - return new Promise((resolve) => { - setTimeout(() => { - expect(fsA.peers.size).to.equal(1) - expect(fsB.peers.size).to.equal(1) - resolve() - }, 1000) - }) - }) - it('Subscribe to a topic:Z in nodeA', () => { - return new Promise((resolve) => { - fsA.subscribe('Z') - fsB.once('floodsub:subscription-change', (changedPeerInfo, changedTopics, changedSubs) => { - expectSet(fsA.subscriptions, ['Z']) - expect(fsB.peers.size).to.equal(1) - expectSet(first(fsB.peers).topics, ['Z']) - expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String()) - expectSet(changedTopics, ['Z']) - expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: true }]) - resolve() - }) + const defer = pDefer() + + fsA.subscribe('Z') + fsB.once('floodsub:subscription-change', (changedPeerInfo, changedTopics, changedSubs) => { + expectSet(fsA.subscriptions, ['Z']) + expect(fsB.peers.size).to.equal(1) + expectSet(first(fsB.peers).topics, ['Z']) + expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String()) + expectSet(changedTopics, ['Z']) + expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: true }]) + defer.resolve() }) + + return defer.promise }) it('Publish to a topic:Z in nodeA', () => { - return new Promise((resolve) => { - fsA.once('Z', (msg) => { - expect(msg.data.toString()).to.equal('hey') - fsB.removeListener('Z', shouldNotHappen) - resolve() - }) + const defer = pDefer() - fsB.once('Z', shouldNotHappen) - - fsA.publish('Z', Buffer.from('hey')) + fsA.once('Z', (msg) => { + expect(msg.data.toString()).to.equal('hey') + fsB.removeListener('Z', shouldNotHappen) + defer.resolve() }) + + fsB.once('Z', shouldNotHappen) + + fsA.publish('Z', Buffer.from('hey')) + + return defer.promise }) it('Publish to a topic:Z in nodeB', () => { - return new Promise((resolve) => { - fsA.once('Z', (msg) => { - fsA.once('Z', shouldNotHappen) - expect(msg.data.toString()).to.equal('banana') + const defer = pDefer() - setTimeout(() => { - fsA.removeListener('Z', shouldNotHappen) - fsB.removeListener('Z', shouldNotHappen) - resolve() - }, 100) - }) + fsA.once('Z', (msg) => { + fsA.once('Z', shouldNotHappen) + expect(msg.data.toString()).to.equal('banana') - fsB.once('Z', shouldNotHappen) + setTimeout(() => { + fsA.removeListener('Z', shouldNotHappen) + fsB.removeListener('Z', shouldNotHappen) - fsB.publish('Z', Buffer.from('banana')) + defer.resolve() + }, 100) }) + + fsB.once('Z', shouldNotHappen) + + fsB.publish('Z', Buffer.from('banana')) + + return defer.promise }) it('Publish 10 msg to a topic:Z in nodeB', () => { + const defer = pDefer() let counter = 0 fsB.once('Z', shouldNotHappen) + fsA.on('Z', receivedMsg) - return new Promise((resolve) => { - fsA.on('Z', receivedMsg) + function receivedMsg (msg) { + expect(msg.data.toString()).to.equal('banana') + expect(msg.from).to.be.eql(fsB.peerInfo.id.toB58String()) + expect(Buffer.isBuffer(msg.seqno)).to.be.true() + expect(msg.topicIDs).to.be.eql(['Z']) - function receivedMsg (msg) { - expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(fsB.libp2p.peerInfo.id.toB58String()) - expect(Buffer.isBuffer(msg.seqno)).to.be.true() - expect(msg.topicIDs).to.be.eql(['Z']) + if (++counter === 10) { + fsA.removeListener('Z', receivedMsg) + fsB.removeListener('Z', shouldNotHappen) - if (++counter === 10) { - fsA.removeListener('Z', receivedMsg) - fsB.removeListener('Z', shouldNotHappen) - resolve() - } + defer.resolve() } - times(10, () => fsB.publish('Z', Buffer.from('banana'))) - }) + } + times(10, () => fsB.publish('Z', Buffer.from('banana'))) + + return defer.promise }) it('Publish 10 msg to a topic:Z in nodeB as array', () => { + const defer = pDefer() let counter = 0 fsB.once('Z', shouldNotHappen) + fsA.on('Z', receivedMsg) - return new Promise((resolve) => { - fsA.on('Z', receivedMsg) + function receivedMsg (msg) { + expect(msg.data.toString()).to.equal('banana') + expect(msg.from).to.be.eql(fsB.peerInfo.id.toB58String()) + expect(Buffer.isBuffer(msg.seqno)).to.be.true() + expect(msg.topicIDs).to.be.eql(['Z']) - function receivedMsg (msg) { - expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(fsB.libp2p.peerInfo.id.toB58String()) - expect(Buffer.isBuffer(msg.seqno)).to.be.true() - expect(msg.topicIDs).to.be.eql(['Z']) + if (++counter === 10) { + fsA.removeListener('Z', receivedMsg) + fsB.removeListener('Z', shouldNotHappen) - if (++counter === 10) { - fsA.removeListener('Z', receivedMsg) - fsB.removeListener('Z', shouldNotHappen) - resolve() - } + defer.resolve() } + } - const msgs = [] - times(10, () => msgs.push(Buffer.from('banana'))) - fsB.publish('Z', msgs) - }) + const msgs = [] + times(10, () => msgs.push(Buffer.from('banana'))) + fsB.publish('Z', msgs) + + return defer.promise }) it('Unsubscribe from topic:Z in nodeA', () => { + const defer = pDefer() + fsA.unsubscribe('Z') expect(fsA.subscriptions.size).to.equal(0) - return new Promise((resolve) => { - fsB.once('floodsub:subscription-change', (changedPeerInfo, changedTopics, changedSubs) => { - expect(fsB.peers.size).to.equal(1) - expectSet(first(fsB.peers).topics, []) - expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String()) - expectSet(changedTopics, []) - expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: false }]) - resolve() - }) + fsB.once('floodsub:subscription-change', (changedPeerInfo, changedTopics, changedSubs) => { + expect(fsB.peers.size).to.equal(1) + expectSet(first(fsB.peers).topics, []) + expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String()) + expectSet(changedTopics, []) + expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: false }]) + + defer.resolve() }) + + return defer.promise }) it('Publish to a topic:Z in nodeA nodeB', () => { + const defer = pDefer() + fsA.once('Z', shouldNotHappen) fsB.once('Z', shouldNotHappen) - return new Promise((resolve) => { - setTimeout(() => { - fsA.removeListener('Z', shouldNotHappen) - fsB.removeListener('Z', shouldNotHappen) - resolve() - }, 100) + setTimeout(() => { + fsA.removeListener('Z', shouldNotHappen) + fsB.removeListener('Z', shouldNotHappen) + defer.resolve() + }, 100) - fsB.publish('Z', Buffer.from('banana')) - fsA.publish('Z', Buffer.from('banana')) - }) - }) + fsB.publish('Z', Buffer.from('banana')) + fsA.publish('Z', Buffer.from('banana')) - it('stop both FloodSubs', () => { - fsA.stop() - fsB.stop() + return defer.promise }) }) describe('nodes send state on connection', () => { - let nodeA - let nodeB - let fsA - let fsB + let peerInfoA, peerInfoB + let fsA, fsB + + const registrarRecordA = {} + const registrarRecordB = {} + const registrar = (registrarRecord) => ({ + register: (multicodec, handlers) => { + registrarRecord[multicodec] = handlers + }, + unregister: (multicodec) => { + delete registrarRecord[multicodec] + } + }) + + // Mount pubsub protocol before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() + [peerInfoA, peerInfoB] = await Promise.all([ + createPeerInfo(), + createPeerInfo() ]) - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) + fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions) + fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions) + }) - await Promise.all([ - fsA.start(), - fsB.start() - ]) + // Start pubsub + before(() => Promise.all([ + fsA.start(), + fsB.start() + ])) + // Make subscriptions prior to new nodes + before(() => { fsA.subscribe('Za') fsB.subscribe('Zb') @@ -237,14 +275,25 @@ describe('basics between 2 nodes', () => { after(() => { return Promise.all([ - nodeA.stop(), - nodeB.stop() + fsA.started && fsA.stop(), + fsB.started && fsB.stop() ]) }) it('existing subscriptions are sent upon peer connection', async () => { await Promise.all([ - nodeA.dial(nodeB.peerInfo), + // nodeA.dial(nodeB.peerInfo), + new Promise((resolve) => { + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = DuplexPair() + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) + + resolve() + }), new Promise((resolve) => fsA.once('floodsub:subscription-change', resolve)), new Promise((resolve) => fsB.once('floodsub:subscription-change', resolve)) ]) @@ -253,299 +302,10 @@ describe('basics between 2 nodes', () => { expect(fsB.peers.size).to.equal(1) expectSet(fsA.subscriptions, ['Za']) - expect(fsB.peers.size).to.equal(1) expectSet(first(fsB.peers).topics, ['Za']) expectSet(fsB.subscriptions, ['Zb']) - expect(fsA.peers.size).to.equal(1) expectSet(first(fsA.peers).topics, ['Zb']) }) - - it('stop both FloodSubs', () => { - fsA.stop() - fsB.stop() - }) - }) - - describe('nodes handle connection errors', () => { - let nodeA - let nodeB - let fsA - let fsB - - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) - - await Promise.all([ - fsA.start(), - fsB.start() - ]) - - fsA.subscribe('Za') - fsB.subscribe('Zb') - - expect(fsA.peers.size).to.equal(0) - expectSet(fsA.subscriptions, ['Za']) - expect(fsB.peers.size).to.equal(0) - expectSet(fsB.subscriptions, ['Zb']) - }) - - it('peer is removed from the state when connection ends', async () => { - await nodeA.dial(nodeB.peerInfo) - - return new Promise((resolve) => { - setTimeout(() => { - expect(first(fsA.peers)._references).to.equal(2) - expect(first(fsB.peers)._references).to.equal(2) - - fsA.stop() - setTimeout(() => { - expect(first(fsB.peers)._references).to.equal(1) - resolve() - }, 1000) - }, 1000) - }) - }) - - it('stop one node', () => { - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) - }) - - it('nodes don\'t have peers in it', () => { - return new Promise((resolve) => { - setTimeout(() => { - expect(fsA.peers.size).to.equal(0) - expect(fsB.peers.size).to.equal(0) - resolve() - }, 1000) - }) - }) - }) - - describe('dial the pubsub protocol on mount', () => { - let nodeA - let nodeB - let fsA - let fsB - - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - - await nodeA.dial(nodeB.peerInfo) - await new Promise((resolve) => setTimeout(resolve, 1000)) - }) - - after(() => { - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) - }) - - it('dial on floodsub on mount', async () => { - fsA = new FloodSub(nodeA, { emitSelf: true }) - fsB = new FloodSub(nodeB, { emitSelf: true }) - - await Promise.all([ - fsA.start(), - fsB.start() - ]) - - expect(fsA.peers.size).to.equal(1) - expect(fsB.peers.size).to.equal(1) - }) - - it('stop both FloodSubs', () => { - fsA.stop() - fsB.stop() - }) - }) - - describe('prevent concurrent dials', () => { - let sandbox - let nodeA - let nodeB - let fsA - let fsB - - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - - sandbox = chai.spy.sandbox() - - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) - - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) - - await fsB.start() - }) - - after(() => { - sandbox.restore() - - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) - }) - - it('does not dial twice to same peer', async () => { - sandbox.on(fsA, ['_onDial']) - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - await fsA.start() - - // Simulate a connection coming in from peer B at the same time. This - // causes floodsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - return new Promise((resolve) => { - // Check that only one dial was made - setTimeout(() => { - expect(fsA._onDial).to.have.been.called.once() - resolve() - }, 1000) - }) - }) - }) - - describe('allow dials even after error', () => { - let sandbox - let nodeA - let nodeB - let fsA - let fsB - - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - - sandbox = chai.spy.sandbox() - - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) - - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) - - await fsB.start() - }) - - after(() => { - sandbox.restore() - - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) - }) - - it('can dial again after error', async () => { - let firstTime = true - const dialProtocol = fsA.libp2p.dialProtocol.bind(fsA.libp2p) - - sandbox.on(fsA.libp2p, 'dialProtocol', (peerInfo, multicodec, cb) => { - // Return an error for the first dial - if (firstTime) { - firstTime = false - return cb(new Error('dial error')) - } - - // Subsequent dials proceed as normal - dialProtocol(peerInfo, multicodec, cb) - }) - - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - await fsA.start() - - // Simulate a connection coming in from peer B. This causes floodsub - // to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - return new Promise((resolve) => { - // Check that both dials were made - setTimeout(() => { - expect(fsA.libp2p.dialProtocol).to.have.been.called.twice() - resolve() - }, 1000) - }) - }) - }) - - describe('prevent processing dial after stop', () => { - let sandbox - let nodeA - let nodeB - let fsA - let fsB - - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - - sandbox = chai.spy.sandbox() - - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) - - await Promise.all([ - fsA.start(), - fsB.start() - ]) - }) - - after(() => { - sandbox.restore() - - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) - }) - - it('does not process dial after stop', () => { - sandbox.on(fsA, ['_onDial']) - - // Simulate a connection coming in from peer B at the same time. This - // causes floodsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - // Stop floodsub before the dial can complete - fsA.stop() - - return new Promise((resolve) => { - // Check that the dial was not processed - setTimeout(() => { - expect(fsA._onDial).to.not.have.been.called() - resolve() - }, 1000) - }) - }) }) }) - -function shouldNotHappen (msg) { - expect.fail() -} diff --git a/test/emit-self.spec.js b/test/emit-self.spec.js index b4dc3a8..e60f5f1 100644 --- a/test/emit-self.spec.js +++ b/test/emit-self.spec.js @@ -10,73 +10,57 @@ const expect = chai.expect const FloodSub = require('../src') const { - createNode + createPeerInfo, mockRegistrar } = require('./utils') const shouldNotHappen = (_) => expect.fail() describe('emit self', () => { + let floodsub + let peerInfo const topic = 'Z' describe('enabled', () => { - let nodeA - let fsA - before(async () => { - nodeA = await createNode() - await nodeA.start() + peerInfo = await createPeerInfo() + floodsub = new FloodSub(peerInfo, mockRegistrar, { emitSelf: true }) }) - before(() => { - fsA = new FloodSub(nodeA, { emitSelf: true }) - return fsA.start() - }) + before(async () => { + await floodsub.start() - before(() => { - fsA.subscribe(topic) + floodsub.subscribe(topic) }) - after(() => { - fsA.stop() - return nodeA.stop() - }) + after(() => floodsub.stop()) it('should emit to self on publish', () => { - const promise = new Promise((resolve) => fsA.once(topic, resolve)) + const promise = new Promise((resolve) => floodsub.once(topic, resolve)) - fsA.publish(topic, Buffer.from('hey')) + floodsub.publish(topic, Buffer.from('hey')) return promise }) }) describe('disabled', () => { - let nodeA - let fsA - before(async () => { - nodeA = await createNode() - await nodeA.start() + peerInfo = await createPeerInfo() + floodsub = new FloodSub(peerInfo, mockRegistrar, { emitSelf: false }) }) - before(() => { - fsA = new FloodSub(nodeA, { emitSelf: false }) - return fsA.start() - }) + before(async () => { + await floodsub.start() - before(() => { - fsA.subscribe(topic) + floodsub.subscribe(topic) }) - after(() => { - fsA.stop() - return nodeA.stop() - }) + after(() => floodsub.stop()) it('should emit to self on publish', () => { - fsA.once(topic, (m) => shouldNotHappen) + floodsub.once(topic, (m) => shouldNotHappen) - fsA.publish(topic, Buffer.from('hey')) + floodsub.publish(topic, Buffer.from('hey')) // Wait 1 second to guarantee that self is not noticed return new Promise((resolve) => setTimeout(() => resolve(), 1000)) diff --git a/test/fixtures/test-peer.json b/test/fixtures/test-peer.json deleted file mode 100644 index 105046a..0000000 --- a/test/fixtures/test-peer.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "id": "Qmex1SSsueWFsUfjdkugJ5zhcnjddAt8TxcnDLUXKD9Sx7", - "privKey": "CAASqAkwggSkAgEAAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAECggEAdBUzV/GaQ0nmoQrWvOnUxmFIho7kCjkh1NwnNVPNc+Msa1r7pcI9wJNPwap8j1w4L/cZuYhOJgcg+o2mWFiuULKZ4F9Ro/M89gZ038457g2/2pPu43c/Xoi/2YcAHXg0Gr+OCe2zCIyITBWKAFqyAzL6DubAxrJW2Ezj1LrZ+EZgMyzbh/go/eEGSJaaGkINeAkY144DqDWWWvzyhKhryipsGkZGEkVy9xJgMEI3ipVvuPez2XAvoyyeuinBBLe+Z2vY5G50XXzbIMhIQGLncHf9MwTv6wt1ilyOSLOXK0BoQbB76J3R3is5dSULXXP9r8VocjLBEkmBuf4FXAKzoQKBgQDNNS4F1XE1gxD8LPkL+aB/hi6eVHVPhr+w0I/9ATikcLGeUfBM2Gd6cZRPFtNVrv1p6ZF1D1UyGDknGbDBSQd9wLUgb0fDoo3jKYMGWq6G+VvaP5rzWQeBV8YV2EhSmUk1i6kiYe2ZE8WyrPie7iwpQIY60e2A8Ly0GKZiBZUcHQKBgQC9YDAVsGnEHFVFkTDpvw5HwEzCgTb2A3NgkGY3rTYZ7L6AFjqCYmUwFB8Fmbyc4kdFWNh8wfmq5Qrvl49NtaeukiqWKUUlB8uPdztB1P0IahA2ks0owStZlRifmwfgYyMd4xE17lhaOgQQJZZPxmP0F6mdOvb3YJafNURCdMS51wKBgEvvIM+h0tmFXXSjQ6kNvzlRMtD92ccKysYn9xAdMpOO6/r0wSH+dhQWEVZO0PcE4NsfReb2PIVj90ojtIdhebcr5xpQc1LORQjJJKXmSmzBux6AqNrhl+hhzXfp56FA/Zkly/lgGWaqrV5XqUxOP+Mn8EO1yNgMvRc7g94DyNB1AoGBAKLBuXHalXwDsdHBUB2Eo3xNLGt6bEcRfia+0+sEBdxQGQWylQScFkU09dh1YaIf44sZKa5HdBFJGpYCVxo9hmjFnK5Dt/Z0daHOonIY4INLzLVqg8KECoLKXkhGEIXsDjFQhukn+G1LMVTDSSU055DQiWjlVX4UWD9qo0jOXIkvAoGBAMP50p2X6PsWWZUuuR7i1JOJHRyQZPWdHh9p8SSLnCtEpHYZfJr4INXNmhnSiB/3TUnHix2vVKjosjMTCk/CjfzXV2H41WPOLZ2/Pi3SxCicWIRj4kCcWhkEuIF2jGkg1+jmNiCl/zNMaBOAIP3QbDPtqOWbYlPd2YIzdj6WQ6R4", - "pubKey": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAE=" -} \ No newline at end of file diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js index 3fa7302..058cbcb 100644 --- a/test/multiple-nodes.spec.js +++ b/test/multiple-nodes.spec.js @@ -6,13 +6,19 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const isNode = require('detect-node') +const pDefer = require('p-defer') +const DuplexPair = require('it-pair/duplex') const FloodSub = require('../src') -const utils = require('./utils') -const first = utils.first -const createNode = utils.createNode -const expectSet = utils.expectSet +const { multicodec } = require('../src') +const { createPeerInfo, first, expectSet } = require('./utils') + +async function spawnPubSubNode (peerInfo, reg) { + const ps = new FloodSub(peerInfo, reg, { emitSelf: true }) + + await ps.start() + return ps +} describe('multiple nodes (more than 2)', () => { describe('every peer subscribes to the topic', () => { @@ -20,147 +26,169 @@ describe('multiple nodes (more than 2)', () => { // line // ◉────◉────◉ // a b c - let a - let b - let c + let psA, psB, psC + let peerInfoA, peerInfoB, peerInfoC + + const registrarRecordA = {} + const registrarRecordB = {} + const registrarRecordC = {} + + const registrar = (registrarRecord) => ({ + register: (multicodec, handlers) => { + registrarRecord[multicodec] = handlers + }, + unregister: (multicodec) => { + delete registrarRecord[multicodec] + } + }) before(async () => { - [a, b, c] = await Promise.all([ - spawnPubSubNode(), - spawnPubSubNode(), - spawnPubSubNode() + [peerInfoA, peerInfoB, peerInfoC] = await Promise.all([ + createPeerInfo(), + createPeerInfo(), + createPeerInfo() + ]); + + [psA, psB, psC] = await Promise.all([ + spawnPubSubNode(peerInfoA, registrar(registrarRecordA)), + spawnPubSubNode(peerInfoB, registrar(registrarRecordB)), + spawnPubSubNode(peerInfoC, registrar(registrarRecordC)) ]) }) - after(() => { - // note: setTimeout to avoid the tests finishing - // before swarm does its dials - return new Promise((resolve) => { - setTimeout(async () => { - await Promise.all([ - a.libp2p.stop(), - b.libp2p.stop(), - c.libp2p.stop() - ]) - resolve() - }, 1000) - }) - }) + // connect nodes + before(() => { + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect + const onConnectC = registrarRecordC[multicodec].onConnect - it('establish the connections', async () => { - await Promise.all([ - a.libp2p.dial(b.libp2p.peerInfo), - b.libp2p.dial(c.libp2p.peerInfo) - ]) + // Notice peers of connection + const [d0, d1] = DuplexPair() + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) - // wait for the pubsub pipes to be established - return new Promise((resolve) => setTimeout(resolve, 1000)) + const [d2, d3] = DuplexPair() + onConnectB(peerInfoC, d2) + onConnectC(peerInfoB, d3) }) + after(() => Promise.all([ + psA.stop(), + psB.stop(), + psC.stop() + ])) + it('subscribe to the topic on node a', () => { - a.ps.subscribe('Z') - expectSet(a.ps.subscriptions, ['Z']) + const defer = pDefer() - return new Promise((resolve) => { - b.ps.once('floodsub:subscription-change', () => { - expect(b.ps.peers.size).to.equal(2) - const aPeerId = a.libp2p.peerInfo.id.toB58String() - const topics = b.ps.peers.get(aPeerId).topics - expectSet(topics, ['Z']) + psA.subscribe('Z') + expectSet(psA.subscriptions, ['Z']) - expect(c.ps.peers.size).to.equal(1) - expectSet(first(c.ps.peers).topics, []) + psB.once('floodsub:subscription-change', () => { + expect(psB.peers.size).to.equal(2) + const aPeerId = psA.peerInfo.id.toB58String() + const topics = psB.peers.get(aPeerId).topics + expectSet(topics, ['Z']) - resolve() - }) + expect(psC.peers.size).to.equal(1) + expectSet(first(psC.peers).topics, []) + + defer.resolve() }) + + return defer.promise }) it('subscribe to the topic on node b', async () => { - b.ps.subscribe('Z') - expectSet(b.ps.subscriptions, ['Z']) + psB.subscribe('Z') + expectSet(psB.subscriptions, ['Z']) await Promise.all([ - new Promise((resolve) => a.ps.once('floodsub:subscription-change', resolve)), - new Promise((resolve) => c.ps.once('floodsub:subscription-change', resolve)) + new Promise((resolve) => psA.once('floodsub:subscription-change', resolve)), + new Promise((resolve) => psC.once('floodsub:subscription-change', resolve)) ]) - expect(a.ps.peers.size).to.equal(1) - expectSet(first(a.ps.peers).topics, ['Z']) + expect(psA.peers.size).to.equal(1) + expectSet(first(psA.peers).topics, ['Z']) - expect(c.ps.peers.size).to.equal(1) - expectSet(first(c.ps.peers).topics, ['Z']) + expect(psC.peers.size).to.equal(1) + expectSet(first(psC.peers).topics, ['Z']) }) it('subscribe to the topic on node c', () => { - c.ps.subscribe('Z') - expectSet(c.ps.subscriptions, ['Z']) + const defer = pDefer() - return new Promise((resolve) => { - b.ps.once('floodsub:subscription-change', () => { - expect(a.ps.peers.size).to.equal(1) - expectSet(first(a.ps.peers).topics, ['Z']) + psC.subscribe('Z') + expectSet(psC.subscriptions, ['Z']) - expect(b.ps.peers.size).to.equal(2) - b.ps.peers.forEach((peer) => { - expectSet(peer.topics, ['Z']) - }) + psB.once('floodsub:subscription-change', () => { + expect(psA.peers.size).to.equal(1) + expectSet(first(psA.peers).topics, ['Z']) - resolve() + expect(psB.peers.size).to.equal(2) + psB.peers.forEach((peer) => { + expectSet(peer.topics, ['Z']) }) + + defer.resolve() }) + + return defer.promise }) it('publish on node a', () => { + const defer = pDefer() + let counter = 0 - return new Promise((resolve) => { - a.ps.on('Z', incMsg) - b.ps.on('Z', incMsg) - c.ps.on('Z', incMsg) + psA.on('Z', incMsg) + psB.on('Z', incMsg) + psC.on('Z', incMsg) - a.ps.publish('Z', Buffer.from('hey')) + psA.publish('Z', Buffer.from('hey')) - function incMsg (msg) { - expect(msg.data.toString()).to.equal('hey') - check() - } + function incMsg (msg) { + expect(msg.data.toString()).to.equal('hey') + check() + } - function check () { - if (++counter === 3) { - a.ps.removeListener('Z', incMsg) - b.ps.removeListener('Z', incMsg) - c.ps.removeListener('Z', incMsg) - resolve() - } + function check () { + if (++counter === 3) { + psA.removeListener('Z', incMsg) + psB.removeListener('Z', incMsg) + psC.removeListener('Z', incMsg) + defer.resolve() } - }) + } + + return defer.promise }) it('publish array on node a', () => { + const defer = pDefer() let counter = 0 - return new Promise((resolve) => { - a.ps.on('Z', incMsg) - b.ps.on('Z', incMsg) - c.ps.on('Z', incMsg) + psA.on('Z', incMsg) + psB.on('Z', incMsg) + psC.on('Z', incMsg) - a.ps.publish('Z', [Buffer.from('hey'), Buffer.from('hey')]) + psA.publish('Z', [Buffer.from('hey'), Buffer.from('hey')]) - function incMsg (msg) { - expect(msg.data.toString()).to.equal('hey') - check() - } + function incMsg (msg) { + expect(msg.data.toString()).to.equal('hey') + check() + } - function check () { - if (++counter === 6) { - a.ps.removeListener('Z', incMsg) - b.ps.removeListener('Z', incMsg) - c.ps.removeListener('Z', incMsg) - resolve() - } + function check () { + if (++counter === 6) { + psA.removeListener('Z', incMsg) + psB.removeListener('Z', incMsg) + psC.removeListener('Z', incMsg) + defer.resolve() } - }) + } + + return defer.promise }) // since the topology is the same, just the publish @@ -173,137 +201,157 @@ describe('multiple nodes (more than 2)', () => { // a c it('publish on node b', () => { + const defer = pDefer() let counter = 0 - return new Promise((resolve) => { - a.ps.on('Z', incMsg) - b.ps.on('Z', incMsg) - c.ps.on('Z', incMsg) + psA.on('Z', incMsg) + psB.on('Z', incMsg) + psC.on('Z', incMsg) - b.ps.publish('Z', Buffer.from('hey')) + psB.publish('Z', Buffer.from('hey')) - function incMsg (msg) { - expect(msg.data.toString()).to.equal('hey') - check() - } + function incMsg (msg) { + expect(msg.data.toString()).to.equal('hey') + check() + } - function check () { - if (++counter === 3) { - a.ps.removeListener('Z', incMsg) - b.ps.removeListener('Z', incMsg) - c.ps.removeListener('Z', incMsg) - resolve() - } + function check () { + if (++counter === 3) { + psA.removeListener('Z', incMsg) + psB.removeListener('Z', incMsg) + psC.removeListener('Z', incMsg) + defer.resolve() } - }) + } + + return defer.promise }) }) }) - if (isNode) { - // TODO enable for browser - describe('2 level tree', () => { - // 2 levels tree - // ┌◉┐ - // │c│ - // ┌◉─┘ └─◉┐ - // │b d│ - // ◉─┘ └─◉ - // a e - - let a - let b - let c - let d - let e - - before(async () => { - [a, b, c, d, e] = await Promise.all([ - spawnPubSubNode(), - spawnPubSubNode(), - spawnPubSubNode(), - spawnPubSubNode(), - spawnPubSubNode() - ]) - }) - - after(() => { - // note: setTimeout to avoid the tests finishing - // before swarm does its dials - return new Promise((resolve) => { - setTimeout(async () => { - await Promise.all([ - a.libp2p.stop(), - b.libp2p.stop(), - c.libp2p.stop(), - d.libp2p.stop(), - e.libp2p.stop() - ]) - - resolve() - }, 1000) - }) - }) - - it('establish the connections', async function () { - this.timeout(30 * 1000) - - await Promise.all([ - a.libp2p.dial(b.libp2p.peerInfo), - b.libp2p.dial(c.libp2p.peerInfo), - c.libp2p.dial(d.libp2p.peerInfo), - d.libp2p.dial(e.libp2p.peerInfo) - ]) - - // wait for the pubsub pipes to be established - return new Promise((resolve) => setTimeout(resolve, 10000)) - }) + describe('2 level tree', () => { + // 2 levels tree + // ┌◉┐ + // │c│ + // ┌◉─┘ └─◉┐ + // │b d│ + // ◉─┘ └─◉ + // a + let psA, psB, psC, psD, psE + let peerInfoA, peerInfoB, peerInfoC, peerInfoD, peerInfoE + + const registrarRecordA = {} + const registrarRecordB = {} + const registrarRecordC = {} + const registrarRecordD = {} + const registrarRecordE = {} + + const registrar = (registrarRecord) => ({ + register: (multicodec, handlers) => { + registrarRecord[multicodec] = handlers + }, + unregister: (multicodec) => { + delete registrarRecord[multicodec] + } + }) - it('subscribes', () => { - a.ps.subscribe('Z') - expectSet(a.ps.subscriptions, ['Z']) - b.ps.subscribe('Z') - expectSet(b.ps.subscriptions, ['Z']) - c.ps.subscribe('Z') - expectSet(c.ps.subscriptions, ['Z']) - d.ps.subscribe('Z') - expectSet(d.ps.subscriptions, ['Z']) - e.ps.subscribe('Z') - expectSet(e.ps.subscriptions, ['Z']) - }) + before(async () => { + [peerInfoA, peerInfoB, peerInfoC, peerInfoD, peerInfoE] = await Promise.all([ + createPeerInfo(), + createPeerInfo(), + createPeerInfo(), + createPeerInfo(), + createPeerInfo() + ]); + + [psA, psB, psC, psD, psE] = await Promise.all([ + spawnPubSubNode(peerInfoA, registrar(registrarRecordA)), + spawnPubSubNode(peerInfoB, registrar(registrarRecordB)), + spawnPubSubNode(peerInfoC, registrar(registrarRecordC)), + spawnPubSubNode(peerInfoD, registrar(registrarRecordD)), + spawnPubSubNode(peerInfoE, registrar(registrarRecordE)) + ]) + }) - it('publishes from c', function () { - this.timeout(30 * 1000) - let counter = 0 + // connect nodes + before(() => { + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect + const onConnectC = registrarRecordC[multicodec].onConnect + const onConnectD = registrarRecordD[multicodec].onConnect + const onConnectE = registrarRecordE[multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = DuplexPair() // A <-> B + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) + + const [d2, d3] = DuplexPair() // B <-> C + onConnectB(peerInfoC, d2) + onConnectC(peerInfoB, d3) + + const [d4, d5] = DuplexPair() // C <-> D + onConnectC(peerInfoD, d4) + onConnectD(peerInfoC, d5) + + const [d6, d7] = DuplexPair() // C <-> D + onConnectD(peerInfoE, d6) + onConnectE(peerInfoD, d7) + }) - return new Promise((resolve) => { - a.ps.on('Z', incMsg) - b.ps.on('Z', incMsg) - c.ps.on('Z', incMsg) - d.ps.on('Z', incMsg) - e.ps.on('Z', incMsg) + after(() => Promise.all([ + psA.stop(), + psB.stop(), + psC.stop(), + psD.stop(), + psE.stop() + ])) + + it('subscribes', () => { + psA.subscribe('Z') + expectSet(psA.subscriptions, ['Z']) + psB.subscribe('Z') + expectSet(psB.subscriptions, ['Z']) + psC.subscribe('Z') + expectSet(psC.subscriptions, ['Z']) + psD.subscribe('Z') + expectSet(psD.subscriptions, ['Z']) + psE.subscribe('Z') + expectSet(psE.subscriptions, ['Z']) + }) - c.ps.publish('Z', Buffer.from('hey from c')) + it('publishes from c', function () { + this.timeout(30 * 1000) + const defer = pDefer() + let counter = 0 - function incMsg (msg) { - expect(msg.data.toString()).to.equal('hey from c') - check() - } + psA.on('Z', incMsg) + psB.on('Z', incMsg) + psC.on('Z', incMsg) + psD.on('Z', incMsg) + psE.on('Z', incMsg) + + psC.publish('Z', Buffer.from('hey from c')) + + function incMsg (msg) { + expect(msg.data.toString()).to.equal('hey from c') + check() + } + + function check () { + if (++counter === 5) { + psA.removeListener('Z', incMsg) + psB.removeListener('Z', incMsg) + psC.removeListener('Z', incMsg) + psD.removeListener('Z', incMsg) + psE.removeListener('Z', incMsg) + defer.resolve() + } + } - function check () { - if (++counter === 5) { - a.ps.removeListener('Z', incMsg) - b.ps.removeListener('Z', incMsg) - c.ps.removeListener('Z', incMsg) - d.ps.removeListener('Z', incMsg) - e.ps.removeListener('Z', incMsg) - resolve() - } - } - }) - }) + return defer.promise }) - } + }) }) describe('only some nodes subscribe the networks', () => { @@ -312,8 +360,8 @@ describe('multiple nodes (more than 2)', () => { // ◉────◎────◉ // a b c - before((done) => {}) - after((done) => {}) + before(() => { }) + after(() => { }) }) describe('1 level tree', () => { @@ -323,8 +371,8 @@ describe('multiple nodes (more than 2)', () => { // ◎─┘ └─◉ // a c - before((done) => {}) - after((done) => {}) + before(() => { }) + after(() => { }) }) describe('2 level tree', () => { @@ -336,19 +384,8 @@ describe('multiple nodes (more than 2)', () => { // ◉─┘ └─◎ // a e - before((done) => {}) - after((done) => {}) + before(() => { }) + after(() => { }) }) }) }) - -async function spawnPubSubNode () { - const node = await createNode() - const ps = new FloodSub(node, { emitSelf: true }) - - await ps.start() - return { - libp2p: node, - ps: ps - } -} diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 2c8add2..3c10460 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -8,18 +8,22 @@ const expect = chai.expect const sinon = require('sinon') const Floodsub = require('../src') -const { createNode } = require('./utils') +const { createPeerInfo, mockRegistrar } = require('./utils') const { utils } = require('libp2p-pubsub') +const defOptions = { + emitSelf: true +} + describe('pubsub', () => { let floodsub - let libp2p + let peerInfo before(async () => { expect(Floodsub.multicodec).to.exist() - libp2p = await createNode() - floodsub = new Floodsub(libp2p, { emitSelf: true }) + peerInfo = await createPeerInfo() + floodsub = new Floodsub(peerInfo, mockRegistrar, defOptions) }) beforeEach(() => { @@ -28,7 +32,7 @@ describe('pubsub', () => { afterEach(() => { sinon.restore() - floodsub.stop() + return floodsub.stop() }) describe('publish', () => { @@ -45,7 +49,7 @@ describe('pubsub', () => { const [topics, messages] = floodsub._emitMessages.getCall(0).args expect(topics).to.eql([topic]) expect(messages).to.eql([{ - from: libp2p.peerInfo.id.toB58String(), + from: peerInfo.id.toB58String(), data: message, seqno: utils.randomSeqno.getCall(0).returnValue, topicIDs: topics @@ -64,7 +68,7 @@ describe('pubsub', () => { const [topics, messages] = floodsub._forwardMessages.getCall(0).args const expected = await floodsub._buildMessage({ - from: libp2p.peerInfo.id.toB58String(), + from: peerInfo.id.toB58String(), data: message, seqno: utils.randomSeqno.getCall(0).returnValue, topicIDs: topics @@ -87,7 +91,7 @@ describe('pubsub', () => { const rpc = { subscriptions: [], msgs: [{ - from: libp2p.peerInfo.id.id, + from: peerInfo.id.id, data: Buffer.from('an unsigned message'), seqno: utils.randomSeqno(), topicIDs: [topic] @@ -117,7 +121,7 @@ describe('pubsub', () => { const rpc = { subscriptions: [], msgs: [{ - from: libp2p.peerInfo.id.id, + from: peerInfo.id.id, data: Buffer.from('an unsigned message'), seqno: utils.randomSeqno(), topicIDs: [topic] diff --git a/test/utils/browser-bundle.js b/test/utils/browser-bundle.js deleted file mode 100644 index 117e0a0..0000000 --- a/test/utils/browser-bundle.js +++ /dev/null @@ -1,31 +0,0 @@ -'use strict' - -const WebSocketStar = require('libp2p-websocket-star') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -const { WS_STAR_MULTIADDR } = require('./constants') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const starOpts = { id: peerInfo.id } - const wsStar = new WebSocketStar(starOpts) - - peerInfo.multiaddrs.add(WS_STAR_MULTIADDR) - - const modules = { - transport: [wsStar], - streamMuxer: [spdy], - connEncryption: [secio] - } - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node diff --git a/test/utils/constants.js b/test/utils/constants.js deleted file mode 100644 index 93516b1..0000000 --- a/test/utils/constants.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict' - -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const peerJSON = require('../fixtures/test-peer') -const multiaddr = require('multiaddr') - -let peerRelay = null - -/** - * Creates a `PeerInfo` that can be used across testing. Once the - * relay `PeerInfo` has been requested, it will be reused for each - * additional request. - * - * This is currently being used to create a relay on test bootstrapping - * so that it can be used by browser nodes during their test suite. This - * is necessary for running a TCP node during browser tests. - * @private - * @returns {Promise} - */ -module.exports.getPeerRelay = () => { - if (peerRelay) return peerRelay - - return new Promise((resolve, reject) => { - PeerId.createFromJSON(peerJSON, (err, peerId) => { - if (err) { - return reject(err) - } - peerRelay = new PeerInfo(peerId) - - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9200/ws') - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9245') - - resolve(peerRelay) - }) - }) -} - -module.exports.WS_STAR_MULTIADDR = multiaddr('/ip4/127.0.0.1/tcp/14444/ws/p2p-websocket-star/') -module.exports.WS_RENDEZVOUS_MULTIADDR = multiaddr('/ip4/127.0.0.1/tcp/14444/wss') diff --git a/test/utils/index.js b/test/utils/index.js index 054d3a5..c82ba08 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -2,10 +2,8 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') -const Node = require('./nodejs-bundle') -const waterfall = require('async/waterfall') -const expect = require('chai').expect +const { expect } = require('chai') exports.first = (map) => map.values().next().value @@ -13,21 +11,17 @@ exports.expectSet = (set, subs) => { expect(Array.from(set.values())).to.eql(subs) } -exports.createNode = () => { - return new Promise((resolve, reject) => { - waterfall([ - (cb) => PeerId.create({ bits: 1024 }, cb), - (id, cb) => PeerInfo.create(id, cb), - (peerInfo, cb) => { - cb(null, new Node({ peerInfo })) - }, - (node, cb) => node.start((err) => cb(err, node)) - ], (err, node) => { - if (err) { - return reject(err) - } - - resolve(node) - }) - }) +exports.createPeerInfo = async () => { + const peerId = await PeerId.create({ bits: 1024 }) + + return PeerInfo.create(peerId) +} + +exports.mockRegistrar = { + register: (multicodec, handlers) => { + + }, + unregister: (multicodec) => { + + } } diff --git a/test/utils/nodejs-bundle.js b/test/utils/nodejs-bundle.js deleted file mode 100644 index ef572d1..0000000 --- a/test/utils/nodejs-bundle.js +++ /dev/null @@ -1,26 +0,0 @@ -'use strict' - -const TCP = require('libp2p-tcp') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const modules = { - transport: [TCP], - streamMuxer: [spdy], - connEncryption: [secio] - } - - peerInfo.multiaddrs.add('/ip4/127.0.0.1/tcp/0') - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node