Skip to content

Commit e757cf6

Browse files
authored
fix: improve connection closing and error handling (libp2p#285)
* fix: improve connection closing and error handling * test: improve identify test * chore: update deps * fix: only emit from connections if there is a listener * test: add more connection tests * chore: update libp2p-mplex
1 parent fe84058 commit e757cf6

File tree

6 files changed

+109
-60
lines changed

6 files changed

+109
-60
lines changed

package.json

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,40 +36,40 @@
3636
"npm": ">=3.0.0"
3737
},
3838
"devDependencies": {
39-
"aegir": "^15.1.0",
40-
"chai": "^4.1.2",
39+
"aegir": "^17.0.1",
40+
"chai": "^4.2.0",
4141
"chai-checkmark": "^1.0.1",
4242
"dirty-chai": "^2.0.1",
43-
"libp2p-mplex": "~0.8.2",
43+
"libp2p-mplex": "~0.8.4",
4444
"libp2p-pnet": "~0.1.0",
45-
"libp2p-secio": "~0.10.0",
46-
"libp2p-spdy": "~0.12.1",
47-
"libp2p-tcp": "~0.12.1",
48-
"libp2p-webrtc-star": "~0.15.4",
45+
"libp2p-secio": "~0.10.1",
46+
"libp2p-spdy": "~0.13.0",
47+
"libp2p-tcp": "~0.13.0",
48+
"libp2p-webrtc-star": "~0.15.5",
4949
"libp2p-websockets": "~0.12.0",
5050
"peer-book": "~0.8.0",
51-
"portfinder": "^1.0.17",
52-
"sinon": "^6.2.0",
51+
"portfinder": "^1.0.19",
52+
"sinon": "^7.1.1",
5353
"webrtcsupport": "^2.2.0"
5454
},
5555
"dependencies": {
5656
"async": "^2.6.1",
57-
"big.js": "^5.1.2",
57+
"big.js": "^5.2.2",
5858
"class-is": "^1.1.0",
59-
"debug": "^3.1.0",
59+
"debug": "^4.1.0",
6060
"err-code": "^1.1.2",
6161
"fsm-event": "^2.1.0",
6262
"hashlru": "^2.2.1",
6363
"interface-connection": "~0.3.2",
6464
"ip-address": "^5.8.9",
65-
"libp2p-circuit": "~0.2.1",
65+
"libp2p-circuit": "~0.3.0",
6666
"libp2p-identify": "~0.7.2",
6767
"lodash.includes": "^4.3.0",
6868
"moving-average": "^1.0.0",
69-
"multiaddr": "^5.0.0",
69+
"multiaddr": "^5.0.2",
7070
"multistream-select": "~0.14.3",
7171
"once": "^1.4.0",
72-
"peer-id": "~0.11.0",
72+
"peer-id": "~0.12.0",
7373
"peer-info": "~0.14.1",
7474
"pull-stream": "^3.6.9"
7575
},

src/connection/base.js

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,29 @@ class BaseConnection extends EventEmitter {
1111
this.switch = _switch
1212
this.ourPeerInfo = this.switch._peerInfo
1313
this.log = debug(`libp2p:conn:${name}`)
14+
this.log.error = debug(`libp2p:conn:${name}:error`)
15+
}
16+
17+
/**
18+
* Puts the state into its disconnecting flow
19+
*
20+
* @param {Error} err Will be emitted if provided
21+
* @returns {void}
22+
*/
23+
close (err) {
24+
this.log(`closing connection to ${this.theirB58Id}`)
25+
if (err && this._events.error) {
26+
this.emit('error', err)
27+
}
28+
this._state('disconnect')
29+
}
30+
31+
emit (eventName, ...args) {
32+
if (eventName === 'error' && !this._events.error) {
33+
this.log.error(...args)
34+
} else {
35+
super.emit(eventName, ...args)
36+
}
1437
}
1538

1639
/**
@@ -86,8 +109,7 @@ class BaseConnection extends EventEmitter {
86109

87110
this.conn = this.switch.protector.protect(this.conn, (err) => {
88111
if (err) {
89-
this.emit('error', err)
90-
return this._state('disconnect')
112+
return this.close(err)
91113
}
92114

93115
this.log(`successfully privatized conn to ${this.theirB58Id}`)

src/connection/incoming.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ class IncomingConnectionFSM extends BaseConnection {
8484
this.msListener.addHandler(this.switch.crypto.tag, (protocol, _conn) => {
8585
this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, _conn, undefined, (err) => {
8686
if (err) {
87-
this.emit('error', err)
88-
return this._state('disconnect')
87+
return this.close(err)
8988
}
9089
this.conn.getPeerInfo((_, peerInfo) => {
9190
this.theirPeerInfo = peerInfo

src/connection/index.js

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,6 @@ class ConnectionFSM extends BaseConnection {
126126
this._state.on('error', (err) => this._onStateError(err))
127127
}
128128

129-
/**
130-
* Puts the state into its disconnecting flow
131-
*
132-
* @returns {void}
133-
*/
134-
close () {
135-
this.log(`closing connection to ${this.theirB58Id}`)
136-
this._state('disconnect')
137-
}
138-
139129
/**
140130
* Puts the state into dialing mode
141131
*
@@ -200,8 +190,7 @@ class ConnectionFSM extends BaseConnection {
200190
this.log(`dialing ${this.theirB58Id}`)
201191

202192
if (!this.switch.hasTransports()) {
203-
this.emit('error', Errors.NO_TRANSPORTS_REGISTERED())
204-
return this._state('disconnect')
193+
return this.close(Errors.NO_TRANSPORTS_REGISTERED())
205194
}
206195

207196
const tKeys = this.switch.availableTransports(this.theirPeerInfo)
@@ -213,17 +202,15 @@ class ConnectionFSM extends BaseConnection {
213202
let transport = key
214203
if (!transport) {
215204
if (!circuitEnabled) {
216-
this.emit('error', Errors.CONNECTION_FAILED(
205+
return this.close(Errors.CONNECTION_FAILED(
217206
new Error(`Circuit not enabled and all transports failed to dial peer ${this.theirB58Id}!`)
218207
))
219-
return this._state('disconnect')
220208
}
221209

222210
if (circuitTried) {
223-
this.emit('error', Errors.CONNECTION_FAILED(
211+
return this.close(Errors.CONNECTION_FAILED(
224212
new Error(`No available transports to dial peer ${this.theirB58Id}!`)
225213
))
226-
return this._state('disconnect')
227214
}
228215

229216
this.log(`Falling back to dialing over circuit`)
@@ -300,24 +287,21 @@ class ConnectionFSM extends BaseConnection {
300287
const msDialer = new multistream.Dialer()
301288
msDialer.handle(this.conn, (err) => {
302289
if (err) {
303-
this.emit('error', Errors.maybeUnexpectedEnd(err))
304-
return this._state('disconnect')
290+
return this.close(Errors.maybeUnexpectedEnd(err))
305291
}
306292

307293
this.log('selecting crypto %s to %s', this.switch.crypto.tag, this.theirB58Id)
308294

309295
msDialer.select(this.switch.crypto.tag, (err, _conn) => {
310296
if (err) {
311-
this._state('disconnect')
312-
return this.emit('error', Errors.maybeUnexpectedEnd(err))
297+
return this.close(Errors.maybeUnexpectedEnd(err))
313298
}
314299

315300
const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)
316301

317302
this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, conn, this.theirPeerInfo.id, (err) => {
318303
if (err) {
319-
this._state('disconnect')
320-
return this.emit('error', err)
304+
return this.close(err)
321305
}
322306

323307
this.conn.setPeerInfo(this.theirPeerInfo)
@@ -371,8 +355,7 @@ class ConnectionFSM extends BaseConnection {
371355
this.switch.muxedConns[this.theirB58Id] = this
372356

373357
this.muxer.once('close', () => {
374-
delete this.muxer
375-
this._state('disconnect')
358+
this.close()
376359
})
377360

378361
// For incoming streams, in case identify is on

test/connection.node.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,21 @@ describe('ConnectionFSM', () => {
111111
connection.dial()
112112
})
113113

114+
it('should be able to close with an error and not throw', (done) => {
115+
const connection = new ConnectionFSM({
116+
_switch: dialerSwitch,
117+
peerInfo: listenerSwitch._peerInfo
118+
})
119+
120+
connection.once('connected', (conn) => {
121+
expect(conn).to.be.an.instanceof(Connection)
122+
expect(() => connection.close(new Error('shutting down'))).to.not.throw()
123+
done()
124+
})
125+
126+
connection.dial()
127+
})
128+
114129
it('should emit warning on dial failed attempt', (done) => {
115130
const connection = new ConnectionFSM({
116131
_switch: dialerSwitch,
@@ -362,6 +377,10 @@ describe('ConnectionFSM', () => {
362377
})
363378
})
364379

380+
afterEach(() => {
381+
sinon.restore()
382+
})
383+
365384
it('should be able to protect a basic connection', (done) => {
366385
const connection = new ConnectionFSM({
367386
_switch: dialerSwitch,
@@ -381,6 +400,35 @@ describe('ConnectionFSM', () => {
381400
connection.dial()
382401
})
383402

403+
it('should close on failed protection', (done) => {
404+
const connection = new ConnectionFSM({
405+
_switch: dialerSwitch,
406+
peerInfo: listenerSwitch._peerInfo
407+
})
408+
409+
const error = new Error('invalid key')
410+
const stub = sinon.stub(dialerSwitch.protector, 'protect').callsFake((_, cb) => {
411+
cb(error)
412+
})
413+
414+
expect(3).check(done)
415+
416+
connection.once('close', () => {
417+
expect(stub.callCount).to.eql(1).mark()
418+
})
419+
420+
connection.once('error', (err) => {
421+
expect(err).to.eql(error).mark()
422+
})
423+
424+
connection.once('connected', (conn) => {
425+
expect(conn).to.be.an.instanceof(Connection).mark()
426+
connection.protect()
427+
})
428+
429+
connection.dial()
430+
})
431+
384432
it('should be able to encrypt a protected connection', (done) => {
385433
const connection = new ConnectionFSM({
386434
_switch: dialerSwitch,

test/identify.node.js

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
'use strict'
33

44
const chai = require('chai')
5-
const dirtyChai = require('dirty-chai')
5+
chai.use(require('dirty-chai'))
6+
chai.use(require('chai-checkmark'))
67
const expect = chai.expect
7-
chai.use(dirtyChai)
88
const parallel = require('async/parallel')
99
const TCP = require('libp2p-tcp')
1010
const multiplex = require('libp2p-mplex')
@@ -13,6 +13,7 @@ const secio = require('libp2p-secio')
1313
const PeerBook = require('peer-book')
1414
const identify = require('libp2p-identify')
1515
const lp = require('pull-length-prefixed')
16+
const sinon = require('sinon')
1617

1718
const utils = require('./utils')
1819
const createInfos = utils.createInfos
@@ -61,8 +62,7 @@ describe('Identify', () => {
6162
], done)
6263
}))
6364

64-
after(function (done) {
65-
this.timeout(3 * 1000)
65+
after((done) => {
6666
parallel([
6767
(cb) => switchA.stop(cb),
6868
(cb) => switchB.stop(cb),
@@ -71,6 +71,7 @@ describe('Identify', () => {
7171
})
7272

7373
afterEach(function (done) {
74+
sinon.restore()
7475
// Hangup everything
7576
parallel([
7677
(cb) => switchA.hangUp(switchB._peerInfo, cb),
@@ -99,8 +100,8 @@ describe('Identify', () => {
99100
})
100101
})
101102

102-
it('should require crypto and identify to have the same peerId', (done) => {
103-
identify.listener = (conn) => {
103+
it('should close connection when identify fails', (done) => {
104+
const stub = sinon.stub(identify, 'listener').callsFake((conn) => {
104105
conn.getObservedAddrs((err, observedAddrs) => {
105106
if (err) { return }
106107
observedAddrs = observedAddrs[0]
@@ -122,20 +123,16 @@ describe('Identify', () => {
122123
conn
123124
)
124125
})
125-
}
126+
})
127+
128+
expect(2).check(done)
126129

127130
switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
128-
switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => {
129-
expect(err).to.not.exist()
130-
pull(
131-
pull.values([Buffer.from('data that cant be had')]),
132-
conn,
133-
pull.collect((err, values) => {
134-
expect(err).to.exist()
135-
expect(values).to.have.length(0)
136-
done()
137-
})
138-
)
131+
const connFSM = switchB.dialFSM(switchA._peerInfo, '/id-test/1.0.0', (err) => {
132+
expect(err).to.not.exist().mark()
133+
})
134+
connFSM.once('close', () => {
135+
expect(stub.called).to.eql(true).mark()
139136
})
140137
})
141138
})

0 commit comments

Comments
 (0)