Skip to content

Commit 7d38702

Browse files
committed
✨ implemented
1 parent 003b448 commit 7d38702

File tree

11 files changed

+425
-0
lines changed

11 files changed

+425
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
# async-lib
2+
23
便利な非同期関数置き場

delay.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export const delay = (milliseconds: number): Promise<void> =>
2+
new Promise((resolve) => setTimeout(resolve, milliseconds));

deps_test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "https://deno.land/std@0.129.0/testing/asserts.ts";

mod.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export * from "./promisify.ts";
2+
export * from "./pool.ts";
3+
export * from "./sort.ts";
4+
export * from "./types.ts";

pool.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { delay } from "./delay.ts";
2+
import { pool } from "./pool.ts";
3+
import { assert, assertEquals, assertStringIncludes } from "./deps_test.ts";
4+
5+
Deno.test("pool()", async (t) => {
6+
await t.step("order", async () => {
7+
const start = new Date();
8+
const results = pool(
9+
2,
10+
[1, 2, 3],
11+
(i, id) => delay(1000).then(() => [i, id]),
12+
);
13+
const list = await Promise.all(results);
14+
15+
const diff = new Date().getTime() - start.getTime();
16+
assert(diff >= 2000);
17+
assert(diff < 3000);
18+
19+
assertEquals(
20+
list.flatMap((result) => result.success ? [result.value] : []),
21+
[
22+
[1, 0],
23+
[2, 1],
24+
[3, 0],
25+
],
26+
);
27+
});
28+
29+
await t.step("error", async () => {
30+
async function mapNumber(n: number): Promise<number> {
31+
if (n <= 2) throw new Error(`Bad number: ${n}`);
32+
await delay(100);
33+
return n;
34+
}
35+
const mappedNumbers: number[] = [];
36+
const errors: unknown[] = [];
37+
for (const promise of pool(3, [1, 2, 3, 4], mapNumber)) {
38+
const m = await promise;
39+
if (m.success) {
40+
mappedNumbers.push(m.value);
41+
} else {
42+
errors.push(m.reason);
43+
}
44+
}
45+
46+
assertEquals(errors.length, 2);
47+
assert(errors[0] instanceof Error);
48+
assert(errors[1] instanceof Error);
49+
assertStringIncludes(errors[0].stack ?? "", "Error: Bad number: 1");
50+
assertStringIncludes(errors[1].stack ?? "", "Error: Bad number: 2");
51+
assertEquals(mappedNumbers, [3, 4]);
52+
});
53+
});

pool.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import type { Result } from "./types.ts";
2+
3+
export function* pool<T, U, E = unknown>(
4+
threshold: number,
5+
args: Iterable<T>,
6+
func: (value: T, id: number) => Promise<U>,
7+
): Generator<Promise<Result<U, E>>, void, unknown> {
8+
let running = 0;
9+
const waitings = [] as ((value: number | PromiseLike<number>) => void)[];
10+
const waitForReady = async () => {
11+
running++;
12+
if (running <= threshold) return running - 1;
13+
return await new Promise<number>(
14+
(resolve) => waitings.push(resolve),
15+
);
16+
};
17+
for (const arg of args) {
18+
yield (async () => {
19+
const id = await waitForReady();
20+
try {
21+
return {
22+
success: true,
23+
value: await func(arg, id),
24+
};
25+
} catch (e: unknown) {
26+
return {
27+
success: false,
28+
reason: e as E,
29+
};
30+
} finally {
31+
running--;
32+
waitings.shift()?.(id);
33+
}
34+
})();
35+
}
36+
// 実行しきれない可能性はある?
37+
}

promisify.test.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { promisify } from "./promisify.ts";
2+
import { assert } from "./deps_test.ts";
3+
4+
Deno.test("promisify()", async (t) => {
5+
await t.step("keep all results", async () => {
6+
const [get, resolve] = promisify<number>();
7+
8+
const pending = get();
9+
resolve(0);
10+
resolve(1);
11+
assert(await pending === 0);
12+
assert(await get() === 1);
13+
resolve(2);
14+
assert(await get() === 2);
15+
resolve(3);
16+
resolve(4);
17+
assert(await get() === 3);
18+
assert(await get() === 4);
19+
resolve(5);
20+
resolve(6);
21+
resolve(7);
22+
assert(await get() === 5);
23+
resolve(8);
24+
assert(await get() === 6);
25+
assert(await get() === 7);
26+
assert(await get() === 8);
27+
});
28+
29+
await t.step("keep nothing", async () => {
30+
const [get, resolve] = promisify<number>({ maxQueued: 0 });
31+
32+
let pending = get();
33+
resolve(1);
34+
assert(await pending === 1);
35+
resolve(2);
36+
resolve(3);
37+
pending = get();
38+
resolve(4);
39+
assert(await pending === 4);
40+
pending = get();
41+
resolve(5);
42+
resolve(6);
43+
assert(await pending === 5);
44+
});
45+
46+
await t.step("keep the latest result", async () => {
47+
const [get, resolve] = promisify<number>({ maxQueued: 1 });
48+
49+
let pending = get();
50+
resolve(1);
51+
assert(await pending === 1);
52+
resolve(2);
53+
resolve(3);
54+
assert(await get() === 3);
55+
resolve(4);
56+
resolve(5);
57+
resolve(6);
58+
assert(await get() === 6);
59+
pending = get();
60+
resolve(7);
61+
resolve(8);
62+
resolve(9);
63+
assert(await pending === 7);
64+
assert(await get() === 9);
65+
});
66+
67+
await t.step("keep the latest and second latest results", async () => {
68+
const [get, resolve] = promisify<number>({ maxQueued: 2 });
69+
70+
let pending = get();
71+
resolve(1);
72+
assert(await pending === 1);
73+
resolve(2);
74+
resolve(3);
75+
assert(await get() === 2);
76+
assert(await get() === 3);
77+
resolve(4);
78+
resolve(5);
79+
resolve(6);
80+
assert(await get() === 5);
81+
resolve(7);
82+
resolve(8);
83+
assert(await get() === 7);
84+
assert(await get() === 8);
85+
pending = get();
86+
resolve(9);
87+
resolve(10);
88+
resolve(11);
89+
resolve(12);
90+
assert(await pending === 9);
91+
assert(await get() === 11);
92+
assert(await get() === 12);
93+
});
94+
});

promisify.ts

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import type { Result } from "./types.ts";
2+
3+
export interface PromisifyOptions {
4+
/** callbackから次の値を取り出すまでに実行されたcallbackの結果を最新でいくつまで保持するかを表す値
5+
*
6+
* - 最後に実行された値だけを残したいときは`1`を指定する
7+
* - 一つも残したくないときは`0`を指定する
8+
* - 全部残したいときはundefinedを指定する
9+
* - これが既定の動作
10+
*
11+
* @default undefined
12+
*/
13+
maxQueued?: number | undefined;
14+
}
15+
/** callbackをPromiseに変換するやつ
16+
*
17+
* @return 左から順に、Promiseを返すやつ、正常値を受け取るcallback、異常値を受け取るcallback
18+
*/
19+
export function promisify<T, E = unknown>(
20+
options?: PromisifyOptions,
21+
): readonly [
22+
() => Promise<T>,
23+
(value: T) => void,
24+
(reason: E) => void,
25+
] {
26+
const maxQueued = options?.maxQueued === undefined
27+
? undefined
28+
: Math.max(0, options.maxQueued);
29+
if (maxQueued === 0) return promisifyWithoutQueue();
30+
31+
const queue = [] as Result<T, E>[];
32+
let _resolve: ((value: T) => void) | undefined;
33+
let _reject: ((value: E) => void) | undefined;
34+
35+
/** queueから一つ取り出す。空なら_resolveをセットする */
36+
const waitForSettled = async () => {
37+
if (maxQueued !== undefined) queue.splice(0, queue.length - maxQueued);
38+
const value = queue.shift();
39+
if (value) {
40+
if (value.success) return value.value;
41+
throw value.reason;
42+
}
43+
44+
return await new Promise<T>(
45+
(res, rej) => {
46+
_resolve = res;
47+
_reject = rej;
48+
},
49+
);
50+
};
51+
const resolve = (value: T) => {
52+
if (!_resolve) {
53+
queue.push({ success: true, value });
54+
return;
55+
}
56+
_resolve(value);
57+
_resolve = _reject = undefined;
58+
};
59+
const reject = (value: E) => {
60+
if (!_reject) {
61+
queue.push({ success: false, reason: value });
62+
return;
63+
}
64+
_reject(value);
65+
_resolve = _reject = undefined;
66+
};
67+
68+
return [waitForSettled, resolve, reject] as const;
69+
}
70+
71+
function promisifyWithoutQueue<T, E = unknown>(): readonly [
72+
() => Promise<T>,
73+
(value: T) => void,
74+
(reason: E) => void,
75+
] {
76+
let _resolve: ((value: T) => void) | undefined;
77+
let _reject: ((value: E) => void) | undefined;
78+
79+
const waitForSettled = () =>
80+
new Promise<T>(
81+
(res, rej) => {
82+
_resolve = res;
83+
_reject = rej;
84+
},
85+
);
86+
const resolve = (value: T) => _resolve?.(value);
87+
const reject = (value: E) => _reject?.(value);
88+
89+
return [waitForSettled, resolve, reject] as const;
90+
}

sort.test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import { delay } from "./delay.ts";
2+
import type { Result } from "./types.ts";
3+
import { sort, sortSettled } from "./sort.ts";
4+
import { assertEquals } from "./deps_test.ts";
5+
6+
Deno.test("sort()", async (t) => {
7+
await t.step("return in order of settled", async () => {
8+
const results: number[] = [];
9+
10+
for await (
11+
const waited of sort(
12+
[1000, 2000, 500, 1500].map((n, index) => delay(n).then(() => index)),
13+
)
14+
) {
15+
results.push(waited);
16+
}
17+
18+
assertEquals(results, [2, 0, 3, 1]);
19+
});
20+
21+
await t.step("ignore errors", async () => {
22+
const results: number[] = [];
23+
24+
for await (
25+
const waited of sort(
26+
[1000, 2000, 1500, 500].map(async (n, index) => {
27+
if (index < 2) throw Error(`Error: ${index}`);
28+
await delay(n);
29+
return index;
30+
}),
31+
)
32+
) {
33+
results.push(waited);
34+
}
35+
36+
assertEquals(results, [3, 2]);
37+
});
38+
});
39+
40+
Deno.test("sortSettled()", async (t) => {
41+
await t.step("return in order of settled", async () => {
42+
const results: Result<number>[] = [];
43+
44+
for await (
45+
const waited of sortSettled(
46+
[1000, 2000, 500, 1500].map((n, index) => delay(n).then(() => index)),
47+
)
48+
) {
49+
results.push(waited);
50+
}
51+
52+
assertEquals(results, [
53+
{ success: true, value: 2 },
54+
{ success: true, value: 0 },
55+
{ success: true, value: 3 },
56+
{ success: true, value: 1 },
57+
]);
58+
});
59+
60+
await t.step("catch errors", async () => {
61+
const results: Result<number>[] = [];
62+
63+
for await (
64+
const waited of sortSettled(
65+
[1000, 2000, 1500, 500].map(async (n, index) => {
66+
if (index < 2) throw `Error: ${index}`;
67+
await delay(n);
68+
return index;
69+
}),
70+
)
71+
) {
72+
results.push(waited);
73+
}
74+
75+
assertEquals(results, [
76+
{ success: false, reason: "Error: 0" },
77+
{ success: false, reason: "Error: 1" },
78+
{ success: true, value: 3 },
79+
{ success: true, value: 2 },
80+
]);
81+
});
82+
});

0 commit comments

Comments
 (0)