Skip to content

Commit 437134b

Browse files
author
Balassa Márton
committed
Initial commit
0 parents  commit 437134b

File tree

10 files changed

+613
-0
lines changed

10 files changed

+613
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
node_modules
2+
typings

.vscode/launch.json

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
{
2+
"version": "0.2.0",
3+
"configurations": [
4+
{
5+
"name": "Launch",
6+
"type": "node",
7+
"request": "launch",
8+
"program": "${workspaceRoot}/app.js",
9+
"stopOnEntry": false,
10+
"args": [],
11+
"cwd": "${workspaceRoot}",
12+
"preLaunchTask": null,
13+
"runtimeExecutable": null,
14+
"runtimeArgs": [
15+
"--nolazy"
16+
],
17+
"env": {
18+
"NODE_ENV": "development"
19+
},
20+
"console": "internalConsole",
21+
"sourceMaps": false,
22+
"outDir": null
23+
},
24+
{
25+
"name": "Attach",
26+
"type": "node",
27+
"request": "attach",
28+
"port": 5858,
29+
"address": "localhost",
30+
"restart": false,
31+
"sourceMaps": false,
32+
"outDir": null,
33+
"localRoot": "${workspaceRoot}",
34+
"remoteRoot": null
35+
},
36+
{
37+
"name": "Attach to Process",
38+
"type": "node",
39+
"request": "attach",
40+
"processId": "${command.PickProcess}",
41+
"port": 5858,
42+
"sourceMaps": false,
43+
"outDir": null
44+
},
45+
{
46+
"name": "Run mocha",
47+
"type": "node",
48+
"request": "launch",
49+
"program": "${workspaceRoot}/node_modules/mocha/bin/_mocha",
50+
"stopOnEntry": false,
51+
"args": ["test/**/*.js", "--no-timeouts"],
52+
"cwd": "${workspaceRoot}",
53+
"runtimeExecutable": null,
54+
"env": { "NODE_ENV": "testing"}
55+
}
56+
]
57+
}

.vscode/tasks.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
// See https://go.microsoft.com/fwlink/?LinkId=733558
3+
// for the documentation about the tasks.json format
4+
"version": "0.1.0",
5+
"command": "tsc",
6+
"isShellCommand": true,
7+
"args": ["-p", "."],
8+
"showOutput": "silent",
9+
"problemMatcher": "$tsc"
10+
}

package.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"name": "sequential-task-queue",
3+
"version": "1.0.0",
4+
"description": "Sequential task queue for node and the browser",
5+
"author": {
6+
"name": "BalassaMarton"
7+
},
8+
"devDependencies": {
9+
"mocha": "^3.0.2",
10+
"sinon": "^1.17.5"
11+
}
12+
}

src/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.js

src/sequential-task-queue.ts

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
export interface ITaskQueueOptions {
2+
name?: string;
3+
errorHandler?: (e: any) => any;
4+
timeout?: number;
5+
}
6+
7+
export interface ICancellationToken {
8+
cancelled?: boolean;
9+
reason?: string;
10+
}
11+
12+
export var cancellationTokenReasons = {
13+
timeout: "timeout",
14+
cancel: "cancel"
15+
}
16+
17+
export class TimeoutError extends Error {
18+
}
19+
20+
export class SequentialTaskQueue {
21+
22+
private _errorHandler: (e: any) => void;
23+
private _queue: ITaskEntry[] = [];
24+
private _isClosed: boolean = false;
25+
private _waiters: Function[] = [];
26+
private _timeout: number;
27+
private _currentTask: ITaskEntry;
28+
29+
static defaultTimeout = 0;
30+
31+
name: string;
32+
33+
get isClosed() {
34+
return this._isClosed;
35+
}
36+
37+
constructor(options?: ITaskQueueOptions) {
38+
if (!options)
39+
options = {};
40+
if (options.timeout === undefined || options.timeout === null)
41+
options.timeout = SequentialTaskQueue.defaultTimeout;
42+
this._timeout = options.timeout;
43+
this.name = options.name;
44+
this._errorHandler = options.errorHandler;
45+
}
46+
47+
push(task: Function, timeout?: number): ICancellationToken {
48+
if (this._isClosed)
49+
throw new Error(`${this.name || "SequentialTaskQueue"} has been previously closed`);
50+
var entry: ITaskEntry = {
51+
fn: task,
52+
timeout: timeout === undefined ? this._timeout : timeout,
53+
cancellationToken: new CancellationToken()
54+
};
55+
this._queue.push(entry);
56+
this.schedule(() => this.next());
57+
return entry.cancellationToken;
58+
}
59+
60+
cancel(): PromiseLike<any> {
61+
if (this._currentTask) {
62+
if (this._currentTask.timeoutHandle)
63+
clearTimeout(this._currentTask.timeoutHandle);
64+
this._currentTask.cancellationToken.cancel(cancellationTokenReasons.cancel);
65+
}
66+
this._queue.splice(0);
67+
return this.wait();
68+
}
69+
70+
close(cancel?: boolean): PromiseLike<any> {
71+
if (this._isClosed)
72+
return this.wait();
73+
this._isClosed = true;
74+
if (cancel)
75+
return this.cancel();
76+
return this.wait();
77+
}
78+
79+
wait(): PromiseLike<any> {
80+
if (!this._currentTask && this._queue.length === 0)
81+
return Promise.resolve();
82+
return new Promise(resolve => {
83+
this._waiters.push(resolve);
84+
});
85+
}
86+
87+
onError(handler: (e: any) => void) {
88+
this._errorHandler = handler;
89+
}
90+
91+
private next() {
92+
if (!this._currentTask) {
93+
let task = this._currentTask = this._queue.shift();
94+
if (task) {
95+
try {
96+
if (task.cancellationToken.cancelled) {
97+
this.schedule(() => this.next());
98+
return;
99+
}
100+
if (task.timeout) {
101+
task.timeoutHandle = setTimeout(() => {
102+
task.timeoutHandle = undefined;
103+
task.cancellationToken.cancel(cancellationTokenReasons.timeout);
104+
this.handleError(new TimeoutError());
105+
if (this._currentTask === task)
106+
this.doneTask();
107+
}, task.timeout);
108+
}
109+
let res = task.fn(task.cancellationToken);
110+
if (res && isPromise(res)) {
111+
res.then(() => {
112+
if (this._currentTask === task)
113+
this.doneTask();
114+
},
115+
err => {
116+
this.handleError(err);
117+
if (this._currentTask === task)
118+
this.doneTask();
119+
});
120+
} else
121+
this.doneTask();
122+
123+
} catch (e) {
124+
this.handleError(e);
125+
this.doneTask();
126+
}
127+
} else {
128+
// queue is empty, call waiters
129+
this.callWaiters();
130+
}
131+
}
132+
}
133+
134+
private doneTask() {
135+
if (this._currentTask.timeoutHandle)
136+
clearTimeout(this._currentTask.timeoutHandle);
137+
this._currentTask = undefined;
138+
if (!this._queue.length)
139+
this.callWaiters();
140+
else
141+
this.schedule(() => this.next());
142+
}
143+
144+
private callWaiters() {
145+
let waiters = this._waiters.splice(0);
146+
waiters.forEach(waiter => waiter());
147+
}
148+
149+
private schedule(fn: Function) {
150+
setTimeout(fn, 0);
151+
}
152+
153+
private handleError(e: any) {
154+
try {
155+
if (typeof this._errorHandler === "function")
156+
this._errorHandler(e);
157+
} catch (ie) {
158+
// suppress errors thrown in error handler
159+
}
160+
}
161+
}
162+
163+
class CancellationToken implements ICancellationToken {
164+
cancelled: boolean = false;
165+
reason: string = null;
166+
167+
cancel(reason: string) {
168+
this.cancelled = true;
169+
this.reason = reason;
170+
}
171+
}
172+
173+
interface ITaskEntry {
174+
fn: Function;
175+
timeout: number;
176+
timeoutHandle?: any;
177+
cancellationToken?: CancellationToken;
178+
}
179+
180+
function isPromise(obj: any): obj is PromiseLike<any> {
181+
return (obj && typeof obj.then === "function");
182+
}
183+
184+
185+
186+
187+
188+
189+
190+
191+
192+
193+
194+
195+
196+
197+

test/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.js

0 commit comments

Comments
 (0)