Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
refactor: switch to async iterators
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Switch to using async/await and async iterators.
  • Loading branch information
vasco-santos committed Oct 4, 2019
1 parent 253f63a commit c66cf31
Show file tree
Hide file tree
Showing 10 changed files with 663 additions and 746 deletions.
78 changes: 35 additions & 43 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const pull = require('pull-stream')
const parallel = require('async/parallel')
const WebSocketStarRendezvous = require('libp2p-websocket-star-rendezvous')

const Node = require('./test/utils/nodejs-bundle.js')
Expand All @@ -13,54 +12,47 @@ const {
let wsRendezvous
let node

const before = (done) => {
parallel([
(cb) => {
WebSocketStarRendezvous.start({
port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port,
refreshPeerListIntervalMS: 1000,
strictMultiaddr: false,
cryptoChallenge: true
}, (err, _server) => {
if (err) {
return cb(err)
}
wsRendezvous = _server
cb()
})
},
(cb) => {
getPeerRelay((err, peerInfo) => {
if (err) {
return done(err)
}

node = new Node({
peerInfo,
config: {
relay: {
const before = async () => {
[wsRendezvous, node] = await Promise.all([
WebSocketStarRendezvous.start({
port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port,
refreshPeerListIntervalMS: 1000,
strictMultiaddr: false,
cryptoChallenge: true
}),
new Promise(async (resolve) => {
const peerInfo = await getPeerRelay()
const n = new Node({
peerInfo,
config: {
relay: {
enabled: true,
hop: {
enabled: true,
hop: {
enabled: true,
active: true
}
active: true
}
}
})

node.handle('/echo/1.0.0', (_, conn) => pull(conn, conn))
node.start(cb)
}
})
}
], done)

n.handle('/echo/1.0.0', (_, conn) => pull(conn, conn))
await n.start()

resolve(n)
})
])
}

const after = (done) => {
setTimeout(() =>
parallel(
[node, wsRendezvous].map((s) => (cb) => s.stop(cb)),
done),
2000)
const after = () => {
return new Promise((resolve) => {
setTimeout(async () => {
await Promise.all([
node.stop(),
wsRendezvous.stop()
])
resolve()
}, 2000)
})
}

module.exports = {
Expand Down
17 changes: 7 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,14 @@ const FloodSub = require('libp2p-floodsub')

const fsub = new FloodSub(node)

fsub.start((err) => {
if (err) {
console.log('Upsy', err)
}
fsub.on('fruit', (data) => {
console.log(data)
})
fsub.subscribe('fruit')

fsub.publish('fruit', new Buffer('banana'))
await fsub.start()

fsub.on('fruit', (data) => {
console.log(data)
})
fsub.subscribe('fruit')

fsub.publish('fruit', new Buffer('banana'))
```

## Events
Expand Down
23 changes: 11 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,32 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-floodsub#readme",
"devDependencies": {
"aegir": "^18.2.1",
"aegir": "^20.3.1",
"benchmark": "^2.1.4",
"chai": "^4.2.0",
"chai-spies": "^1.0.0",
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"libp2p": "~0.24.4",
"libp2p": "~0.26.2",
"libp2p-secio": "~0.11.1",
"libp2p-spdy": "~0.13.3",
"libp2p-tcp": "~0.13.0",
"libp2p-tcp": "~0.13.2",
"libp2p-websocket-star": "~0.10.2",
"libp2p-websocket-star-rendezvous": "~0.3.0",
"lodash": "^4.17.11",
"multiaddr": "^6.0.6",
"peer-id": "~0.12.2",
"libp2p-websocket-star-rendezvous": "~0.4.1",
"lodash": "^4.17.15",
"multiaddr": "^6.1.0",
"peer-id": "~0.12.5",
"peer-info": "~0.15.1",
"sinon": "^7.3.2"
"sinon": "^7.5.0"
},
"dependencies": {
"async": "^2.6.2",
"bs58": "^4.0.1",
"debug": "^4.1.1",
"length-prefixed-stream": "^2.0.0",
"libp2p-crypto": "~0.16.1",
"libp2p-pubsub": "~0.2.0",
"libp2p-pubsub": "libp2p/js-libp2p-pubsub#refactor/async",
"p-map": "^3.0.0",
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.2",
"pull-length-prefixed": "^1.3.3",
"pull-pushable": "^2.2.0",
"pull-stream": "^3.6.9"
},
Expand Down
93 changes: 44 additions & 49 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ const config = require('./config')
const multicodec = config.multicodec
const ensureArray = utils.ensureArray
const setImmediate = require('async/setImmediate')
const asyncMap = require('async/map')
const noop = () => {}
const pMap = require('p-map')

/**
* FloodSub (aka dumbsub is an implementation of pubsub focused on
Expand Down Expand Up @@ -49,19 +48,16 @@ class FloodSub extends BaseProtocol {
* @override
* @param {PeerInfo} peerInfo peer info
* @param {Connection} conn connection to the peer
* @param {function} callback
*/
_onDial (peerInfo, conn, callback) {
super._onDial(peerInfo, conn, (err) => {
if (err) return callback(err)
const idB58Str = peerInfo.id.toB58String()
const peer = this.peers.get(idB58Str)
if (peer && peer.isWritable) {
// Immediately send my own subscriptions to the newly established conn
peer.sendSubscriptions(this.subscriptions)
}
setImmediate(() => callback())
})
_onDial (peerInfo, conn) {
super._onDial(peerInfo, conn)
const idB58Str = peerInfo.id.toB58String()
const peer = this.peers.get(idB58Str)

if (peer && peer.isWritable) {
// Immediately send my own subscriptions to the newly established conn
peer.sendSubscriptions(this.subscriptions)
}
}

/**
Expand All @@ -71,7 +67,7 @@ class FloodSub extends BaseProtocol {
* @param {string} idB58Str peer id string in base58
* @param {Connection} conn connection
* @param {PeerInfo} peer peer info
* @returns {undefined}
* @returns {void}
*
*/
_processConnection (idB58Str, conn, peer) {
Expand Down Expand Up @@ -119,7 +115,7 @@ class FloodSub extends BaseProtocol {
* @param {rpc.RPC.Message} message The message to process
* @returns {void}
*/
_processRpcMessage (message) {
async _processRpcMessage (message) {
const msg = utils.normalizeInRpcMessage(message)
const seqno = utils.msgId(msg.from, msg.seqno)
// 1. check if I've seen the message, if yes, ignore
Expand All @@ -128,19 +124,27 @@ class FloodSub extends BaseProtocol {
}

this.seenCache.put(seqno)

// 2. validate the message (signature verification)
this.validate(message, (err, isValid) => {
if (err || !isValid) {
this.log('Message could not be validated, dropping it. isValid=%s', isValid, err)
return
}
let isValid
let error

// 3. if message is valid, emit to self
this._emitMessages(msg.topicIDs, [msg])
try {
isValid = await this.validate(message)
} catch (err) {
error = err
}

// 4. if message is valid, propagate msg to others
this._forwardMessages(msg.topicIDs, [msg])
})
if (error || !isValid) {
this.log('Message could not be validated, dropping it. isValid=%s', isValid, error)
return
}

// 3. if message is valid, emit to self
this._emitMessages(msg.topicIDs, [msg])

// 4. if message is valid, propagate msg to others
this._forwardMessages(msg.topicIDs, [msg])
}

_emitMessages (topics, messages) {
Expand Down Expand Up @@ -170,30 +174,25 @@ class FloodSub extends BaseProtocol {
/**
* Unmounts the floodsub protocol and shuts down every connection
* @override
* @param {Function} callback
* @returns {undefined}
* @returns {void}
*
*/
stop (callback) {
super.stop((err) => {
if (err) return callback(err)
this.subscriptions = new Set()
callback()
})
stop () {
super.stop()

this.subscriptions = new Set()
}

/**
* Publish messages to the given topics.
* @override
* @param {Array<string>|string} topics
* @param {Array<any>|any} messages
* @param {function(Error)} callback
* @returns {undefined}
* @returns {Promise}
*
*/
publish (topics, messages, callback) {
async publish (topics, messages) {
assert(this.started, 'FloodSub is not started')
callback = callback || noop

this.log('publish', topics, messages)

Expand All @@ -202,7 +201,7 @@ class FloodSub extends BaseProtocol {

const from = this.libp2p.peerInfo.id.toB58String()

const buildMessage = (msg, cb) => {
const buildMessage = (msg) => {
const seqno = utils.randomSeqno()
this.seenCache.put(utils.msgId(from, seqno))

Expand All @@ -216,24 +215,20 @@ class FloodSub extends BaseProtocol {
// Emit to self if I'm interested and it is enabled
this._options.emitSelf && this._emitMessages(topics, [message])

this._buildMessage(message, cb)
return this._buildMessage(message)
}

asyncMap(messages, buildMessage, (err, msgObjects) => {
if (err) return callback(err)

// send to all the other peers
this._forwardMessages(topics, msgObjects)
const msgObjects = await pMap(messages, buildMessage)

callback(null)
})
// send to all the other peers
this._forwardMessages(topics, msgObjects)
}

/**
* Subscribe to the given topic(s).
* @override
* @param {Array<string>|string} topics
* @returns {undefined}
* @returns {void}
*/
subscribe (topics) {
assert(this.started, 'FloodSub is not started')
Expand Down Expand Up @@ -261,7 +256,7 @@ class FloodSub extends BaseProtocol {
* Unsubscribe from the given topic(s).
* @override
* @param {Array<string>|string} topics
* @returns {undefined}
* @returns {void}
*/
unsubscribe (topics) {
// Avoid race conditions, by quietly ignoring unsub when shutdown.
Expand Down
Loading

0 comments on commit c66cf31

Please sign in to comment.