Skip to content
This repository was archived by the owner on Jan 19, 2021. It is now read-only.

Commit 9b5bbf8

Browse files
add lock
1 parent 49beeca commit 9b5bbf8

File tree

3 files changed

+44
-24
lines changed

3 files changed

+44
-24
lines changed

src/baseTrie.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ export class Trie {
255255
const childRef = child[1] as Buffer
256256
const childKey = key.concat(keyExtension)
257257
const priority = childKey.length
258-
taskExecutor.execute(priority, async (taskCallback: Function) => {
258+
await taskExecutor.execute(priority, async (taskCallback: Function) => {
259259
const childNode = await self._lookupNode(childRef)
260260
taskCallback()
261261
if (childNode) {
@@ -275,7 +275,7 @@ export class Trie {
275275
const childKey = key.slice()
276276
childKey.push(childIndex)
277277
const priority = childKey.length
278-
taskExecutor.execute(priority, async (taskCallback: Function) => {
278+
await taskExecutor.execute(priority, async (taskCallback: Function) => {
279279
const childNode = await self._lookupNode(childRef)
280280
taskCallback()
281281
if (childNode) {

src/prioritizedTaskExecutor.ts

+31-11
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import Semaphore from 'semaphore-async-await'
2+
13
interface Task {
24
priority: number
35
fn: Function
@@ -10,6 +12,8 @@ export class PrioritizedTaskExecutor {
1012
private currentPoolSize: number
1113
/** The task queue */
1214
private queue: Task[]
15+
/** The Lock */
16+
private lock: Semaphore
1317

1418
/**
1519
* Executes tasks up to maxPoolSize at a time, other items are put in a priority queue.
@@ -21,6 +25,7 @@ export class PrioritizedTaskExecutor {
2125
this.maxPoolSize = maxPoolSize
2226
this.currentPoolSize = 0
2327
this.queue = []
28+
this.lock = new Semaphore(1)
2429
}
2530

2631
/**
@@ -29,17 +34,22 @@ export class PrioritizedTaskExecutor {
2934
* @param priority The priority of the task
3035
* @param fn The function that accepts the callback, which must be called upon the task completion.
3136
*/
32-
execute(priority: number, fn: Function) {
33-
if (this.currentPoolSize < this.maxPoolSize) {
34-
this.currentPoolSize++
35-
fn(() => {
36-
this.currentPoolSize--
37-
if (this.queue.length > 0) {
38-
const item = this.queue.shift()
39-
this.execute(item!.priority, item!.fn)
37+
async execute(priority: number, fn: Function) {
38+
let self = this
39+
function runTask() {
40+
self.currentPoolSize++
41+
fn(async () => {
42+
self.currentPoolSize--
43+
if (self.queue.length > 0) {
44+
const item = self.queue.shift()
45+
await self.execute(item!.priority, item!.fn)
4046
}
4147
})
48+
}
49+
if (this.currentPoolSize < this.maxPoolSize) {
50+
runTask()
4251
} else {
52+
await this.lock.wait()
4353
if (this.queue.length == 0) {
4454
this.queue.push({ priority, fn })
4555
} else {
@@ -50,15 +60,24 @@ export class PrioritizedTaskExecutor {
5060
return Math.floor(left + (right - left) / 2)
5161
}
5262
while (true) {
63+
// note that there is a special case: it could be that during sorting, a Task is finished (reducing currentPoolSize by 1), but this Task was not yet inserted
64+
// therefore, if we want to insert the item we explicitly check that we indeed should Queue it, if not, we execute it and do not insert it.
5365
let index = mid()
5466
let value = this.queue[index].priority
55-
console.log(left, right, index, value)
5667
if (value == priority) {
57-
this.queue.splice(index, 0, { priority, fn })
68+
if (this.currentPoolSize < this.maxPoolSize) {
69+
runTask()
70+
} else {
71+
this.queue.splice(index, 0, { priority, fn })
72+
}
5873
break
5974
}
6075
if (left == right) {
61-
this.queue.splice(left, 0, { priority, fn })
76+
if (this.currentPoolSize < this.maxPoolSize) {
77+
runTask()
78+
} else {
79+
this.queue.splice(left, 0, { priority, fn })
80+
}
6281
break
6382
}
6483

@@ -69,6 +88,7 @@ export class PrioritizedTaskExecutor {
6988
}
7089
}
7190
}
91+
this.lock.signal()
7292
}
7393
}
7494

test/prioritizedTaskExecutor.spec.ts

+11-11
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,33 @@ import * as tape from 'tape'
22
import { PrioritizedTaskExecutor } from '../src/prioritizedTaskExecutor'
33

44
tape('prioritized task executor test', function (t: any) {
5-
t.test('should execute tasks in the right order', (st: any) => {
5+
t.test('should execute tasks in the right order', async (st: any) => {
66
const taskExecutor = new PrioritizedTaskExecutor(2)
77
const tasks = [1, 2, 3, 4]
88
const callbacks = [] as any
99
const executionOrder = [] as any
10-
tasks.forEach(function (task) {
11-
taskExecutor.execute(task, function (cb: Function) {
10+
for (let task of tasks) {
11+
await taskExecutor.execute(task, function (cb: Function) {
1212
executionOrder.push(task)
1313
callbacks.push(cb)
1414
})
15-
})
15+
}
1616

17-
callbacks.forEach(function (callback: Function) {
18-
callback()
19-
})
17+
for (let callback of callbacks) {
18+
await callback()
19+
}
2020

2121
const expectedExecutionOrder = [1, 2, 4, 3]
2222
st.deepEqual(executionOrder, expectedExecutionOrder)
2323
st.end()
2424
})
2525

26-
t.test('should queue tasks in the right order', (st: any) => {
26+
t.test('should queue tasks in the right order', async (st: any) => {
2727
const priorityList = [0, 1, 0, 2, 0, 1, 0, 2, 2, 1]
2828
const PTE = new PrioritizedTaskExecutor(0) // this ensures that no task actually gets executed, so this essentially just checks the sort algorithm
29-
priorityList.map((priority) => {
30-
PTE.execute(priority, () => {})
31-
})
29+
for (let priority of priorityList) {
30+
await PTE.execute(priority, () => {})
31+
}
3232
// have to cast the PTE as <any> to access the private queue
3333
st.deepEqual(
3434
(<any>PTE).queue.map((task: any) => task.priority),

0 commit comments

Comments
 (0)