Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
refactor: async
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Switch to using async/await and async iterators.
  • Loading branch information
vasco-santos committed Sep 2, 2019
1 parent 65fe4a5 commit 8c85540
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 134 deletions.
60 changes: 29 additions & 31 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const pull = require('pull-stream')
const parallel = require('async/parallel')
const WebSocketStarRendezvous = require('libp2p-websocket-star-rendezvous')

const Node = require('./test/utils/nodejs-bundle.js')
Expand All @@ -13,29 +12,22 @@ const {
let wsRendezvous
let node

const before = (done) => {
parallel([
(cb) => {
WebSocketStarRendezvous.start({
port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port,
refreshPeerListIntervalMS: 1000,
strictMultiaddr: false,
cryptoChallenge: true
}, (err, _server) => {
const before = async () => {
[wsRendezvous, node] = await Promise.all([
WebSocketStarRendezvous.start({
port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port,
refreshPeerListIntervalMS: 1000,
strictMultiaddr: false,
cryptoChallenge: true
}),
new Promise(async (resolve, reject) => {
// TODO PROMISIFY THIS
getPeerRelay(async (err, peerInfo) => {
if (err) {
return cb(err)
}
wsRendezvous = _server
cb()
})
},
(cb) => {
getPeerRelay((err, peerInfo) => {
if (err) {
return done(err)
return reject(err)
}

node = new Node({
const n = new Node({
peerInfo,
config: {
relay: {
Expand All @@ -48,19 +40,25 @@ const before = (done) => {
}
})

node.handle('/echo/1.0.0', (_, conn) => pull(conn, conn))
node.start(cb)
n.handle('/echo/1.0.0', (_, conn) => pull(conn, conn))
await n.start()

resolve(n)
})
}
], done)
})
])
}

const after = (done) => {
setTimeout(() =>
parallel(
[node, wsRendezvous].map((s) => (cb) => s.stop(cb)),
done),
2000)
const after = () => {
return new Promise((resolve) => {
setTimeout(async () => {
await Promise.all([
node.stop(),
wsRendezvous.stop()
])
resolve()
}, 2000)
})
}

module.exports = {
Expand Down
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,19 @@ class PubsubImplementation extends Pubsub {

Validates the signature of a message.

#### `pubsub.validate(message, callback)`
#### `pubsub.validate(message)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| message | `Message` | a pubsub message |
| callback | `function(Error, Boolean)` | calls back with true if the message is valid |

#### Returns

| Type | Description |
|------|-------------|
| `Promise<Boolean>` | resolves to true if the message is valid |

## Implementations using this base protocol

Expand Down
20 changes: 10 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,33 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-pubsub#readme",
"devDependencies": {
"aegir": "^18.2.1",
"aegir": "^20.0.0",
"benchmark": "^2.1.4",
"chai": "^4.2.0",
"chai-spies": "^1.0.0",
"dirty-chai": "^2.0.1",
"libp2p": "~0.24.4",
"libp2p": "~0.26.1",
"libp2p-secio": "~0.11.1",
"libp2p-spdy": "~0.13.3",
"libp2p-tcp": "~0.13.0",
"libp2p-tcp": "~0.13.1",
"libp2p-websocket-star": "~0.10.2",
"libp2p-websocket-star-rendezvous": "~0.3.0",
"lodash": "^4.17.11",
"multiaddr": "^6.0.6",
"libp2p-websocket-star-rendezvous": "~0.4.1",
"lodash": "^4.17.15",
"multiaddr": "^6.1.0",
"peer-id": "~0.12.2",
"peer-info": "~0.15.1"
},
"dependencies": {
"async": "^2.6.2",
"bs58": "^4.0.1",
"debug": "^4.1.1",
"err-code": "^1.1.2",
"err-code": "^2.0.0",
"length-prefixed-stream": "^2.0.0",
"libp2p-crypto": "~0.16.1",
"libp2p-crypto": "~0.17.0",
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.1",
"pull-length-prefixed": "^1.3.3",
"pull-pushable": "^2.2.0",
"pull-stream": "^3.6.9",
"pull-stream": "^3.6.14",
"sinon": "^7.3.2",
"time-cache": "~0.3.0"
},
Expand Down
2 changes: 1 addition & 1 deletion src/message/sign.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function signMessage (peerId, message, callback) {
*/
function verifySignature (message, callback) {
// Get message sans the signature
let baseMessage = { ...message }
const baseMessage = { ...message }
delete baseMessage.signature
delete baseMessage.key
const bytes = Buffer.concat([
Expand Down
2 changes: 1 addition & 1 deletion src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ exports.anyMatch = (a, b) => {
bHas = (val) => b.has(val)
}

for (let val of a) {
for (const val of a) {
if (bHas(val)) {
return true
}
Expand Down
135 changes: 59 additions & 76 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ chai.use(require('dirty-chai'))
chai.use(require('chai-spies'))
const expect = chai.expect
const sinon = require('sinon')
const series = require('async/series')
const parallel = require('async/parallel')

const PubsubBaseProtocol = require('../src')
Expand Down Expand Up @@ -47,29 +46,24 @@ describe('pubsub base protocol', () => {
let psA
let psB

before((done) => {
series([
(cb) => createNode(cb),
(cb) => createNode(cb)
], (err, nodes) => {
if (err) {
return done(err)
}
nodeA = nodes[0]
nodeB = nodes[1]
done()
})
before(async () => {
[nodeA, nodeB] = await Promise.all([
createNode(),
createNode()
])
})

before('mount the pubsub protocol', (done) => {
before('mount the pubsub protocol', () => {
psA = new PubsubImplementation(nodeA)
psB = new PubsubImplementation(nodeB)

setTimeout(() => {
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
done()
}, 50)
return new Promise((resolve) => {
setTimeout(() => {
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
resolve()
}, 50)
})
})

before('start both Pubsub', (done) => {
Expand All @@ -86,15 +80,16 @@ describe('pubsub base protocol', () => {
], done)
})

it('Dial from nodeA to nodeB', (done) => {
series([
(cb) => nodeA.dial(nodeB.peerInfo, cb),
(cb) => setTimeout(() => {
it('Dial from nodeA to nodeB', async () => {
await nodeA.dial(nodeB.peerInfo)

return new Promise((resolve) => {
setTimeout(() => {
expect(psA.peers.size).to.equal(1)
expect(psB.peers.size).to.equal(1)
cb()
resolve()
}, 1000)
], done)
})
})

it('_buildMessage normalizes and signs messages', (done) => {
Expand Down Expand Up @@ -156,15 +151,14 @@ describe('pubsub base protocol', () => {
let psA
let psB

before((done) => {
series([
(cb) => createNode(cb),
(cb) => createNode(cb)
], (cb, nodes) => {
nodeA = nodes[0]
nodeB = nodes[1]
nodeA.dial(nodeB.peerInfo, () => setTimeout(done, 1000))
})
before(async () => {
[nodeA, nodeB] = await Promise.all([
createNode(),
createNode()
])

await nodeA.dial(nodeB.peerInfo)
await new Promise((resolve) => setTimeout(resolve, 1000))
})

after((done) => {
Expand Down Expand Up @@ -205,25 +199,21 @@ describe('pubsub base protocol', () => {
let psA
let psB

before((done) => {
sandbox = chai.spy.sandbox()

series([
(cb) => createNode(cb),
(cb) => createNode(cb)
], (err, nodes) => {
if (err) return done(err)

nodeA = nodes[0]
nodeB = nodes[1]

// Put node B in node A's peer book
nodeA.peerBook.put(nodeB.peerInfo)
before(async () => {
// sandbox = chai.spy.sandbox()
[nodeA, nodeB] = await Promise.all([
createNode(),
createNode()
])
// Put node B in node A's peer book
nodeA.peerBook.put(nodeB.peerInfo)

return new Promise((resolve) => {
psA = new PubsubImplementation(nodeA)
psB = new PubsubImplementation(nodeB)

psB.start(done)
sandbox = chai.spy.sandbox()
psB.start(resolve)
})
})

Expand Down Expand Up @@ -266,25 +256,21 @@ describe('pubsub base protocol', () => {
let psA
let psB

before((done) => {
sandbox = chai.spy.sandbox()

series([
(cb) => createNode(cb),
(cb) => createNode(cb)
], (err, nodes) => {
if (err) return done(err)

nodeA = nodes[0]
nodeB = nodes[1]

// Put node B in node A's peer book
nodeA.peerBook.put(nodeB.peerInfo)
before(async () => {
// sandbox = chai.spy.sandbox()
[nodeA, nodeB] = await Promise.all([
createNode(),
createNode()
])
// Put node B in node A's peer book
nodeA.peerBook.put(nodeB.peerInfo)

return new Promise((resolve) => {
psA = new PubsubImplementation(nodeA)
psB = new PubsubImplementation(nodeB)

psB.start(done)
sandbox = chai.spy.sandbox()
psB.start(resolve)
})
})

Expand Down Expand Up @@ -338,25 +324,22 @@ describe('pubsub base protocol', () => {
let psA
let psB

before((done) => {
sandbox = chai.spy.sandbox()

series([
(cb) => createNode(cb),
(cb) => createNode(cb)
], (err, nodes) => {
if (err) return done(err)

nodeA = nodes[0]
nodeB = nodes[1]
before(async () => {
// sandbox = chai.spy.sandbox()
[nodeA, nodeB] = await Promise.all([
createNode(),
createNode()
])

return new Promise((resolve) => {
psA = new PubsubImplementation(nodeA)
psB = new PubsubImplementation(nodeB)

sandbox = chai.spy.sandbox()
parallel([
(cb) => psA.start(cb),
(cb) => psB.start(cb)
], done)
], resolve)
})
})

Expand Down
8 changes: 4 additions & 4 deletions test/utils.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ describe('utils', () => {
})

it('converts an IN msg.from to b58', () => {
let binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex')
let stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM'
const binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex')
const stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM'
const m = [
{ from: binaryId },
{ from: stringId }
Expand All @@ -63,8 +63,8 @@ describe('utils', () => {
})

it('converts an OUT msg.from to binary', () => {
let binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex')
let stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM'
const binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex')
const stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM'
const m = [
{ from: binaryId },
{ from: stringId }
Expand Down
Loading

0 comments on commit 8c85540

Please sign in to comment.