Skip to content

Commit

Permalink
refactor(core-transaction-pool): better locking (#3802)
Browse files Browse the repository at this point in the history
  • Loading branch information
rainydio authored Jun 17, 2020
1 parent bb8cb12 commit f0acce8
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 248 deletions.
75 changes: 75 additions & 0 deletions __tests__/unit/core-kernel/utils/lock.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { Lock } from "../../../../packages/core-kernel/src/utils/lock";

describe("Lock", () => {
it("should run exclusive executions in series", async () => {
let resolve: () => void;
const promise = new Promise((r) => (resolve = r));

let executions = 0;
const fn = async () => {
executions++;
await promise;
return executions;
};

const lock = new Lock();
const promises = [lock.runExclusive(fn), lock.runExclusive(fn), lock.runExclusive(fn)];
resolve();

expect(await Promise.all(promises)).toEqual([1, 2, 3]);
});

it("should run non-exclusive executions in parallel", async () => {
let resolve: () => void;
const promise = new Promise((r) => (resolve = r));

let executions = 0;
const fn = async () => {
executions++;
await promise;
return executions;
};

const lock = new Lock();
const promises = [lock.runNonExclusive(fn), lock.runNonExclusive(fn), lock.runNonExclusive(fn)];
resolve();

expect(await Promise.all(promises)).toEqual([3, 3, 3]);
});

it("should run exclusive execution after non-exclusive had finished", async () => {
let resolve: () => void;
const promise = new Promise((r) => (resolve = r));

let executions = 0;
const fn = async () => {
executions++;
await promise;
return executions;
};

const lock = new Lock();
const promises = [lock.runNonExclusive(fn), lock.runNonExclusive(fn), lock.runExclusive(fn)];
resolve();

expect(await Promise.all(promises)).toEqual([2, 2, 3]);
});

it("should run non-exclusive execution after exclusive had finished", async () => {
let resolve: () => void;
const promise = new Promise((r) => (resolve = r));

let executions = 0;
const fn = async () => {
executions++;
await promise;
return executions;
};

const lock = new Lock();
const promises = [lock.runExclusive(fn), lock.runNonExclusive(fn), lock.runNonExclusive(fn)];
resolve();

expect(await Promise.all(promises)).toEqual([1, 3, 3]);
});
});
38 changes: 21 additions & 17 deletions __tests__/unit/core-transaction-pool/mempool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ beforeEach(() => {
describe("Mempool.getSize", () => {
it("should return sum of transaction counts of sender states", async () => {
createSenderMempool
.mockReturnValueOnce({ addTransaction: jest.fn(), getSize: () => 10, isEmpty: () => false })
.mockReturnValueOnce({ addTransaction: jest.fn(), getSize: () => 20, isEmpty: () => false });
.mockReturnValueOnce({ addTransaction: jest.fn(), getSize: () => 10, isDisposable: () => false })
.mockReturnValueOnce({ addTransaction: jest.fn(), getSize: () => 20, isDisposable: () => false });

const transaction1 = {
data: { senderPublicKey: Identities.PublicKey.fromPassphrase("sender1") },
Expand All @@ -39,7 +39,7 @@ describe("Mempool.getSize", () => {

describe("Mempool.hasSenderMempool", () => {
it("should return true if sender's transaction was added previously", async () => {
createSenderMempool.mockReturnValueOnce({ addTransaction: jest.fn(), isEmpty: () => false });
createSenderMempool.mockReturnValueOnce({ addTransaction: jest.fn(), isDisposable: () => false });

const transaction = {
data: { senderPublicKey: Identities.PublicKey.fromPassphrase("sender's key") },
Expand All @@ -53,7 +53,7 @@ describe("Mempool.hasSenderMempool", () => {
});

it("should return false if sender's transaction wasn't added previously", async () => {
createSenderMempool.mockReturnValueOnce({ addTransaction: jest.fn(), isEmpty: () => false });
createSenderMempool.mockReturnValueOnce({ addTransaction: jest.fn(), isDisposable: () => false });

const transaction = {
data: { senderPublicKey: Identities.PublicKey.fromPassphrase("sender's key") },
Expand All @@ -69,7 +69,7 @@ describe("Mempool.hasSenderMempool", () => {

describe("Mempool.getSenderMempool", () => {
it("should return sender state if sender's transaction was added previously", async () => {
const expectedSenderMempool = { addTransaction: jest.fn(), isEmpty: () => false };
const expectedSenderMempool = { addTransaction: jest.fn(), isDisposable: () => false };
createSenderMempool.mockReturnValueOnce(expectedSenderMempool);

const transaction = {
Expand All @@ -84,7 +84,7 @@ describe("Mempool.getSenderMempool", () => {
});

it("should throw if sender's transaction wasn't added previously", async () => {
createSenderMempool.mockReturnValueOnce({ addTransaction: jest.fn(), isEmpty: () => false });
createSenderMempool.mockReturnValueOnce({ addTransaction: jest.fn(), isDisposable: () => false });

const transaction = {
data: { senderPublicKey: Identities.PublicKey.fromPassphrase("sender's key") },
Expand All @@ -100,8 +100,8 @@ describe("Mempool.getSenderMempool", () => {

describe("Mempool.getSenderMempools", () => {
it("should return all sender states", async () => {
const SenderMempool1 = { addTransaction: jest.fn(), isEmpty: () => false };
const SenderMempool2 = { addTransaction: jest.fn(), isEmpty: () => false };
const SenderMempool1 = { addTransaction: jest.fn(), isDisposable: () => false };
const SenderMempool2 = { addTransaction: jest.fn(), isDisposable: () => false };
createSenderMempool.mockReturnValueOnce(SenderMempool1).mockReturnValueOnce(SenderMempool2);

const transaction1 = {
Expand All @@ -122,7 +122,7 @@ describe("Mempool.getSenderMempools", () => {

describe("Mempool.addTransaction", () => {
it("should add transaction to sender state", async () => {
const SenderMempool = { addTransaction: jest.fn(), isEmpty: () => false };
const SenderMempool = { addTransaction: jest.fn(), isDisposable: () => false };
createSenderMempool.mockReturnValueOnce(SenderMempool);

const transaction = {
Expand All @@ -137,7 +137,7 @@ describe("Mempool.addTransaction", () => {

it("should forget sender state if it's empty even if error was thrown", async () => {
const error = new Error("Something went horribly wrong");
const SenderMempool = { addTransaction: jest.fn(), isEmpty: () => true };
const SenderMempool = { addTransaction: jest.fn(), isDisposable: () => true };
SenderMempool.addTransaction.mockRejectedValueOnce(error);
createSenderMempool.mockReturnValueOnce(SenderMempool);

Expand Down Expand Up @@ -173,7 +173,7 @@ describe("Mempool.removeTransaction", () => {
const SenderMempool = {
addTransaction: jest.fn(),
removeTransaction: jest.fn(() => expectedRemovedTransactions),
isEmpty: () => false,
isDisposable: () => false,
};
createSenderMempool.mockReturnValueOnce(SenderMempool);

Expand All @@ -188,9 +188,9 @@ describe("Mempool.removeTransaction", () => {

it("should forget sender state if it's empty even if error was thrown", async () => {
const error = new Error("Something went horribly wrong");
const SenderMempool = { addTransaction: jest.fn(), removeTransaction: jest.fn(), isEmpty: jest.fn() };
const SenderMempool = { addTransaction: jest.fn(), removeTransaction: jest.fn(), isDisposable: jest.fn() };
SenderMempool.removeTransaction.mockRejectedValueOnce(error);
SenderMempool.isEmpty.mockReturnValueOnce(false).mockReturnValueOnce(true);
SenderMempool.isDisposable.mockReturnValueOnce(false).mockReturnValueOnce(true);
createSenderMempool.mockReturnValueOnce(SenderMempool);

const transaction = {
Expand Down Expand Up @@ -226,7 +226,7 @@ describe("Mempool.acceptForgedTransaction", () => {
const SenderMempool = {
addTransaction: jest.fn(),
acceptForgedTransaction: jest.fn(() => expectedRemovedTransactions),
isEmpty: () => false,
isDisposable: () => false,
};
createSenderMempool.mockReturnValueOnce(SenderMempool);

Expand All @@ -241,9 +241,13 @@ describe("Mempool.acceptForgedTransaction", () => {

it("should forget sender state if it's empty even if error was thrown", async () => {
const error = new Error("Something went horribly wrong");
const SenderMempool = { addTransaction: jest.fn(), acceptForgedTransaction: jest.fn(), isEmpty: jest.fn() };
const SenderMempool = {
addTransaction: jest.fn(),
acceptForgedTransaction: jest.fn(),
isDisposable: jest.fn(),
};
SenderMempool.acceptForgedTransaction.mockRejectedValueOnce(error);
SenderMempool.isEmpty.mockReturnValueOnce(false).mockReturnValueOnce(true);
SenderMempool.isDisposable.mockReturnValueOnce(false).mockReturnValueOnce(true);
createSenderMempool.mockReturnValueOnce(SenderMempool);

const transaction = {
Expand All @@ -262,7 +266,7 @@ describe("Mempool.acceptForgedTransaction", () => {

describe("Mempool.flush", () => {
it("should remove all sender states", async () => {
const SenderMempool = { addTransaction: jest.fn(), isEmpty: () => false };
const SenderMempool = { addTransaction: jest.fn(), isDisposable: () => false };
createSenderMempool.mockReturnValueOnce(SenderMempool);

const transaction = {
Expand Down
8 changes: 4 additions & 4 deletions __tests__/unit/core-transaction-pool/sender-mempool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ const transaction3 = Transactions.BuilderFactory.transfer()
.sign("sender's secret")
.build();

describe("SenderMempool.isEmpty", () => {
it("should return true intially", () => {
describe("SenderMempool.isDisposable", () => {
it("should return true initially", () => {
const senderMempool = container.resolve(SenderMempool);
const empty = senderMempool.isEmpty();
const empty = senderMempool.isDisposable();

expect(empty).toBe(true);
});
Expand All @@ -56,7 +56,7 @@ describe("SenderMempool.isEmpty", () => {

const senderMempool = container.resolve(SenderMempool);
await senderMempool.addTransaction(transaction1);
const empty = senderMempool.isEmpty();
const empty = senderMempool.isDisposable();

expect(empty).toBe(false);
});
Expand Down
62 changes: 59 additions & 3 deletions __tests__/unit/core-transaction-pool/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,41 +329,97 @@ describe("Service.readdTransactions", () => {
const service = container.resolve(Service);
await service.readdTransactions();

expect(mempool.addTransaction).toBeCalledTimes(3);
expect(mempool.addTransaction).toBeCalledWith(transaction1);
expect(mempool.addTransaction).toBeCalledWith(transaction2);
expect(mempool.addTransaction).toBeCalledWith(transaction3);

expect(storage.removeTransaction).toBeCalledTimes(0);
});

it("should remove transaction from storage that failed adding to mempool", async () => {
it("should remove transaction from storage that cannot be added to mempool", async () => {
storage.getAllTransactions.mockReturnValueOnce([transaction1]);
mempool.addTransaction.mockRejectedValueOnce(new Error("Something wrong"));

const service = container.resolve(Service);
await service.readdTransactions();

expect(mempool.addTransaction).toBeCalledTimes(1);
expect(mempool.addTransaction).toBeCalledWith(transaction1);

expect(storage.removeTransaction).toBeCalledTimes(1);
expect(storage.removeTransaction).toBeCalledWith(transaction1.id);
});

it("should first add transactions passed in argument", async () => {
it("should remove all transactions from storage that cannot be added to mempool", async () => {
storage.getAllTransactions.mockReturnValueOnce([transaction1, transaction2]);
mempool.addTransaction.mockRejectedValueOnce(new Error("Something wrong"));
mempool.addTransaction.mockRejectedValueOnce(new Error("Something wrong"));

const service = container.resolve(Service);
await service.readdTransactions();

expect(mempool.addTransaction).toBeCalledTimes(2);
expect(mempool.addTransaction).toBeCalledWith(transaction1);
expect(mempool.addTransaction).toBeCalledWith(transaction2);

expect(storage.removeTransaction).toBeCalledTimes(2);
expect(storage.removeTransaction).toBeCalledWith(transaction1.id);
expect(storage.removeTransaction).toBeCalledWith(transaction2.id);
});

it("should add preceding transactions first", async () => {
storage.getAllTransactions.mockReturnValueOnce([transaction3]);

const service = container.resolve(Service);
await service.readdTransactions([transaction1, transaction2]);

expect(mempool.addTransaction).toBeCalledTimes(3);
expect(mempool.addTransaction).toBeCalledWith(transaction1);
expect(mempool.addTransaction).toBeCalledWith(transaction2);
expect(mempool.addTransaction).toBeCalledWith(transaction3);

expect(storage.addTransaction).toBeCalledTimes(2);
expect(storage.addTransaction).toBeCalledWith(transaction1.id, transaction1.serialized);
expect(storage.addTransaction).toBeCalledWith(transaction2.id, transaction2.serialized);

expect(storage.removeTransaction).toBeCalledTimes(0);
});

it("should ignore error when adding preceding transactions", async () => {
storage.getAllTransactions.mockReturnValueOnce([transaction3]);
mempool.addTransaction.mockRejectedValueOnce(new Error("Something wrong"));

const service = container.resolve(Service);
await service.readdTransactions([transaction1, transaction2]);

expect(mempool.addTransaction).toBeCalledTimes(3);
expect(mempool.addTransaction).toBeCalledWith(transaction1);
expect(mempool.addTransaction).toBeCalledWith(transaction2);
expect(mempool.addTransaction).toBeCalledWith(transaction3);

expect(storage.addTransaction).toBeCalledTimes(1);
expect(storage.addTransaction).toBeCalledWith(transaction2.id, transaction2.serialized);

expect(storage.removeTransaction).toBeCalledTimes(0);
});

it("should ignore errors when adding transaction that was passed in argument", async () => {
it("should ignore all errors when adding preceding transactions", async () => {
storage.getAllTransactions.mockReturnValueOnce([transaction3]);
mempool.addTransaction.mockRejectedValueOnce(new Error("Something wrong"));
mempool.addTransaction.mockRejectedValueOnce(new Error("Something wrong"));

const service = container.resolve(Service);
await service.readdTransactions([transaction1, transaction2]);

expect(mempool.addTransaction).toBeCalledTimes(3);
expect(mempool.addTransaction).toBeCalledWith(transaction1);
expect(mempool.addTransaction).toBeCalledWith(transaction2);
expect(mempool.addTransaction).toBeCalledWith(transaction3);

expect(storage.addTransaction).toBeCalledTimes(0);

expect(storage.removeTransaction).toBeCalledTimes(0);
});
});

Expand Down
35 changes: 1 addition & 34 deletions __tests__/unit/core-transaction-pool/utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createLock, IteratorMany } from "../../../packages/core-transaction-pool/src/utils";
import { IteratorMany } from "../../../packages/core-transaction-pool/src/utils";

describe("IteratorMany", () => {
it("should choose next item based on comparator", () => {
Expand Down Expand Up @@ -30,36 +30,3 @@ describe("IteratorMany", () => {
expect(iteratorMany.next()).toStrictEqual({ done: true, value: undefined });
});
});

describe("createLock", () => {
it("should synchronize parallel async executions", async () => {
let counter = 0;
const lock = createLock();

const promise1 = lock(async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
return ++counter;
});
const promise2 = lock(async () => {
return ++counter;
});
const results = await Promise.all([promise1, promise2]);

expect(results).toStrictEqual([1, 2]);
});

it("should synchronize parallel async executions even if error was thrown", async () => {
let counter = 0;
const lock = createLock();

const promise1 = lock(async () => {
await new Promise((_, reject) => setTimeout(() => reject(++counter), 100));
}).catch((value) => value);
const promise2 = lock(async () => {
return ++counter;
});
const results = await Promise.all([promise1, promise2]);

expect(results).toStrictEqual([1, 2]);
});
});
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Interfaces } from "@arkecosystem/crypto";

export interface SenderMempool {
isEmpty(): boolean;
isDisposable(): boolean;
getSize(): number;

getFromEarliest(): Iterable<Interfaces.ITransaction>;
Expand Down
1 change: 1 addition & 0 deletions packages/core-kernel/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export * from "./expiration-calculator";
export * from "./assert";
export * from "./ipc-handler";
export * from "./ipc-subprocess";
export * from "./lock";

export const delegateCalculator = { calculateApproval, calculateForgedTotal };
export const expirationCalculator = { calculateTransactionExpiration, calculateLockExpirationStatus };
Expand Down
Loading

0 comments on commit f0acce8

Please sign in to comment.