From adfbd40c991ab787aaaecc4f8731808fcd8db9cf Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 21 Aug 2019 11:37:02 +0200 Subject: [PATCH] refactor: switch to async iterators BREAKING CHANGE: Switch to using async/await and async iterators. The transport and connection interfaces have changed. --- .aegir.js | 8 +- .travis.yml | 1 - README.md | 76 +++++-------- gulpfile.js | 2 + package.json | 22 ++-- src/adapter.js | 17 +++ src/constants.js | 5 + src/index.js | 199 ++++++++++++++++++++-------------- src/socket.js | 88 +++++++++++++++ test/dial.spec.js | 47 ++++---- test/listen.js | 36 +++--- test/valid-connection.spec.js | 41 ++----- 12 files changed, 325 insertions(+), 217 deletions(-) create mode 100644 src/adapter.js create mode 100644 src/constants.js create mode 100644 src/socket.js diff --git a/.aegir.js b/.aegir.js index f9106e2..276063e 100644 --- a/.aegir.js +++ b/.aegir.js @@ -2,6 +2,7 @@ const WebRTCDirect = require('./src') const pull = require('pull-stream') +const pipe = require('it-pipe') const multiaddr = require('multiaddr') const ma = multiaddr('/ip4/127.0.0.1/tcp/12345/http/p2p-webrtc-direct') @@ -9,15 +10,16 @@ let listener function boot (done) { const wd = new WebRTCDirect() - listener = wd.createListener((conn) => pull(conn, conn)) - listener.listen(ma, done) + listener = wd.createListener((conn) => pipe(conn, conn)) listener.on('listening', () => { console.log('gulp listener started on:', ma.toString()) }) + listener.listen(ma).then(() => done()).catch(done) + listener.on('error', console.error) } function shutdown (done) { - listener.close(done) + listener.close().then(done).catch(done) } module.exports = { diff --git a/.travis.yml b/.travis.yml index aa8cf09..030d1e5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,6 @@ jobs: include: - stage: check script: - - npx aegir commitlint --travis - npx aegir dep-check -- -i wrtc -i electron-webrtc - npm run lint diff --git a/README.md b/README.md index 9049b1d..ddd9017 100644 --- a/README.md +++ b/README.md @@ -39,43 +39,33 @@ ```js const WebRTCDirect = require('libp2p-webrtc-direct') const multiaddr = require('multiaddr') -const pull = require('pull-stream') +const pipe = require('pull-stream') +const { collect } = require('streaming-iterables') -const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') +const addr = multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') const webRTCDirect = new WebRTCDirect() const listener = webRTCDirect.createListener((socket) => { console.log('new connection opened') - pull( - pull.values(['hello']), + pipe( + ['hello'], socket ) }) -listener.listen(mh, () => { - console.log('listening') - - webRTCDirect.dial(mh, (err, conn) => { - if (!err) { - pull( - conn, - pull.collect((err, values) => { - if (!err) { - console.log(`Value: ${values.toString()}`) - } else { - console.log(`Error: ${err}`) - } - - // Close connection after reading - listener.close() - }), - ) - } else { - console.log(`Error: ${err}`) - } - }) -}) +await listener.listen(addr) +console.log('listening') + +const conn = await webRTCDirect.dial(addr) +const values = await pipe( + conn, + collect +) +console.log(`Value: ${values.toString()}`) + +// Close connection after reading +await listener.close() ``` Outputs: @@ -89,32 +79,22 @@ Note that it may take some time for the connection to be established. ## API -Follows the interface defined by `interface-transport` - -[![](https://raw.githubusercontent.com/diasdavid/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport) +### Transport -## Pull-streams +[![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport) -### This module uses `pull-streams` +### Connection -We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). +[![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png)](https://github.com/libp2p/interface-connection) -You can learn more about pull-streams at: +## Contribute -- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ) -- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams) -- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple) -- [pull-streams documentation](https://pull-stream.github.io/) +The libp2p implementation in JavaScript is a work in progress. As such, there are a few things you can do right now to help out: -#### Converting `pull-streams` to Node.js Streams + - Go through the modules and **check out existing issues**. This would be especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it - for instance, you may need to read up on p2p and more complex operations like muxing to be able to help technically. + - **Perform code reviews**. + - **Add tests**. There can never be enough tests. -If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/pull-stream/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example: - -```JavaScript -const pullToStream = require('pull-stream-to-stream') - -const nodeStreamInstance = pullToStream(pullStreamInstance) -// nodeStreamInstance is an instance of a Node.js Stream -``` +## License -To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream. +[MIT](LICENSE) © Protocol Labs diff --git a/gulpfile.js b/gulpfile.js index 027e74d..c1d1248 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -18,7 +18,9 @@ function boot (done) { listener = wd.createListener((conn) => pull(conn, conn)) listener.listen(ma, done) listener.on('listening', () => { + /* eslint-disable no-console */ console.log('gulp listener started on:', ma.toString()) + /* eslint-enable no-console */ }) } diff --git a/package.json b/package.json index 5caa6d3..d9c7315 100644 --- a/package.json +++ b/package.json @@ -12,18 +12,17 @@ "scripts": { "lint": "aegir lint", "build": "aegir build", + "docs": "aegir docs", "test": "aegir test --target node --target browser", "test:node": "aegir test --target node", "test:browser": "aegir test --target browser", "release": "aegir release --target node --target browser", "release-minor": "aegir release --type minor --target node --target browser", "release-major": "aegir release --type major --target node --target browser", - "coverage": "aegir coverage", - "coverage-publish": "aegir coverage --provider coveralls" + "coverage": "nyc --reporter=text --reporter=lcov npm run test:node" }, "pre-push": [ - "lint", - "test" + "lint" ], "repository": { "type": "git", @@ -44,25 +43,28 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-webrtc-direct#readme", "devDependencies": { - "aegir": "^18.2.1", + "aegir": "^20.0.0", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "gulp": "^4.0.0", + "gulp": "^4.0.2", "multiaddr": "^6.0.6", "webrtcsupport": "^2.2.0" }, "dependencies": { + "abortable-iterator": "^2.1.0", "class-is": "^1.1.0", "concat-stream": "^2.0.0", "detect-node": "^2.0.4", + "err-code": "^2.0.0", "interface-connection": "~0.3.3", - "mafmt": "^6.0.7", + "interface-transport": "^0.5.2", + "lodash.includes": "^4.3.0", + "mafmt": "^6.0.8", "multibase": "~0.6.0", "once": "^1.4.0", - "pull-stream": "^3.6.9", "request": "^2.88.0", - "simple-peer": "9.3.0", - "stream-to-pull-stream": "^1.7.3", + "simple-peer": "9.5.0", + "stream-to-it": "^0.1.0", "wrtc": "~0.2.1", "xhr": "^2.5.0" }, diff --git a/src/adapter.js b/src/adapter.js new file mode 100644 index 0000000..6367a63 --- /dev/null +++ b/src/adapter.js @@ -0,0 +1,17 @@ +'use strict' + +const { Adapter } = require('interface-transport') +const withIs = require('class-is') +const WebRTCDirect = require('.') + +// Legacy adapter to old transport & connection interface +class WebRTCDirectAdapter extends Adapter { + constructor () { + super(new WebRTCDirect()) + } +} + +module.exports = withIs(WebRTCDirectAdapter, { + className: 'WebRTCDirect', + symbolName: '@libp2p/js-libp2p-webrtc-direct/webrtcdirect' +}) diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..7c9048c --- /dev/null +++ b/src/constants.js @@ -0,0 +1,5 @@ +'use strict' + +// Time to wait for a connection to close gracefully before destroying it +// manually +module.exports.CLOSE_TIMEOUT = 2000 diff --git a/src/index.js b/src/index.js index 20cc241..269eb59 100644 --- a/src/index.js +++ b/src/index.js @@ -1,85 +1,115 @@ 'use strict' +const debug = require('debug') +const log = debug('libp2p:webrtcdirect:dial') +const errcode = require('err-code') + +const includes = require('lodash.includes') const wrtc = require('wrtc') const SimplePeer = require('simple-peer') const isNode = require('detect-node') const http = require('http') -const toPull = require('stream-to-pull-stream') -const Connection = require('interface-connection').Connection -const EE = require('events').EventEmitter +const { EventEmitter } = require('events') const mafmt = require('mafmt') const multibase = require('multibase') -const once = require('once') const request = require('request') const withIs = require('class-is') +const { AbortError } = require('interface-transport') -function noop () {} +const Libp2pSocket = require('./socket') -function cleanMultiaddr (ma) { - return ma.decapsulate('/p2p-webrtc-direct') -} +function noop () {} class WebRTCDirect { - dial (ma, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback = once(callback || noop) - - Object.assign(options, { + async dial (ma, options) { + options = { + ...options, initiator: true, trickle: false - }) + } if (isNode) { options.wrtc = wrtc } - const channel = new SimplePeer(options) - const conn = new Connection(toPull.duplex(channel)) - - let connected = false - - channel.on('signal', (signal) => { - const signalStr = JSON.stringify(signal) - const cma = cleanMultiaddr(ma) - const url = 'http://' + cma.toOptions().host + ':' + cma.toOptions().port - const path = '/?signal=' + multibase.encode('base58btc', Buffer.from(signalStr)) - const uri = url + path - - request.get(uri, (err, res) => { - if (err) { - return callback(err) - } - const incSignalBuf = multibase.decode(res.body) - const incSignalStr = incSignalBuf.toString() - const incSignal = JSON.parse(incSignalStr) - channel.signal(incSignal) - }) - }) + const cma = ma.decapsulate('/p2p-webrtc-direct') + const cOpts = cma.toOptions() + log('Dialing %s:%s', cOpts.host, cOpts.port) - channel.on('connect', () => { - connected = true - callback(null, conn) - }) + const rawConn = await this._connect(cOpts, options) + return new Libp2pSocket(rawConn, ma, options) + } + + _connect (cOpts, options) { + return new Promise((resolve, reject) => { + if ((options.signal || {}).aborted) { + return reject(new AbortError()) + } + + const start = Date.now() + const channel = new SimplePeer(options) + + const onError = (err) => { + const msg = `Error dialing ${cOpts.host}:${cOpts.port}: ${err.message}` + done(errcode(msg, err.code)) + } + + const onTimeout = () => { + log('Timeout dialing %s:%s', cOpts.host, cOpts.port) + const err = errcode(`Timeout after ${Date.now() - start}ms`, 'ETIMEDOUT') + // Note: this will result in onError() being called + channel.emit('error', err) + } + + const onConnect = () => { + log('Connected to %s:%s', cOpts.host, cOpts.port) + done(null, channel) + } + + const onAbort = () => { + log('Dial to %s:%s aborted', cOpts.host, cOpts.port) + channel.destroy() + done(new AbortError()) + } + + const done = (err, res) => { + channel.removeListener('error', onError) + channel.removeListener('timeout', onTimeout) + channel.removeListener('connect', onConnect) - conn.destroy = channel.destroy.bind(channel) - conn.getObservedAddrs = (callback) => callback(null, [ma]) + options.signal && options.signal.removeEventListener('abort', onAbort) - channel.on('timeout', () => callback(new Error('timeout'))) - channel.on('close', () => conn.destroy()) - channel.on('error', (err) => { - if (!connected) { - callback(err) + err ? reject(err) : resolve(res) } + + channel.once('error', onError) + channel.once('timeout', onTimeout) + channel.once('connect', onConnect) + channel.on('close', () => channel.destroy()) + options.signal && options.signal.addEventListener('abort', onAbort) + + channel.on('signal', (signal) => { + const signalStr = JSON.stringify(signal) + const url = 'http://' + cOpts.host + ':' + cOpts.port + const path = '/?signal=' + multibase.encode('base58btc', Buffer.from(signalStr)) + const uri = url + path + + request.get(uri, (err, res) => { + if (err) { + return reject(err) + } + const incSignalBuf = multibase.decode(res.body) + const incSignalStr = incSignalBuf.toString() + const incSignal = JSON.parse(incSignalStr) + channel.signal(incSignal) + }) + }) }) } createListener (options, handler) { if (!isNode) { - throw new Error(`Can't listen if run from the Browser`) + throw errcode(new Error(`Can't listen if run from the Browser`), 'ERR_CANNOT_LISTEN_FROM_BROWSER') } if (typeof options === 'function') { @@ -87,7 +117,9 @@ class WebRTCDirect { options = {} } - const listener = new EE() + handler = handler || noop + + const listener = new EventEmitter() const server = http.createServer() let maSelf @@ -100,19 +132,20 @@ class WebRTCDirect { const incSignalBuf = multibase.decode(Buffer.from(incSignalStr)) const incSignal = JSON.parse(incSignalBuf.toString()) - Object.assign(options, { + options = { + ...options, trickle: false - }) + } if (isNode) { options.wrtc = wrtc } const channel = new SimplePeer(options) - const conn = new Connection(toPull.duplex(channel)) + // TODO get multiaddr + const conn = new Libp2pSocket(channel) channel.on('connect', () => { - conn.getObservedAddrs = (callback) => callback(null, []) listener.emit('connection', conn) handler(conn) }) @@ -126,37 +159,41 @@ class WebRTCDirect { channel.signal(incSignal) }) - listener.listen = (ma, callback) => { - callback = callback || noop + server.on('listening', () => listener.emit('listening')) + server.on('error', (err) => listener.emit('error', err)) + server.on('close', () => listener.emit('close')) + listener.listen = (ma) => { maSelf = ma - server.on('listening', () => { - listener.emit('listening') - callback() - }) + const lOpts = ma.decapsulate('/p2p-webrtc-direct').toOptions() + + return new Promise((resolve, reject) => { + server.on('listening', (err) => { + if (err) { + return reject(err) + } + + listener.emit('listening') + log('Listening on %s %s', lOpts.port, lOpts.host) + resolve() + }) - const cma = cleanMultiaddr(ma) - server.listen(cma.toOptions()) + server.listen(lOpts) + }) } - listener.close = (options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} + listener.close = () => { + if (!server.listening) { + return } - callback = callback || noop - - server.close(() => { - listener.emit('close') - callback() + return new Promise((resolve, reject) => { + server.close((err) => err ? reject(err) : resolve()) }) } - listener.getAddrs = (callback) => { - setImmediate(() => { - callback(null, [maSelf]) - }) + listener.getAddrs = () => { + return [maSelf] } return listener @@ -168,10 +205,14 @@ class WebRTCDirect { } return multiaddrs.filter((ma) => { - if (ma.protoNames().indexOf('p2p-circuit') > -1) { + if (includes(ma.protoNames(), 'p2p-circuit')) { return false } + if (includes(ma.protoNames(), 'ipfs')) { + ma = ma.decapsulate('ipfs') + } + return mafmt.WebRTCDirect.matches(ma) }) } diff --git a/src/socket.js b/src/socket.js new file mode 100644 index 0000000..9e41a69 --- /dev/null +++ b/src/socket.js @@ -0,0 +1,88 @@ +'use strict' + +const abortable = require('abortable-iterator') +// const toIterable = require('stream-to-it') + +const debug = require('debug') +const log = debug('libp2p:tcp:socket') + +const c = require('./constants') + +class Libp2pSocket { + constructor (rawSocket, ma, opts = {}) { + this._rawSocket = rawSocket + this._ma = ma + + this.sink = this._sink(opts) + this.source = opts.signal + ? abortable(this._rawSocket, opts.signal) : this._rawSocket + } + + _sink (opts) { + // By default, close when the source is exhausted + const closeOnEnd = opts.closeOnEnd !== false + + return async (source) => { + try { + const src = opts.signal ? abortable(source, opts.signal) : source + await this._write(src, closeOnEnd) + } catch (err) { + // If the connection is aborted just close the socket + if (err.type === 'aborted') { + return this.close() + } + + throw err + } + } + } + + async _write (source, closeOnEnd) { + for await (const data of source) { + if (this._rawSocket.destroyed) { + const cOpts = this._ma.toOptions() + throw new Error('Cannot write %d bytes to destroyed socket %s:%s', + data.length, cOpts.host, cOpts.port) + } + + const flushed = this._rawSocket.write(data) + if (!flushed) { + await new Promise((resolve) => this._rawSocket.once('drain', resolve)) + } + } + + if (closeOnEnd) { + await this.close() + } + } + + close (opts = {}) { + if (this._rawSocket.pending || this._rawSocket.destroyed) { + return + } + + return new Promise((resolve, reject) => { + const start = Date.now() + + // Attempt to end the socket. If it takes longer to close than the + // timeout, destroy it manually. + const timeout = setTimeout(() => { + const cOpts = this._ma.toOptions() + log('Timeout closing socket to %s:%s after %dms, destroying it manually', + cOpts.host, cOpts.port, Date.now() - start) + this._rawSocket.destroy() + resolve() + }, opts.timeout || c.CLOSE_TIMEOUT) + + this._rawSocket.once('close', () => clearTimeout(timeout)) + + this._rawSocket.end((err) => err ? reject(err) : resolve()) + }) + } + + getObservedAddrs () { + return [this._ma] + } +} + +module.exports = Libp2pSocket diff --git a/test/dial.spec.js b/test/dial.spec.js index ac45861..f2d7e22 100644 --- a/test/dial.spec.js +++ b/test/dial.spec.js @@ -8,7 +8,9 @@ const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') -const pull = require('pull-stream') + +const pipe = require('it-pipe') +const { collect } = require('streaming-iterables') const WebRTCDirect = require('../src') @@ -22,34 +24,33 @@ describe('dial', function () { wd = new WebRTCDirect() }) - it('dial on IPv4, check callback', (done) => { - wd.dial(ma, { config: {} }, (err, conn) => { - expect(err).to.not.exist() - - const data = Buffer.from('some data') - - pull( - pull.values([data]), - conn, - pull.collect((err, values) => { - expect(err).to.not.exist() - expect(values).to.eql([data]) - done() - }) - ) - }) + it('dial on IPv4', async () => { + const conn = await wd.dial(ma) + const data = Buffer.from('some data') + + const values = await pipe( + [data], + conn, + collect + ) + + expect(values).to.eql([data]) }) - it('dial offline / non-existent node on IPv4, check callback', (done) => { - let maOffline = multiaddr('/ip4/127.0.0.1/tcp/55555/http/p2p-webrtc-direct') + it('dial offline / non-existent node on IPv4, check callback', async () => { + const maOffline = multiaddr('/ip4/127.0.0.1/tcp/55555/http/p2p-webrtc-direct') - wd.dial(maOffline, { config: {} }, (err, conn) => { + try { + await wd.dial(maOffline, { config: {} }) + } catch (err) { expect(err).to.exist() - done() - }) + return + } + + throw new Error('dial did not fail') }) - it.skip('dial on IPv6', (done) => { + it.skip('dial on IPv6', () => { // TODO IPv6 not supported yet }) }) diff --git a/test/listen.js b/test/listen.js index 09331f3..faf5581 100644 --- a/test/listen.js +++ b/test/listen.js @@ -18,29 +18,28 @@ describe('listen', () => { wd = new WebRTCDirect() }) - it('listen, check for callback', (done) => { - const listener = wd.createListener({ config: {} }, (conn) => {}) + it('listen, check for promise', async () => { + const listener = wd.createListener({ config: {} }, (_) => { }) - listener.listen(ma, (err) => { - expect(err).to.not.exist() - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it('listen, check for listening event', (done) => { const listener = wd.createListener({ config: {} }, (conn) => {}) - listener.once('listening', () => { - listener.close(done) + listener.once('listening', async () => { + await listener.close() + done() }) listener.listen(ma) }) it('listen, check for the close event', (done) => { const listener = wd.createListener({ config: {} }, (conn) => {}) - listener.listen(ma, (err) => { - expect(err).to.not.exist() + listener.listen(ma).then(() => { listener.once('close', done) + listener.close() }) }) @@ -61,15 +60,14 @@ describe('listen', () => { // TODO IPv6 not supported yet }) - it('getAddrs', (done) => { + it('getAddrs', async () => { const listener = wd.createListener({ config: {} }, (conn) => {}) - listener.listen(ma, (err) => { - expect(err).to.not.exist() - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) + + await listener.listen(ma) + + const addrs = listener.getAddrs() + expect(addrs[0]).to.deep.equal(ma) + + await listener.close() }) }) diff --git a/test/valid-connection.spec.js b/test/valid-connection.spec.js index 51b765b..466a8cc 100644 --- a/test/valid-connection.spec.js +++ b/test/valid-connection.spec.js @@ -7,7 +7,6 @@ const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') -const pull = require('pull-stream') const WebRTCDirect = require('../src') @@ -17,45 +16,19 @@ describe('valid Connection', function () { let wd let conn - before((done) => { + before(async () => { wd = new WebRTCDirect() - wd.dial(ma, { config: {} }, (err, _conn) => { - expect(err).to.not.exist() - conn = _conn - done() - }) + conn = await wd.dial(ma, { config: {} }) }) - after((done) => { - pull( - pull.empty(), - conn, - pull.onEnd(done) - ) + after(async () => { + conn && await conn.close() }) - it('get observed addrs', (done) => { - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs[0].toString()).to.equal(ma.toString()) - done() - }) - }) - - it('get Peer Info', (done) => { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - done() - }) - }) + it('get observed addrs', () => { + const addrs = conn.getObservedAddrs() - it('set Peer Info', (done) => { - conn.setPeerInfo('info') - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('info') - done() - }) + expect(addrs[0].toString()).to.equal(ma.toString()) }) })