Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit 169be19

Browse files
committed
feat: handle active/passive relays
1 parent ad93dfc commit 169be19

File tree

3 files changed

+143
-69
lines changed

3 files changed

+143
-69
lines changed

src/circuit/dialer.js

Lines changed: 39 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class Dialer {
5757
log.err(err)
5858
return cb(err)
5959
}
60+
6061
dstConn.setInnerConn(conn)
6162
cb(null, dstConn)
6263
})
@@ -75,37 +76,29 @@ class Dialer {
7576
cb = once(cb || (() => {}))
7677

7778
if (!this.relayPeers.get(this.utils.getB58String(peer))) {
78-
return this._dialRelay(peer, (err, streamHandler) => {
79-
if (err) {
80-
return log.err(err)
81-
}
82-
83-
streamHandler.write(proto.CircuitRelay.encode({
79+
let streamHandler
80+
waterfall([
81+
(wCb) => this._dialRelay(peer, wCb),
82+
(sh, wCb) => {
83+
streamHandler = sh
84+
wCb()
85+
},
86+
(wCb) => streamHandler.write(proto.CircuitRelay.encode({
8487
type: proto.CircuitRelay.Type.CAN_HOP
85-
}), (err) => {
86-
if (err) {
87-
log.err(err)
88-
return cb(err)
89-
}
88+
}), wCb),
89+
(wCb) => streamHandler.read(wCb),
90+
(msg, wCb) => {
91+
const response = proto.CircuitRelay.decode(msg)
9092

91-
streamHandler.read((err, msg) => {
92-
if (err) {
93-
log.err(err)
94-
return cb(err)
95-
}
96-
97-
const response = proto.CircuitRelay.decode(msg)
98-
99-
if (response.code !== proto.CircuitRelay.Status.SUCCESS) {
100-
return log(`HOP not supported, skipping - ${this.utils.getB58String(peer)}`)
101-
}
93+
if (response.code !== proto.CircuitRelay.Status.SUCCESS) {
94+
return log(`HOP not supported, skipping - ${this.utils.getB58String(peer)}`)
95+
}
10296

103-
log(`HOP supported adding as relay - ${this.utils.getB58String(peer)}`)
104-
this.relayPeers.set(this.utils.getB58String(peer), peer)
105-
cb(null)
106-
})
107-
})
108-
})
97+
log(`HOP supported adding as relay - ${this.utils.getB58String(peer)}`)
98+
this.relayPeers.set(this.utils.getB58String(peer), peer)
99+
wCb(null)
100+
}
101+
], cb)
109102
}
110103

111104
return cb(null)
@@ -177,14 +170,19 @@ class Dialer {
177170
dstMa = multiaddr(dstMa)
178171

179172
const srcMas = this.swarm._peerInfo.multiaddrs.toArray()
173+
let streamHandler
180174
waterfall([
181175
(cb) => {
182176
if (relay instanceof Connection) {
183177
return cb(null, new StreamHandler(relay))
184178
}
185179
return this._dialRelay(this.utils.peerInfoFromMa(relay), cb)
186180
},
187-
(streamHandler, cb) => {
181+
(sh, cb) => {
182+
streamHandler = sh
183+
cb(null)
184+
},
185+
(cb) => {
188186
log(`negotiating relay for peer ${dstMa.getPeerId()}`)
189187
streamHandler.write(
190188
proto.CircuitRelay.encode({
@@ -197,35 +195,21 @@ class Dialer {
197195
id: PeerId.createFromB58String(dstMa.getPeerId()).id,
198196
addrs: [dstMa.buffer]
199197
}
200-
}),
201-
(err) => {
202-
if (err) {
203-
log.err(err)
204-
return cb(err)
205-
}
206-
207-
cb(null, streamHandler)
208-
})
198+
}), cb)
209199
},
210-
(streamHandler, cb) => {
211-
streamHandler.read((err, msg) => {
212-
if (err) {
213-
log.err(err)
214-
return cb(err)
215-
}
216-
217-
const message = proto.CircuitRelay.decode(msg)
218-
if (message.type !== proto.CircuitRelay.Type.STATUS) {
219-
return cb(new Error(`Got invalid message type - ` +
220-
`expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`))
221-
}
200+
(cb) => streamHandler.read(cb),
201+
(msg, cb) => {
202+
const message = proto.CircuitRelay.decode(msg)
203+
if (message.type !== proto.CircuitRelay.Type.STATUS) {
204+
return cb(new Error(`Got invalid message type - ` +
205+
`expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`))
206+
}
222207

223-
if (message.code !== proto.CircuitRelay.Status.SUCCESS) {
224-
return cb(new Error(`Got ${message.code} error code trying to dial over relay`))
225-
}
208+
if (message.code !== proto.CircuitRelay.Status.SUCCESS) {
209+
return cb(new Error(`Got ${message.code} error code trying to dial over relay`))
210+
}
226211

227-
cb(null, new Connection(streamHandler.rest()))
228-
})
212+
cb(null, new Connection(streamHandler.rest()))
229213
}
230214
], callback)
231215
}

src/circuit/hop.js

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,15 @@ class Hop extends EE {
6565
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.SUCCESS)
6666
}
6767

68-
if (message.dstPeer.id.toString() === this.peerInfo.id.toB58String()) {
68+
const srcPeerId = PeerId.createFromBytes(message.dstPeer.id)
69+
if (srcPeerId.toB58String() === this.peerInfo.id.toB58String()) {
6970
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_CANT_RELAY_TO_SELF)
7071
}
7172

73+
const dstPeerId = PeerId.createFromBytes(message.dstPeer.id).toB58String()
7274
if (!message.dstPeer.addrs.length) {
7375
// TODO: use encapsulate here
74-
const addr = multiaddr(`/p2p-circuit/ipfs/${PeerId.createFromBytes(message.dstPeer.id).toB58String()}`).buffer
76+
const addr = multiaddr(`/p2p-circuit/ipfs/${dstPeerId}`).buffer
7577
message.dstPeer.addrs.push(addr)
7678
}
7779

@@ -80,6 +82,20 @@ class Hop extends EE {
8082
return log(err)
8183
}
8284

85+
let dstPeer
86+
try {
87+
dstPeer = this.swarm._peerBook.get(dstPeerId)
88+
if (!dstPeer.isConnected() && !this.active) {
89+
throw new Error('No Connection to peer')
90+
}
91+
} catch (err) {
92+
if (!this.active) {
93+
log.err(err)
94+
setImmediate(() => this.emit('circuit:error', err))
95+
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_NO_CONN_TO_DST)
96+
}
97+
}
98+
8399
return this._circuit(streamHandler.rest(), message, (err) => {
84100
if (err) {
85101
log.err(err)

test/hop.spec.js

Lines changed: 86 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,30 @@ const sinon = require('sinon')
1717
const expect = require('chai').expect
1818

1919
describe('relay', function () {
20-
describe(`handle circuit requests`, function () {
20+
describe(`should handle circuit requests`, function () {
2121
let relay
2222
let swarm
2323
let fromConn
2424
let stream
2525
let shake
2626

2727
beforeEach(function (done) {
28-
stream = handshake({timeout: 1000 * 60})
28+
stream = handshake({ timeout: 1000 * 60 })
2929
shake = stream.handshake
3030
fromConn = new Connection(stream)
3131
fromConn.setPeerInfo(new PeerInfo(PeerId.createFromB58String('QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA')))
3232

33+
let peers = {
34+
QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE:
35+
new PeerInfo(PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`)),
36+
QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA:
37+
new PeerInfo(PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`)),
38+
QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy:
39+
new PeerInfo(PeerId.createFromB58String(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`))
40+
}
41+
42+
Object.keys(peers).forEach((key) => { peers[key]._connectedMultiaddr = true }) // make it truthy
43+
3344
waterfall([
3445
(cb) => PeerId.createFromJSON(nodes.node4, cb),
3546
(peerId, cb) => PeerInfo.create(peerId, cb),
@@ -38,14 +49,25 @@ describe('relay', function () {
3849
swarm = {
3950
_peerInfo: peer,
4051
conns: {
41-
QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE: new Connection()
52+
QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE: new Connection(),
53+
QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA: new Connection(),
54+
QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy: new Connection()
55+
},
56+
_peerBook: {
57+
get: (peer) => {
58+
if (!peers[peer]) {
59+
throw new Error()
60+
}
61+
62+
return peers[peer]
63+
}
4264
}
4365
}
4466

4567
cb()
4668
}
4769
], () => {
48-
relay = new Hop(swarm, {enabled: true})
70+
relay = new Hop(swarm, { enabled: true })
4971
relay._circuit = sinon.stub()
5072
relay._circuit.callsArg(2, null, new Connection())
5173
done()
@@ -56,15 +78,15 @@ describe('relay', function () {
5678
relay._circuit.reset()
5779
})
5880

59-
it(`handle a valid circuit request`, function (done) {
81+
it(`should handle a valid circuit request`, function (done) {
6082
let relayMsg = {
6183
type: proto.CircuitRelay.Type.HOP,
6284
srcPeer: {
63-
id: Buffer.from(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`),
85+
id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id,
6486
addrs: [multiaddr(`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).buffer]
6587
},
6688
dstPeer: {
67-
id: Buffer.from(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`),
89+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
6890
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
6991
}
7092
}
@@ -77,15 +99,67 @@ describe('relay', function () {
7799
relay.handle(relayMsg, new StreamHandler(fromConn))
78100
})
79101

102+
it(`should handle a request to passive circuit`, function (done) {
103+
let relayMsg = {
104+
type: proto.CircuitRelay.Type.HOP,
105+
srcPeer: {
106+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
107+
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
108+
},
109+
dstPeer: {
110+
id: PeerId.createFromB58String(`QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).id,
111+
addrs: [multiaddr(`/ipfs/QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).buffer]
112+
}
113+
}
114+
115+
relay.active = false
116+
lp.decodeFromReader(shake, (err, msg) => {
117+
expect(err).to.be.null
118+
119+
const response = proto.CircuitRelay.decode(msg)
120+
expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_NO_CONN_TO_DST)
121+
expect(response.type).to.equal(proto.CircuitRelay.Type.STATUS)
122+
done()
123+
})
124+
125+
relay.handle(relayMsg, new StreamHandler(fromConn))
126+
})
127+
128+
it(`should handle a request to active circuit`, function (done) {
129+
let relayMsg = {
130+
type: proto.CircuitRelay.Type.HOP,
131+
srcPeer: {
132+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
133+
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
134+
},
135+
dstPeer: {
136+
id: PeerId.createFromB58String(`QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).id,
137+
addrs: [multiaddr(`/ipfs/QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).buffer]
138+
}
139+
}
140+
141+
relay.active = true
142+
relay.on('circuit:success', () => {
143+
expect(relay._circuit.calledWith(sinon.match.any, relayMsg)).to.be.ok
144+
done()
145+
})
146+
147+
relay.on('circuit:error', (err) => {
148+
done(err)
149+
})
150+
151+
relay.handle(relayMsg, new StreamHandler(fromConn))
152+
})
153+
80154
it(`not dial to self`, function (done) {
81155
let relayMsg = {
82156
type: proto.CircuitRelay.Type.HOP,
83157
srcPeer: {
84-
id: Buffer.from(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`),
158+
id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id,
85159
addrs: [multiaddr(`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).buffer]
86160
},
87161
dstPeer: {
88-
id: Buffer.from(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`),
162+
id: PeerId.createFromB58String(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`).id,
89163
addrs: [multiaddr(`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`).buffer]
90164
}
91165
}
@@ -110,7 +184,7 @@ describe('relay', function () {
110184
addrs: [`sdfkjsdnfkjdsb`]
111185
},
112186
dstPeer: {
113-
id: Buffer.from(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`),
187+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
114188
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
115189
}
116190
}
@@ -131,11 +205,11 @@ describe('relay', function () {
131205
let relayMsg = {
132206
type: proto.CircuitRelay.Type.HOP,
133207
srcPeer: {
134-
id: Buffer.from(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`),
208+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
135209
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
136210
},
137211
dstPeer: {
138-
id: `sdfkjsdnfkjdsb`,
212+
id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id,
139213
addrs: [`sdfkjsdnfkjdsb`]
140214
}
141215
}

0 commit comments

Comments
 (0)