Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit aa86307

Browse files
authored
fix: improve connection tracking and closing (#291)
* chore: update deps * fix: check we have a proper transport before filtering addresses * fix: improve connection close on stop * fix: improve stat stopping * test: fix stats test * fix: improve tracking of open connections * chore: remove log * fix: stats stop in browser chore: fix linting and browser tests * fix: remove uneeded set peer info * fix: abort the base connection on close * fix: catch edge cases of dialTimeout calling back twice * fix: close all connections instead of checking peerbook peers * test: update dial fsm test waits * test: make parallel dial tests deterministic fix: improve logic around disconnecting fix: remove duplicate event handling logic * chore: fix lint * test: improve test reliability
1 parent f43084b commit aa86307

18 files changed

+298
-130
lines changed

package.json

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,20 @@
4040
"npm": ">=3.0.0"
4141
},
4242
"devDependencies": {
43-
"aegir": "^17.0.1",
43+
"aegir": "^17.1.1",
4444
"chai": "^4.2.0",
4545
"chai-checkmark": "^1.0.1",
4646
"dirty-chai": "^2.0.1",
4747
"libp2p-mplex": "~0.8.4",
4848
"libp2p-pnet": "~0.1.0",
4949
"libp2p-secio": "~0.10.1",
50-
"libp2p-spdy": "~0.13.0",
50+
"libp2p-spdy": "~0.13.1",
5151
"libp2p-tcp": "~0.13.0",
52-
"libp2p-webrtc-star": "~0.15.5",
52+
"libp2p-webrtc-star": "~0.15.6",
5353
"libp2p-websockets": "~0.12.0",
54-
"peer-book": "~0.8.0",
55-
"portfinder": "^1.0.19",
56-
"sinon": "^7.1.1",
54+
"peer-book": "~0.9.0",
55+
"portfinder": "^1.0.20",
56+
"sinon": "^7.2.0",
5757
"webrtcsupport": "^2.2.0"
5858
},
5959
"dependencies": {
@@ -63,18 +63,18 @@
6363
"debug": "^4.1.0",
6464
"err-code": "^1.1.2",
6565
"fsm-event": "^2.1.0",
66-
"hashlru": "^2.2.1",
67-
"interface-connection": "~0.3.2",
66+
"hashlru": "^2.3.0",
67+
"interface-connection": "~0.3.3",
6868
"ip-address": "^5.8.9",
69-
"libp2p-circuit": "~0.3.0",
69+
"libp2p-circuit": "~0.3.1",
7070
"libp2p-identify": "~0.7.2",
7171
"lodash.includes": "^4.3.0",
7272
"moving-average": "^1.0.0",
73-
"multiaddr": "^5.0.2",
73+
"multiaddr": "^6.0.0",
7474
"multistream-select": "~0.14.3",
7575
"once": "^1.4.0",
7676
"peer-id": "~0.12.0",
77-
"peer-info": "~0.14.1",
77+
"peer-info": "~0.15.0",
7878
"pull-stream": "^3.6.9",
7979
"retimer": "^2.0.0"
8080
},

src/connection/incoming.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ class IncomingConnectionFSM extends BaseConnection {
2020
this.msListener = new multistream.Listener()
2121

2222
this._state = FSM('DIALED', {
23-
DISCONNECTED: { },
23+
DISCONNECTED: {
24+
disconnect: 'DISCONNECTED'
25+
},
2426
DIALED: { // Base connection to peer established
2527
privatize: 'PRIVATIZING',
2628
encrypt: 'ENCRYPTING'

src/connection/index.js

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

33
const FSM = require('fsm-event')
4-
const setImmediate = require('async/setImmediate')
54
const Circuit = require('libp2p-circuit')
65
const multistream = require('multistream-select')
76
const withIs = require('class-is')
@@ -15,6 +14,8 @@ const Errors = require('../errors')
1514
* @property {Switch} _switch Our switch instance
1615
* @property {PeerInfo} peerInfo The PeerInfo of the peer to dial
1716
* @property {Muxer} muxer Optional - A muxed connection
17+
* @property {Connection} conn Optional - The base connection
18+
* @property {string} type Optional - identify the connection as incoming or outgoing. Defaults to out.
1819
*/
1920

2021
/**
@@ -29,16 +30,16 @@ class ConnectionFSM extends BaseConnection {
2930
* @param {ConnectionOptions} param0
3031
* @constructor
3132
*/
32-
constructor ({ _switch, peerInfo, muxer }) {
33+
constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) {
3334
super({
3435
_switch,
35-
name: `out:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
36+
name: `${type}:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
3637
})
3738

3839
this.theirPeerInfo = peerInfo
3940
this.theirB58Id = this.theirPeerInfo.id.toB58String()
4041

41-
this.conn = null // The base connection
42+
this.conn = conn // The base connection
4243
this.muxer = muxer // The upgraded/muxed connection
4344

4445
let startState = 'DISCONNECTED'
@@ -114,6 +115,7 @@ class ConnectionFSM extends BaseConnection {
114115
this._state.on('UPGRADING', () => this._onUpgrading())
115116
this._state.on('MUXED', () => {
116117
this.log(`successfully muxed connection to ${this.theirB58Id}`)
118+
delete this.switch.conns[this.theirB58Id]
117119
this.emit('muxed', this.muxer)
118120
})
119121
this._state.on('CONNECTED', () => {
@@ -166,7 +168,6 @@ class ConnectionFSM extends BaseConnection {
166168
})
167169
}
168170

169-
this.conn.setPeerInfo(this.theirPeerInfo)
170171
this._protocolHandshake(protocol, this.conn, callback)
171172
}
172173

@@ -266,14 +267,22 @@ class ConnectionFSM extends BaseConnection {
266267
this.muxer.end()
267268
}
268269

269-
delete this.switch.muxedConns[this.theirB58Id]
270+
this.switch.connection.remove(this)
271+
270272
delete this.switch.conns[this.theirB58Id]
271273
delete this.muxer
272-
delete this.conn
273-
274-
this._state('done')
275274

276-
setImmediate(() => this.switch.emit('peer-mux-closed', this.theirPeerInfo))
275+
// If we have the base connection, abort it
276+
if (this.conn) {
277+
this.conn.source(true, () => {
278+
this._state('done')
279+
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
280+
delete this.conn
281+
})
282+
} else {
283+
this._state('done')
284+
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
285+
}
277286
}
278287

279288
/**
@@ -352,7 +361,8 @@ class ConnectionFSM extends BaseConnection {
352361
const conn = observeConnection(null, key, _conn, this.switch.observer)
353362

354363
this.muxer = this.switch.muxers[key].dialer(conn)
355-
this.switch.muxedConns[this.theirB58Id] = this
364+
// this.switch.muxedConns[this.theirB58Id] = this
365+
this.switch.connection.add(this)
356366

357367
this.muxer.once('close', () => {
358368
this.close()
@@ -365,7 +375,7 @@ class ConnectionFSM extends BaseConnection {
365375
this.switch.protocolMuxer(null)(conn)
366376
})
367377

368-
setImmediate(() => this.switch.emit('peer-mux-established', this.theirPeerInfo))
378+
this.switch.emit('peer-mux-established', this.theirPeerInfo)
369379

370380
this._didUpgrade(null)
371381
})

src/connection/manager.js

Lines changed: 93 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const waterfall = require('async/waterfall')
66
const debug = require('debug')
77
const log = debug('libp2p:switch:conn-manager')
88
const once = require('once')
9-
const setImmediate = require('async/setImmediate')
109
const ConnectionFSM = require('../connection')
1110

1211
const Circuit = require('libp2p-circuit')
@@ -20,6 +19,92 @@ const plaintext = require('../plaintext')
2019
class ConnectionManager {
2120
constructor (_switch) {
2221
this.switch = _switch
22+
this.connections = {}
23+
}
24+
25+
/**
26+
* Adds the connection for tracking if it's not already added
27+
* @private
28+
* @param {ConnectionFSM} connection
29+
* @returns {void}
30+
*/
31+
add (connection) {
32+
this.connections[connection.theirB58Id] = this.connections[connection.theirB58Id] || []
33+
// Only add it if it's not there
34+
if (!this.get(connection)) {
35+
this.connections[connection.theirB58Id].push(connection)
36+
}
37+
}
38+
39+
/**
40+
* Gets the connection from the list if it exists
41+
* @private
42+
* @param {ConnectionFSM} connection
43+
* @returns {ConnectionFSM|null} The found connection or null
44+
*/
45+
get (connection) {
46+
if (!this.connections[connection.theirB58Id]) return null
47+
48+
for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
49+
if (this.connections[connection.theirB58Id][i] === connection) {
50+
return this.connections[connection.theirB58Id][i]
51+
}
52+
}
53+
return null
54+
}
55+
56+
/**
57+
* Gets a connection associated with the given peer
58+
* @private
59+
* @param {string} peerId The peers id
60+
* @returns {ConnectionFSM|null} The found connection or null
61+
*/
62+
getOne (peerId) {
63+
if (this.connections[peerId]) {
64+
// TODO: Maybe select the best?
65+
return this.connections[peerId][0]
66+
}
67+
return null
68+
}
69+
70+
/**
71+
* Removes the connection from tracking
72+
* @private
73+
* @param {ConnectionFSM} connection The connection to remove
74+
* @returns {void}
75+
*/
76+
remove (connection) {
77+
if (!this.connections[connection.theirB58Id]) return
78+
79+
for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
80+
if (this.connections[connection.theirB58Id][i] === connection) {
81+
this.connections[connection.theirB58Id].splice(i, 1)
82+
return
83+
}
84+
}
85+
}
86+
87+
/**
88+
* Returns all connections being tracked
89+
* @private
90+
* @returns {ConnectionFSM[]}
91+
*/
92+
getAll () {
93+
let connections = []
94+
for (const conns of Object.values(this.connections)) {
95+
connections = [...connections, ...conns]
96+
}
97+
return connections
98+
}
99+
100+
/**
101+
* Returns all connections being tracked for a given peer id
102+
* @private
103+
* @param {string} peerId Stringified peer id
104+
* @returns {ConnectionFSM[]}
105+
*/
106+
getAllById (peerId) {
107+
return this.connections[peerId] || []
23108
}
24109

25110
/**
@@ -70,9 +155,6 @@ class ConnectionManager {
70155
], (err, peerInfo) => {
71156
if (err) {
72157
return muxedConn.end(() => {
73-
if (peerInfo) {
74-
setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo))
75-
}
76158
callback(err, null)
77159
})
78160
}
@@ -91,11 +173,14 @@ class ConnectionManager {
91173
}
92174
const b58Str = peerInfo.id.toB58String()
93175

94-
this.switch.muxedConns[b58Str] = new ConnectionFSM({
176+
const connection = new ConnectionFSM({
95177
_switch: this.switch,
96178
peerInfo,
97-
muxer: muxedConn
179+
muxer: muxedConn,
180+
conn: conn,
181+
type: 'inc'
98182
})
183+
this.switch.connection.add(connection)
99184

100185
if (peerInfo.multiaddrs.size > 0) {
101186
// with incomming conn and through identify, going to pick one
@@ -111,14 +196,10 @@ class ConnectionManager {
111196
peerInfo = this.switch._peerBook.put(peerInfo)
112197

113198
muxedConn.once('close', () => {
114-
delete this.switch.muxedConns[b58Str]
115-
peerInfo.disconnect()
116-
peerInfo = this.switch._peerBook.put(peerInfo)
117-
log(`closed connection to ${b58Str}`)
118-
setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo))
199+
connection.close()
119200
})
120201

121-
setImmediate(() => this.switch.emit('peer-mux-established', peerInfo))
202+
this.switch.emit('peer-mux-established', peerInfo)
122203
})
123204
})
124205
}

src/dialer.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,14 @@ function dial (_switch, returnFSM) {
5454

5555
log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`)
5656

57-
let connection = _switch.muxedConns[b58Id] || _switch.conns[b58Id]
57+
let connection = _switch.connection.getOne(b58Id)
5858

5959
if (!ConnectionFSM.isConnectionFSM(connection)) {
6060
connection = new ConnectionFSM({
6161
_switch,
6262
peerInfo,
63-
muxer: _switch.muxedConns[b58Id] || null
63+
muxer: null,
64+
conn: null
6465
})
6566
connection.once('error', (err) => callback(err))
6667
connection.once('connected', () => connection.protect())

0 commit comments

Comments
 (0)