Skip to content

Commit

Permalink
refactor(core-p2p): transaction broadcaster (#3516)
Browse files Browse the repository at this point in the history
  • Loading branch information
rainydio authored Feb 20, 2020
1 parent acf3423 commit 8fc18c1
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"parser": "@typescript-eslint/parser",
"parserOptions": {
"project": "./tsconfig.json"
"project": "./tsconfig.eslint.json"
},
"plugins": ["@typescript-eslint", "jest", "prettier", "simple-import-sort"],
"extends": [
Expand Down
60 changes: 60 additions & 0 deletions __tests__/unit/core-p2p/transaction-broadcaster.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Container } from "@arkecosystem/core-kernel";
import { Interfaces } from "@arkecosystem/crypto";

import { TransactionBroadcaster } from "../../../packages/core-p2p/src/transaction-broadcaster";

describe("TransactionBroadcaster", () => {
const container = new Container.Container();

describe("broadcastTransactions", () => {
const logger = { warning: jest.fn(), debug: jest.fn() };
const configuration = { getRequired: jest.fn() };
const storage = { getPeers: jest.fn() };
const communicator = { postTransactions: jest.fn() };

beforeAll(() => {
container.unbindAll();
container.bind(Container.Identifiers.LogService).toConstantValue(logger);
container.bind(Container.Identifiers.PluginConfiguration).toConstantValue(configuration);
container.bind(Container.Identifiers.PeerStorage).toConstantValue(storage);
container.bind(Container.Identifiers.PeerCommunicator).toConstantValue(communicator);
});

beforeEach(() => {
logger.warning.mockReset();
logger.debug.mockReset();
configuration.getRequired.mockReset();
storage.getPeers.mockReset();
communicator.postTransactions.mockReset();
});

it("should warn when attempting to broadcast empty array", async () => {
const broadcaster = container.resolve(TransactionBroadcaster);

await broadcaster.broadcastTransactions([]);

expect(logger.warning).toBeCalledWith("Broadcasting 0 transactions");
expect(configuration.getRequired).not.toBeCalled();
expect(storage.getPeers).not.toBeCalled();
expect(communicator.postTransactions).not.toBeCalled();
});

it("should broadcast transaction to peers", async () => {
const peers = [{}, {}, {}];
configuration.getRequired.mockReturnValue(3);
storage.getPeers.mockReturnValue(peers);
const jsons = [{}];
const transactions: any[] = jsons.map(j => ({ toJson: jest.fn().mockReturnValue(j) }));

const broadcaster = container.resolve(TransactionBroadcaster);
await broadcaster.broadcastTransactions(transactions as Interfaces.ITransaction[]);

expect(configuration.getRequired).toBeCalledWith("maxPeersBroadcast");
expect(storage.getPeers).toBeCalled();
expect(logger.debug).toBeCalledWith("Broadcasting 1 transaction to 3 peers");
expect(transactions[0].toJson).toBeCalled();
expect(communicator.postTransactions).toBeCalledWith(peers[0], jsons);
expect(communicator.postTransactions).toBeCalledWith(peers[1], jsons);
});
});
});
54 changes: 54 additions & 0 deletions __tests__/unit/core-transaction-pool/processor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { Container } from "@arkecosystem/core-kernel";
import { Interfaces, Transactions } from "@arkecosystem/crypto";

import { Processor } from "../../../packages/core-transaction-pool/src/processor";

jest.mock("@arkecosystem/crypto");

describe("Processor", () => {
const container = new Container.Container();

describe("process", () => {
const logger = { warning: jest.fn() };
const pool = { addTransaction: jest.fn() };
const dynamicFeeMatcher = { canEnterPool: jest.fn(), canBroadcast: jest.fn() };
const transactionBroadcaster = { broadcastTransactions: jest.fn() };

beforeAll(() => {
container.unbindAll();
container.bind(Container.Identifiers.LogService).toConstantValue(logger);
container.bind(Container.Identifiers.TransactionPoolService).toConstantValue(pool);
container.bind(Container.Identifiers.TransactionPoolDynamicFeeMatcher).toConstantValue(dynamicFeeMatcher);
container.bind(Container.Identifiers.PeerTransactionBroadcaster).toConstantValue(transactionBroadcaster);
});

beforeEach(() => {
(Transactions.TransactionFactory.fromData as jest.Mock).mockReset();

logger.warning.mockReset();
pool.addTransaction.mockReset();
dynamicFeeMatcher.canEnterPool.mockReset();
dynamicFeeMatcher.canBroadcast.mockReset();
transactionBroadcaster.broadcastTransactions.mockReset();
});

it("should add eligible transactions to pool", async () => {
(Transactions.TransactionFactory.fromData as jest.Mock).mockImplementation(d => ({ id: d.id }));

dynamicFeeMatcher.canEnterPool.mockReturnValueOnce(true).mockReturnValueOnce(false);
const data: any[] = [{ id: "id_eligible" }, { id: "id_non_eligible" }];

const processor = container.resolve(Processor);
await processor.process(data as Interfaces.ITransactionData[]);

expect(processor.accept).toEqual(["id_eligible"]);
expect(processor.invalid).toEqual(["id_non_eligible"]);
expect(processor.errors).toEqual({
id_non_eligible: {
type: "ERR_LOW_FEE",
message: "Transaction id_non_eligible fee is to low to include in pool",
},
});
});
});
});
1 change: 1 addition & 0 deletions packages/core-kernel/src/contracts/p2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./peer-storage";
export * from "./peer-verifier";
export * from "./peer";
export * from "./server";
export * from "./transaction-broadcaster";
1 change: 0 additions & 1 deletion packages/core-kernel/src/contracts/p2p/network-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export interface NetworkMonitor {
checkNetworkHealth(): Promise<NetworkStatus>;
downloadBlocksFromHeight(fromBlockHeight: number, maxParallelDownloads?: number): Promise<Interfaces.IBlockData[]>;
broadcastBlock(block: Interfaces.IBlock): Promise<void>;
broadcastTransactions(transactions: Interfaces.ITransaction[]): Promise<void>;
getServer(): SocketCluster; // remove this
setServer(server: SocketCluster): void; // remove this
isColdStart(): boolean;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Interfaces } from "@arkecosystem/crypto";

export interface TransactionBroadcaster {
broadcastTransactions(transactions: Interfaces.ITransaction[]): Promise<void>;
}
1 change: 1 addition & 0 deletions packages/core-kernel/src/ioc/identifiers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export const Identifiers = {
PeerNetworkMonitor: Symbol.for("Peer<NetworkMonitor>"),
PeerProcessor: Symbol.for("Peer<Processor>"),
PeerStorage: Symbol.for("Peer<Storage>"),
PeerTransactionBroadcaster: Symbol.for("Peer<TransactionBroadcaster>"),
// Transaction Pool
TransactionPoolCleaner: Symbol.for("TransactionPool<Cleaner>"),
TransactionPoolMemory: Symbol.for("TransactionPool<Memory>"),
Expand Down
23 changes: 0 additions & 23 deletions packages/core-p2p/src/network-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -513,29 +513,6 @@ export class NetworkMonitor implements Contracts.P2P.NetworkMonitor {
await Promise.all(peers.map(peer => this.communicator.postBlock(peer, block.toJson())));
}

public async broadcastTransactions(transactions: Interfaces.ITransaction[]): Promise<any> {
const peers: Contracts.P2P.Peer[] = Utils.take(
Utils.shuffle(this.storage.getPeers()),
this.config.maxPeersBroadcast,
);

this.logger.debug(
`Broadcasting ${Utils.pluralize("transaction", transactions.length, true)} to ${Utils.pluralize(
"peer",
peers.length,
true,
)}`,
);

const transactionsBroadcast: Interfaces.ITransactionJson[] = transactions.map(transaction =>
transaction.toJson(),
);

return Promise.all(
peers.map((peer: Contracts.P2P.Peer) => this.communicator.postTransactions(peer, transactionsBroadcast)),
);
}

private async pingPeerPorts(initialRun?: boolean): Promise<void> {
let peers = this.storage.getPeers();
if (!initialRun) {
Expand Down
3 changes: 3 additions & 0 deletions packages/core-p2p/src/service-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { PeerProcessor } from "./peer-processor";
import { PeerStorage } from "./peer-storage";
import { startSocketServer } from "./socket-server";
import { payloadProcessor } from "./socket-server/payload-processor";
import { TransactionBroadcaster } from "./transaction-broadcaster";

export class ServiceProvider extends Providers.ServiceProvider {
public async register(): Promise<void> {
Expand Down Expand Up @@ -87,5 +88,7 @@ export class ServiceProvider extends Providers.ServiceProvider {
.bind("p2p.event-listener")
.to(EventListener)
.inSingletonScope();

this.app.bind(Container.Identifiers.PeerTransactionBroadcaster).to(TransactionBroadcaster);
}
}
39 changes: 39 additions & 0 deletions packages/core-p2p/src/transaction-broadcaster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Container, Contracts, Providers, Utils } from "@arkecosystem/core-kernel";
import { Interfaces } from "@arkecosystem/crypto";

import { PeerCommunicator } from "./peer-communicator";

@Container.injectable()
export class TransactionBroadcaster implements Contracts.P2P.TransactionBroadcaster {
@Container.inject(Container.Identifiers.LogService)
private readonly logger!: Contracts.Kernel.Logger;

@Container.inject(Container.Identifiers.PluginConfiguration)
@Container.tagged("plugin", "@arkecosystem/core-p2p")
private readonly configuration!: Providers.PluginConfiguration;

@Container.inject(Container.Identifiers.PeerStorage)
private readonly storage!: Contracts.P2P.PeerStorage;

@Container.inject(Container.Identifiers.PeerCommunicator)
private readonly communicator!: PeerCommunicator;

public async broadcastTransactions(transactions: Interfaces.ITransaction[]): Promise<void> {
if (transactions.length === 0) {
this.logger.warning("Broadcasting 0 transactions");
return;
}

const maxPeersBroadcast: number = this.configuration.getRequired<number>("maxPeersBroadcast");
const peers: Contracts.P2P.Peer[] = Utils.take(Utils.shuffle(this.storage.getPeers()), maxPeersBroadcast);

const transactionsStr = Utils.pluralize("transaction", transactions.length, true);
const peersStr = Utils.pluralize("peer", peers.length, true);
this.logger.debug(`Broadcasting ${transactionsStr} to ${peersStr}`);

const transactionsBroadcast: Interfaces.ITransactionJson[] = transactions.map(t => t.toJson());
const promises = peers.map(p => this.communicator.postTransactions(p, transactionsBroadcast));

await Promise.all(promises);
}
}
1 change: 0 additions & 1 deletion packages/core-transaction-pool/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ export class Connection implements Contracts.TransactionPool.Connection {
this.memory.remember(transaction);
this.storage.add(transaction);
} catch (error) {
this.logger.debug(error.stack);
throw new TransactionFailedToApplyError(transaction, error);
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core-transaction-pool/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class TransactionHasExpiredError extends Contracts.TransactionPool.PoolEr

export class TransactionFeeToLowError extends Contracts.TransactionPool.PoolError {
public constructor(transaction: Interfaces.ITransaction) {
super(`Transaction ${transaction.id} fee is to low include in pool`, "ERR_LOW_FEE", transaction);
super(`Transaction ${transaction.id} fee is to low to include in pool`, "ERR_LOW_FEE", transaction);
}
}

Expand Down
21 changes: 11 additions & 10 deletions packages/core-transaction-pool/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ export class Processor implements Contracts.TransactionPool.Processor {
public excess: string[] = [];
public errors?: { [id: string]: Contracts.TransactionPool.ProcessorError };

@Container.inject(Container.Identifiers.LogService)
private readonly logger!: Contracts.Kernel.Logger;

@Container.inject(Container.Identifiers.TransactionPoolService)
private readonly pool!: Contracts.TransactionPool.Connection;

@Container.inject(Container.Identifiers.PeerNetworkMonitor)
private readonly networkMonitor!: Contracts.P2P.NetworkMonitor;

@Container.inject(Container.Identifiers.TransactionPoolDynamicFeeMatcher)
private readonly dynamicFeeMatcher!: Contracts.TransactionPool.DynamicFeeMatcher;

@Container.inject(Container.Identifiers.LogService)
private readonly logger!: Contracts.Kernel.Logger;
@Container.inject(Container.Identifiers.PeerTransactionBroadcaster)
@Container.optional()
private readonly transactionBroadcaster!: Contracts.P2P.TransactionBroadcaster | undefined;

public async process(data: Interfaces.ITransactionData[]): Promise<void> {
const broadcastable: Interfaces.ITransaction[] = [];
const broadcastableTransactions: Interfaces.ITransaction[] = [];
const transactions = data.map(d => Transactions.TransactionFactory.fromData(d));

try {
Expand All @@ -36,7 +37,7 @@ export class Processor implements Contracts.TransactionPool.Processor {
await this.pool.addTransaction(transaction);
this.accept.push(transaction.id);
if (await this.dynamicFeeMatcher.canBroadcast(transaction)) {
broadcastable.push(transaction);
broadcastableTransactions.push(transaction);
}
} else {
throw new TransactionFeeToLowError(transaction);
Expand All @@ -62,9 +63,9 @@ export class Processor implements Contracts.TransactionPool.Processor {
}
}
} finally {
if (broadcastable.length !== 0) {
await this.networkMonitor.broadcastTransactions(broadcastable);
for (const transaction of broadcastable) {
if (this.transactionBroadcaster && broadcastableTransactions.length !== 0) {
await this.transactionBroadcaster.broadcastTransactions(broadcastableTransactions);
for (const transaction of broadcastableTransactions) {
AppUtils.assert.defined<string>(transaction.id);
this.broadcast.push(transaction.id);
}
Expand Down
4 changes: 4 additions & 0 deletions tsconfig.eslint.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": "./tsconfig.json",
"exclude": ["node_modules"]
}

0 comments on commit 8fc18c1

Please sign in to comment.