Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
refactory: async with multiaddr conn
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Sep 20, 2019
1 parent 20358d3 commit 8991a3b
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 176 deletions.
6 changes: 5 additions & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ const multiaddr = require('multiaddr')
const pipe = require('it-pipe')
const WS = require('./src')

const mockUpgrader = {
upgradeInbound: maConn => maConn,
upgradeOutbound: maConn => maConn
}
let listener

function boot (done) {
const ws = new WS()
const ws = new WS({ upgrader: mockUpgrader })
const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws')
listener = ws.createListener(conn => pipe(conn, conn))
listener.listen(ma).then(() => done()).catch(done)
Expand Down
41 changes: 1 addition & 40 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,43 +1,4 @@
docs
node_modules
package-lock.json
yarn.lock

# Logs
logs
*.log
npm-debug.log*

# Runtime data
pids
*.pid
*.seed

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
coverage
.nyc_output

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# node-waf configuration
.lock-wscript

# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release

# Dependency directory
node_modules

# Optional npm cache directory
.npm

# Optional REPL history
.node_repl_history

# Vim editor swap files
*.swp

dist
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:

- stage: check
script:
- npx aegir commitlint --travis
- npx aegir build --bundlesize
- npx aegir dep-check -- -i wrtc -i electron-webrtc
- npm run lint

Expand Down
40 changes: 18 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,33 @@
```js
const WS = require('libp2p-websockets')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const pipe = require('it-pipe')
const { collect } = require('streaming-iterables')

const mh = multiaddr('/ip4/0.0.0.0/tcp/9090/ws')
const addr = multiaddr('/ip4/0.0.0.0/tcp/9090/ws')

const ws = new WS()
const ws = new WS({ upgrader })

const listener = ws.createListener((socket) => {
console.log('new connection opened')
pull(
pull.values(['hello']),
pipe(
['hello'],
socket
)
})

listener.listen(mh, () => {
console.log('listening')

pull(
ws.dial(mh),
pull.collect((err, values) => {
if (!err) {
console.log(`Value: ${values.toString()}`)
} else {
console.log(`Error: ${err}`)
}

// Close connection after reading
listener.close()
}),
)
})
await listener.listen(addr)
console.log('listening')

const socket = await ws.dial(addr)
const values = await pipe(
socket,
collect
)
console.log(`Value: ${values.toString()}`)

// Close connection after reading
await listener.close()
```

## API
Expand Down
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
"dist"
],
"pre-push": [
"lint",
"test"
"lint"
],
"repository": {
"type": "git",
Expand All @@ -39,23 +38,24 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-websockets#readme",
"dependencies": {
"abortable-iterator": "^2.0.0",
"abortable-iterator": "^2.1.0",
"class-is": "^1.1.0",
"debug": "^4.1.1",
"interface-connection": "~0.3.3",
"it-ws": "^2.1.0",
"mafmt": "^6.0.7",
"err-code": "^2.0.0",
"ip-address": "^6.1.0",
"it-ws": "vasco-santos/it-ws#feat/add-properties-and-functions-to-client-and-server",
"mafmt": "^6.0.9",
"multiaddr": "^7.1.0",
"multiaddr-to-uri": "^5.0.0"
},
"devDependencies": {
"abort-controller": "^3.0.0",
"aegir": "^20.0.0",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"interface-transport": "~0.6.1",
"interface-transport": "^0.7.0",
"it-goodbye": "^2.0.0",
"it-pipe": "^1.0.0",
"multiaddr": "^6.0.6",
"streaming-iterables": "^4.0.2"
},
"contributors": [
Expand Down
17 changes: 0 additions & 17 deletions src/adapter.js

This file was deleted.

8 changes: 8 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'

// p2p multi-address code
exports.CODE_P2P = 421
exports.CODE_CIRCUIT = 290

// Time to wait for a connection to close gracefully before destroying it manually
exports.CLOSE_TIMEOUT = 2000
111 changes: 80 additions & 31 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,78 @@ const connect = require('it-ws/client')
const mafmt = require('mafmt')
const withIs = require('class-is')
const toUri = require('multiaddr-to-uri')
const log = require('debug')('libp2p:websockets:transport')
const abortable = require('abortable-iterator')
const { AbortError } = require('interface-transport')
const { AbortError } = require('abortable-iterator')

const log = require('debug')('libp2p:websockets')
const assert = require('assert')

const createListener = require('./listener')
const toConnection = require('./socket-to-conn')
const { CODE_CIRCUIT, CODE_P2P } = require('./constants')

/**
* @class WebSockets
*/
class WebSockets {
/**
* @constructor
* @param {object} options
* @param {Upgrader} options.upgrader
*/
constructor ({ upgrader }) {
assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.')
this._upgrader = upgrader
}

/**
* @async
* @param {Multiaddr} ma
* @param {object} options
* @param {AbortSignal} options.signal Used to abort dial requests
* @returns {Connection} An upgraded Connection
*/
async dial (ma, options) {
options = options || {}
log('dialing %s', ma)

const socket = connect(toUri(ma), Object.assign({ binary: true }, options))
const getObservedAddrs = () => [ma]
const stream = await this._connect(ma, options)
const maConn = toConnection(stream, { socket: stream.socket, remoteAddr: ma, signal: options.signal })
log('new outbound connection %s', maConn.remoteAddr)

const conn = await this._upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}

/**
* @private
* @param {Multiaddr} ma
* @param {object} options
* @param {AbortSignal} options.signal Used to abort dial requests
* @returns {Promise<Socket>} Resolves a TCP Socket
*/
async _connect (ma, options = {}) {
if (options.signal && options.signal.aborted) {
throw new AbortError()
}
const cOpts = ma.toOptions()
log('dialing %s:%s', cOpts.host, cOpts.port)

const rawSocket = connect(toUri(ma), Object.assign({ binary: true }, options))

if (!options.signal) {
socket.getObservedAddrs = getObservedAddrs
await socket.connected()
await rawSocket.connected()

log('connected %s', ma)
return socket
return rawSocket
}

// Allow abort via signal during connect
let onAbort
const abort = new Promise((resolve, reject) => {
onAbort = () => {
reject(new AbortError())
socket.close()
rawSocket.close()
}

// Already aborted?
Expand All @@ -38,45 +84,48 @@ class WebSockets {
})

try {
await Promise.race([abort, socket.connected()])
await Promise.race([abort, rawSocket.connected()])
} finally {
options.signal.removeEventListener('abort', onAbort)
}

log('connected %s', ma)
return {
sink: async source => {
try {
await socket.sink(abortable(source, options.signal))
} catch (err) {
// Re-throw non-aborted errors
if (err.type !== 'aborted') throw err
// Otherwise, this is fine...
await socket.close()
}
},
source: abortable(socket.source, options.signal),
getObservedAddrs
}
return rawSocket
}

/**
* Creates a Websockets listener. The provided `handler` function will be called
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
* @param {object} [options]
* @param {function (Connection)} handler
* @returns {Listener} A Websockets listener
*/
createListener (options, handler) {
return createListener(options, handler)
if (typeof options === 'function') {
handler = options
options = {}
}
options = options || {}

return createListener({ handler, upgrader: this._upgrader }, options)
}

/**
* Takes a list of `Multiaddr`s and returns only valid Websockets addresses
* @param {Multiaddr[]} multiaddrs
* @returns {Multiaddr[]} Valid Websockets multiaddrs
*/
filter (multiaddrs) {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]

return multiaddrs.filter((ma) => {
if (ma.protoNames().includes('p2p-circuit')) {
if (ma.protoNames().includes(CODE_CIRCUIT)) {
return false
}

if (ma.protoNames().includes('ipfs')) {
ma = ma.decapsulate('ipfs')
}

return mafmt.WebSockets.matches(ma) || mafmt.WebSocketsSecure.matches(ma)
return mafmt.WebSockets.matches(ma.decapsulateCode(CODE_P2P)) ||
mafmt.WebSocketsSecure.matches(ma.decapsulateCode(CODE_P2P))
})
}
}
Expand Down
30 changes: 30 additions & 0 deletions src/ip-port-to-multiaddr.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const multiaddr = require('multiaddr')
const { Address4, Address6 } = require('ip-address')

module.exports = (ip, port) => {
if (typeof ip !== 'string') {
throw new Error('invalid ip')
}

port = parseInt(port)

if (isNaN(port)) {
throw new Error('invalid port')
}

if (new Address4(ip).isValid()) {
return multiaddr(`/ip4/${ip}/tcp/${port}`)
}

const ip6 = new Address6(ip)

if (ip6.isValid()) {
return ip6.is4()
? multiaddr(`/ip4/${ip6.to4().correctForm()}/tcp/${port}`)
: multiaddr(`/ip6/${ip}/tcp/${port}`)
}

throw new Error('invalid ip')
}
Loading

0 comments on commit 8991a3b

Please sign in to comment.