From b4aca42e7f4bddda48b108b08a215573e43f839f Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Thu, 8 Feb 2018 12:08:18 -0800 Subject: [PATCH] Adds loop to 1) check if proposer, 2) package header root, 3) send to peers --- index.js | 4 ++- src/config.js | 35 ++++++++++++++++---- src/lib/Bridge.js | 72 ++++++++++++++++++++++++++++++++--------- src/lib/util/merkle.js | 1 - src/lib/util/staking.js | 3 +- src/lib/util/sync.js | 3 +- 6 files changed, 92 insertions(+), 26 deletions(-) diff --git a/index.js b/index.js index 5bb90b1..1b0055a 100644 --- a/index.js +++ b/index.js @@ -29,6 +29,8 @@ const argv = require('yargs') .command('list-wallets', 'List indices for saved wallets') .command('proposal-threshold', 'Number of blocks that must elapse before you will propose a root. Must be a power of two. Default 512') .alias('t', 'threshold') + .command('wallet', 'Use wallet with supplied index. For a list of wallets, use --list-wallets') + .alias('w', 'wallet') .command('stake', 'Stake a specified number of tokens. Must be coupled with --bridge (address) and may be coupled with --gasprice') .command('bridge', 'Bridge to stake on. This is the address of the bridge contract. Must be part of a bridge you have previously saved.') .command('gasprice', 'Gas price to use when making a transaction') @@ -47,7 +49,7 @@ if (!fs.existsSync(DIR)) { fs.mkdirSync(DIR); } Log.setLogger(DIR); // Numerical parameters/config -let THRESH = 512; +let THRESH = 4; if (argv['proposal-threshold']) { THRESH = util.lastPowTwo(argv['proposal-threshold']); } diff --git a/src/config.js b/src/config.js index 13feb87..1647fea 100644 --- a/src/config.js +++ b/src/config.js @@ -1,13 +1,14 @@ // Connect to and handle networks const fs = require('fs'); +const Peer = require('./lib/Peer.js').Peer; const jsonfile = require('jsonfile'); // Network indexes are formatted `networkA_networkB` where each network is // represented by the address of the deployed bridge contract on the given // blockchain. exports.getNetIndex = function(networks) { - const tmpA = parseInt(networks[0]); - const tmpB = parseInt(networks[1]); + const tmpA = networks[0]; + const tmpB = networks[1]; if (tmpB > tmpA) { return `${tmpA}_${tmpB}`; } else { return `${tmpB}_${tmpA}`; } } @@ -45,13 +46,12 @@ exports.addGroup = function(group, dir, cb) { addrB = group[1]; } _ifExists(fPath, (err, data, exists) => { + console.log('data', data) if (err) { cb(err); } else { const i = `${addrA}_${addrB}`; - if (!exists) { - data = {}; - data[i] = { }; - }; + if (!exists) { data = {}; }; + data[i] = { }; let isSaved = false; Object.keys(data).forEach((k) => { if (data[k].hostA == hostA && data[k].hostB == hostB && data[k].addrA == addrA && data[k].addrB == addrB) { isSaved = true; } @@ -102,7 +102,12 @@ exports.getPeers = function(dir, index, cb) { _ifExists(fPath, (err, data, exists) => { if (err) { cb(err); } else if (!exists) { cb('No peers saved.'); } - else { cb(null, data[index].peers); } + else { + _loadPeers(data[index].peers, (err, peers) => { + if (err) { cb(err); } + else { cb(null, peers); } + }) + } }) } @@ -139,3 +144,19 @@ function _ifExists(path, cb) { else { cb(null, f, true); } }) } + +function _loadPeers(hosts, cb, peers=[]) { + if (hosts.length == 0) { cb(null, peers.reverse()); } + else { + const host = hosts.pop(); + const params = host.split(':'); + const peer = new Peer(params[0], params[1]); + peer.on('connect', () => { console.log('#### peer connected'); }) + peer.on('error', (e) => { console.log('peer error', e)}) + peer.on('data', (d) => { console.log('peer data', d); }) + peer.on('message', (d) => { console.log('peer message', d);}) + peer.connect(); + peers.push(peer); + _loadPeers(hosts, cb, peers); + } +} diff --git a/src/lib/Bridge.js b/src/lib/Bridge.js index 9e6608f..2dd815b 100644 --- a/src/lib/Bridge.js +++ b/src/lib/Bridge.js @@ -4,10 +4,14 @@ const fs = require('fs'); const sync = require('./util/sync.js'); const bridges = require('./util/bridges.js'); const merkle = require('./util/merkle.js'); +const util = require('./util/util.js'); const Log = require('./../log.js'); const Wallet = require('./Wallet.js'); let logger; +// This will later be made dynamic +let NCHAINS = 2; + // Run a bridge client. This has a set of peers and two web3 clients corresponding // to a particular bridge, which corresponds to two specific networks. class Bridge { @@ -24,7 +28,7 @@ class Bridge { this.datadir = opts.datadir || `${process.cwd()}/data`; this.addrs = this.index.split('_'); // Number of blocks to wait to propose - this.proposeThreshold = opts.proposeThreshold || 512; + this.proposeThreshold = opts.proposeThreshold || 4; // Header data (number, timestamp, prevHeader, txRoot, receiptsRoot) is // stored in lines with 100 entries each. The remainder is kept in a cache. this.cache = []; @@ -32,11 +36,8 @@ class Bridge { // [bridgeToQuery][bridgedChain], where these indices are the addesses // of the bridge contracts sitting on those chains. this.bridgeData = {}; - this.bridgeData[this.addrs[0]] = {}; - this.bridgeData[this.addrs[0]][this.addrs[1]] = {}; - this.bridgeData[this.addrs[1]] = {}; - this.bridgeData[this.addrs[1]][this.addrs[0]] = {}; - + this.bridgeData[this.addrs[0]] = { lastBlocks: {}, proposer: null }; + this.bridgeData[this.addrs[1]] = { lastBlocks: {}, proposer: null }; // Create a server and listen to peer messages this.server = net.createServer((socket) => { @@ -44,6 +45,7 @@ class Bridge { logger.log('error', 'Server socket connection ended') }); socket.on('data', (data) => { + console.log('socket data', data) this.handleMsg(data); }); }); @@ -54,9 +56,9 @@ class Bridge { }) // Sync headers from the two networks - for (let i = 0; i < 2; i++) { + for (let i = 0; i < NCHAINS; i++) { sync.checkHeaders(`${this.datadir}/${this.addrs[i]}/headers`, (err, cache) => { - if (err) { log.error('Error getting headers', err, i); } + if (err) { logger.error('Error getting headers', err, i); } else { this.cache[i] = cache; this.sync(this.addrs[i], cache, this.clients[i], (err, newCache) => { @@ -74,15 +76,45 @@ class Bridge { }); } - this.getProposalRoot(this.addrs[0], 1, 4, () => {}) - // Continue syncing periodically setInterval(() => { this.sync(this.addrs[i], this.cache[i], this.clients[i], (err, newCache) => { if (err) { logger.log('warn', `ERROR: ${err}`); } this.cache[i] = newCache; }) - }, opts.queryDelay || 10000); + }, opts.queryDelay || 1000); + + // Do stuff if you're the proposer + setInterval(() => { + const bdata = this.bridgeData[this.addrs[i]]; + if (bdata.proposer == this.wallet.getAddress()) { + for (let j = 0; j < NCHAINS; j++) { + if (i != j) { + const chain = this.addrs[j]; + const lastBlock = bdata.lastBlocks[chain]; + const currentBlock = parseInt(this.cache[j][this.cache[j].length - 2][1]); + if (util.lastPowTwo(currentBlock - lastBlock) > this.proposeThreshold) { + console.log('proposing!') + this.getProposalRoot(chain, lastBlock + 1, currentBlock, (err, hRoot) => { + if (err) { logger.log('warn', `Error getting proposal root: ${err}`); } + else { + const msg = { + type: 'SIGREQ', + data: { + chain: chain, + start: lastBlock + 1, + end: currentBlock, + root: hRoot, + } + }; + this.broadcastMsg(msg) + } + }) + } + } + } + } + }, opts.queryDelay || 1000); }) } }) @@ -118,18 +150,18 @@ class Bridge { bridges.getLastBlock(queryAddr, bridgedAddr, client, (err, lastBlock) => { if (err) { cb(err); } else { - this.bridgeData[queryAddr][bridgedAddr].lastBlock = lastBlock; + this.bridgeData[queryAddr].lastBlocks[bridgedAddr] = lastBlock; bridges.getProposer(queryAddr, client, (err, proposer) => { if (err) { cb(err); } else { - this.bridgeData[queryAddr][bridgedAddr].proposer = proposer; + this.bridgeData[queryAddr].proposer = `0x${proposer.slice(26)}`; } }) } }) } - propose(queryAddr, bridgedAddr, client, cb) { + /*propose(queryAddr, bridgedAddr, client, cb) { const d = this.bridgeData[queryAddr][bridgedAddr]; const currentN = this.cache[this.cache.length - 1].n; if (d.proposer != this.wallet.getAddress() || this.proposeThreshold - 1 > currentN - d.lastBlock) { @@ -138,15 +170,17 @@ class Bridge { } else { // Get the root const range = util.lastPowTwo(currentN - d.lastBlock - 1); + console.log('range', range) getProposalRoot(queryAddr, d.lastBlock + 1, d.lastBlock + 1 + range, (err, headerRoot) => { // Broadcast root with metadata to all known peers }) } - } + }*/ // If this client is elected as the proposer, get the relevant data and form // the block header Merkle root. getProposalRoot(chain, startBlock, endBlock, cb) { + console.log('startBlock', startBlock, 'endBlock', endBlock) sync.loadHeaders(startBlock, endBlock, `${this.datadir}/${chain}/headers`, (err, headers, n) => { if (n < endBlock) { cb('Not synced to that block. Try again later.'); } else { @@ -160,6 +194,7 @@ class Bridge { // Handle an incoming socket message handleMsg(data) { + console.log('got data') const msg = JSON.parse(data.toString('utf8')); switch (msg.type) { case 'SIGREQ': @@ -180,6 +215,13 @@ class Bridge { break; } } + + broadcastMsg(_msg) { + const msg = JSON.stringify(_msg); + this.peers.forEach((peer) => { + peer.send('msg', msg); + }) + } } module.exports = Bridge; diff --git a/src/lib/util/merkle.js b/src/lib/util/merkle.js index ccc0031..f699e0f 100644 --- a/src/lib/util/merkle.js +++ b/src/lib/util/merkle.js @@ -15,7 +15,6 @@ exports.buildMerkleTree = buildMerkleTree; exports.getMerkleRoot = function(leaves) { const tree = buildMerkleTree(leaves); - console.log('tree', tree) return tree[tree.length - 1][0]; } diff --git a/src/lib/util/staking.js b/src/lib/util/staking.js index 5785ba7..25a3bf7 100644 --- a/src/lib/util/staking.js +++ b/src/lib/util/staking.js @@ -1,7 +1,7 @@ // Functions for staking and destaking const leftPad = require('left-pad'); -exports.stake = function(bridge, amount, from, client, wallet, gasPrice=1000000000, gas=200000) { +exports.stake = function(bridge, amount, from, client, wallet, gasPrice=1000000000, gas=500000) { let stakeToken; let tx = { gas: gas, @@ -82,6 +82,7 @@ function _stake(tx, wallet, client, cb) { if (err) { console.log('Error staking', err); } else { client.eth.getTransactionReceipt(h, (err, receipt) => { + console.log('receipt', receipt); if (err) { cb(err); } else if (receipt.logs.length < 1) { cb('Stake did not execute'); } else { cb(null); } diff --git a/src/lib/util/sync.js b/src/lib/util/sync.js index d250c7c..4da1b60 100644 --- a/src/lib/util/sync.js +++ b/src/lib/util/sync.js @@ -47,12 +47,13 @@ lastHeader=`0x${leftPad(0, 64, '0')}`) { }; item.header = _hashHeader(item.n, lastHeader, item.timestamp, item.transactionsRoot, item.receiptsRoot); + console.log('item', item) cache.push(item); if (cache.length > 100) { // Write chunks of 100, but make sure the cache has at least one value // at all times (so it can be referenced in other places) fStream.write(_stringify(cache.slice(0, cache.length - 1))); - cache = cache[cache.length - 1]; + cache = [ cache[cache.length - 1] ]; } } syncData(currentBlockN, lastBlockN + 1, client, fStream, cache, cb);