Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
chore: address review
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Sep 24, 2019
1 parent c97a0f3 commit d884ea6
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 62 deletions.
7 changes: 3 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class WebSockets {
async dial (ma, options = {}) {
log('dialing %s', ma)

const stream = await this._connect(ma, options)
const maConn = toConnection(stream, { socket: stream.socket, remoteAddr: ma, signal: options.signal })
const socket = await this._connect(ma, options)
const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal })
log('new outbound connection %s', maConn.remoteAddr)

const conn = await this._upgrader.upgradeOutbound(maConn)
Expand Down Expand Up @@ -101,12 +101,11 @@ class WebSockets {
* @param {function (Connection)} handler
* @returns {Listener} A Websockets listener
*/
createListener (options, handler) {
createListener (options = {}, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}
options = options || {}

return createListener({ handler, upgrader: this._upgrader }, options)
}
Expand Down
16 changes: 2 additions & 14 deletions src/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ const toConnection = require('./socket-to-conn')
module.exports = ({ handler, upgrader }, options = {}) => {
const listener = new EventEmitter()

const server = createServer(options, async (stream, req) => {
const maConn = toConnection(stream, {
socket: req.socket
})
const server = createServer(options, async (stream) => {
const maConn = toConnection(stream)

log('new inbound connection %s', maConn.remoteAddr)

Expand Down Expand Up @@ -48,10 +46,6 @@ module.exports = ({ handler, upgrader }, options = {}) => {
listeningMultiaddr = ma
peerId = listeningMultiaddr.getPeerId()

if (peerId) {
ma = ma.decapsulateCode(CODE_P2P)
}

return server.listen(ma.toOptions())
}

Expand Down Expand Up @@ -96,10 +90,4 @@ module.exports = ({ handler, upgrader }, options = {}) => {

function trackConn (server, maConn) {
server.__connections.push(maConn)

const untrackConn = () => {
server.__connections = server.__connections.filter(c => c !== maConn)
}

maConn.conn.once('close', untrackConn)
}
21 changes: 9 additions & 12 deletions src/socket-to-conn.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,30 @@ const log = require('debug')('libp2p:websockets:socket')

// Convert a stream into a MultiaddrConnection
// https://github.com/libp2p/interface-transport#multiaddrconnection
module.exports = (stream, options = {}) => {
const socket = options.socket

module.exports = (socket, options = {}) => {
const maConn = {
async sink (source) {
if (options.signal) {
source = abortable(source, options.signal)
}

try {
await stream.sink(source)
await socket.sink(source)
} catch (err) {
// Re-throw non-aborted errors
if (err.type !== 'aborted') throw err
// Otherwise, this is fine...
await stream.close()
if (err.type !== 'aborted') {
log(err)
}
}
},

source: options.signal ? abortable(stream.source, options.signal) : stream.source,
source: options.signal ? abortable(socket.source, options.signal) : socket.source,

conn: socket,

localAddr: undefined,

// If the remote address was passed, use it - it may have the peer ID encapsulated
remoteAddr: options.remoteAddr || toMultiaddr(stream.remoteAddress, stream.remotePort),
remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort),

timeline: { open: Date.now() },

Expand All @@ -49,12 +46,12 @@ module.exports = (stream, options = {}) => {
log('timeout closing socket to %s:%s after %dms, destroying it manually',
host, port, Date.now() - start)

socket.terminate()
socket.destroy()
maConn.timeline.close = Date.now()
return resolve()
}, CLOSE_TIMEOUT)

await stream.close()
await socket.close()

clearTimeout(timeout)
maConn.timeline.close = Date.now()
Expand Down
2 changes: 1 addition & 1 deletion test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe('libp2p-websockets', () => {
})

it('many writes', async function () {
this.timeout(100000)
this.timeout(10000)
const s = goodbye({
source: pipe(
{
Expand Down
32 changes: 1 addition & 31 deletions test/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ const expect = chai.expect
chai.use(dirtyChai)
const multiaddr = require('multiaddr')
const goodbye = require('it-goodbye')
const { collect, consume } = require('streaming-iterables')
const { collect } = require('streaming-iterables')
const pipe = require('it-pipe')
const AbortController = require('abort-controller')

const WS = require('../src')

Expand Down Expand Up @@ -216,35 +215,6 @@ describe('dial', () => {

expect(result).to.be.eql([Buffer.from('hey')])
})

it('should be abortable after connect', async () => {
const controller = new AbortController()
const conn = await ws.dial(ma, { signal: controller.signal })
const s = goodbye({
source: {
[Symbol.asyncIterator] () {
return this
},
next () {
return new Promise(resolve => {
setTimeout(() => resolve(Math.random()), 1000)
})
}
},
sink: consume
})

setTimeout(() => controller.abort(), 500)

try {
await pipe(s, conn, s)
} catch (err) {
expect(err.type).to.equal('aborted')
return
}

throw new Error('connection was not aborted')
})
})

describe('ip4 with wss', () => {
Expand Down

0 comments on commit d884ea6

Please sign in to comment.