Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
chore: apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
  • Loading branch information
vasco-santos and jacobheun committed Sep 30, 2019
1 parent 310730b commit 3d1d689
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 72 deletions.
11 changes: 6 additions & 5 deletions .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ const mockUpgrader = {
upgradeOutbound: maConn => maConn
}

function boot (done) {
function boot () {
const wd = new WebRTCDirect({ upgrader: mockUpgrader })

listener = wd.createListener((conn) => pipe(conn, conn))
listener.on('listening', () => {
console.log('gulp listener started on:', ma.toString())
console.log('listener started on:', ma.toString())
})
listener.listen(ma).then(() => done()).catch(done)
listener.on('error', console.error)
return listener.listen(ma)
}

function shutdown (done) {
listener.close().then(done).catch(done)
function shutdown () {
return listener.close()
}

module.exports = {
Expand Down
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ stages:

node_js:
- '10'
- '12'

os:
- linux
Expand Down
31 changes: 0 additions & 31 deletions gulpfile.js

This file was deleted.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"aegir": "^20.3.1",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"gulp": "^4.0.2",
"multiaddr": "^7.1.0",
"webrtcsupport": "^2.2.0"
},
Expand Down
12 changes: 8 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const assert = require('debug')
const debug = require('debug')
const log = debug('libp2p:webrtcdirect')
log.error = debug('libp2p:webrtcdirect:error')
const errcode = require('err-code')

const wrtc = require('wrtc')
Expand Down Expand Up @@ -63,10 +64,10 @@ class WebRTCDirect {
}

options = {
...options,
initiator: true,
trickle: false,
wrtc: isNode ? wrtc : undefined
wrtc: isNode ? wrtc : undefined,
...options
}

return new Promise((resolve, reject) => {
Expand All @@ -80,7 +81,10 @@ class WebRTCDirect {

const onError = (err) => {
if (!connected) {
err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`
const msg = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`

log.error(msg)
err.message = msg
done(err)
}
}
Expand All @@ -100,7 +104,7 @@ class WebRTCDirect {
}

const onAbort = () => {
log('connection aborted %s:%s', cOpts.host, cOpts.port)
log.error('connection aborted %s:%s', cOpts.host, cOpts.port)
channel.destroy()
done(new AbortError())
}
Expand Down
31 changes: 17 additions & 14 deletions src/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const http = require('http')
const EventEmitter = require('events')
const log = require('debug')('libp2p:libp2p:webrtcdirect:listener')
const log = require('debug')('libp2p:webrtcdirect:listener')

const isNode = require('detect-node')
const wrtc = require('wrtc')
Expand All @@ -19,7 +19,7 @@ module.exports = ({ handler, upgrader }, options = {}) => {
let maSelf

// Keep track of open connections to destroy in case of timeout
server.__connections = []
listener.__connections = []

server.on('request', async (req, res) => {
res.setHeader('Content-Type', 'text/plain')
Expand All @@ -31,22 +31,19 @@ module.exports = ({ handler, upgrader }, options = {}) => {
const incSignal = JSON.parse(incSignalBuf.toString())

options = {
...options,
trickle: false
}

if (isNode) {
options.wrtc = wrtc
trickle: false,
wrtc: isNode ? wrtc : undefined,
...options
}

const channel = new SimplePeer(options)
const maConn = toConnection(channel)
const maConn = toConnection(channel, listener.__connections)
log('new inbound connection %s', maConn.remoteAddr)

const conn = await upgrader.upgradeInbound(maConn)
log('inbound connection %s upgraded', maConn.remoteAddr)

trackConn(server, maConn)
trackConn(listener, maConn)

channel.on('connect', () => {
listener.emit('connection', conn)
Expand All @@ -67,7 +64,7 @@ module.exports = ({ handler, upgrader }, options = {}) => {

listener.listen = (ma) => {
maSelf = ma
const lOpts = ma.decapsulate('/p2p-webrtc-direct').toOptions()
const lOpts = ma.toOptions()

return new Promise((resolve, reject) => {
server.on('listening', (err) => {
Expand All @@ -90,7 +87,7 @@ module.exports = ({ handler, upgrader }, options = {}) => {
}

return new Promise((resolve, reject) => {
server.__connections.forEach(maConn => maConn.close())
listener.__connections.forEach(maConn => maConn.close())
server.close((err) => err ? reject(err) : resolve())
})
}
Expand All @@ -102,6 +99,12 @@ module.exports = ({ handler, upgrader }, options = {}) => {
return listener
}

function trackConn (server, maConn) {
server.__connections.push(maConn)
function trackConn (listener, maConn) {
listener.__connections.push(maConn)

const untrackConn = () => {
listener.__connections = listener.__connections.filter(c => c !== maConn)
}

maConn.conn.on('close', untrackConn)
}
20 changes: 7 additions & 13 deletions src/socket-to-conn.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ const { CLOSE_TIMEOUT } = require('./constants')
const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr')

const debug = require('debug')
const log = debug('libp2p:libp2p:webrtcdirect:socket')
log.error = debug('libp2p:libp2p:webrtcdirect:socket:error')
const log = debug('libp2p:webrtcdirect:socket')
log.error = debug('libp2p:webrtcdirect:socket:error')

// Convert a socket into a MultiaddrConnection
// https://github.com/libp2p/interface-transport#multiaddrconnection
Expand Down Expand Up @@ -74,28 +74,22 @@ module.exports = (socket, options = {}) => {
}, CLOSE_TIMEOUT)

socket.once('close', () => {
clearTimeout(timeout)
maConn.timeline.close = Date.now()
socket.emit('closed')
resolve()
})

socket.end(err => {
socket.end((err) => {
clearTimeout(timeout)

if (err) return reject(err)
maConn.timeline.close = Date.now()
socket.emit('closed')
resolve()
})
})
}
}

socket.once('close', () => {
// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
// timeline
if (!maConn.timeline.close) {
maConn.timeline.close = Date.now()
}
})

return maConn
}
29 changes: 25 additions & 4 deletions test/listen.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)

const multiaddr = require('multiaddr')
const pipe = require('it-pipe')

const WebRTCDirect = require('../src')

const mockUpgrader = {
Expand Down Expand Up @@ -57,10 +60,6 @@ describe('listen', () => {
// TODO
})

it.skip('close listener with connections, through timeout', (done) => {
// TODO
})

it.skip('listen on IPv6 addr', (done) => {
// TODO IPv6 not supported yet
})
Expand All @@ -75,4 +74,26 @@ describe('listen', () => {

await listener.close()
})

it('should untrack conn after being closed', async () => {
const ma1 = multiaddr('/ip4/127.0.0.1/tcp/12346/http/p2p-webrtc-direct')

const wd1 = new WebRTCDirect({ upgrader: mockUpgrader })
const listener1 = wd1.createListener((conn) => pipe(conn, conn))

await listener1.listen(ma1)
expect(listener1.__connections).to.have.lengthOf(0)

const conn = await wd.dial(ma1)
expect(listener1.__connections).to.have.lengthOf(1)

await conn.close()

// wait for listener to know of the disconnect
await new Promise((resolve) => {
setTimeout(resolve, 2500)
})

expect(listener1.__connections).to.have.lengthOf(0)
})
})

0 comments on commit 3d1d689

Please sign in to comment.