From a1410d629afc543f724e6a6744422ddbaff9b63d Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 28 Oct 2019 10:22:21 +0100 Subject: [PATCH] chore: address review --- LICENSE | 12 +++--- README.md | 9 ++-- package.json | 3 +- src/index.js | 18 +++++--- test/2-nodes.spec.js | 78 +++++++++++++++------------------- test/multiple-nodes.spec.js | 83 ++++++++++++++++--------------------- test/utils/index.js | 45 +++++++++++++++++++- 7 files changed, 137 insertions(+), 111 deletions(-) diff --git a/LICENSE b/LICENSE index 7789aab..58b2056 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -MIT License +The MIT License (MIT) -Copyright (c) 2016 libp2p +Copyright (c) 2019 Protocol Labs, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -9,13 +9,13 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index d96fcfa..99faef0 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,14 @@ js-libp2p-floodsub const FloodSub = require('libp2p-floodsub') const registrar = { - register: (multicodec, handlers) => { + handle: (multicodecs, handle) => { // register multicodec to libp2p + // handle function is called everytime a remote peer opens a stream to the peer. + }, + register: (multicodecs, handlers) => { // handlers will be used to notify pubsub of peer connection establishment or closing }, - unregister: (multicodec) => { + unregister: (id) => { } } @@ -99,4 +102,4 @@ This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/c ## License -MIT © David Dias +Copyright (c) Protocol Labs, Inc. under the **MIT License**. See [LICENSE file](./LICENSE) for details. diff --git a/package.json b/package.json index 26d1ab8..03a762b 100644 --- a/package.json +++ b/package.json @@ -50,13 +50,14 @@ "dirty-chai": "^2.0.1", "it-pair": "^1.0.0", "lodash": "^4.17.15", - "multiaddr": "^6.1.0", + "multiaddr": "^7.1.0", "p-defer": "^3.0.0", "peer-id": "~0.13.3", "peer-info": "~0.17.0", "sinon": "^7.5.0" }, "dependencies": { + "async.nexttick": "^0.5.2", "debug": "^4.1.1", "it-length-prefixed": "^2.0.0", "it-pipe": "^1.0.1", diff --git a/src/index.js b/src/index.js index b274517..62f7dbf 100644 --- a/src/index.js +++ b/src/index.js @@ -10,6 +10,7 @@ const pipe = require('it-pipe') const lp = require('it-length-prefixed') const pMap = require('p-map') const TimeCache = require('time-cache') +const nextTick = require('async.nexttick') const PeerInfo = require('peer-info') const BaseProtocol = require('libp2p-pubsub') @@ -27,6 +28,7 @@ class FloodSub extends BaseProtocol { /** * @param {PeerInfo} peerInfo instance of the peer's PeerInfo * @param {Object} registrar + * @param {function} registrar.handle * @param {function} registrar.register * @param {function} registrar.unregister * @param {Object} [options] @@ -37,6 +39,7 @@ class FloodSub extends BaseProtocol { assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') // registrar handling + assert(registrar && typeof registrar.handle === 'function', 'a handle function must be provided in registrar') assert(registrar && typeof registrar.register === 'function', 'a register function must be provided in registrar') assert(registrar && typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar') @@ -77,9 +80,10 @@ class FloodSub extends BaseProtocol { * @override * @param {PeerInfo} peerInfo peer info * @param {Connection} conn connection to the peer + * @returns {Promise} */ - _onPeerConnected (peerInfo, conn) { - super._onPeerConnected(peerInfo, conn) + async _onPeerConnected (peerInfo, conn) { + await super._onPeerConnected(peerInfo, conn) const idB58Str = peerInfo.id.toB58String() const peer = this.peers.get(idB58Str) @@ -105,7 +109,7 @@ class FloodSub extends BaseProtocol { await pipe( conn, lp.decode(), - async function collect (source) { + async function (source) { for await (const data of source) { const rpc = Buffer.isBuffer(data) ? data : data.slice() @@ -209,7 +213,7 @@ class FloodSub extends BaseProtocol { /** * Unmounts the floodsub protocol and shuts down every connection * @override - * @returns {Promise} + * @returns {Promise} */ async stop () { await super.stop() @@ -222,7 +226,7 @@ class FloodSub extends BaseProtocol { * @override * @param {Array|string} topics * @param {Array|any} messages - * @returns {Promise} + * @returns {Promise} */ async publish (topics, messages) { assert(this.started, 'FloodSub is not started') @@ -267,10 +271,10 @@ class FloodSub extends BaseProtocol { assert(this.started, 'FloodSub is not started') topics = ensureArray(topics) - topics.forEach((topic) => this.subscriptions.add(topic)) this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer)) + // make sure that FloodSub is already mounted function sendSubscriptionsOnceReady (peer) { if (peer && peer.isWritable) { @@ -303,6 +307,8 @@ class FloodSub extends BaseProtocol { function checkIfReady (peer) { if (peer && peer.isWritable) { peer.sendUnsubscriptions(topics) + } else { + nextTick(checkIfReady.bind(peer)) } } } diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index 34b2f1a..456d0e7 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -9,15 +9,17 @@ const expect = chai.expect const pDefer = require('p-defer') const times = require('lodash/times') -const DuplexPair = require('it-pair/duplex') const FloodSub = require('../src') const { multicodec } = require('../src') -const { first, createPeerInfo, expectSet } = require('./utils') - -const defOptions = { - emitSelf: true -} +const { + defOptions, + first, + createPeerInfo, + createMockRegistrar, + expectSet, + ConnectionPair +} = require('./utils') function shouldNotHappen (_) { expect.fail() @@ -31,15 +33,6 @@ describe('basics between 2 nodes', () => { const registrarRecordA = {} const registrarRecordB = {} - const registrar = (registrarRecord) => ({ - register: (multicodecs, handlers) => { - registrarRecord[multicodecs[0]] = handlers - }, - unregister: (multicodecs) => { - delete registrarRecord[multicodecs[0]] - } - }) - // Mount pubsub protocol before(async () => { [peerInfoA, peerInfoB] = await Promise.all([ @@ -47,8 +40,8 @@ describe('basics between 2 nodes', () => { createPeerInfo() ]) - fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions) - fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions) + fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions) + fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions) expect(fsA.peers.size).to.be.eql(0) expect(fsA.subscriptions.size).to.eql(0) @@ -63,14 +56,19 @@ describe('basics between 2 nodes', () => { ])) // Connect floodsub nodes - before(() => { + before(async () => { const onConnectA = registrarRecordA[multicodec].onConnect - const onConnectB = registrarRecordB[multicodec].onConnect + const handleB = registrarRecordB[multicodec].handler // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [c0, c1] = ConnectionPair() + await onConnectA(peerInfoB, c0) + + await handleB({ + protocol: multicodec, + stream: c1.stream, + remotePeer: peerInfoA.id + }) expect(fsA.peers.size).to.be.eql(1) expect(fsB.peers.size).to.be.eql(1) @@ -236,15 +234,6 @@ describe('basics between 2 nodes', () => { const registrarRecordA = {} const registrarRecordB = {} - const registrar = (registrarRecord) => ({ - register: (multicodec, handlers) => { - registrarRecord[multicodec] = handlers - }, - unregister: (multicodec) => { - delete registrarRecord[multicodec] - } - }) - // Mount pubsub protocol before(async () => { [peerInfoA, peerInfoB] = await Promise.all([ @@ -252,8 +241,8 @@ describe('basics between 2 nodes', () => { createPeerInfo() ]) - fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions) - fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions) + fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions) + fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions) }) // Start pubsub @@ -281,19 +270,18 @@ describe('basics between 2 nodes', () => { }) it('existing subscriptions are sent upon peer connection', async () => { + const dial = async () => { + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + await onConnectA(peerInfoB, c0) + await onConnectB(peerInfoA, c1) + } + await Promise.all([ - // nodeA.dial(nodeB.peerInfo), - new Promise((resolve) => { - const onConnectA = registrarRecordA[multicodec].onConnect - const onConnectB = registrarRecordB[multicodec].onConnect - - // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) - - resolve() - }), + dial(), new Promise((resolve) => fsA.once('floodsub:subscription-change', resolve)), new Promise((resolve) => fsB.once('floodsub:subscription-change', resolve)) ]) diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js index 058cbcb..522c875 100644 --- a/test/multiple-nodes.spec.js +++ b/test/multiple-nodes.spec.js @@ -7,11 +7,16 @@ chai.use(require('dirty-chai')) const expect = chai.expect const pDefer = require('p-defer') -const DuplexPair = require('it-pair/duplex') const FloodSub = require('../src') const { multicodec } = require('../src') -const { createPeerInfo, first, expectSet } = require('./utils') +const { + createPeerInfo, + createMockRegistrar, + first, + expectSet, + ConnectionPair +} = require('./utils') async function spawnPubSubNode (peerInfo, reg) { const ps = new FloodSub(peerInfo, reg, { emitSelf: true }) @@ -33,15 +38,6 @@ describe('multiple nodes (more than 2)', () => { const registrarRecordB = {} const registrarRecordC = {} - const registrar = (registrarRecord) => ({ - register: (multicodec, handlers) => { - registrarRecord[multicodec] = handlers - }, - unregister: (multicodec) => { - delete registrarRecord[multicodec] - } - }) - before(async () => { [peerInfoA, peerInfoB, peerInfoC] = await Promise.all([ createPeerInfo(), @@ -50,26 +46,26 @@ describe('multiple nodes (more than 2)', () => { ]); [psA, psB, psC] = await Promise.all([ - spawnPubSubNode(peerInfoA, registrar(registrarRecordA)), - spawnPubSubNode(peerInfoB, registrar(registrarRecordB)), - spawnPubSubNode(peerInfoC, registrar(registrarRecordC)) + spawnPubSubNode(peerInfoA, createMockRegistrar(registrarRecordA)), + spawnPubSubNode(peerInfoB, createMockRegistrar(registrarRecordB)), + spawnPubSubNode(peerInfoC, createMockRegistrar(registrarRecordC)) ]) }) // connect nodes - before(() => { + before(async () => { const onConnectA = registrarRecordA[multicodec].onConnect const onConnectB = registrarRecordB[multicodec].onConnect const onConnectC = registrarRecordC[multicodec].onConnect // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [d0, d1] = ConnectionPair() + await onConnectA(peerInfoB, d0) + await onConnectB(peerInfoA, d1) - const [d2, d3] = DuplexPair() - onConnectB(peerInfoC, d2) - onConnectC(peerInfoB, d3) + const [d2, d3] = ConnectionPair() + await onConnectB(peerInfoC, d2) + await onConnectC(peerInfoB, d3) }) after(() => Promise.all([ @@ -246,15 +242,6 @@ describe('multiple nodes (more than 2)', () => { const registrarRecordD = {} const registrarRecordE = {} - const registrar = (registrarRecord) => ({ - register: (multicodec, handlers) => { - registrarRecord[multicodec] = handlers - }, - unregister: (multicodec) => { - delete registrarRecord[multicodec] - } - }) - before(async () => { [peerInfoA, peerInfoB, peerInfoC, peerInfoD, peerInfoE] = await Promise.all([ createPeerInfo(), @@ -265,16 +252,16 @@ describe('multiple nodes (more than 2)', () => { ]); [psA, psB, psC, psD, psE] = await Promise.all([ - spawnPubSubNode(peerInfoA, registrar(registrarRecordA)), - spawnPubSubNode(peerInfoB, registrar(registrarRecordB)), - spawnPubSubNode(peerInfoC, registrar(registrarRecordC)), - spawnPubSubNode(peerInfoD, registrar(registrarRecordD)), - spawnPubSubNode(peerInfoE, registrar(registrarRecordE)) + spawnPubSubNode(peerInfoA, createMockRegistrar(registrarRecordA)), + spawnPubSubNode(peerInfoB, createMockRegistrar(registrarRecordB)), + spawnPubSubNode(peerInfoC, createMockRegistrar(registrarRecordC)), + spawnPubSubNode(peerInfoD, createMockRegistrar(registrarRecordD)), + spawnPubSubNode(peerInfoE, createMockRegistrar(registrarRecordE)) ]) }) // connect nodes - before(() => { + before(async () => { const onConnectA = registrarRecordA[multicodec].onConnect const onConnectB = registrarRecordB[multicodec].onConnect const onConnectC = registrarRecordC[multicodec].onConnect @@ -282,21 +269,21 @@ describe('multiple nodes (more than 2)', () => { const onConnectE = registrarRecordE[multicodec].onConnect // Notice peers of connection - const [d0, d1] = DuplexPair() // A <-> B - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [d0, d1] = ConnectionPair() // A <-> B + await onConnectA(peerInfoB, d0) + await onConnectB(peerInfoA, d1) - const [d2, d3] = DuplexPair() // B <-> C - onConnectB(peerInfoC, d2) - onConnectC(peerInfoB, d3) + const [d2, d3] = ConnectionPair() // B <-> C + await onConnectB(peerInfoC, d2) + await onConnectC(peerInfoB, d3) - const [d4, d5] = DuplexPair() // C <-> D - onConnectC(peerInfoD, d4) - onConnectD(peerInfoC, d5) + const [d4, d5] = ConnectionPair() // C <-> D + await onConnectC(peerInfoD, d4) + await onConnectD(peerInfoC, d5) - const [d6, d7] = DuplexPair() // C <-> D - onConnectD(peerInfoE, d6) - onConnectE(peerInfoD, d7) + const [d6, d7] = ConnectionPair() // C <-> D + await onConnectD(peerInfoE, d6) + await onConnectE(peerInfoD, d7) }) after(() => Promise.all([ diff --git a/test/utils/index.js b/test/utils/index.js index 4d98473..fbb6643 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -2,6 +2,7 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') +const DuplexPair = require('it-pair/duplex') const { expect } = require('chai') @@ -18,10 +19,50 @@ exports.createPeerInfo = async () => { } exports.mockRegistrar = { - register: (multicodecs, handlers) => { + handle: () => {}, + register: () => {}, + unregister: () => {} +} + +exports.createMockRegistrar = (registrarRecord) => ({ + handle: (multicodecs, handler) => { + const rec = registrarRecord[multicodecs[0]] || {} + registrarRecord[multicodecs[0]] = { + ...rec, + handler + } }, - unregister: (multicodecs) => { + register: (multicodecs, handlers) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + ...handlers + } + return multicodecs[0] + }, + unregister: (id) => { + delete registrarRecord[id] } +}) + +exports.ConnectionPair = () => { + const [d0, d1] = DuplexPair() + + return [ + { + stream: d0, + newStream: () => Promise.resolve({ stream: d0 }) + }, + { + stream: d1, + newStream: () => Promise.resolve({ stream: d1 }) + } + ] +} + +exports.defOptions = { + emitSelf: true }