Skip to content

Commit b810928

Browse files
committed
Enabled timeouts for lock acquisition
1 parent 738c46f commit b810928

13 files changed

+594
-204
lines changed

package-lock.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
},
2121
"dependencies": {
2222
"@matrixai/resources": "^1.0.0",
23-
"async-mutex": "^0.3.2"
23+
"async-mutex": "^0.3.2",
24+
"ts-custom-error": "^3.2.0"
2425
},
2526
"devDependencies": {
2627
"@types/jest": "^27.0.2",

src/Lock.ts

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,64 @@
11
import type { MutexInterface } from 'async-mutex';
22
import type { ResourceAcquire } from '@matrixai/resources';
3-
import { Mutex } from 'async-mutex';
3+
import { Mutex, withTimeout } from 'async-mutex';
44
import { withF, withG } from '@matrixai/resources';
5+
import { yieldMicro } from './utils';
6+
import { ErrorAsyncLocksTimeout } from './errors';
57

68
class Lock {
7-
protected lock: Mutex = new Mutex();
8-
protected release: MutexInterface.Releaser;
9+
protected _lock: Mutex = new Mutex();
910
protected _count: number = 0;
1011

11-
public acquire: ResourceAcquire<Lock> = async () => {
12-
++this._count;
13-
this.release = await this.lock.acquire();
14-
return [
15-
async () => {
12+
public lock(timeout?: number): ResourceAcquire<Lock> {
13+
return async () => {
14+
++this._count;
15+
let lock: MutexInterface = this._lock;
16+
if (timeout != null) {
17+
lock = withTimeout(this._lock, timeout, new ErrorAsyncLocksTimeout());
18+
}
19+
let release: MutexInterface.Releaser;
20+
try {
21+
release = await lock.acquire();
22+
} catch (e) {
1623
--this._count;
17-
this.release();
18-
},
19-
this,
20-
];
21-
};
24+
throw e;
25+
}
26+
return [
27+
async () => {
28+
--this._count;
29+
release();
30+
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54
31+
await yieldMicro();
32+
},
33+
this,
34+
];
35+
};
36+
}
2237

2338
public get count(): number {
2439
return this._count;
2540
}
2641

2742
public isLocked(): boolean {
28-
return this.lock.isLocked();
43+
return this._lock.isLocked();
2944
}
3045

3146
public async waitForUnlock(): Promise<void> {
32-
return this.lock.waitForUnlock();
47+
return this._lock.waitForUnlock();
3348
}
3449

35-
public async withF<T>(f: (resources: [Lock]) => Promise<T>): Promise<T> {
36-
return withF([this.acquire], f);
50+
public async withF<T>(
51+
f: (resources: [Lock]) => Promise<T>,
52+
timeout?: number,
53+
): Promise<T> {
54+
return withF([this.lock(timeout)], f);
3755
}
3856

3957
public withG<T, TReturn, TNext>(
4058
g: (resources: [Lock]) => AsyncGenerator<T, TReturn, TNext>,
59+
timeout?: number,
4160
): AsyncGenerator<T, TReturn, TNext> {
42-
return withG([this.acquire], g);
61+
return withG([this.lock(timeout)], g);
4362
}
4463
}
4564

src/RWLockReader.ts

Lines changed: 64 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import type { MutexInterface } from 'async-mutex';
22
import type { ResourceAcquire } from '@matrixai/resources';
3-
import { Mutex } from 'async-mutex';
3+
import { Mutex, withTimeout } from 'async-mutex';
44
import { withF, withG } from '@matrixai/resources';
5+
import { yieldMicro } from './utils';
6+
import { ErrorAsyncLocksTimeout } from './errors';
57

68
/**
79
* Read-preferring read write lock
@@ -12,35 +14,62 @@ class RWLockReader {
1214
protected lock: Mutex = new Mutex();
1315
protected release: MutexInterface.Releaser;
1416

15-
public acquireRead: ResourceAcquire<RWLockReader> = async () => {
16-
const readerCount = ++this._readerCount;
17-
// The first reader locks
18-
if (readerCount === 1) {
19-
this.release = await this.lock.acquire();
20-
}
21-
return [
22-
async () => {
23-
const readerCount = --this._readerCount;
24-
// The last reader unlocks
25-
if (readerCount === 0) {
26-
this.release();
17+
public read(timeout?: number): ResourceAcquire<RWLockReader> {
18+
return async () => {
19+
const readerCount = ++this._readerCount;
20+
// The first reader locks
21+
if (readerCount === 1) {
22+
let lock: MutexInterface = this.lock;
23+
if (timeout != null) {
24+
lock = withTimeout(this.lock, timeout, new ErrorAsyncLocksTimeout());
2725
}
28-
},
29-
this,
30-
];
31-
};
26+
try {
27+
this.release = await lock.acquire();
28+
} catch (e) {
29+
--this._readerCount;
30+
throw e;
31+
}
32+
}
33+
return [
34+
async () => {
35+
const readerCount = --this._readerCount;
36+
// The last reader unlocks
37+
if (readerCount === 0) {
38+
this.release();
39+
}
40+
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54
41+
await yieldMicro();
42+
},
43+
this,
44+
];
45+
};
46+
}
3247

33-
public acquireWrite: ResourceAcquire<RWLockReader> = async () => {
34-
++this._writerCount;
35-
this.release = await this.lock.acquire();
36-
return [
37-
async () => {
48+
public write(timeout?: number): ResourceAcquire<RWLockReader> {
49+
return async () => {
50+
++this._writerCount;
51+
let lock: MutexInterface = this.lock;
52+
if (timeout != null) {
53+
lock = withTimeout(this.lock, timeout, new ErrorAsyncLocksTimeout());
54+
}
55+
let release: MutexInterface.Releaser;
56+
try {
57+
release = await lock.acquire();
58+
} catch (e) {
3859
--this._writerCount;
39-
this.release();
40-
},
41-
this,
42-
];
43-
};
60+
throw e;
61+
}
62+
return [
63+
async () => {
64+
release();
65+
--this._writerCount;
66+
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54
67+
await yieldMicro();
68+
},
69+
this,
70+
];
71+
};
72+
}
4473

4574
public get readerCount(): number {
4675
return this._readerCount;
@@ -60,26 +89,30 @@ class RWLockReader {
6089

6190
public async withReadF<T>(
6291
f: (resources: [RWLockReader]) => Promise<T>,
92+
timeout?: number,
6393
): Promise<T> {
64-
return withF([this.acquireRead], f);
94+
return withF([this.read(timeout)], f);
6595
}
6696

6797
public async withWriteF<T>(
6898
f: (resources: [RWLockReader]) => Promise<T>,
99+
timeout?: number,
69100
): Promise<T> {
70-
return withF([this.acquireWrite], f);
101+
return withF([this.write(timeout)], f);
71102
}
72103

73104
public withReadG<T, TReturn, TNext>(
74105
g: (resources: [RWLockReader]) => AsyncGenerator<T, TReturn, TNext>,
106+
timeout?: number,
75107
): AsyncGenerator<T, TReturn, TNext> {
76-
return withG([this.acquireRead], g);
108+
return withG([this.read(timeout)], g);
77109
}
78110

79111
public withWriteG<T, TReturn, TNext>(
80112
g: (resources: [RWLockReader]) => AsyncGenerator<T, TReturn, TNext>,
113+
timeout?: number,
81114
): AsyncGenerator<T, TReturn, TNext> {
82-
return withG([this.acquireWrite], g);
115+
return withG([this.write(timeout)], g);
83116
}
84117
}
85118

0 commit comments

Comments
 (0)