Skip to content

Commit 6d0e156

Browse files
mpowaganduchak
authored andcommitted
feat(State Channels): Ping every 10 seconds to persist connection (#324)
* Improve channel rpc usage * Fix lint error * Remove unreachable code * Make sure that sign function is correctly called * Improve error handling for update method * Add missing channel tx serializations * Add channel close solo and settle tx serialization * Add channel slash tx serialization * Add proof of inclusion tx serialization * Add basic merkle patricia tree implementation * Add merkle patricia tree serialization and verify function * fix(schema.js): Fix linter error * Improve channel tests and error handling (#276) * Make sure that sign function is correctly called * Improve error handling for update method * Improve state channel params handling. Fixes #299 (#300) * Compiler improvements (#303) * refactor(Chain and Contract): Fix Chain.getAccount. Omprove Compiler Add ability to get account/balance on specific block hash/height. Add test. Add changeCompilerUrl to Compiler stamp #302 * fix(Crypto): Fix name hash function arguments parsing * refactor(Compiler): Remove async for changeCompilerUrl function * Channel contracts (#279) * Add support for contracts in state channels * Remove console.log * Remove console.log * Improve channel rpc usage (#275) * Improve channel rpc usage * Fix lint error * Remove unreachable code * Improve channel tests and error handling (#276) * Make sure that sign function is correctly called * Improve error handling for update method * Improve state channel params handling. Fixes #299 (#300) * Fix channel tests * feat(State Channels): Ping every 10 seconds to persist connection Connection can be manually closed with new "disconnect()" method * Register pong timeout after ping has been sent * Add contract call tx serialization * Add channel tx serialization * Add missing tree tx serializations * Add channel snapshot solo tx serialization * feat(State Channels): Ping every 10 seconds to persist connection Connection can be manually closed with new "disconnect()" method * Register pong timeout after ping has been sent * Handle unexpected messages * Fix lint error
1 parent 2ddeea8 commit 6d0e156

File tree

4 files changed

+94
-17
lines changed

4 files changed

+94
-17
lines changed

es/channel/handlers.js

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,21 @@ import {
2222
changeState,
2323
send,
2424
emit,
25-
channelId
25+
channelId,
26+
disconnect
2627
} from './internal'
2728
import { unpackTx } from '../tx/builder'
2829

30+
function handleUnexpectedMessage (channel, message, state) {
31+
if (state.reject) {
32+
state.reject(Object.assign(
33+
Error(`Unexpected message received:\n\n${JSON.stringify(message)}`),
34+
{ wsMessage: message }
35+
))
36+
}
37+
return { handler: channelOpen }
38+
}
39+
2940
export function awaitingConnection (channel, message, state) {
3041
if (message.method === 'channels.info') {
3142
if (['channel_accept', 'funding_created'].includes(message.params.data.event)) {
@@ -159,6 +170,7 @@ export async function awaitingOffChainTx (channel, message, state) {
159170
}
160171
return { handler: channelOpen }
161172
}
173+
return handleUnexpectedMessage(channel, message, state)
162174
}
163175

164176
export function awaitingOffChainUpdate (channel, message, state) {
@@ -175,6 +187,7 @@ export function awaitingOffChainUpdate (channel, message, state) {
175187
state.reject(new Error(message.error.message))
176188
return { handler: channelOpen }
177189
}
190+
return handleUnexpectedMessage(channel, message, state)
178191
}
179192

180193
export async function awaitingTxSignRequest (channel, message, state) {
@@ -198,6 +211,7 @@ export async function awaitingTxSignRequest (channel, message, state) {
198211
})
199212
return { handler: awaitingUpdateConflict }
200213
}
214+
return handleUnexpectedMessage(channel, message, state)
201215
}
202216

203217
export function awaitingUpdateConflict (channel, message, state) {
@@ -207,6 +221,7 @@ export function awaitingUpdateConflict (channel, message, state) {
207221
if (message.method === 'channels.conflict') {
208222
return { handler: channelOpen }
209223
}
224+
return handleUnexpectedMessage(channel, message, state)
210225
}
211226

212227
export async function awaitingShutdownTx (channel, message, state) {
@@ -215,24 +230,28 @@ export async function awaitingShutdownTx (channel, message, state) {
215230
send(channel, { jsonrpc: '2.0', method: 'channels.shutdown_sign', params: { tx: signedTx } })
216231
return { handler: awaitingShutdownOnChainTx, state }
217232
}
233+
return handleUnexpectedMessage(channel, message, state)
218234
}
219235

220236
export function awaitingShutdownOnChainTx (channel, message, state) {
221237
if (message.method === 'channels.on_chain_tx') {
222-
state.resolveShutdownPromise(message.params.data.tx)
238+
state.resolve(message.params.data.tx)
223239
return { handler: channelClosed }
224240
}
241+
return handleUnexpectedMessage(channel, message, state)
225242
}
226243

227244
export function awaitingLeave (channel, message, state) {
228245
if (message.method === 'channels.leave') {
229246
state.resolve({ channelId: message.params.channel_id, signedTx: message.params.data.state })
247+
disconnect(channel)
230248
return { handler: channelClosed }
231249
}
232250
if (message.method === 'channels.error') {
233251
state.reject(new Error(message.data.message))
234252
return { handler: channelOpen }
235253
}
254+
return handleUnexpectedMessage(channel, message, state)
236255
}
237256

238257
export async function awaitingWithdrawTx (channel, message, state) {
@@ -241,6 +260,7 @@ export async function awaitingWithdrawTx (channel, message, state) {
241260
send(channel, { jsonrpc: '2.0', method: 'channels.withdraw_tx', params: { tx: signedTx } })
242261
return { handler: awaitingWithdrawCompletion, state }
243262
}
263+
return handleUnexpectedMessage(channel, message, state)
244264
}
245265

246266
export function awaitingWithdrawCompletion (channel, message, state) {
@@ -271,6 +291,7 @@ export function awaitingWithdrawCompletion (channel, message, state) {
271291
state.resolve({ accepted: false })
272292
return { handler: channelOpen }
273293
}
294+
return handleUnexpectedMessage(channel, message, state)
274295
}
275296

276297
export async function awaitingDepositTx (channel, message, state) {
@@ -279,6 +300,7 @@ export async function awaitingDepositTx (channel, message, state) {
279300
send(channel, { jsonrpc: '2.0', method: 'channels.deposit_tx', params: { tx: signedTx } })
280301
return { handler: awaitingDepositCompletion, state }
281302
}
303+
return handleUnexpectedMessage(channel, message, state)
282304
}
283305

284306
export function awaitingDepositCompletion (channel, message, state) {
@@ -309,6 +331,7 @@ export function awaitingDepositCompletion (channel, message, state) {
309331
state.resolve({ accepted: false })
310332
return { handler: channelOpen }
311333
}
334+
return handleUnexpectedMessage(channel, message, state)
312335
}
313336

314337
export async function awaitingNewContractTx (channel, message, state) {
@@ -317,6 +340,7 @@ export async function awaitingNewContractTx (channel, message, state) {
317340
send(channel, { jsonrpc: '2.0', method: 'channels.update', params: { tx: signedTx } })
318341
return { handler: awaitingNewContractCompletion, state }
319342
}
343+
return handleUnexpectedMessage(channel, message, state)
320344
}
321345

322346
export function awaitingNewContractCompletion (channel, message, state) {
@@ -339,6 +363,7 @@ export function awaitingNewContractCompletion (channel, message, state) {
339363
state.resolve({ accepted: false })
340364
return { handler: channelOpen }
341365
}
366+
return handleUnexpectedMessage(channel, message, state)
342367
}
343368

344369
export async function awaitingCallContractUpdateTx (channel, message, state) {
@@ -347,6 +372,7 @@ export async function awaitingCallContractUpdateTx (channel, message, state) {
347372
send(channel, { jsonrpc: '2.0', method: 'channels.update', params: { tx: signedTx } })
348373
return { handler: awaitingCallContractCompletion, state }
349374
}
375+
return handleUnexpectedMessage(channel, message, state)
350376
}
351377

352378
export function awaitingCallContractCompletion (channel, message, state) {
@@ -359,6 +385,7 @@ export function awaitingCallContractCompletion (channel, message, state) {
359385
state.resolve({ accepted: false })
360386
return { handler: channelOpen }
361387
}
388+
return handleUnexpectedMessage(channel, message, state)
362389
}
363390

364391
export function awaitingCallsPruned (channels, message, state) {

es/channel/index.js

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ import {
3232
enqueueAction,
3333
send,
3434
channelId,
35-
call
35+
call,
36+
disconnect as channelDisconnect
3637
} from './internal'
3738
import * as R from 'ramda'
3839

@@ -62,6 +63,13 @@ function on (event, callback) {
6263
eventEmitters.get(this).on(event, callback)
6364
}
6465

66+
/**
67+
* Close the connection
68+
*/
69+
function disconnect () {
70+
return channelDisconnect(this)
71+
}
72+
6573
/**
6674
* Get current status
6775
*
@@ -205,15 +213,15 @@ async function balances (accounts) {
205213
* })
206214
*/
207215
function leave () {
208-
return new Promise((resolve) => {
216+
return new Promise((resolve, reject) => {
209217
enqueueAction(
210218
this,
211219
(channel, state) => state.handler === handlers.channelOpen,
212220
(channel, state) => {
213221
send(channel, { jsonrpc: '2.0', method: 'channels.leave', params: {} })
214222
return {
215223
handler: handlers.awaitingLeave,
216-
state: { resolve }
224+
state: { resolve, reject }
217225
}
218226
})
219227
})
@@ -232,7 +240,7 @@ function leave () {
232240
* ).then(tx => console.log('on_chain_tx', tx))
233241
*/
234242
function shutdown (sign) {
235-
return new Promise((resolve) => {
243+
return new Promise((resolve, reject) => {
236244
enqueueAction(
237245
this,
238246
(channel, state) => state.handler === handlers.channelOpen,
@@ -242,7 +250,8 @@ function shutdown (sign) {
242250
handler: handlers.awaitingShutdownTx,
243251
state: {
244252
sign,
245-
resolveShutdownPromise: resolve
253+
resolve,
254+
reject
246255
}
247256
}
248257
}
@@ -297,7 +306,7 @@ function shutdown (sign) {
297306
* })
298307
*/
299308
function withdraw (amount, sign, { onOnChainTx, onOwnWithdrawLocked, onWithdrawLocked } = {}) {
300-
return new Promise((resolve) => {
309+
return new Promise((resolve, reject) => {
301310
enqueueAction(
302311
this,
303312
(channel, state) => state.handler === handlers.channelOpen,
@@ -308,6 +317,7 @@ function withdraw (amount, sign, { onOnChainTx, onOwnWithdrawLocked, onWithdrawL
308317
state: {
309318
sign,
310319
resolve,
320+
reject,
311321
onOnChainTx,
312322
onOwnWithdrawLocked,
313323
onWithdrawLocked
@@ -366,7 +376,7 @@ function withdraw (amount, sign, { onOnChainTx, onOwnWithdrawLocked, onWithdrawL
366376
* })
367377
*/
368378
function deposit (amount, sign, { onOnChainTx, onOwnDepositLocked, onDepositLocked } = {}) {
369-
return new Promise((resolve) => {
379+
return new Promise((resolve, reject) => {
370380
enqueueAction(
371381
this,
372382
(channel, state) => state.handler === handlers.channelOpen,
@@ -377,6 +387,7 @@ function deposit (amount, sign, { onOnChainTx, onOwnDepositLocked, onDepositLock
377387
state: {
378388
sign,
379389
resolve,
390+
reject,
380391
onOnChainTx,
381392
onOwnDepositLocked,
382393
onDepositLocked
@@ -420,7 +431,7 @@ function deposit (amount, sign, { onOnChainTx, onOwnDepositLocked, onDepositLock
420431
* })
421432
*/
422433
function createContract ({ code, callData, deposit, vmVersion, abiVersion }, sign) {
423-
return new Promise((resolve) => {
434+
return new Promise((resolve, reject) => {
424435
enqueueAction(
425436
this,
426437
(channel, state) => state.handler === handlers.channelOpen,
@@ -440,7 +451,8 @@ function createContract ({ code, callData, deposit, vmVersion, abiVersion }, sig
440451
handler: handlers.awaitingNewContractTx,
441452
state: {
442453
sign,
443-
resolve
454+
resolve,
455+
reject
444456
}
445457
}
446458
}
@@ -485,7 +497,7 @@ function createContract ({ code, callData, deposit, vmVersion, abiVersion }, sig
485497
* })
486498
*/
487499
function callContract ({ amount, callData, contract, abiVersion }, sign) {
488-
return new Promise((resolve) => {
500+
return new Promise((resolve, reject) => {
489501
enqueueAction(
490502
this,
491503
(channel, state) => state.handler === handlers.channelOpen,
@@ -502,7 +514,7 @@ function callContract ({ amount, callData, contract, abiVersion }, sign) {
502514
})
503515
return {
504516
handler: handlers.awaitingCallContractUpdateTx,
505-
state: { resolve, sign }
517+
state: { resolve, reject, sign }
506518
}
507519
}
508520
)
@@ -717,6 +729,7 @@ const Channel = AsyncInit.compose({
717729
callContractStatic,
718730
getContractCall,
719731
getContractState,
732+
disconnect,
720733
cleanContractCalls
721734
}
722735
})

es/channel/internal.js

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import * as R from 'ramda'
2121
import { pascalToSnake } from '../utils/string'
2222
import { awaitingConnection } from './handlers'
2323

24+
const PING_TIMEOUT_MS = 10000
25+
const PONG_TIMEOUT_MS = 5000
26+
2427
const options = new WeakMap()
2528
const status = new WeakMap()
2629
const state = new WeakMap()
@@ -34,6 +37,8 @@ const actionQueueLocked = new WeakMap()
3437
const sequence = new WeakMap()
3538
const channelId = new WeakMap()
3639
const rpcCallbacks = new WeakMap()
40+
const pingTimeoutId = new WeakMap()
41+
const pongTimeoutId = new WeakMap()
3742

3843
function channelURL (url, params, endpoint = 'channel') {
3944
const paramString = R.join('&', R.values(R.mapObjIndexed((value, key) =>
@@ -154,6 +159,24 @@ function call (channel, method, params) {
154159
})
155160
}
156161

162+
function ping (channel) {
163+
const ws = websockets.get(channel)
164+
if (ws.readyState === ws.OPEN) {
165+
ws._connection.ping()
166+
clearTimeout(pongTimeoutId.get(channel))
167+
pongTimeoutId.set(channel, setTimeout(() => ws._connection.drop(), PONG_TIMEOUT_MS))
168+
}
169+
}
170+
171+
function disconnect (channel) {
172+
const ws = websockets.get(channel)
173+
if (ws.readyState === ws.OPEN) {
174+
ws._connection.close()
175+
clearTimeout(pongTimeoutId.get(channel))
176+
clearTimeout(pingTimeoutId.get(channel))
177+
}
178+
}
179+
157180
function WebSocket (url, callbacks) {
158181
function fireOnce (target, key, always) {
159182
target[key] = (...args) => {
@@ -185,11 +208,18 @@ async function initialize (channel, channelOptions) {
185208
eventEmitters.set(channel, new EventEmitter())
186209
sequence.set(channel, 0)
187210
rpcCallbacks.set(channel, new Map())
188-
websockets.set(channel, await WebSocket(wsUrl, {
211+
const ws = await WebSocket(wsUrl, {
189212
onopen: () => changeStatus(channel, 'connected'),
190213
onclose: () => changeStatus(channel, 'disconnected'),
191214
onmessage: ({ data }) => onMessage(channel, data)
192-
}))
215+
})
216+
ws._connection.on('pong', () => {
217+
clearTimeout(pongTimeoutId.get(channel))
218+
clearTimeout(pingTimeoutId.get(channel))
219+
pingTimeoutId.set(channel, setTimeout(() => ping(channel), PING_TIMEOUT_MS))
220+
})
221+
pingTimeoutId.set(channel, setTimeout(() => ping(channel), PING_TIMEOUT_MS))
222+
websockets.set(channel, ws)
193223
}
194224

195225
export {
@@ -204,5 +234,6 @@ export {
204234
send,
205235
enqueueAction,
206236
channelId,
207-
call
237+
call,
238+
disconnect
208239
}

test/integration/channel.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ describe('Channel', function () {
8585
await initiator.spend('6000000000000000', await responder.address())
8686
})
8787

88+
after(() => {
89+
initiatorCh.disconnect()
90+
responderCh.disconnect()
91+
})
92+
8893
beforeEach(() => {
8994
responderShouldRejectUpdate = false
9095
})
@@ -403,7 +408,8 @@ describe('Channel', function () {
403408
await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)])
404409
sinon.assert.notCalled(initiatorSign)
405410
sinon.assert.notCalled(responderSign)
406-
await initiatorCh.leave()
411+
initiatorCh.disconnect()
412+
responderCh.disconnect()
407413
})
408414

409415
it('can solo close a channel', async () => {

0 commit comments

Comments
 (0)