Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
chore: apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
  • Loading branch information
vasco-santos and jacobheun committed Oct 29, 2019
1 parent f60da79 commit 0b67775
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 16 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ js-libp2p-pubsub
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![](https://img.shields.io/badge/pm-waffle-yellow.svg?style=flat-square)](https://waffle.io/libp2p/js-libp2p-pubsub)

> libp2p-pubsub consists on the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol in libp2p, as well as all the logic regarding pubsub connections with other peers.
> libp2p-pubsub is the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol with libp2p, as well as managing the logic regarding pubsub connections with other peers.
## Lead Maintainer

Expand All @@ -34,11 +34,11 @@ js-libp2p-pubsub

## Usage

`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algortithm, instead of also needing to create the setup for it.
`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algorithm, instead of also needing to create the setup for it.

A pubsub implementation **MUST** override the `_processMessages`, `publish`, `subscribe`, `unsubscribe` and `getTopics` functions.

Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to add custom logic on them. It is important pointing out that `start` and `stop` **must** call `super`. The `start` function is responsible for registering the pubsub protocol onto the libp2p node, while the `stop` function is responsible for unregistering the pubsub protocol and shutting down every connection
Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to customize their logic. Implementations overriding `start` and `stop` **MUST** call `super`. The `start` function is responsible for registering the pubsub protocol with libp2p, while the `stop` function is responsible for unregistering the pubsub protocol and closing pubsub connections.

All the remaining functions **MUST NOT** be overwritten.

Expand Down Expand Up @@ -183,7 +183,7 @@ Get a list of the peer-ids that are subscribed to one topic.

| Type | Description |
|------|-------------|
| `Array<string>` | Array of base-58 peer id's |
| `Array<string>` | Array of base-58 PeerId's |

### Validate

Expand Down
32 changes: 25 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,26 @@ class PubsubBaseProtocol extends EventEmitter {
*/
this.strictSigning = strictSigning

this._registrarId = undefined
this._onPeerConnected = this._onPeerConnected.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
}

/**
* Register the pubsub protocol onto the libp2p node.
* @returns {Promise}
* @returns {void}
*/
async start () {
start () {
if (this.started) {
return
}
this.log('starting')

// Incoming streams
this.registrar.handle(this.multicodecs, this._onIncomingConnection)

// register protocol with multicodec and handlers
await this.registrar.register(this.multicodecs, {
this._registrarId = this.registrar.register(this.multicodecs, {
onConnect: this._onPeerConnected,
onDisconnect: this._onPeerDisconnected
})
Expand All @@ -118,7 +122,7 @@ class PubsubBaseProtocol extends EventEmitter {
}

// unregister protocol and handlers
await this.registrar.unregister(this.multicodecs)
await this.registrar.unregister(this._registrarId)

this.log('stopping')
this.peers.forEach((peer) => peer.close())
Expand All @@ -128,6 +132,20 @@ class PubsubBaseProtocol extends EventEmitter {
this.log('stopped')
}

/**
* On an incoming connection event.
* @private
* @param {String} protocol connection protocol
* @param {Connection} conn connection to the peer
*/
_onIncomingConnection (protocol, conn) {
const peerInfo = conn.remotePeer
const idB58Str = peerInfo.id.toB58String()
const peer = this._addPeer(new Peer(peerInfo))

this._processMessages(idB58Str, conn, peer)
}

/**
* Registrar notifies a connection successfully with pubsub protocol.
* @private
Expand All @@ -141,7 +159,7 @@ class PubsubBaseProtocol extends EventEmitter {
const peer = this._addPeer(new Peer(peerInfo))
peer.attachConnection(conn)

this._processMessages(idB58Str, conn, peer)
// this._processMessages(idB58Str, conn, peer)
}

/**
Expand Down Expand Up @@ -202,14 +220,14 @@ class PubsubBaseProtocol extends EventEmitter {
// If strict signing is on and we have no signature, abort
if (this.strictSigning && !message.signature) {
this.log('Signing required and no signature was present, dropping message:', message)
return Promise.resolve(false)
return false
}

// Check the message signature if present
if (message.signature) {
return verifySignature(message)
} else {
return Promise.resolve(true)
return true
}
}

Expand Down
12 changes: 10 additions & 2 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ describe('pubsub base protocol', () => {
beforeEach(async () => {
const peerInfo = await createPeerInfo()
sinonMockRegistrar = {
handle: sinon.stub(),
register: sinon.stub(),
unregister: sinon.stub()
}
Expand All @@ -40,6 +41,7 @@ describe('pubsub base protocol', () => {

it('should be able to start and stop', async () => {
await pubsub.start()
expect(sinonMockRegistrar.handle.calledOnce).to.be.true()
expect(sinonMockRegistrar.register.calledOnce).to.be.true()

await pubsub.stop()
Expand All @@ -49,6 +51,7 @@ describe('pubsub base protocol', () => {
it('should not throw to start if already started', async () => {
await pubsub.start()
await pubsub.start()
expect(sinonMockRegistrar.handle.calledOnce).to.be.true()
expect(sinonMockRegistrar.register.calledOnce).to.be.true()

await pubsub.stop()
Expand Down Expand Up @@ -132,11 +135,16 @@ describe('pubsub base protocol', () => {
const registrarRecordB = {}

const registrar = (registrarRecord) => ({
handle: (multicodecs, handlers) => {

},
register: (multicodecs, handlers) => {
registrarRecord[multicodecs[0]] = handlers

return multicodecs[0]
},
unregister: (multicodecs) => {
delete registrarRecord[multicodecs[0]]
unregister: (id) => {
delete registrarRecord[id]
}
})

Expand Down
9 changes: 6 additions & 3 deletions test/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class PubsubImplementation extends PubsubBaseProtocol {
pipe(
conn,
lp.decode(),
async function collect (source) {
async function (source) {
for await (const val of source) {
const rpc = message.rpc.RPC.decode(val)

Expand All @@ -55,10 +55,13 @@ class PubsubImplementation extends PubsubBaseProtocol {
exports.PubsubImplementation = PubsubImplementation

exports.mockRegistrar = {
register: (multicodec, handlers) => {
handle: (multicodecs, handlers) => {

},
unregister: (multicodec) => {
register: (multicodecs, handlers) => {

},
unregister: (id) => {

}
}

0 comments on commit 0b67775

Please sign in to comment.