Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
chore: apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
  • Loading branch information
vasco-santos and jacobheun committed Nov 1, 2019
1 parent f60da79 commit 5b4f41b
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 43 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ js-libp2p-pubsub
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![](https://img.shields.io/badge/pm-waffle-yellow.svg?style=flat-square)](https://waffle.io/libp2p/js-libp2p-pubsub)

> libp2p-pubsub consists on the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol in libp2p, as well as all the logic regarding pubsub connections with other peers.
> libp2p-pubsub is the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol with libp2p, as well as managing the logic regarding pubsub connections with other peers.
## Lead Maintainer

Expand All @@ -34,11 +34,11 @@ js-libp2p-pubsub

## Usage

`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algortithm, instead of also needing to create the setup for it.
`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algorithm, instead of also needing to create the setup for it.

A pubsub implementation **MUST** override the `_processMessages`, `publish`, `subscribe`, `unsubscribe` and `getTopics` functions.

Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to add custom logic on them. It is important pointing out that `start` and `stop` **must** call `super`. The `start` function is responsible for registering the pubsub protocol onto the libp2p node, while the `stop` function is responsible for unregistering the pubsub protocol and shutting down every connection
Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to customize their logic. Implementations overriding `start` and `stop` **MUST** call `super`. The `start` function is responsible for registering the pubsub protocol with libp2p, while the `stop` function is responsible for unregistering the pubsub protocol and closing pubsub connections.

All the remaining functions **MUST NOT** be overwritten.

Expand Down Expand Up @@ -183,7 +183,7 @@ Get a list of the peer-ids that are subscribed to one topic.

| Type | Description |
|------|-------------|
| `Array<string>` | Array of base-58 peer id's |
| `Array<string>` | Array of base-58 PeerId's |

### Validate

Expand Down
59 changes: 47 additions & 12 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class PubsubBaseProtocol extends EventEmitter {
* @param {Array<string>|string} props.multicodecs protocol identificers to connect
* @param {PeerInfo} props.peerInfo peer's peerInfo
* @param {Object} props.registrar registrar for libp2p protocols
* @param {function} props.registrar.handle
* @param {function} props.registrar.register
* @param {function} props.registrar.unregister
* @param {boolean} [props.signMessages] if messages should be signed, defaults to true
Expand All @@ -44,7 +45,8 @@ class PubsubBaseProtocol extends EventEmitter {
assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`')

// registrar handling
assert(registrar && typeof registrar === 'object', 'a registrar object is required') // TODO: isRegistrar when it's implemented
assert(registrar && typeof registrar === 'object', 'a registrar object is required')
assert(typeof registrar.handle === 'function', 'a handle function must be provided in registrar')
assert(typeof registrar.register === 'function', 'a register function must be provided in registrar')
assert(typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar')

Expand Down Expand Up @@ -84,22 +86,27 @@ class PubsubBaseProtocol extends EventEmitter {
*/
this.strictSigning = strictSigning

this._registrarId = undefined
this._onIncomingStream = this._onIncomingStream.bind(this)
this._onPeerConnected = this._onPeerConnected.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
}

/**
* Register the pubsub protocol onto the libp2p node.
* @returns {Promise}
* @returns {Promise<void>}
*/
async start () {
if (this.started) {
return
}
this.log('starting')

// Incoming streams
this.registrar.handle(this.multicodecs, this._onIncomingStream)

// register protocol with multicodec and handlers
await this.registrar.register(this.multicodecs, {
this._registrarId = await this.registrar.register(this.multicodecs, {
onConnect: this._onPeerConnected,
onDisconnect: this._onPeerDisconnected
})
Expand All @@ -118,7 +125,7 @@ class PubsubBaseProtocol extends EventEmitter {
}

// unregister protocol and handlers
await this.registrar.unregister(this.multicodecs)
await this.registrar.unregister(this._registrarId)

this.log('stopping')
this.peers.forEach((peer) => peer.close())
Expand All @@ -128,20 +135,41 @@ class PubsubBaseProtocol extends EventEmitter {
this.log('stopped')
}

/**
* On an incoming stream event.
* @private
* @param {Object} props
* @param {string} props.protocol
* @param {DuplexStream} props.strean
* @param {PeerId} props.remotePeer remote peer-id
*/
async _onIncomingStream ({ protocol, stream, remotePeer }) {
const peerInfo = await PeerInfo.create(remotePeer)
peerInfo.protocols.add(protocol)

const idB58Str = peerInfo.id.toB58String()

const peer = this._addPeer(new Peer(peerInfo))

peer.attachConnection(stream)
this._processMessages(idB58Str, stream, peer)
}

/**
* Registrar notifies a connection successfully with pubsub protocol.
* @private
* @param {PeerInfo} peerInfo remote peer info
* @param {Connection} conn connection to the peer
*/
_onPeerConnected (peerInfo, conn) {
async _onPeerConnected (peerInfo, conn) {
const idB58Str = peerInfo.id.toB58String()
this.log('connected', idB58Str)

const peer = this._addPeer(new Peer(peerInfo))
peer.attachConnection(conn)
const { stream } = await conn.newStream(this.multicodecs)

this._processMessages(idB58Str, conn, peer)
peer.attachConnection(stream)
this._processMessages(idB58Str, stream, peer)
}

/**
Expand All @@ -166,15 +194,16 @@ class PubsubBaseProtocol extends EventEmitter {
*/
_addPeer (peer) {
const id = peer.info.id.toB58String()

let existing = this.peers.get(id)

if (!existing) {
this.log('new peer', id)
this.peers.set(id, peer)
existing = peer

peer.once('close', () => this._removePeer(peer))
}
++existing._references

return existing
}
Expand All @@ -188,8 +217,14 @@ class PubsubBaseProtocol extends EventEmitter {
_removePeer (peer) {
const id = peer.info.id.toB58String()

this.log('delete peer', id)
this.peers.delete(id)
this.log('remove', id, peer._references)

// Only delete when no one else is referencing this peer.
if (--peer._references === 0) {
this.log('delete peer', id)
this.peers.delete(id)
}

return peer
}

Expand All @@ -202,14 +237,14 @@ class PubsubBaseProtocol extends EventEmitter {
// If strict signing is on and we have no signature, abort
if (this.strictSigning && !message.signature) {
this.log('Signing required and no signature was present, dropping message:', message)
return Promise.resolve(false)
return false
}

// Check the message signature if present
if (message.signature) {
return verifySignature(message)
} else {
return Promise.resolve(true)
return true
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class Peer extends EventEmitter {
* @type {Pushable}
*/
this.stream = null

this._references = 0
}

/**
Expand Down Expand Up @@ -164,6 +166,9 @@ class Peer extends EventEmitter {
* @returns {void}
*/
close () {
// Force removal of peer
this._references = 1

// End the pushable
if (this.stream) {
this.stream.end()
Expand Down
57 changes: 34 additions & 23 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ chai.use(require('dirty-chai'))
chai.use(require('chai-spies'))
const expect = chai.expect
const sinon = require('sinon')
const DuplexPair = require('it-pair/duplex')

const PubsubBaseProtocol = require('../src')
const { randomSeqno } = require('../src/utils')
const { createPeerInfo, mockRegistrar, PubsubImplementation } = require('./utils')
const {
createPeerInfo,
createMockRegistrar,
mockRegistrar,
PubsubImplementation,
ConnectionPair
} = require('./utils')

describe('pubsub base protocol', () => {
describe('should start and stop properly', () => {
Expand All @@ -20,6 +25,7 @@ describe('pubsub base protocol', () => {
beforeEach(async () => {
const peerInfo = await createPeerInfo()
sinonMockRegistrar = {
handle: sinon.stub(),
register: sinon.stub(),
unregister: sinon.stub()
}
Expand All @@ -40,6 +46,7 @@ describe('pubsub base protocol', () => {

it('should be able to start and stop', async () => {
await pubsub.start()
expect(sinonMockRegistrar.handle.calledOnce).to.be.true()
expect(sinonMockRegistrar.register.calledOnce).to.be.true()

await pubsub.stop()
Expand All @@ -49,6 +56,7 @@ describe('pubsub base protocol', () => {
it('should not throw to start if already started', async () => {
await pubsub.start()
await pubsub.start()
expect(sinonMockRegistrar.handle.calledOnce).to.be.true()
expect(sinonMockRegistrar.register.calledOnce).to.be.true()

await pubsub.stop()
Expand Down Expand Up @@ -131,22 +139,13 @@ describe('pubsub base protocol', () => {
const registrarRecordA = {}
const registrarRecordB = {}

const registrar = (registrarRecord) => ({
register: (multicodecs, handlers) => {
registrarRecord[multicodecs[0]] = handlers
},
unregister: (multicodecs) => {
delete registrarRecord[multicodecs[0]]
}
})

// mount pubsub
beforeEach(async () => {
peerInfoA = await createPeerInfo()
peerInfoB = await createPeerInfo()

pubsubA = new PubsubImplementation(protocol, peerInfoA, registrar(registrarRecordA))
pubsubB = new PubsubImplementation(protocol, peerInfoB, registrar(registrarRecordB))
pubsubA = new PubsubImplementation(protocol, peerInfoA, createMockRegistrar(registrarRecordA))
pubsubB = new PubsubImplementation(protocol, peerInfoB, createMockRegistrar(registrarRecordB))
})

// start pubsub
Expand All @@ -169,29 +168,41 @@ describe('pubsub base protocol', () => {
])
})

it('should handle onConnect as expected', () => {
it('should handle onConnect as expected', async () => {
const onConnectA = registrarRecordA[protocol].onConnect
const onConnectB = registrarRecordB[protocol].onConnect
const handlerB = registrarRecordB[protocol].handler

// Notice peers of connection
const [d0, d1] = DuplexPair()
onConnectA(peerInfoB, d0)
onConnectB(peerInfoA, d1)
const [c0, c1] = ConnectionPair()

await onConnectA(peerInfoB, c0)
await handlerB({
protocol,
stream: c1.stream,
remotePeer: peerInfoA.id
})

expect(pubsubA.peers.size).to.be.eql(1)
expect(pubsubB.peers.size).to.be.eql(1)
})

it('should handle onDisconnect as expected', () => {
it('should handle onDisconnect as expected', async () => {
const onConnectA = registrarRecordA[protocol].onConnect
const onDisconnectA = registrarRecordA[protocol].onDisconnect
const onConnectB = registrarRecordB[protocol].onConnect
const handlerB = registrarRecordB[protocol].handler
const onDisconnectB = registrarRecordB[protocol].onDisconnect

// Notice peers of connection
const [d0, d1] = DuplexPair()
onConnectA(peerInfoB, d0)
onConnectB(peerInfoA, d1)
const [c0, c1] = ConnectionPair()

await onConnectA(peerInfoB, c0)
await handlerB({
protocol,
stream: c1.stream,
remotePeer: peerInfoA.id
})

// Notice peers of disconnect
onDisconnectA(peerInfoB)
onDisconnectB(peerInfoA)

Expand Down
2 changes: 1 addition & 1 deletion test/utils.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-env mocha */
'use strict'

const expect = require('chai').expect
const { expect } = require('chai')

const utils = require('../src/utils')

Expand Down
43 changes: 40 additions & 3 deletions test/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const lp = require('it-length-prefixed')
const pipe = require('it-pipe')
const DuplexPair = require('it-pair/duplex')

const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
Expand Down Expand Up @@ -41,7 +42,7 @@ class PubsubImplementation extends PubsubBaseProtocol {
pipe(
conn,
lp.decode(),
async function collect (source) {
async function (source) {
for await (const val of source) {
const rpc = message.rpc.RPC.decode(val)

Expand All @@ -55,10 +56,46 @@ class PubsubImplementation extends PubsubBaseProtocol {
exports.PubsubImplementation = PubsubImplementation

exports.mockRegistrar = {
register: (multicodec, handlers) => {
handle: () => {},
register: () => {},
unregister: () => {}
}

exports.createMockRegistrar = (registrarRecord) => ({
handle: (multicodecs, handler) => {
const rec = registrarRecord[multicodecs[0]] || {}

registrarRecord[multicodecs[0]] = {
...rec,
handler
}
},
unregister: (multicodec) => {
register: (multicodecs, handlers) => {
const rec = registrarRecord[multicodecs[0]] || {}

registrarRecord[multicodecs[0]] = {
...rec,
...handlers
}

return multicodecs[0]
},
unregister: (id) => {
delete registrarRecord[id]
}
})

exports.ConnectionPair = () => {
const [d0, d1] = DuplexPair()

return [
{
stream: d0,
newStream: () => Promise.resolve({ stream: d0 })
},
{
stream: d1,
newStream: () => Promise.resolve({ stream: d1 })
}
]
}

0 comments on commit 5b4f41b

Please sign in to comment.