Skip to content

Commit

Permalink
refactor(core-transaction-pool): pool processor to be injected direct…
Browse files Browse the repository at this point in the history
…ly (#4278)
  • Loading branch information
air1one authored Jan 25, 2021
1 parent da71564 commit 147c4de
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 101 deletions.
2 changes: 2 additions & 0 deletions __tests__/unit/core-api/service-provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ beforeEach(() => {

app.bind(Container.Identifiers.TransactionPoolProcessorFactory).toConstantValue({});

app.bind(Container.Identifiers.TransactionPoolProcessor).toConstantValue({});

app.bind(Container.Identifiers.EventDispatcherService).toConstantValue({});

app.bind(Container.Identifiers.BlockHistoryService).toConstantValue({});
Expand Down
2 changes: 2 additions & 0 deletions __tests__/unit/core-magistrate-api/service-provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ beforeEach(() => {

app.bind(Container.Identifiers.TransactionPoolProcessorFactory).toConstantValue({});

app.bind(Container.Identifiers.TransactionPoolProcessor).toConstantValue({});

app.bind(Container.Identifiers.BlockHistoryService).toConstantValue({});

app.bind(Container.Identifiers.TransactionHistoryService).toConstantValue({});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ describe("TransactionsController", () => {
pingBlock: jest.fn(),
getLastDownloadedBlock: jest.fn(),
};
const processor = {
process: jest.fn().mockReturnValue({ accept: []}),
};
const createProcessor = jest.fn();
const appPlugins = [{ package: "@arkecosystem/core-api", options: {} }];
const coreApiServiceProvider = {
Expand Down Expand Up @@ -60,6 +63,7 @@ describe("TransactionsController", () => {
container.bind(Container.Identifiers.PeerStorage).toConstantValue(peerStorage);
container.bind(Container.Identifiers.DatabaseService).toConstantValue(database);
container.bind(Container.Identifiers.Application).toConstantValue(app);
container.bind(Container.Identifiers.TransactionPoolProcessor).toConstantValue(processor);
});

beforeEach(() => {
Expand All @@ -69,8 +73,7 @@ describe("TransactionsController", () => {
describe("postTransactions", () => {
it("should create transaction processor and use it to process the transactions", async () => {
const transactions = Networks.testnet.genesisBlock.transactions;
const processor = { process: jest.fn(), accept: [transactions[0].id] };
createProcessor.mockReturnValueOnce(processor);
processor.process.mockReturnValueOnce({ accept: [transactions[0].id] });

expect(await transactionsController.postTransactions({ payload: { transactions } }, {})).toEqual([
transactions[0].id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ const clear = () => {
describe("TransactionPoolProcessor", () => {
describe("default values", () => {
it("accept should be empty array", async () => {
expect(TransactionPoolProcessor.instance.accept).toEqual([]);
expect((await TransactionPoolProcessor.instance.process([])).accept).toEqual([]);
});

it("broadcast should be empty array", async () => {
expect(TransactionPoolProcessor.instance.broadcast).toEqual([]);
expect((await TransactionPoolProcessor.instance.process([])).broadcast).toEqual([]);
});

it("invalid should be empty array", async () => {
expect(TransactionPoolProcessor.instance.invalid).toEqual([]);
expect((await TransactionPoolProcessor.instance.process([])).invalid).toEqual([]);
});

it("excess should be empty array", async () => {
expect(TransactionPoolProcessor.instance.excess).toEqual([]);
expect((await TransactionPoolProcessor.instance.process([])).excess).toEqual([]);
});

it("errors should be empty array", async () => {
expect(TransactionPoolProcessor.instance.errors).toBeUndefined();
expect((await TransactionPoolProcessor.instance.process([])).errors).toBeUndefined();
});
});

Expand All @@ -50,23 +50,23 @@ describe("TransactionPoolProcessor", () => {
});

it("accept should be mocked value", async () => {
expect(TransactionPoolProcessor.instance.accept).toEqual(processorState.accept);
expect((await TransactionPoolProcessor.instance.process([])).accept).toEqual(processorState.accept);
});

it("broadcast should be mocked value", async () => {
expect(TransactionPoolProcessor.instance.broadcast).toEqual(processorState.broadcast);
expect((await TransactionPoolProcessor.instance.process([])).accept).toEqual(processorState.accept);
});

it("invalid should be mocked value", async () => {
expect(TransactionPoolProcessor.instance.invalid).toEqual(processorState.invalid);
expect((await TransactionPoolProcessor.instance.process([])).invalid).toEqual(processorState.invalid);
});

it("excess should be mocked value", async () => {
expect(TransactionPoolProcessor.instance.excess).toEqual(processorState.excess);
expect((await TransactionPoolProcessor.instance.process([])).excess).toEqual(processorState.excess);
});

it("errors should be mocked value", async () => {
expect(TransactionPoolProcessor.instance.errors).toEqual(processorState.errors);
expect((await TransactionPoolProcessor.instance.process([])).errors).toEqual(processorState.errors);
});
});

Expand Down
64 changes: 29 additions & 35 deletions __tests__/unit/core-transaction-pool/processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ describe("Processor.process", () => {
});

const processor = container.resolve(Processor);
await processor.process([transaction1.data, transaction2.data]);
const result = await processor.process([transaction1.data, transaction2.data]);

expect(pool.addTransaction).toBeCalledTimes(2);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(2);
expect(spyBroadcastTransactions).not.toBeCalled();

expect(processor.accept).toEqual([transaction1.id, transaction2.id]);
expect(processor.broadcast).toEqual([]);
expect(processor.invalid).toEqual([]);
expect(processor.excess).toEqual([]);
expect(processor.errors).toBeUndefined();
expect(result.accept).toEqual([transaction1.id, transaction2.id]);
expect(result.broadcast).toEqual([]);
expect(result.invalid).toEqual([]);
expect(result.excess).toEqual([]);
expect(result.errors).toBeUndefined();
});

it("should wrap deserialize errors into BAD_DATA pool error", async () => {
Expand All @@ -76,13 +76,13 @@ describe("Processor.process", () => {
workerPool.isTypeGroupSupported.mockReturnValueOnce(true);
workerPool.getTransactionFromData.mockRejectedValueOnce(new Error("Version 1 not supported"));

await processor.process([transaction1.data]);
const result = await processor.process([transaction1.data]);

expect(workerPool.isTypeGroupSupported).toBeCalledWith(transaction1.data.typeGroup);
expect(workerPool.getTransactionFromData).toBeCalledWith(transaction1.data);

expect(processor.invalid).toEqual([transaction1.id]);
expect(processor.errors).toEqual({
expect(result.invalid).toEqual([transaction1.id]);
expect(result.errors).toEqual({
[transaction1.data.id]: {
type: "ERR_BAD_DATA",
message: "Invalid transaction data: Version 1 not supported",
Expand All @@ -100,18 +100,18 @@ describe("Processor.process", () => {
});

const processor = container.resolve(Processor);
await processor.process([transaction1.data, transaction2.data]);
const result = await processor.process([transaction1.data, transaction2.data]);

expect(pool.addTransaction).toBeCalledTimes(2);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(1);
expect(spyBroadcastTransactions).toBeCalledTimes(1);

expect(processor.accept).toEqual([transaction1.id]);
expect(processor.broadcast).toEqual([transaction1.id]);
expect(processor.invalid).toEqual([transaction2.id]);
expect(processor.excess).toEqual([]);
expect(processor.errors[transaction2.id]).toBeTruthy();
expect(processor.errors[transaction2.id].type).toBe("ERR_LOW_FEE");
expect(result.accept).toEqual([transaction1.id]);
expect(result.broadcast).toEqual([transaction1.id]);
expect(result.invalid).toEqual([transaction2.id]);
expect(result.excess).toEqual([]);
expect(result.errors[transaction2.id]).toBeTruthy();
expect(result.errors[transaction2.id].type).toBe("ERR_LOW_FEE");
});

it("should add broadcast eligible transaction", async () => {
Expand All @@ -124,17 +124,17 @@ describe("Processor.process", () => {
});

const processor = container.resolve(Processor);
await processor.process([transaction1.data, transaction2.data]);
const result = await processor.process([transaction1.data, transaction2.data]);

expect(pool.addTransaction).toBeCalledTimes(2);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(2);
expect(spyBroadcastTransactions).toBeCalled();

expect(processor.accept).toEqual([transaction1.id, transaction2.id]);
expect(processor.broadcast).toEqual([transaction1.id]);
expect(processor.invalid).toEqual([]);
expect(processor.excess).toEqual([]);
expect(processor.errors).toEqual(undefined);
expect(result.accept).toEqual([transaction1.id, transaction2.id]);
expect(result.broadcast).toEqual([transaction1.id]);
expect(result.invalid).toEqual([]);
expect(result.excess).toEqual([]);
expect(result.errors).toEqual(undefined);
});

it("should rethrow unexpected error", async () => {
Expand All @@ -149,12 +149,6 @@ describe("Processor.process", () => {
expect(pool.addTransaction).toBeCalledTimes(1);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(0);
expect(spyBroadcastTransactions).not.toBeCalled();

expect(processor.accept).toEqual([]);
expect(processor.broadcast).toEqual([]);
expect(processor.invalid).toEqual([transaction1.id]);
expect(processor.excess).toEqual([]);
expect(processor.errors).toEqual(undefined);
});

it("should track excess transactions", async () => {
Expand All @@ -164,17 +158,17 @@ describe("Processor.process", () => {
pool.addTransaction.mockRejectedValueOnce(exceedsError);

const processor = container.resolve(Processor);
await processor.process([transaction1.data]);
const result = await processor.process([transaction1.data]);

expect(pool.addTransaction).toBeCalledTimes(1);
expect(dynamicFeeMatcher.throwIfCannotBroadcast).toBeCalledTimes(0);
expect(spyBroadcastTransactions).not.toBeCalled();

expect(processor.accept).toEqual([]);
expect(processor.broadcast).toEqual([]);
expect(processor.invalid).toEqual([transaction1.id]);
expect(processor.excess).toEqual([transaction1.id]);
expect(processor.errors[transaction1.id]).toBeTruthy();
expect(processor.errors[transaction1.id].type).toBe("ERR_EXCEEDS_MAX_COUNT");
expect(result.accept).toEqual([]);
expect(result.broadcast).toEqual([]);
expect(result.invalid).toEqual([transaction1.id]);
expect(result.excess).toEqual([transaction1.id]);
expect(result.errors[transaction1.id]).toBeTruthy();
expect(result.errors[transaction1.id].type).toBe("ERR_EXCEEDS_MAX_COUNT");
});
});
2 changes: 2 additions & 0 deletions __tests__/unit/core-webhooks/service-provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ beforeEach(() => {

app.bind(Container.Identifiers.TransactionPoolProcessorFactory).toConstantValue({});

app.bind(Container.Identifiers.TransactionPoolProcessor).toConstantValue({});

app.bind(Container.Identifiers.BlockHistoryService).toConstantValue({});

app.bind(Container.Identifiers.TransactionHistoryService).toConstantValue({});
Expand Down
17 changes: 8 additions & 9 deletions packages/core-api/src/controllers/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export class TransactionsController extends Controller {
@Container.inject(Container.Identifiers.BlockHistoryService)
private readonly blockHistoryService!: Contracts.Shared.BlockHistoryService;

@Container.inject(Container.Identifiers.TransactionPoolProcessorFactory)
private readonly createProcessor!: Contracts.TransactionPool.ProcessorFactory;
@Container.inject(Container.Identifiers.TransactionPoolProcessor)
private readonly processor!: Contracts.TransactionPool.Processor;

public async index(request: Hapi.Request, h: Hapi.ResponseToolkit) {
const criteria: Contracts.Shared.TransactionCriteria = request.query;
Expand Down Expand Up @@ -55,16 +55,15 @@ export class TransactionsController extends Controller {
}

public async store(request: Hapi.Request, h: Hapi.ResponseToolkit) {
const processor: Contracts.TransactionPool.Processor = this.createProcessor();
await processor.process(request.payload.transactions);
const result = await this.processor.process(request.payload.transactions);
return {
data: {
accept: processor.accept,
broadcast: processor.broadcast,
excess: processor.excess,
invalid: processor.invalid,
accept: result.accept,
broadcast: result.broadcast,
excess: result.excess,
invalid: result.invalid,
},
errors: processor.errors,
errors: result.errors,
};
}

Expand Down
14 changes: 7 additions & 7 deletions packages/core-kernel/src/contracts/transaction-pool/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ export type ProcessorError = {
};

export interface Processor {
accept: string[];
broadcast: string[];
excess: string[];
invalid: string[];
errors?: { [id: string]: ProcessorError };

process(data: Interfaces.ITransactionData[] | Buffer[]): Promise<void>;
process(data: Interfaces.ITransactionData[] | Buffer[]): Promise<{
accept: string[],
broadcast: string[],
invalid: string[],
excess: string[],
errors?: { [id: string]: ProcessorError },
}>;
}

export type ProcessorFactory = () => Processor;
11 changes: 5 additions & 6 deletions packages/core-p2p/src/socket-server/controllers/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import Hapi from "@hapi/hapi";
import { Controller } from "./controller";

export class TransactionsController extends Controller {
@Container.inject(Container.Identifiers.TransactionPoolProcessor)
private readonly processor!: Contracts.TransactionPool.Processor;

public async postTransactions(request: Hapi.Request, h: Hapi.ResponseToolkit): Promise<string[]> {
const createProcessor: Contracts.TransactionPool.ProcessorFactory = this.app.get(
Container.Identifiers.TransactionPoolProcessorFactory,
);
const processor: Contracts.TransactionPool.Processor = createProcessor();
await processor.process((request.payload as any).transactions as Buffer[]);
return processor.accept;
const result = await this.processor.process((request.payload as any).transactions as Buffer[]);
return result.accept;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,20 @@ export const setProcessorState = (state: any): void => {
};

class TransactionPoolProcessorMock implements Partial<Contracts.TransactionPool.Processor> {
public async process(data: Interfaces.ITransactionData[] | Buffer[]): Promise<void> {}

public get accept(): string[] {
return accept;
}

public get broadcast(): string[] {
return broadcast;
}

public get invalid(): string[] {
return invalid;
}

public get excess(): string[] {
return excess;
}

public get errors(): any {
return errors;
public async process(data: Interfaces.ITransactionData[] | Buffer[]): Promise<{
accept: string[],
broadcast: string[],
invalid: string[],
excess: string[],
errors?: { [id: string]: Contracts.TransactionPool.ProcessorError },
}> {
return {
accept,
broadcast,
invalid,
excess,
errors,
}
}
}

Expand Down
Loading

0 comments on commit 147c4de

Please sign in to comment.