From b7341d671fdb42663aeac0cabe0998338f861502 Mon Sep 17 00:00:00 2001 From: Slugalisk Date: Mon, 4 Mar 2019 15:58:12 -0800 Subject: [PATCH] dht peer selection wip --- .babelrc | 4 +- package.json | 3 + src/App.js | 16 ++- src/DhtGraph.js | 192 ++++++++++++++++++++++++++++++--- src/DiagnosticMenu/index.js | 2 +- src/client.js | 15 ++- src/dht.js | 204 +++++++++++++++++++++++++++++------- src/dht.test.js | 31 +++--- src/loopback.js | 48 +++++++-- src/ppspp/index.js | 8 +- src/server.js | 10 +- src/wrtc.js | 10 +- tls/certificate.pem | 21 ++++ tls/key.pem | 28 +++++ tls/regen.sh | 4 + yarn.lock | 14 ++- 16 files changed, 511 insertions(+), 99 deletions(-) create mode 100644 tls/certificate.pem create mode 100644 tls/key.pem create mode 100755 tls/regen.sh diff --git a/.babelrc b/.babelrc index 3ebcf3b..65f8b13 100644 --- a/.babelrc +++ b/.babelrc @@ -1,5 +1,5 @@ { "plugins": [ - "@babel/plugin-transform-modules-commonjs" - ] + "@babel/plugin-transform-modules-commonjs" + ] } diff --git a/package.json b/package.json index f124a87..15c71a4 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,8 @@ "react-scripts": "2.1.5", "reconnecting-websocket": "^3.2.2", "seedrandom": "^2.4.4", + "three": "^0.102.0", + "three-spritetext": "^1.0.6", "urlsafe-base64": "^1.0.0", "uuid": "^3.3.2", "wrtc": "^0.3.5" @@ -41,6 +43,7 @@ "test": "react-scripts test --env=jsdom --watch --verbose false", "eject": "react-scripts eject", "server": "nodemon --delay 2.5s ./node_modules/.bin/babel-node src/server.js", + "_server": "nodemon ./node_modules/.bin/babel-node src/server.js", "predeploy": "yarn run build", "deploy": "gh-pages -d build" }, diff --git a/src/App.js b/src/App.js index 0d868f3..d10f04e 100644 --- a/src/App.js +++ b/src/App.js @@ -1,5 +1,6 @@ import React, {useEffect, useState} from 'react'; import URI from './ppspp/uri'; +// import DiagnosticMenu from './DiagnosticMenu'; import SwarmPlayer from './SwarmPlayer'; import {Client} from './client'; import {ConnManager} from './wrtc'; @@ -7,17 +8,21 @@ import {ConnManager} from './wrtc'; import './App.css'; -const BOOTSTRAP_ADDRESS = process.env.NODE_ENV === 'development' - ? window.location.hostname + ':8080' - : window.location.host; - const App = () => { const [ppsppClient, setPpsppClient] = useState(null); const [swarmUri, setSwarmUri] = useState(''); const [swarm, setSwarm] = useState(null); useEffect(() => { - const connManager = new ConnManager(BOOTSTRAP_ADDRESS); + const proto = window.location.protocol === 'https:' ? 'wss' : 'ws'; + const host = process.env.NODE_ENV === 'development' + ? window.location.hostname + ':8080' + : window.location.host; + const bootstrapAddress = `${proto}://${host}`; + + console.log({bootstrapAddress}); + + const connManager = new ConnManager(bootstrapAddress); Client.create(connManager).then(({ppsppClient, swarmUri}) => { setPpsppClient(ppsppClient); @@ -26,6 +31,7 @@ const App = () => { }, []); if (swarm) { + // return ; return ; } diff --git a/src/DhtGraph.js b/src/DhtGraph.js index 41a28c9..6f25e72 100644 --- a/src/DhtGraph.js +++ b/src/DhtGraph.js @@ -5,6 +5,8 @@ import arrayBufferToHex from 'array-buffer-to-hex'; import ForceGraph3D from 'react-force-graph-3d'; import {schemeCategory10} from 'd3-scale-chromatic'; import {scaleOrdinal} from 'd3-scale'; +// import SpriteText from 'three-spritetext'; +// import {Mesh, SphereBufferGeometry, MeshLambertMaterial} from 'three'; import './App.css'; @@ -17,16 +19,66 @@ const reduceGraph = (graph, {type, ...data}) => { nodes: [...graph.nodes, data], links: graph.links, }; + case 'REMOVE_NODE': + return { + nodes: graph.nodes.filter(node => node.id !== data.id), + links: graph.links.filter(({source, target}) => { + return source.id !== data.id && target.id !== data.id; + }), + }; case 'ADD_LINK': return { nodes: graph.nodes, - links: [...graph.links, data], + links: [...graph.links, {...data, activity: 0}], + }; + case 'UPDATE_LINK': + return { + nodes: graph.nodes, + links: graph.links.map((link) => { + const {source, target} = link; + if (source.id !== data.source || target.id !== data.target) { + return link; + } + return { + ...link, + ...data, + }; + }), + }; + case 'INCR_LINK_ACTIVITY': + return { + nodes: graph.nodes, + links: graph.links.map((link) => { + const {source, target} = link; + if (source.id !== data.source || target.id !== data.target) { + return link; + } + return { + ...link, + activity: link.activity + 1, + }; + }), + }; + case 'DECR_LINK_ACTIVITY': + return { + nodes: graph.nodes, + links: graph.links.map((link) => { + const {source, target} = link; + if (source.id !== data.source || target.id !== data.target) { + return link; + } + return { + ...link, + activity: link.activity - 1, + }; + }), }; case 'REMOVE_LINK': + console.log(data); return { nodes: graph.nodes, links: graph.links.filter(({source, target}) => { - return source !== data.source || target !== data.target; + return source.id !== data.source || target.id !== data.target; }), }; default: @@ -34,7 +86,7 @@ const reduceGraph = (graph, {type, ...data}) => { } }; -const App = () => { +const useGraph = () => { const [server] = useState(new Server()); const [gen, setGen] = useState(1); const [graph, dispatchGraphAction] = useReducer(reduceGraph, {nodes: [], links: []}); @@ -45,27 +97,92 @@ const App = () => { type: 'ADD_NODE', id: source, color: color(0), + dhtClient: server.dhtClient, }); }, []); - const handleAddPeerClick = (n=1) => { + const addNodes = (n = 1, props = {}) => { setGen(gen + 1); Promise.all(new Array(n).fill(0).map(() => Client.create(new ConnManager(server)))) - .then(clients => clients.forEach(({dhtClient: {id, channels}}) => { + .then(clients => clients.forEach(({dhtClient}) => { + // if (Math.random() > 0.5) { + // setTimeout(() => dhtClient.close(), Math.random() * 30000); + // } + + const {id, allChannels} = dhtClient; + console.log(allChannels); + const source = arrayBufferToHex(id); dispatchGraphAction({ type: 'ADD_NODE', id: source, color: color(gen), + dhtClient, + ...props, }); - - channels.on('added', ({id}) => dispatchGraphAction({ - type: 'ADD_LINK', - source, - target: arrayBufferToHex(id), + dhtClient.on('close', () => dispatchGraphAction({ + type: 'REMOVE_NODE', + id: source, })); - channels.on('removed', ({id}) => dispatchGraphAction({ + + allChannels.toArray().forEach(({id, conn}) => { + if (conn != null) { + dispatchGraphAction({ + type: 'ADD_LINK', + source, + target: arrayBufferToHex(id), + color: conn ? '#fff' : '#66f', + }); + } + }); + + // const registerConnObservers = (target, conn) => { + // const handleMessage = () => { + // dispatchGraphAction({type: 'INCR_LINK_ACTIVITY', source, target}); + // setTimeout(() => dispatchGraphAction({type: 'DECR_LINK_ACTIVITY', source, target}), 10000); + // }; + + // conn.on('message', handleMessage); + // conn.remote.on('message', handleMessage); + // }; + + allChannels.on('added', ({id, conn}) => { + if (conn != null) { + const target = arrayBufferToHex(id); + dispatchGraphAction({ + type: 'ADD_LINK', + source, + target, + color: conn ? '#fff' : '#66f', + }); + + + // registerConnObservers(target, conn); + } + }); + allChannels.on('updated', ({conn: oldConn}, {id, conn}) => { + if (conn !== oldConn) { + const target = arrayBufferToHex(id); + // dispatchGraphAction({ + // type: 'UPDATE_LINK', + // source, + // target, + // color: conn ? '#fff' : '#66f', + // }); + + dispatchGraphAction({ + type: 'ADD_LINK', + source, + target, + color: conn ? '#fff' : '#66f', + }); + + + // registerConnObservers(target, conn); + } + }); + allChannels.on('removed', ({id}) => dispatchGraphAction({ type: 'REMOVE_LINK', source, target: arrayBufferToHex(id), @@ -73,17 +190,62 @@ const App = () => { })); }; + return [graph, {addNodes}]; +}; + +const useNodePinger = () => { + const [source, setSource] = useState(null); + + const handleNodeClick = node => { + console.log(node); + if (source === null) { + setSource(node); + console.log('set source', arrayBufferToHex(node.dhtClient.id)); + return; + } + + console.log('pinging %s > %s', arrayBufferToHex(source.dhtClient.id), arrayBufferToHex(node.dhtClient.id)) + source.dhtClient.sendPing(node.dhtClient.id, (data) => { + console.log('received ping response', data); + }); + setSource(null); + }; + + return handleNodeClick; +}; + +const App = () => { + const [graph, {addNodes}] = useGraph(); + const handleNodeClick = useNodePinger(); + + console.log(graph); + + // useEffect(() => { + // let n = 1; + // const ivl = setInterval(() => { + // addNodes(1, {color: color(n)}); + // if (++ n == 50) { + // clearInterval(ivl); + // } + // }, 1000); + // return () => clearInterval(ivl); + // }, []); + return (
- - - - + + +
link.color} + linkWidth={1.5} + nodeRelSize={2} + nodeVal={node => node.dhtClient.allChannels.count()} />
); diff --git a/src/DiagnosticMenu/index.js b/src/DiagnosticMenu/index.js index 27b91db..3df3df1 100644 --- a/src/DiagnosticMenu/index.js +++ b/src/DiagnosticMenu/index.js @@ -160,7 +160,7 @@ class PeerStateTable extends Component { const rows = values.map(({key, value}) => ( {key} - {value} + {String(value)} )); diff --git a/src/client.js b/src/client.js index c3ff2e7..ba7f34d 100644 --- a/src/client.js +++ b/src/client.js @@ -40,10 +40,21 @@ export class Client { this.dhtClient.createChannel(id, client.createDataChannel('dht')); this.ppsppClient.createChannel(client.createDataChannel('ppspp')); - this.dhtClient.send(id, 'connect.request', {channelId: sub.id}, () => client.init()); + const timeout = setTimeout(() => client.close(), 10000); + + const init = () => { + clearTimeout(timeout); + client.init(); + }; + + this.dhtClient.send(id, 'connect.request', {channelId: sub.id}, init); } handleReceiveConnectRequest({data: {channelId, from}, callback}) { + // if (this.dhtClient.channels.count() > 10) { + // return; + // } + // console.log('handleReceiveConnectRequest', {channelId, from, callback}); const id = new hexToUint8Array(from); const client = this.connManager.createClient(new dht.SubChannel(this.dhtClient, id, channelId)); @@ -56,6 +67,6 @@ export class Client { } }); - callback(); + callback({}); } } diff --git a/src/dht.js b/src/dht.js index e2e32d7..37cbdeb 100644 --- a/src/dht.js +++ b/src/dht.js @@ -8,13 +8,11 @@ import hexToUint8Array from './hexToUint8Array'; const SEND_REPLICAS = 2; const MAX_HOPS = 10; -const DEFAULT_PEER_REQUEST_COUNT = 5; -const NUMBER_OF_NODES_PER_BUCKET = 15; +const DEFAULT_PEER_REQUEST_COUNT = 10; +const NUMBER_OF_NODES_PER_BUCKET = 2; -// TODO: replace dropped connections // TODO: implement get/set // TODO: implement connection dump rpc for network debugging -// TODO: update peers with new peer lists periodically export class Client extends EventEmitter { constructor(id) { @@ -23,16 +21,24 @@ export class Client extends EventEmitter { this.id = id; + // managed/unmanaged? this.channels = new KBucket({ numberOfNodesPerKBucket: NUMBER_OF_NODES_PER_BUCKET, localNodeId: this.id, }); + this.allChannels = new KBucket({ + numberOfNodesPerKBucket: 100, + localNodeId: this.id, + }); this.channels.on('ping', this.handlePing.bind(this)); this.channels.on('removed', this.handleRemoved.bind(this)); this.channels.on('updated', this.handleUpdated.bind(this)); this.channels.on('added', this.handleAdded.bind(this)); + this.knownPeerIds = {}; + this.channelMap = {}; + this.seenIds = new LRU({max: 1024}); this.knownRoutes = new LRU({ max: 1024, @@ -42,7 +48,48 @@ export class Client extends EventEmitter { this.on('receive.peers.request', this.handlePeersRequest.bind(this)); this.on('receive.ping.request', this.handlePingRequest.bind(this)); + this.on('receive.trace.request', this.handleTraceRequest.bind(this)); this.on('receive.callback.response', this.handleCallbackResponse.bind(this)); + + this.startPeerRequests(); + } + + close() { + this.stopPeerRequests(); + this.channels.toArray().forEach(({id}) => this.removeChannel(id)); + this.emit('close'); + } + + startPeerRequests() { + let index = 0; + let ids = Object.keys(this.knownPeerIds); + + const next = () => { + for (let retry = 0; retry <= ids.length; retry ++) { + if (index >= ids.length) { + index = 0; + ids = Object.keys(this.knownPeerIds); + } + + const id = ids[index]; + index ++; + + if (id) { + return id; + } + } + }; + + this.peerRequestIvl = setInterval(() => { + const id = next(); + if (id) { + this.sendPeerRequest(hexToUint8Array(id)); + } + }, 5000); + } + + stopPeerRequests() { + clearInterval(this.peerRequestIvl); } handlePing(channels, newChannel) { @@ -50,24 +97,31 @@ export class Client extends EventEmitter { const PING_TIMEOUT = 1000; const CONNECT_TIMEOUT = 1000; + const now = Date.now(); + channels.forEach(channel => { - const {id} = channel; + const {id, lastPing} = channel; + + if (now - lastPing < 30000) { + this.addChannel(channel); + return; + } // console.log('>>> conn exists, pinging'); // TODO: maybe keep track of how recently we pinged? debounce const replaceChannel = () => { - // console.log('>>> ping timeout'); - this.channels.remove(id); - this.channels.add(newChannel); + console.log('ping timeout removing', arrayBufferToHex(id)); + this.removeChannel(id); + this.addChannel(newChannel); }; // TODO: connection up/down getter if (channel.conn == null) { // console.log('>>> channel undefined, waiting to see if it gets replaced'); setTimeout(() => { - const channel = this.channels.get(id); + const channel = this.getChannel(id); if (channel != null && channel.conn != null) { - this.channels.add(channel); + this.addChannel(channel); return; } replaceChannel(); @@ -79,16 +133,31 @@ export class Client extends EventEmitter { // console.log('ping', arrayBufferToHex(id)); const replaceChannelTimeout = setTimeout(replaceChannel, PING_TIMEOUT); const clearReplaceChannelTimeout = () => { - // console.log('>>> clear timeout'); clearTimeout(replaceChannelTimeout); - this.channels.add(channel); + channel.lastPing = now; + this.addChannel(channel); }; - this.send(id, 'ping.request', {}, clearReplaceChannelTimeout); + this.sendPing(id, clearReplaceChannelTimeout); }); } + addChannel(channel) { + this.channels.add(channel); + this.allChannels.add(channel); + } + + removeChannel(id) { + this.channels.remove(id); + this.allChannels.remove(id); + delete this.channelMap[arrayBufferToHex(id)]; + } + + getChannel(id) { + return this.channelMap[arrayBufferToHex(id)]; + } + handleRemoved(channel) { - // console.log('remove', arrayBufferToHex(channel.id)); + console.log('remove', arrayBufferToHex(channel.id)); if (channel.conn) { channel.conn.close(); } @@ -113,25 +182,38 @@ export class Client extends EventEmitter { createChannel(id, conn) { const channel = new Channel(id, conn); - // console.log('create channel'); - // this.candidates.add(channel); + this.channelMap[arrayBufferToHex(id)] = channel; const messages = []; const bufferMessages = event => messages.push(event); + const handleMessage = this.handleMessage.bind(this, channel); + + // let requestPeersIvl = setInterval(() => this.sendPeerRequest(id), 30000); - conn.addEventListener('open', () => { - this.channels.add(channel); + const handleOpen = () => { + // console.log('opened', arrayBufferToHex(channel.id)); + this.addChannel(channel); conn.removeEventListener('message', bufferMessages); - conn.addEventListener('message', this.handleMessage.bind(this, channel)); - messages.forEach(event => this.handleMessage(channel, event)); + conn.addEventListener('message', handleMessage); + messages.forEach(handleMessage); - this.send(id, 'peers.request', {}, this.handlePeersResponse.bind(this)); - }); + this.sendPeerRequest(id); + setTimeout(() => this.sendPeerRequest(id), 1000); + }; + + const handleClose = () => { + // clearInterval(requestPeersIvl); + conn.removeEventListener('message', bufferMessages); + conn.removeEventListener('message', handleMessage); + conn.removeEventListener('open', handleOpen); + conn.removeEventListener('close', handleClose); + this.handleClose(channel); + }; conn.addEventListener('message', bufferMessages); - conn.addEventListener('close', this.handleClose.bind(this, channel)); - conn.addEventListener('error', this.handleError.bind(this, channel)); + conn.addEventListener('open', handleOpen, {once: true}); + conn.addEventListener('close', handleClose, {once: true}); } handleMessage(channel, event) { @@ -140,13 +222,26 @@ export class Client extends EventEmitter { const req = JSON.parse(event.data); const {type, id} = req; + if (req.trace) { + req.trace.push(arrayBufferToHex(this.id)); + // console.log('TRACE', req); + } + if (this.seenIds.get(id)) { // console.log('discarding seen message', id); + // if (req.trace) { + // console.log('DROPPED', req); + // } return; } this.seenIds.set(id, true); + if (!this.getChannel(channel.id)) { + console.warn('receiving channel is not known to dht', arrayBufferToHex(channel.id)); + } + this.knownRoutes.set(req.from, channel.id); + // this.knownRoutes.set(req.from, arrayBufferToHex(channel.id)); const to = hexToUint8Array(req.to); if (!arrayEqual(to, this.id)) { @@ -154,6 +249,10 @@ export class Client extends EventEmitter { return; } + // if (req.trace) { + // console.log('DELIVERED', req); + // } + const resCallback = (res={}, callback=null) => { const from = hexToUint8Array(req.from); const data = {re: id, ...res}; @@ -173,21 +272,26 @@ export class Client extends EventEmitter { } data.hops ++; - this.sendRaw(to, JSON.stringify(data)); + this.sendRaw(to, JSON.stringify(data), data.trace); } handleClose({id}) { - // console.warn('handleClose', arrayBufferToHex(id)); - this.channels.remove(id); - this.candidates.remove(id); + console.warn('handleClose', arrayBufferToHex(id)); + this.removeChannel(id); + delete this.channelMap[arrayBufferToHex(id)]; } - handleError(error) { - // console.log('error', error); + sendPing(to, callback=()=>{}) { + this.send(to, 'ping.request', {}, callback); } - handlePingRequest({callback}) { - callback(); + handlePingRequest({data, callback}) { + // console.log('PING', data); + callback({}); + } + + handleTraceRequest({data, callback}) { + callback(data); } handleCallbackResponse({data, callback}) { @@ -199,10 +303,21 @@ export class Client extends EventEmitter { } } + sendPeerRequest(to, count=DEFAULT_PEER_REQUEST_COUNT) { + const timeout = setTimeout(() => { + delete this.knownPeerIds[arrayBufferToHex(to)]; + }, 5000); + this.send(to, 'peers.request', {count}, res => { + clearTimeout(timeout); + this.handlePeersResponse(res); + }); + } + handlePeersRequest({data: {count=DEFAULT_PEER_REQUEST_COUNT, from}, callback}) { // console.log('handlePeersRequest'); - const ids = this.channels.closest(hexToUint8Array(from), count) + const ids = this.allChannels.closest(hexToUint8Array(from)) + .filter(({conn}) => conn != null) .map(({id}) => arrayBufferToHex(id)); callback({ids}); } @@ -212,8 +327,16 @@ export class Client extends EventEmitter { res.ids .map(id => hexToUint8Array(id)) - .filter(id => this.channels.get(id) == null && !arrayEqual(id, this.id)) - .forEach(id => this.channels.add(new Channel(id))); + .filter(id => !arrayEqual(id, this.id)) + .filter(id => { + const channel = this.getChannel(id); + return channel == null || channel.conn == null; + }) + .forEach(id => { + // TODO: store peer provenance so we can ignore bad actors? + this.knownPeerIds[arrayBufferToHex(id)] = true; + this.addChannel(new Channel(id)); + }); } send(to, type, data={}, callback=null) { @@ -234,6 +357,7 @@ export class Client extends EventEmitter { type, from: arrayBufferToHex(this.id), to: arrayBufferToHex(to), + trace: [arrayBufferToHex(this.id)], hops: 0, ...data, }); @@ -243,14 +367,16 @@ export class Client extends EventEmitter { this.sendRaw(to, message); } - sendRaw(to, message) { - let closest = this.channels.closest(to) + sendRaw(to, message, trace=[]) { + let closest = this.allChannels.closest(to) .filter(({conn}) => conn != null) + .filter(({idHex}) => trace.indexOf(idHex) === -1) .slice(0, SEND_REPLICAS); const knownRoute = this.knownRoutes.get(arrayBufferToHex(to)); if (knownRoute) { - const channel = this.channels.get(knownRoute); + const channel = this.getChannel(knownRoute); + // const channel = this.allChannels.get(knownRoute); if (channel != null && channel.conn != null) { closest.push(channel); } @@ -273,7 +399,9 @@ export class Client extends EventEmitter { export class Channel { constructor(id, conn) { this.id = id; + this.idHex = arrayBufferToHex(id); this.vectorClock = Date.now(); + this.lastPing = Date.now(); this.conn = conn; // console.log('channel', this); diff --git a/src/dht.test.js b/src/dht.test.js index 4b79669..969e012 100644 --- a/src/dht.test.js +++ b/src/dht.test.js @@ -14,10 +14,9 @@ it('dht clients can send and receive messages', async () => { dhtClients.forEach(client => client.on('receive.test', ({callback}) => callback())); await new Promise(resolve => setTimeout(resolve, 1000)) - .then(() => Promise.all(pairs.map(({src, dst}) => Promise.race([ - new Promise(resolve => dhtClients[src].send(dhtClients[dst].id, 'test', {src, dst}, resolve)), - new Promise((_, reject) => setTimeout(() => reject(new Error('callback timeout')), 3000)), - ])))); + .then(() => Promise.all(pairs.map(({src, dst}) => new Promise( + resolve => dhtClients[src].send(dhtClients[dst].id, 'test', {src, dst}, resolve), + )))); }); it('dht clients can process messages in busy clusters', async () => { @@ -31,10 +30,9 @@ it('dht clients can process messages in busy clusters', async () => { dhtClients.forEach(client => client.on('receive.test', ({callback}) => callback())); await new Promise(resolve => setTimeout(resolve, 1000)) - .then(() => Promise.all(pairs.map(({src, dst}) => Promise.race([ - new Promise(resolve => dhtClients[src].send(dhtClients[dst].id, 'test', {src, dst}, resolve)), - new Promise((_, reject) => setTimeout(() => reject(new Error('callback timeout')), 3000)), - ])))); + .then(() => Promise.all(pairs.map(({src, dst}) => new Promise( + resolve => dhtClients[src].send(dhtClients[dst].id, 'test', {src, dst}, resolve), + )))); }); it('dht clients can respond to messages via callbacks', async () => { @@ -48,14 +46,11 @@ it('dht clients can respond to messages via callbacks', async () => { dhtClients.forEach(client => client.on('receive.test', ({data: {src, dst}, callback}) => callback({src, dst}))); await new Promise(resolve => setTimeout(resolve, 1000)) - .then(() => Promise.all(pairs.map(({src, dst}) => Promise.race([ - new Promise((resolve, reject) => dhtClients[src].send(dhtClients[dst].id, 'test', {src, dst}, (data) => { - if (src === data.src && dst === data.dst) { - resolve(); - } else { - reject(new Error(`recv mismatch {src: ${src}, dst: ${dst}} vs {src: ${data.src}, dst: ${data.dst}}`)); - } - })), - new Promise((_, reject) => setTimeout(() => reject(new Error('callback timeout')), 3000)), - ])))); + .then(() => Promise.all(pairs.map(({src, dst}) => new Promise( + (resolve, reject) => dhtClients[src].send(dhtClients[dst].id, 'test', {src, dst}, (data) => { + expect(src).toEqual(data.src); + expect(dst).toEqual(data.dst); + resolve(); + }), + )))); }); diff --git a/src/loopback.js b/src/loopback.js index acf201d..8f02302 100644 --- a/src/loopback.js +++ b/src/loopback.js @@ -11,6 +11,8 @@ export class Server { } } +const queue = []; + export class ConnManager { constructor(server) { this.server = server; @@ -35,6 +37,12 @@ export class ConnManager { } }); + queue.push(client); + if (queue.length > 11) { + queue.shift().close(); + } + // setTimeout(() => client.close(), Math.random() * 30000); + return Promise.resolve({data, conn: conn.remote}); } @@ -54,13 +62,16 @@ export class Conn extends EventEmitter { this.remote = remote || new Conn(this); this.remote.remote = this; this.onmessage = () => {}; + this.closed = false; } send(data) { - setImmediate(() => { - this.remote.emit('message', {data}); - this.remote.onmessage({data}); - }); + if (!this.closed) { + setImmediate(() => { + this.remote.emit('message', {data}); + this.remote.onmessage({data}); + }); + } } addEventListener(...args) { @@ -71,8 +82,11 @@ export class Conn extends EventEmitter { this.removeListener(...args); } - // TODO: this should do something...? - close() {} + close() { + this.closed = true; + this.remote.emit('close'); + this.emit('close'); + } } export class Mediator extends EventEmitter { @@ -123,13 +137,16 @@ export class Client extends EventEmitter { this.mediator = mediator; this.datachannels = {}; + this.conns = []; mediator.on('datachannel', this.handleDataChannel.bind(this)); mediator.once('open', this.handleOpen.bind(this)); } handleDataChannel(label, conn) { - this.emit('datachannel', {label, channel: new ClientDataChannel(this, label, conn)}); + const channel = new ClientDataChannel(this, label, conn); + this.conns.push(channel); + this.emit('datachannel', {label, channel}); } handleOpen() { @@ -137,13 +154,20 @@ export class Client extends EventEmitter { } createDataChannel(label) { - this.datachannels[label] = new ClientDataChannel(this, label); - return this.datachannels[label]; + const channel = new ClientDataChannel(this, label); + this.datachannels[label] = channel; + this.conns.push(channel); + return channel; } init() { this.mediator.sendConnection(this.datachannels); } + + close() { + this.conns.forEach(conn => conn.close()); + this.emit('close'); + } } export class ClientDataChannel extends Conn { @@ -152,7 +176,11 @@ export class ClientDataChannel extends Conn { this.client = client; this.label = label; + this.open = false; - this.client.on('open', () => this.emit('open')); + this.client.on('open', () => { + this.emit('open'); + this.open = true; + }); } } diff --git a/src/ppspp/index.js b/src/ppspp/index.js index 2182140..1df695d 100644 --- a/src/ppspp/index.js +++ b/src/ppspp/index.js @@ -197,7 +197,8 @@ class Peer { this.remoteId = handshake.channelId; - if (this.state === PeerState.CONNECTING) { + console.log('received handshake message while in state', this.state); + if (this.state !== PeerState.READY) { this.sendHandshake(); this.swarm.scheduler.getRecentChunks().forEach(address => this.sendHave(address)); this.flush(); @@ -460,14 +461,17 @@ export class Channel extends EventEmitter { return; } if (handshake === undefined || handshake.type !== MessageTypes.HANDSHAKE) { + console.log('rejected new peer without handshake'); return; } const swarmId = handshake.options.find(({type}) => type === ProtocolOptions.SwarmIdentifier); if (swarmId === undefined) { + console.log('rejecting new peer with invalid swarm id'); return; } const swarm = this.swarms.get(SwarmId.from(swarmId.value)); if (swarm === undefined) { + console.log('rejecting new peer requesting unknown swarm'); return; } @@ -476,11 +480,13 @@ export class Channel extends EventEmitter { data = new peer.swarm.encoding.Datagram(); data.read(event.data); + // console.log('RECEIVED', data.messages.toArray()); peer.handleData(data); } send(data) { try { + // console.log('SENT', data); this.conn.send(data.toBuffer()); } catch (error) { console.log('encountered error while sending', error); diff --git a/src/server.js b/src/server.js index 8809183..4311c2b 100644 --- a/src/server.js +++ b/src/server.js @@ -1,7 +1,8 @@ require('dotenv').config(); -import http from 'http'; +import https from 'https'; import express from 'express'; +import fs from 'fs'; import ws from 'ws'; import hilbert from 'hilbert'; import ip2location from 'ip2location-nodejs'; @@ -14,7 +15,7 @@ import * as dht from './dht'; import * as ppspp from './ppspp'; import * as wrtc from './wrtc'; -ip2location.IP2Location_init(path.join(__dirname, '../vendor/IP2LOCATION-LITE-DB5.BIN')); +ip2location.IP2Location_init(path.join(__dirname, '..', 'vendor', 'IP2LOCATION-LITE-DB5.BIN')); const args = require('minimist')(process.argv.slice(2)); const port = args.p || 8080; @@ -24,7 +25,10 @@ let swarmUri = ''; const app = express(); app.use(express.static('public')); -const server = http.createServer(app); +const server = https.createServer({ + key: fs.readFileSync(path.join(__dirname, '..', 'tls', 'key.pem')), + cert: fs.readFileSync(path.join(__dirname, '..', 'tls', 'certificate.pem')), +}, app); server.listen(port, function(err) { const address = server.address(); diff --git a/src/wrtc.js b/src/wrtc.js index c70afe2..51c3f94 100644 --- a/src/wrtc.js +++ b/src/wrtc.js @@ -12,8 +12,7 @@ export class ConnManager { bootstrap() { return new Promise((resolve, reject) => { - const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'; - const conn = new WebSocket(`${protocol}://${this.bootstrapAddress}`); + const conn = new WebSocket(this.bootstrapAddress); conn.onmessage = (event) => { const data = JSON.parse(event.data); if (data.type === 'bootstrap') { @@ -163,6 +162,7 @@ export class Client extends EventEmitter { resolveWaitingChannel() { if (-- this.waitingChannels === 0) { + console.log('wrtc client opened'); this.emit('open'); } } @@ -176,4 +176,10 @@ export class Client extends EventEmitter { this.mediator.sendOffer(offer); }); } + + close() { + console.log('wrtc client closed'); + this.peerConn.close(); + this.emit('close'); + } } diff --git a/tls/certificate.pem b/tls/certificate.pem new file mode 100644 index 0000000..8608786 --- /dev/null +++ b/tls/certificate.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDYDCCAkigAwIBAgIJALdxmRw6Mn8KMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTkwMzA0MTY1MTU1WhcNMjAwMzAzMTY1MTU1WjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEAvJqGgGJZiGH2Hn86VLdJPVfZQTUoREPaxRYd8AnkGmXq6/ksNw0YjhRm +v2v/LVFc+44FyGyrGs4zkifFzxCyzJG5eKmCi64bcSVfSGAjPsacxHwYweeoVFdm +A9ra7ASzkbh5xFnQZpq2ZlCUzPapdb8jemT2ZHqLhnfo35SyR05iCPTUeWj8H1bh +DqoCwWqglFXtObLZT+HdJeDWPLqmtlhUDvnj9eOaxDplSFFVfx8yaaMHdwLAV8K3 +RuvhuQFr+bNcbZVwx6c421Qd4RFGAhGMyXP+l2OsRFX2zrqIZY7YfNO0qf833h1K +QT2XPJpAb6OANwQLVfOxokMmVMQVkQIDAQABo1MwUTAdBgNVHQ4EFgQUfEXdUdc2 +y1Nf0gKBvDHZpI+ZomEwHwYDVR0jBBgwFoAUfEXdUdc2y1Nf0gKBvDHZpI+ZomEw +DwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEABpIJaPEyMeJkXwuq +FC+evI3fFEA1cv/pfcG8Ey8LcN919OKkY5RL9XT4c88U5xX0fs2WI4lqt34sju9B +zd0DeqZsFMU6ep4ymzr5/vQs+LQ0ln/EGl3XDvKYYGlRuUCrNoPOLo293gifuqQD +nnxu+xfY8Tc3opG5U7b/KVLHS3jlcMCe+JfGT1ppb4nTm3YvmmXMk/mpJUOwyR9m ++n81dAwgd/5efMUkTiJJg6yWl/dn0h/zsvIKTAf8ngH+MNtTuMR07wjQvQL0ldoJ +fZgoHaxX9O2LkNM1pEuDP/7TKot+TcWUi8h8pHZALsLbZ8T4MyUfriffZ+1easCs +i45TwQ== +-----END CERTIFICATE----- diff --git a/tls/key.pem b/tls/key.pem new file mode 100644 index 0000000..2d751d9 --- /dev/null +++ b/tls/key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC8moaAYlmIYfYe +fzpUt0k9V9lBNShEQ9rFFh3wCeQaZerr+Sw3DRiOFGa/a/8tUVz7jgXIbKsazjOS +J8XPELLMkbl4qYKLrhtxJV9IYCM+xpzEfBjB56hUV2YD2trsBLORuHnEWdBmmrZm +UJTM9ql1vyN6ZPZkeouGd+jflLJHTmII9NR5aPwfVuEOqgLBaqCUVe05stlP4d0l +4NY8uqa2WFQO+eP145rEOmVIUVV/HzJpowd3AsBXwrdG6+G5AWv5s1xtlXDHpzjb +VB3hEUYCEYzJc/6XY6xEVfbOuohljth807Sp/zfeHUpBPZc8mkBvo4A3BAtV87Gi +QyZUxBWRAgMBAAECggEAbTgPK8nIj3IrVP3YibaacyusZFakOuQQjJGCRL91mNPs +dzqUoEy1K5Ik7jVcUKINdp+IOci0TZGby8e8hWzmHhFFltqS/tCUk7FQvFjwECp0 +HlS/AYtlr3HtYc4beRI/6uMpoMtiJAj0F8wSX594AvftU7Oj/Av7QKOv98nkc+Wx +mubmAaGOYD7SetPyja7RtzbxHcgHSbAB4npiBk1BuIOK2PGWXBHZ+kleI1DVuFVr +7CV/NgjiYKYeKHGA6E5DdujJJHXH4jYuRWZE1t5IOomBhpA6Ew76q5NG4TioASrB +LJ8A1iHM3tFfb61lso+vAcyKSXb0AfWJn1ZZPKtQAQKBgQDmLjMmXR36zVcUC4zO +//j1VJv0YgWD/cpWo3S7sYP0GvF1IGq7vhpG+e2YHbP/dbmIMuzrmC1aOdgZsNmo +wFPDmFah+NPGDO82QH3cZcf090wY2uxrtpVnGOlnmewoneVdKX71Ftyh5Z1Gafn2 +qNi2YNuWSRPkhM5jTJQMBs7xeQKBgQDRwmp4TlHN73tbhF2myyXoudcOSbFmeCP3 +fbLfZVYobE+JQzJ0GwXVOCVaCDF1DrfgKoGUj8b87s2zErpO0G14bSbD3TxnOMoH +0KlKIoziW8/XHXcqKv9kSfsc4ewZdDd/DZwavMbUIb1UIW44wLmKzZ+vEcLW/+vP +tHFrViEW2QKBgQDP7jgilcd5QnlBEFX4wF7jL7VaBBo7a82Wij3vN6KdpqEsT0Zb +ppUSPL+WxG5PfMdG88n14RhHAy3lt05IFMUWUc0gsCiOJi0JcyS/Lep9rc++PDWJ +7/vykBx4jCE7Z7dEezHGsQpJ7aQvcZhua4So68Ixdn5T1myiYxDx9EyI8QKBgELe +5Qx3g3LQnxX869JJv0sjO+EtMyYEtQqogwbfComWhgCvcH09RC3GWXoLVKEJ0Srg +wjC3aWunQeKN3OFVZyDWQ157wE7z9sxpBB9DRoC/XlIo7Z1+2a6CO1Do4Vj4CN0z +XInVltl37CDPC5GCN2R9yyOH2QKr0eSElQfwDFp5AoGACWuFIyBqOiW/tH/58htq +qj1ohkFAMBbqgg/QM1bLFTApGJOb8xPX7R+23nnGfCmnlGK7YbYqDaC497om8cxq +Bsma3s6nba5W3RZUNGIB6+GLkGdewJ3L43gYZkoCHFpRZ+9XEsT/Yz0A4K/1oUKr +o3zlHOvWwfg4tW9KNwQv42g= +-----END PRIVATE KEY----- diff --git a/tls/regen.sh b/tls/regen.sh new file mode 100755 index 0000000..9dcca04 --- /dev/null +++ b/tls/regen.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days 365 -out certificate.pem +openssl x509 -text -noout -in certificate.pem diff --git a/yarn.lock b/yarn.lock index a85aa95..3bf7675 100644 --- a/yarn.lock +++ b/yarn.lock @@ -379,7 +379,7 @@ dependencies: "@babel/helper-plugin-utils" "^7.0.0" -"@babel/plugin-syntax-dynamic-import@7.2.0", "@babel/plugin-syntax-dynamic-import@^7.2.0": +"@babel/plugin-syntax-dynamic-import@7.2.0": version "7.2.0" resolved "https://registry.yarnpkg.com/@babel/plugin-syntax-dynamic-import/-/plugin-syntax-dynamic-import-7.2.0.tgz#69c159ffaf4998122161ad8ebc5e6d1f55df8612" integrity sha512-mVxuJ0YroI/h/tbFTPGZR8cv6ai+STMKNBq0f8hFxsxWjl94qqhsb+wXbpNMDPU3cfR1TIsVFzU3nXyZMqyK4w== @@ -1719,7 +1719,7 @@ babel-messages@^6.23.0: dependencies: babel-runtime "^6.22.0" -babel-plugin-dynamic-import-node@2.2.0, babel-plugin-dynamic-import-node@^2.2.0: +babel-plugin-dynamic-import-node@2.2.0: version "2.2.0" resolved "https://registry.yarnpkg.com/babel-plugin-dynamic-import-node/-/babel-plugin-dynamic-import-node-2.2.0.tgz#c0adfb07d95f4a4495e9aaac6ec386c4d7c2524e" integrity sha512-fP899ELUnTaBcIzmrW7nniyqqdYWrWuJUyPWHxFa/c7r7hS6KC8FscNfLlBNIoPSc55kYMGEEKjPjJGCLbE1qA== @@ -10101,6 +10101,11 @@ three-render-objects@^1.4.1: three-orbit-controls "^82.1.0" three-trackballcontrols "^0.0.7" +three-spritetext@^1.0.6: + version "1.0.6" + resolved "https://registry.yarnpkg.com/three-spritetext/-/three-spritetext-1.0.6.tgz#917cb460ac5fec8d4618f8712f7718da1df1ba87" + integrity sha512-qgVBD3ss14g+2eInq4kGDm3ztMR+XtYuralWAi17ZYo4WHGLlRABcThdjvBjlRLog3Xv/sCzPomCBYaZpsVmCg== + three-trackballcontrols@^0.0.7: version "0.0.7" resolved "https://registry.yarnpkg.com/three-trackballcontrols/-/three-trackballcontrols-0.0.7.tgz#42cae1f6856d5f889eaae3a0e2fca9d214793e91" @@ -10111,6 +10116,11 @@ three@^0.101.1: resolved "https://registry.yarnpkg.com/three/-/three-0.101.1.tgz#e7681ef52f4e572cb84f307f16f540c457030ec6" integrity sha512-8ufimUVmRLtH+BTpEIbDjdGEKQOVWLMLgGynaKin1KbYTE136ZNOepJ8EgByi0tN43dQ7B1YrKLCJgXGy4bLmw== +three@^0.102.0: + version "0.102.0" + resolved "https://registry.yarnpkg.com/three/-/three-0.102.0.tgz#b26a7212d95f1ea043ab94598d55973ebe110cbb" + integrity sha512-ZszcMfos5uPugmf7svZshEIh/oncVAtnxpPqrFFj6hrh6SMyQ1qQBCY8QE0Lah1C5gXTx837rn2ikt0VLGqQ9g== + throat@^4.0.0: version "4.1.0" resolved "https://registry.yarnpkg.com/throat/-/throat-4.1.0.tgz#89037cbc92c56ab18926e6ba4cbb200e15672a6a"