Skip to content
This repository has been archived by the owner on Mar 6, 2023. It is now read-only.

Commit

Permalink
Adds blockchain query functions and logger
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-miller-0 committed Feb 6, 2018
1 parent 04106af commit 801b5de
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 34 deletions.
17 changes: 12 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const config = require('./src/config.js');
const Bridge = require('./src/lib/Bridge.js');
const Peers = require('./src/peers.js');
const Clients = require('./src/clients.js');
const Log = require('./src/log.js');

const argv = require('yargs')
.usage('Usage: $0 <cmd> [options]')
Expand All @@ -29,6 +30,9 @@ if (argv.datadir) {
DIR = argv.datadir;
}
if (!fs.existsSync(DIR)) { fs.mkdirSync(DIR); }
// Setup the logger
Log.setLogger(DIR);


if(argv.network) {
// Specify two networks being bridged
Expand Down Expand Up @@ -74,18 +78,21 @@ if (argv.start) {
// Start listening to peers and blockchains
let peers;
let clients;
console.log('getting peers')
config.getPeers(DIR, INDEX, (err, _peers) => {
peers = _peers;
console.log('getting hosts')
config.getHosts(DIR, INDEX, (err, _hosts) => {
console.log('connecting to clients')
Clients.connectToClients(_hosts, (err, _clients) => {
console.log('creating bridge')
const _port = isNaN(parseInt(argv.start)) ? null : parseInt(argv.start);
// Start a new Bridge client. This consists of a server listening to
// a given port and handling socket messages from peers. The client
// also checks linked web3 hosts for updated blockchain data.
const b = new Bridge({ index: INDEX, peers: _peers, clients: _clients, datadir: DIR });
const b = new Bridge({
index: INDEX,
peers: _peers,
clients: _clients,
datadir: DIR,
port: _port,
});
})
})
})
Expand Down
10 changes: 10 additions & 0 deletions server.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:46:41.257Z"}
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:46:41.264Z"}
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:47:41.137Z"}
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:47:41.143Z"}
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:49:17.417Z"}
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:49:17.423Z"}
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:49:28.448Z"}
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:49:28.454Z"}
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:49:31.997Z"}
{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{"0x880248D134a1481b051AfB4B765758F9b9152991":{}},"0x880248D134a1481b051AfB4B765758F9b9152991":{"0x263d4d7fffc17149ae3c436e4bd6aa4cb20303e9":{}},"level":"info","message":"","timestamp":"2018-02-06T22:49:32.002Z"}
6 changes: 4 additions & 2 deletions src/clients.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ const Web3 = require('web3');

// Load a list of connections to web3 clients
function connectToClients(hosts, cb, clients=[]) {
if (hosts.length == 0) { cb(null, clients); }
else {
if (hosts.length == 0) {
// Clients will be in reverse order
cb(null, clients.reverse());
} else {
try {
const host = hosts.pop();
const web3 = new Web3(new Web3.providers.HttpProvider(host));
Expand Down
66 changes: 52 additions & 14 deletions src/lib/Bridge.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,40 @@
const net = require('net');
const fs = require('fs');
const sync = require('./sync');
const bridges = require('./bridgesUtil.js');
const Log = require('./../log.js');
let logger;

// Run a bridge client. This has a set of peers and two web3 clients corresponding
// to a particular bridge, which corresponds to two specific networks.
class Bridge {
constructor(opts) {
logger = Log.getLogger();

if (!opts) { opts = {}; }
this.port = opts.port || 8000;
this.peers = opts.peers || [];
this.clients = opts.clients || [];
this.index = opts.index || '';
this.datadir = opts.datadir || `${process.cwd()}/data`;
this.addrs = this.index.split('_');
// Header data (number, timestamp, prevHeader, txRoot, receiptsRoot) is
// stored in lines with 100 entries each. The remainder is kept in a cache.
this.cache = [];
// Data for the bridges are kept in memory. It is indexed based on
// [bridgeToQuery][bridgedChain], where these indices are the addesses
// of the bridge contracts sitting on those chains.
this.bridgeData = {};
this.bridgeData[this.addrs[0]] = {};
this.bridgeData[this.addrs[0]][this.addrs[1]] = {};
this.bridgeData[this.addrs[1]] = {};
this.bridgeData[this.addrs[1]][this.addrs[0]] = {};


// Create a server and listen to peer messages
this.server = net.createServer((socket) => {
socket.on('end', () => {
console.log('client disconnected');
logger.log('error', 'Server socket connection ended')
});
socket.on('data', (data) => {
this.handleMsg(data);
Expand All @@ -27,27 +43,33 @@ class Bridge {

// Listen on port
this.server.listen(this.port, () => {
console.log(`Server listening on ${this.port}`)
logger.log('info', `Listening on port ${this.port}`)
})


// TODO: This is writing a second set of header data after it has been written. Looks like it's starting from block 1...


// Sync headers from the two networks
for (let i = 0; i < 1; i++) {
for (let i = 0; i < 2; i++) {
sync.checkHeaders(`${this.datadir}/${this.addrs[i]}/headers`, (err, cache) => {
if (err) { console.log('Error getting headers', err, i); }
if (err) { log.error('Error getting headers', err, i); }
else {
this.cache[i] = cache;
this.sync(this.addrs[i], cache, this.clients[i], (err, newCache) => {
if (err) { console.log(`ERROR: ${err}`); }
this.cache[i] = newCache;

if (err) { logger.log('warn', `ERROR: ${err}`); }
else { this.cache[i] = newCache; }
// Get the bridge data. This will be updated periodically (when we get new
// messages)
if (i == 0) {
this.getBridgeData(this.addrs[0], this.addrs[1], this.clients[0], (err) => {
if (err) { logger.log('warn', `ERROR: ${err}`); }
});
} else {
this.getBridgeData(this.addrs[1], this.addrs[0], this.clients[1], (err) => {
if (err) { logger.log('warn', `ERROR: ${err}`); }
});
}
// Continue syncing periodically
setInterval(() => {
this.sync(this.addrs[i], this.cache[i], this.clients[i], (err, newCache) => {
if (err) { console.log(`ERROR: ${err}`); }
if (err) { logger.log('warn', `ERROR: ${err}`); }
this.cache[i] = newCache;
})
}, opts.queryDelay || 10000);
Expand All @@ -57,6 +79,7 @@ class Bridge {
}
}


// Sync a given client. Headers are persisted in sets of 100 along with their
// corresponding block numbers
sync(addr, cache, client, cb) {
Expand All @@ -67,7 +90,7 @@ class Bridge {
client.eth.getBlockNumber((err, currentBlock) => {
let cacheBlock = 0;
if (cache[cache.length - 1] != undefined) { cacheBlock = parseInt(cache[cache.length - 1][0]); }
if (err) { console.log(`ERROR: ${err}`); }
if (err) { cb(err); }
else if (currentBlock > cacheBlock) {
// Create a write stream so we can write to the header file
const stream = fs.createWriteStream(fPath, { flags: 'a' });
Expand All @@ -80,10 +103,25 @@ class Bridge {
})
}

// Get current data on the bridges
getBridgeData(queryAddr, bridgedAddr, client, cb) {
bridges.getLastBlock(queryAddr, bridgedAddr, client, (err, lastBlock) => {
if (err) { cb(err); }
else {
this.bridgeData[queryAddr][bridgedAddr].lastBlock = lastBlock;
bridges.getProposer(queryAddr, client, (err, proposer) => {
if (err) { cb(err); }
else {
this.bridgeData[queryAddr][bridgedAddr].proposer = proposer;
}
})
}
})
}

// Handle an incoming socket message
handleMsg(data) {
const msg = JSON.parse(data.toString('utf8'));
console.log('this', this)
switch (msg.type) {
case 'SIGREQ':
console.log('signature request', msg);
Expand Down
27 changes: 27 additions & 0 deletions src/lib/bridgesUtil.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Formatted calls to the bridge contracts
const leftPad = require('left-pad');

// Get the block corresponding to the last block header Merkle root committed
// to the chain. queryAddr is the address of the bridge contract on the chain
// being queried and bridgedAddr is the address of the bridge on the other chain.
exports.getLastBlock = function(queryAddr, bridgedAddr, client, cb) {
const data = `${LAST_BLOCK_ABI}${leftPad(bridgedAddr.slice(2), 64, '0')}`;
client.eth.call({ to: queryAddr, data: data }, (err, ret) => {
if (err) { cb(err); }
else { cb(null, parseInt(ret, 16)); }
})
}

// Get the current proposer for the chain being queried
exports.getProposer = function(queryAddr, client, cb) {
const data = GET_PROPOSER_ABI;
client.eth.call({ to: queryAddr, data: data }, (err, ret) => {
if (err) { cb(err); }
else { cb(null, ret); }
})
}

// getLastBlock(address)
const LAST_BLOCK_ABI = '0x4929dfa1';
// getProposer()
const GET_PROPOSER_ABI = '0xe9790d02';
11 changes: 6 additions & 5 deletions src/lib/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ exports.checkHeaders = function(fPath, cb) {
else {
let line;
const reader = readline.createInterface({ input: fs.createReadStream(fPath) })
reader.on('error', (err) => { console.log('got dur err', err); cb(err); reader.close(); })
reader.on('error', (err) => { cb(err); reader.close(); })
reader.on('line', (_line) => {
line = _line.split(',').splice(1); // Every line has a leading comma
for (let i = 0; i < line.length; i += 4) {
Expand All @@ -30,7 +30,6 @@ exports.checkHeaders = function(fPath, cb) {

// Sync up to the current block and save to a file. Only header data is stored.
function syncData(currentBlockN, lastBlockN, client, fStream, cache=[], cb) {
// console.log('currentBlockN', currentBlockN, 'lastBlockN', lastBlockN)
if (currentBlockN == lastBlockN) {
fStream.end(_stringify(cache));
cb(null, _zipCache(cache));
Expand All @@ -46,9 +45,11 @@ function syncData(currentBlockN, lastBlockN, client, fStream, cache=[], cb) {
receiptsRoot: block.receiptsRoot,
};
cache.push(item);
if (cache.length > 99) {
fStream.write(_stringify(cache));
cache = [];
if (cache.length > 100) {
// Write chunks of 100, but make sure the cache has at least one value
// at all times (so it can be referenced in other places)
fStream.write(_stringify(cache.slice(0, cache.length - 1)));
cache = cache[cache.length - 1];
}
}
syncData(currentBlockN, lastBlockN + 1, client, fStream, cache, cb);
Expand Down
19 changes: 19 additions & 0 deletions src/log.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Logging
const winston = require('winston');

// This will be overwritten when we know where the log file should be;
let logger = null;
exports.logger = logger;

// Setup the winston logger
exports.setLogger = function(fPath) {
logger = new (winston.Logger)({
transports: [
new (winston.transports.File)({ filename: `${fPath}/log`, json: false, timestamp: true })
]
})
}

exports.getLogger = function() {
return logger;
}
14 changes: 6 additions & 8 deletions src/peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
const fs = require('fs');
const Peer = require('./lib/Peer.js').Peer;
const Promise = require('bluebird').Promise;
const log = require('winston');
log.add(
log.transports.File, { filename: 'tmp.log', json: true, timestamp: true, prettyPrint: true}
)
const Log = require('./log');

// Given a list of hosts, form p2p connections with them.
function connectToPeers(peers, cb, connections=[]) {
let logger = Log.getLogger();
if (peers.length == 0) { cb(connections); }
else {
// Peer stored as e.g. localhost:7545
Expand All @@ -17,18 +15,18 @@ function connectToPeers(peers, cb, connections=[]) {
const peer = new Peer(loc[0], parseInt(loc[1]));
const name = `${loc[0]}:${loc[1]}`;
peer.on('connect', (c) => {
log.info(`Connected to peer ${name}`);
logger.log('info', `Connected to peer ${name}`);
})
peer.on('end', (d) => {
log.info(`Disconnected from ${name}`);
logger.log('info', `Disconnected from ${name}`);
})
peer.on('error', (e) => {
log.warn(`Error from ${name}: ${e.error}`);
logger.log('warn', `Error from ${name}: ${e.error}`);
peer.disconnect();
disconnected = true;
})
peer.on('message', (m) => {
log.info(`Message from ${name} : ${m}`);
logger.log('info', `Message from ${name} : ${m}`);
})
peer.connect();
setTimeout(() => {
Expand Down

0 comments on commit 801b5de

Please sign in to comment.