Skip to content

Commit

Permalink
feat(core-transaction-pool): processor extension point (#4522)
Browse files Browse the repository at this point in the history
  • Loading branch information
rainydio authored Oct 5, 2021
1 parent f85b927 commit 8e9474e
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Container } from "@packages/core-kernel";
import { ProcessorDynamicFeeExtension } from "@packages/core-transaction-pool/src/processor-dynamic-fee-extension";
import { Identities, Managers, Transactions } from "@packages/crypto";

Managers.configManager.getMilestone().aip11 = true;
const transaction1 = Transactions.BuilderFactory.transfer()
.version(2)
.amount("100")
.recipientId(Identities.Address.fromPassphrase("recipient's secret"))
.nonce("1")
.sign("sender's secret")
.build();

const dynamicFeeMatcher = {
throwIfCannotBroadcast: jest.fn(),
};

const container = new Container.Container();
container.bind(Container.Identifiers.TransactionPoolDynamicFeeMatcher).toConstantValue(dynamicFeeMatcher);

describe("ProcessorDynamicFeeExtension.throwIfCannotBroadcast", () => {
it("should call dynamicFeeMatcher.throwIfCannotBroadcast", async () => {
const processorDynamicFeeExtension = container.resolve(ProcessorDynamicFeeExtension);
await processorDynamicFeeExtension.throwIfCannotBroadcast(transaction1);

expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(1);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toHaveBeenCalledWith(transaction1);
});
});
24 changes: 15 additions & 9 deletions __tests__/unit/core-transaction-pool/processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const transaction2 = Transactions.BuilderFactory.transfer()
transaction2.data.typeGroup = undefined;

const pool = { addTransaction: jest.fn() };
const dynamicFeeMatcher = { throwIfCannotBroadcast: jest.fn() };
const extensions = [{ throwIfCannotBroadcast: jest.fn() }, { throwIfCannotBroadcast: jest.fn() }];
const spyBroadcastTransactions = jest.fn();
const transactionBroadcaster = {
broadcastTransactions: () => {
Expand All @@ -34,8 +34,9 @@ const workerPool = { isTypeGroupSupported: jest.fn(), getTransactionFromData: je
const logger = { error: jest.fn() };

const container = new Container.Container();
container.bind(Container.Identifiers.TransactionPoolProcessorExtension).toConstantValue(extensions[0]);
container.bind(Container.Identifiers.TransactionPoolProcessorExtension).toConstantValue(extensions[1]);
container.bind(Container.Identifiers.TransactionPoolService).toConstantValue(pool);
container.bind(Container.Identifiers.TransactionPoolDynamicFeeMatcher).toConstantValue(dynamicFeeMatcher);
container.bind(Container.Identifiers.PeerTransactionBroadcaster).toConstantValue(transactionBroadcaster);
container.bind(Container.Identifiers.TransactionPoolWorkerPool).toConstantValue(workerPool);
container.bind(Container.Identifiers.LogService).toConstantValue(logger);
Expand All @@ -49,7 +50,7 @@ describe("Processor.process", () => {
workerPool.isTypeGroupSupported.mockReturnValue(true);
workerPool.getTransactionFromData.mockResolvedValueOnce(transaction1).mockResolvedValueOnce(transaction2);

dynamicFeeMatcher.throwIfCannotBroadcast
extensions[0].throwIfCannotBroadcast
.mockImplementationOnce(async (transaction) => {
throw new TransactionFeeToLowError(transaction);
})
Expand All @@ -61,7 +62,8 @@ describe("Processor.process", () => {
const result = await processor.process([transaction1.data, transaction2.data]);

expect(pool.addTransaction).toBeCalledTimes(2);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(2);
expect(extensions[0].throwIfCannotBroadcast).toBeCalledTimes(2);
expect(extensions[1].throwIfCannotBroadcast).toBeCalledTimes(2);
expect(spyBroadcastTransactions).not.toBeCalled();

expect(result.accept).toEqual([transaction1.id, transaction2.id]);
Expand Down Expand Up @@ -104,7 +106,8 @@ describe("Processor.process", () => {
const result = await processor.process([transaction1.data, transaction2.data]);

expect(pool.addTransaction).toBeCalledTimes(2);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(1);
expect(extensions[0].throwIfCannotBroadcast).toBeCalledTimes(1);
expect(extensions[1].throwIfCannotBroadcast).toBeCalledTimes(1);
expect(spyBroadcastTransactions).toBeCalledTimes(1);

expect(result.accept).toEqual([transaction1.id]);
Expand All @@ -118,7 +121,7 @@ describe("Processor.process", () => {
it("should add broadcast eligible transaction", async () => {
workerPool.isTypeGroupSupported.mockReturnValue(false);

dynamicFeeMatcher.throwIfCannotBroadcast
extensions[0].throwIfCannotBroadcast
.mockImplementationOnce(async (transaction) => {})
.mockImplementationOnce(async (transaction) => {
throw new TransactionFeeToLowError(transaction);
Expand All @@ -128,7 +131,8 @@ describe("Processor.process", () => {
const result = await processor.process([transaction1.data, transaction2.data]);

expect(pool.addTransaction).toBeCalledTimes(2);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(2);
expect(extensions[0].throwIfCannotBroadcast).toBeCalledTimes(2);
expect(extensions[1].throwIfCannotBroadcast).toBeCalledTimes(2);
expect(spyBroadcastTransactions).toBeCalled();

expect(result.accept).toEqual([transaction1.id, transaction2.id]);
Expand All @@ -148,7 +152,8 @@ describe("Processor.process", () => {
await expect(promise).rejects.toThrow();

expect(pool.addTransaction).toBeCalledTimes(1);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(0);
expect(extensions[0].throwIfCannotBroadcast).toBeCalledTimes(0);
expect(extensions[1].throwIfCannotBroadcast).toBeCalledTimes(0);
expect(spyBroadcastTransactions).not.toBeCalled();
});

Expand All @@ -162,7 +167,8 @@ describe("Processor.process", () => {
const result = await processor.process([transaction1.data]);

expect(pool.addTransaction).toBeCalledTimes(1);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(0);
expect(extensions[0].throwIfCannotBroadcast).toBeCalledTimes(0);
expect(extensions[1].throwIfCannotBroadcast).toBeCalledTimes(0);
expect(spyBroadcastTransactions).not.toBeCalled();

expect(result.accept).toEqual([]);
Expand Down
24 changes: 17 additions & 7 deletions packages/core-kernel/src/contracts/transaction-pool/processor.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
import { Interfaces } from "@arkecosystem/crypto";
import { injectable } from "../../ioc";

export type ProcessorError = {
type: string;
message: string;
};

export type ProcessorResult = {
accept: string[];
broadcast: string[];
invalid: string[];
excess: string[];
errors?: { [id: string]: ProcessorError };
};

@injectable()
export abstract class ProcessorExtension {
public async throwIfCannotBroadcast(transaction: Interfaces.ITransaction): Promise<void> {
// override me
}
}

export interface Processor {
process(data: Interfaces.ITransactionData[] | Buffer[]): Promise<{
accept: string[],
broadcast: string[],
invalid: string[],
excess: string[],
errors?: { [id: string]: ProcessorError },
}>;
process(data: Interfaces.ITransactionData[] | Buffer[]): Promise<ProcessorResult>;
}

export type ProcessorFactory = () => Processor;
1 change: 1 addition & 0 deletions packages/core-kernel/src/ioc/identifiers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export const Identifiers = {
TransactionPoolCollator: Symbol.for("TransactionPool<Collator>"),
TransactionPoolQuery: Symbol.for("TransactionPool<Query>"),
TransactionPoolDynamicFeeMatcher: Symbol.for("TransactionPool<DynamicFeeMatcher>"),
TransactionPoolProcessorExtension: Symbol.for("TransactionPool<ProcessorExtension>"),
TransactionPoolProcessor: Symbol.for("TransactionPool<Processor>"),
TransactionPoolProcessorFactory: Symbol.for("TransactionPool<ProcessorFactory>"),
TransactionPoolSenderMempool: Symbol.for("TransactionPool<SenderMempool>"),
Expand Down
16 changes: 11 additions & 5 deletions packages/core-transaction-pool/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
export * from "./service-provider";
export * from "./service";
export * from "./storage";
export * from "./collator";
export * from "./mempool";
export * from "./actions";
export * from "./collator";
export * from "./dynamic-fee-matcher";
export * from "./errors";
export * from "./expiration-service";
export * from "./mempool";
export * from "./processor-dynamic-fee-extension";
export * from "./processor";
export * from "./query";
export * from "./sender-mempool";
export * from "./sender-state";
export * from "./service-provider";
export * from "./service";
export * from "./storage";
export * from "./utils";
export * from "./worker-pool";
export * from "./worker";
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Container, Contracts } from "@arkecosystem/core-kernel";
import { Interfaces } from "@arkecosystem/crypto";

@Container.injectable()
export class ProcessorDynamicFeeExtension extends Contracts.TransactionPool.ProcessorExtension {
@Container.inject(Container.Identifiers.TransactionPoolDynamicFeeMatcher)
private readonly dynamicFeeMatcher!: Contracts.TransactionPool.DynamicFeeMatcher;

public async throwIfCannotBroadcast(transaction: Interfaces.ITransaction): Promise<void> {
await this.dynamicFeeMatcher.throwIfCannotBroadcast(transaction);
}
}
17 changes: 6 additions & 11 deletions packages/core-transaction-pool/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import { InvalidTransactionDataError } from "./errors";

@Container.injectable()
export class Processor implements Contracts.TransactionPool.Processor {
@Container.multiInject(Container.Identifiers.TransactionPoolProcessorExtension)
@Container.optional()
private readonly extensions: Contracts.TransactionPool.ProcessorExtension[] = [];

@Container.inject(Container.Identifiers.TransactionPoolService)
private readonly pool!: Contracts.TransactionPool.Service;

@Container.inject(Container.Identifiers.TransactionPoolDynamicFeeMatcher)
private readonly dynamicFeeMatcher!: Contracts.TransactionPool.DynamicFeeMatcher;

@Container.inject(Container.Identifiers.TransactionPoolWorkerPool)
private readonly workerPool!: Contracts.TransactionPool.WorkerPool;

Expand All @@ -24,13 +25,7 @@ export class Processor implements Contracts.TransactionPool.Processor {

public async process(
data: Interfaces.ITransactionData[] | Buffer[],
): Promise<{
accept: string[];
broadcast: string[];
invalid: string[];
excess: string[];
errors?: { [id: string]: Contracts.TransactionPool.ProcessorError };
}> {
): Promise<Contracts.TransactionPool.ProcessorResult> {
const accept: string[] = [];
const broadcast: string[] = [];
const invalid: string[] = [];
Expand All @@ -53,7 +48,7 @@ export class Processor implements Contracts.TransactionPool.Processor {
accept.push(entryId);

try {
await this.dynamicFeeMatcher.throwIfCannotBroadcast(transaction);
await Promise.all(this.extensions.map((e) => e.throwIfCannotBroadcast(transaction)));
broadcastTransactions.push(transaction);
broadcast.push(entryId);
} catch {}
Expand Down
3 changes: 3 additions & 0 deletions packages/core-transaction-pool/src/service-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Service } from "./service";
import { Storage } from "./storage";
import { Worker } from "./worker";
import { WorkerPool } from "./worker-pool";
import { ProcessorDynamicFeeExtension } from "./processor-dynamic-fee-extension";

/**
* @export
Expand Down Expand Up @@ -123,6 +124,8 @@ export class ServiceProvider extends Providers.ServiceProvider {
return new AppUtils.IpcSubprocess<Contracts.TransactionPool.WorkerScriptHandler>(subprocess);
};
});

this.app.bind(Container.Identifiers.TransactionPoolProcessorExtension).to(ProcessorDynamicFeeExtension);
}

private registerActions(): void {
Expand Down

0 comments on commit 8e9474e

Please sign in to comment.