diff --git a/lib/orderbook/OrderBook.ts b/lib/orderbook/OrderBook.ts index d6e929249..c90911b2c 100644 --- a/lib/orderbook/OrderBook.ts +++ b/lib/orderbook/OrderBook.ts @@ -77,6 +77,11 @@ class OrderBook extends EventEmitter { this.bindSwaps(); } + private static createOutgoingOrder = (order: orders.OwnOrder): orders.OutgoingOrder => { + const { createdAt, localId, initialQuantity, hold, ...outgoingOrder } = order; + return outgoingOrder ; + } + private bindPool = () => { if (this.pool) { this.pool.on('packet.order', this.addPeerOrder); @@ -552,8 +557,8 @@ class OrderBook extends EventEmitter { // send only requested pairIds if (pairIds.includes(tp.pairId)) { const orders = tp.getOwnOrders(); - orders.buy.forEach(order => outgoingOrders.push(this.createOutgoingOrder(order))); - orders.sell.forEach(order => outgoingOrders.push(this.createOutgoingOrder(order))); + orders.buy.forEach(order => outgoingOrders.push(OrderBook.createOutgoingOrder(order))); + orders.sell.forEach(order => outgoingOrders.push(OrderBook.createOutgoingOrder(order))); } }); peer.sendOrders(outgoingOrders, reqId); @@ -565,7 +570,7 @@ class OrderBook extends EventEmitter { private broadcastOrder = (order: orders.OwnOrder) => { if (this.pool) { if (this.swaps && this.swaps.isPairSupported(order.pairId)) { - const outgoingOrder = this.createOutgoingOrder(order); + const outgoingOrder = OrderBook.createOutgoingOrder(order); this.pool.broadcastOrder(outgoingOrder); } } @@ -583,11 +588,6 @@ class OrderBook extends EventEmitter { return { ...order, id, initialQuantity: order.quantity, createdAt: ms() }; } - private createOutgoingOrder = (order: orders.OwnOrder): orders.OutgoingOrder => { - const { createdAt, localId, ...outgoingOrder } = order; - return outgoingOrder; - } - private handleOrderInvalidation = (oi: OrderPortion, peerPubKey: string) => { try { const removeResult = this.removePeerOrder(oi.id, oi.pairId, peerPubKey, oi.quantity); diff --git a/lib/p2p/Peer.ts b/lib/p2p/Peer.ts index 2b46289e9..a5e4172ef 100644 --- a/lib/p2p/Peer.ts +++ b/lib/p2p/Peer.ts @@ -11,7 +11,6 @@ import { Packet, PacketDirection, PacketType } from './packets'; import { HandshakeState, Address, NodeConnectionInfo } from '../types/p2p'; import errors from './errors'; import addressUtils from '../utils/addressUtils'; -import { DisconnectingPacketBody } from './packets/types/DisconnectingPacket'; /** Key info about a peer for display purposes */ type PeerInfo = { @@ -45,7 +44,6 @@ interface Peer { class Peer extends EventEmitter { // TODO: properties documentation public inbound!: boolean; - public connected = false; public recvDisconnectionReason?: DisconnectionReason; public sentDisconnectionReason?: DisconnectionReason; public expectedNodePubKey?: string; @@ -90,6 +88,10 @@ class Peer extends EventEmitter { return this.handshakeState ? this.handshakeState.addresses : undefined; } + public get connected(): boolean { + return this.socket !== undefined && !this.socket.destroyed; + } + public get info(): PeerInfo { return { address: addressUtils.toString(this.address), @@ -119,7 +121,6 @@ class Peer extends EventEmitter { const peer = new Peer(logger, addressUtils.fromSocket(socket)); peer.inbound = true; - peer.connected = true; peer.socket = socket; peer.bindSocket(); @@ -201,7 +202,6 @@ class Peer extends EventEmitter { } this.closed = true; - this.connected = false; if (this.socket) { if (reason !== undefined) { @@ -247,11 +247,8 @@ class Peer extends EventEmitter { public sendPacket = (packet: Packet): void => { this.sendRaw(packet.toRaw()); - if (this.nodePubKey !== undefined) { - this.logger.trace(`Sent packet to ${this.nodePubKey}: ${packet.body ? JSON.stringify(packet.body) : ''}`); - } else { - this.logger.trace(`Sent packet to ${addressUtils.toString(this.address)}: ${packet.body ? JSON.stringify(packet.body) : ''}`); - } + const recipient = this.nodePubKey !== undefined ? this.nodePubKey : addressUtils.toString(this.address); + this.logger.trace(`Sent ${PacketType[packet.type]} packet to ${recipient}: ${JSON.stringify(packet)}`); this.packetCount += 1; if (packet.direction === PacketDirection.Request) { @@ -306,12 +303,14 @@ class Peer extends EventEmitter { } this.socket!.removeListener('error', onError); this.socket!.removeListener('connect', onConnect); - this.retryConnectionTimer = undefined; + if (this.retryConnectionTimer) { + clearTimeout(this.retryConnectionTimer); + this.retryConnectionTimer = undefined; + } }; const onConnect = () => { this.connectTime = Date.now(); - this.connected = true; this.bindSocket(); @@ -391,7 +390,8 @@ class Peer extends EventEmitter { for (const [packetId, entry] of this.responseMap) { if (now > entry.timeout) { - this.emitError(`Peer (${this.nodePubKey}) is stalling (${packetId})`); + this.emitError(`Peer is stalling (${packetId})`); + entry.reject('response timed out'); this.close(DisconnectionReason.ResponseStalling, packetId); return; } @@ -472,16 +472,7 @@ class Peer extends EventEmitter { this.close(); }); - this.socket!.on('data', (data) => { - this.lastRecv = Date.now(); - const dataStr = data.toString(); - if (this.nodePubKey !== undefined) { - this.logger.trace(`Received data from ${this.nodePubKey}: ${dataStr}`); - } else { - this.logger.trace(`Received data from ${addressUtils.toString(this.address)}: ${data.toString()}`); - } - this.parser.feed(data); - }); + this.socket!.on('data', this.parser.feed); this.socket!.setNoDelay(true); } @@ -530,6 +521,10 @@ class Peer extends EventEmitter { } private handlePacket = (packet: Packet): void => { + this.lastRecv = Date.now(); + const sender = this.nodePubKey !== undefined ? this.nodePubKey : addressUtils.toString(this.address); + this.logger.trace(`Received ${PacketType[packet.type]} packet from ${sender}${JSON.stringify(packet)}`); + if (this.isPacketSolicited(packet)) { switch (packet.type) { case PacketType.Hello: { diff --git a/lib/p2p/Pool.ts b/lib/p2p/Pool.ts index 72b7a944a..01bde16b4 100644 --- a/lib/p2p/Pool.ts +++ b/lib/p2p/Pool.ts @@ -370,7 +370,7 @@ class Pool extends EventEmitter { } } - public closePeer = async (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string): Promise => { + public closePeer = (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => { const peer = this.peers.get(nodePubKey); if (peer) { peer.close(reason, reasonPayload); @@ -570,12 +570,21 @@ class Pool extends EventEmitter { return; } + // check to make sure the socket was not destroyed during or immediately after the handshake + if (!peer.connected) { + this.logger.error(`the socket to node ${peer.nodePubKey} was disconnected`); + return; + } + this.logger.verbose(`opened connection to ${peer.nodePubKey} at ${addressUtils.toString(peer.address)}`); this.peers.set(peer.nodePubKey, peer); peer.active = true; - // request peer's orders - peer.sendPacket(new packets.GetOrdersPacket({ pairIds: this.handshakeData.pairs })); + if (this.handshakeData.pairs.length > 0) { + // request peer's orders + peer.sendPacket(new packets.GetOrdersPacket({ pairIds: this.handshakeData.pairs })); + } + if (this.config.discover) { // request peer's known nodes only if p2p.discover option is true peer.sendPacket(new packets.GetNodesPacket()); @@ -648,41 +657,7 @@ class Pool extends EventEmitter { this.pendingOutboundPeers.delete(peer.nodePubKey!); }); - peer.once('close', async () => { - if (!peer.nodePubKey && peer.expectedNodePubKey) { - this.pendingOutboundPeers.delete(peer.expectedNodePubKey); - } - - if (!peer.active) { - return; - } - - if (peer.nodePubKey) { - this.pendingOutboundPeers.delete(peer.nodePubKey); - this.peers.delete(peer.nodePubKey); - } - this.emit('peer.close', peer); - - // if handshake passed and peer disconnected from us for stalling or without specifying any reason - - // reconnect, for that might have been due to a temporary loss in connectivity - const unintentionalDisconnect = - (peer.sentDisconnectionReason === undefined || peer.sentDisconnectionReason === DisconnectionReason.ResponseStalling) && - (peer.recvDisconnectionReason === undefined || peer.recvDisconnectionReason === DisconnectionReason.ResponseStalling); - const addresses = peer.addresses || []; - - let lastAddress; - if (peer.inbound) { - lastAddress = addresses.length > 0 ? addresses[0] : undefined; - } else { - lastAddress = peer.address; - } - - if (peer.nodePubKey && unintentionalDisconnect && (addresses.length || lastAddress)) { - this.logger.debug(`attempting to reconnect to a disconnected peer ${peer.nodePubKey}`); - const node = { lastAddress, addresses, nodePubKey: peer.nodePubKey }; - await this.tryConnectNode(node, true); - } - }); + peer.once('close', () => this.handlePeerClose(peer)); peer.once('reputation', async (event) => { this.logger.debug(`Peer (${peer.nodePubKey || addressUtils.toString(peer.address)}), received reputation event: ${ReputationEvent[event]}`); @@ -692,6 +667,42 @@ class Pool extends EventEmitter { }); } + private handlePeerClose = async (peer: Peer) => { + if (!peer.nodePubKey && peer.expectedNodePubKey) { + this.pendingOutboundPeers.delete(peer.expectedNodePubKey); + } + + if (!peer.active) { + return; + } + + if (peer.nodePubKey) { + this.pendingOutboundPeers.delete(peer.nodePubKey); + this.peers.delete(peer.nodePubKey); + } + this.emit('peer.close', peer); + + // if handshake passed and peer disconnected from us for stalling or without specifying any reason - + // reconnect, for that might have been due to a temporary loss in connectivity + const unintentionalDisconnect = + (peer.sentDisconnectionReason === undefined || peer.sentDisconnectionReason === DisconnectionReason.ResponseStalling) && + (peer.recvDisconnectionReason === undefined || peer.recvDisconnectionReason === DisconnectionReason.ResponseStalling); + const addresses = peer.addresses || []; + + let lastAddress; + if (peer.inbound) { + lastAddress = addresses.length > 0 ? addresses[0] : undefined; + } else { + lastAddress = peer.address; + } + + if (peer.nodePubKey && unintentionalDisconnect && (addresses.length || lastAddress)) { + this.logger.debug(`attempting to reconnect to a disconnected peer ${peer.nodePubKey}`); + const node = { lastAddress, addresses, nodePubKey: peer.nodePubKey }; + await this.tryConnectNode(node, true); + } + } + private closePeers = () => { this.peers.forEach(peer => peer.close(DisconnectionReason.Shutdown)); } diff --git a/lib/p2p/packets/Packet.ts b/lib/p2p/packets/Packet.ts index 119e38933..14291ef91 100644 --- a/lib/p2p/packets/Packet.ts +++ b/lib/p2p/packets/Packet.ts @@ -82,16 +82,17 @@ abstract class Packet implements PacketInterface { if (bodyOrPacket) { this.body = bodyOrPacket; - this.header.hash = this.hash(bodyOrPacket); + this.header.hash = Packet.hash(bodyOrPacket); } } } - public abstract serialize(): Uint8Array; - private hash(value: any): string { + private static hash(value: any): string { return MD5(stringify(value)).toString(CryptoJS.enc.Base64); } + public abstract serialize(): Uint8Array; + /** * Verify the header hash against the packet body. */ @@ -104,7 +105,7 @@ abstract class Packet implements PacketInterface { return false; } - return this.header.hash === this.hash(this.body); + return this.header.hash === Packet.hash(this.body); } /** diff --git a/test/integration/Pool.spec.ts b/test/integration/Pool.spec.ts index 920e7337b..924281863 100644 --- a/test/integration/Pool.spec.ts +++ b/test/integration/Pool.spec.ts @@ -8,6 +8,7 @@ import NodeKey from '../../lib/nodekey/NodeKey'; import Peer from '../../lib/p2p/Peer'; import { Address } from '../../lib/types/p2p'; import { DisconnectionReason } from '../../lib/types/enums'; +import sinon from 'sinon'; chai.use(chaiAsPromised); @@ -16,15 +17,23 @@ describe('P2P Pool Tests', async () => { let pool: Pool; let nodePubKeyOne: string; const loggers = Logger.createLoggers(Level.Warn); + const sandbox = sinon.createSandbox(); const createPeer = (nodePubKey: string, addresses: Address[]) => { - const peer = new Peer(loggers.p2p, addresses[0]); - peer['handshakeState'] = { + const peer = sandbox.createStubInstance(Peer) as any; + peer.address = addresses[0]; + peer.handshakeState = { addresses, nodePubKey, version: 'test', pairs: [], }; + peer.socket = {}; + peer.sendPacket = () => {}; + peer.close = () => { + peer.sentDisconnectionReason = DisconnectionReason.NotAcceptingConnections; + pool['handlePeerClose'](peer); + }; pool['bindPeer'](peer); return peer; @@ -35,6 +44,7 @@ describe('P2P Pool Tests', async () => { const config = new Config(); config.p2p.listen = false; + config.p2p.discover = false; db = new DB(loggers.db); await db.init(); @@ -71,19 +81,17 @@ describe('P2P Pool Tests', async () => { expect(nodeInstance!.addresses![0].host).to.equal(addresses[0].host); }); - it('should close a peer', async () => { - const closePromise = pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections); - expect(closePromise).to.be.fulfilled; - await closePromise; + it('should close a peer', () => { + pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections); expect(pool['peers'].has(nodePubKeyOne)).to.be.false; expect(pool['peers'].size).to.equal(0); }); it('should update a node on new handshake', async () => { - /* const addresses = [{ host: '86.75.30.9', port: 8885 }]; + const addresses = [{ host: '86.75.30.9', port: 8885 }]; const peer = createPeer(nodePubKeyOne, addresses); - await pool['handleOpen'](peer); + pool['handleOpen'](peer); const nodeInstance = await db.models.Node.find({ where: { @@ -94,10 +102,7 @@ describe('P2P Pool Tests', async () => { expect(nodeInstance!.addresses!).to.have.length(1); expect(nodeInstance!.addresses![0].host).to.equal(addresses[0].host); - const closePromise = pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections); - expect(closePromise).to.be.fulfilled; - await closePromise; - */ + pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections); }); after(async () => { diff --git a/test/p2p/sanity.spec.ts b/test/p2p/sanity.spec.ts index be5eda262..04ae5eafc 100644 --- a/test/p2p/sanity.spec.ts +++ b/test/p2p/sanity.spec.ts @@ -69,10 +69,10 @@ describe('P2P Sanity Tests', () => { .to.be.rejectedWith('already connected'); }); - it('should disconnect successfully', async () => { - await expect(nodeOne['pool']['closePeer'](nodeTwo.nodePubKey, DisconnectionReason.NotAcceptingConnections)).to.be.fulfilled; + it('should disconnect successfully', () => { + nodeOne['pool']['closePeer'](nodeTwo.nodePubKey, DisconnectionReason.NotAcceptingConnections); - const listPeersResult = await nodeOne.service.listPeers(); + const listPeersResult = nodeOne.service.listPeers(); expect(listPeersResult).to.be.empty; });