Skip to content

Commit d0f4ab4

Browse files
committed
support extension messages
1 parent 7a698a3 commit d0f4ab4

File tree

5 files changed

+116
-2
lines changed

5 files changed

+116
-2
lines changed

feed.js

+42
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,32 @@ Feed.prototype.data = function (message) {
6161
return this._send(9, messages.Data, message)
6262
}
6363

64+
Feed.prototype.extension = function (type, message) {
65+
var id = this.stream.extensions.indexOf(type)
66+
if (id === -1) return false
67+
68+
var header = this.header | 15
69+
var len = this.headerLength + varint.encodingLength(id) + message.length
70+
var box = new Buffer(varint.encodingLength(len) + len)
71+
var offset = 0
72+
73+
varint.encode(len, box, offset)
74+
offset += varint.encode.bytes
75+
76+
varint.encode(header, box, offset)
77+
offset += varint.encode.bytes
78+
79+
varint.encode(id, box, offset)
80+
offset += varint.encode.bytes
81+
82+
message.copy(box, offset)
83+
return this.stream._push(box)
84+
}
85+
86+
Feed.prototype.remoteSupports = function (name) {
87+
return this.stream.remoteSupports(name)
88+
}
89+
6490
Feed.prototype.destroy = function (err) {
6591
this.stream.destroy(err)
6692
}
@@ -109,6 +135,22 @@ Feed.prototype._resume = function () {
109135
}
110136
}
111137

138+
Feed.prototype._onextension = function (data, start, end) {
139+
if (end <= start) return
140+
141+
var id = varint.decode(data, start)
142+
var r = this.stream.remoteExtensions
143+
var localId = !r || id >= r.length ? -1 : r[id]
144+
145+
if (localId === -1) return
146+
147+
var message = data.slice(start + varint.decode.bytes, end)
148+
var name = this.stream.extensions[localId]
149+
150+
if (this.peer && this.peer.onextension) this.peer.onextension(name, message)
151+
else this.emit('extension', name, message)
152+
}
153+
112154
Feed.prototype._onmessage = function (type, data, start, end) {
113155
var message = decodeMessage(type, data, start, end)
114156
if (!message || this.closed) return

index.js

+22-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ var stream = require('readable-stream')
22
var inherits = require('inherits')
33
var varint = require('varint')
44
var sodium = require('sodium-universal')
5+
var indexOf = require('sorted-indexof')
56
var feed = require('./feed')
67
var messages = require('./messages')
78

@@ -27,6 +28,8 @@ function Protocol (opts) {
2728
this.remoteDiscoveryKey = null
2829
this.feeds = []
2930
this.expectedFeeds = opts.expectedFeeds || 0
31+
this.extensions = opts.extensions || []
32+
this.remoteExtensions = null
3033

3134
this._localFeeds = []
3235
this._remoteFeeds = []
@@ -81,7 +84,10 @@ Protocol.prototype.feed = function (key, opts) {
8184
var dk = opts.discoveryKey || discoveryKey(key)
8285
var ch = this._feed(dk)
8386

84-
if (ch.id > -1) return ch
87+
if (ch.id > -1) {
88+
if (opts.peer) ch.peer = opts.peer
89+
return ch
90+
}
8591

8692
if (this._localFeeds.length >= 128) {
8793
this._tooManyFeeds()
@@ -131,7 +137,12 @@ Protocol.prototype.feed = function (key, opts) {
131137
if (this.destroyed) return null
132138

133139
if (first) {
134-
ch.handshake({id: this.id, live: this.live, userData: this.userData})
140+
ch.handshake({
141+
id: this.id,
142+
live: this.live,
143+
userData: this.userData,
144+
extensions: this.extensions
145+
})
135146
}
136147

137148
if (ch._buffer.length) ch._resume()
@@ -235,11 +246,19 @@ Protocol.prototype._feed = function (dk) {
235246
return ch
236247
}
237248

249+
Protocol.prototype.remoteSupports = function (name) {
250+
var i = this.extensions.indexOf(name)
251+
return i > -1 && !!this.remoteExtensions && this.remoteExtensions.indexOf(i) > -1
252+
}
253+
238254
Protocol.prototype._onhandshake = function (handshake) {
239255
if (this.remoteId) return
256+
240257
this.remoteId = handshake.id || randomBytes(32)
241258
this.remoteLive = handshake.live
242259
this.remoteUserData = handshake.userData
260+
this.remoteExtensions = indexOf(this.extensions, handshake.extensions)
261+
243262
this.emit('handshake')
244263
}
245264

@@ -293,6 +312,7 @@ Protocol.prototype._onmessage = function (data, start, end) {
293312
}
294313

295314
if (!ch) return this._badFeed()
315+
if (type === 15) return ch._onextension(data, start, end)
296316
ch._onmessage(type, data, start, end)
297317
}
298318

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"protocol-buffers": "^3.2.1",
1010
"readable-stream": "^2.2.6",
1111
"sodium-universal": "^1.0.0",
12+
"sorted-indexof": "^1.0.0",
1213
"varint": "^5.0.0"
1314
},
1415
"devDependencies": {

schema.proto

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ message Handshake {
1212
optional bytes id = 1;
1313
optional bool live = 2; // keep the connection open forever? both ends have to agree
1414
optional bytes userData = 3;
15+
repeated string extensions = 4;
1516
}
1617

1718
// type=2, message indicating state changes etc.
@@ -75,3 +76,6 @@ message Data {
7576
repeated Node nodes = 3;
7677
optional bytes signature = 4;
7778
}
79+
80+
// type=15 (last massage) is an extension message
81+
// that is encoded like this <varint user-type><payload>

test.js

+47
Original file line numberDiff line numberDiff line change
@@ -490,3 +490,50 @@ tape('message after ping', function (t) {
490490

491491
a.pipe(b).pipe(a)
492492
})
493+
494+
tape('extension message', function (t) {
495+
t.plan(10)
496+
497+
var a = protocol({
498+
extensions: ['a', 'b']
499+
})
500+
501+
var b = protocol({
502+
extensions: ['b', 'c']
503+
})
504+
505+
var ch1 = a.feed(KEY)
506+
var ch2 = b.feed(KEY)
507+
508+
ch2.on('extension', function (type, message) {
509+
t.same(type, 'b')
510+
t.same(message, new Buffer('hello ch2'))
511+
})
512+
513+
ch1.on('extension', function (type, message) {
514+
t.same(type, 'b')
515+
t.same(message, new Buffer('hello ch1'))
516+
})
517+
518+
a.once('handshake', function () {
519+
t.same(a.remoteSupports('a'), false)
520+
t.same(a.remoteSupports('b'), true)
521+
t.same(a.remoteSupports('c'), false)
522+
523+
ch1.extension('a', new Buffer('nooo'))
524+
ch1.extension('b', new Buffer('hello ch2'))
525+
ch1.extension('c', new Buffer('nooo'))
526+
})
527+
528+
b.once('handshake', function () {
529+
t.same(b.remoteSupports('a'), false)
530+
t.same(b.remoteSupports('b'), true)
531+
t.same(b.remoteSupports('c'), false)
532+
533+
ch2.extension('a', new Buffer('nooo'))
534+
ch2.extension('b', new Buffer('hello ch1'))
535+
ch2.extension('c', new Buffer('nooo'))
536+
})
537+
538+
a.pipe(b).pipe(a)
539+
})

0 commit comments

Comments
 (0)