diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 127de24..52ce5f3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -4,8 +4,6 @@ env: DENO_VERSION: 1.x on: - schedule: - - cron: "0 7 * * 0" push: branches: - main @@ -36,8 +34,23 @@ jobs: deno-version: ${{ env.DENO_VERSION }} - name: Test run: | - deno task test + deno task test:coverage timeout-minutes: 5 - - name: JSR publish (dry-run) + - run: | + deno task coverage --lcov > coverage.lcov + - uses: codecov/codecov-action@v4 + with: + os: ${{ runner.os }} + files: ./coverage.lcov + token: ${{ secrets.CODECOV_TOKEN }} + + jsr-publish: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: denoland/setup-deno@v1 + with: + deno-version: ${{ env.DENO_VERSION }} + - name: Publish (dry-run) run: | deno publish --dry-run diff --git a/.gitmessage b/.gitmessage new file mode 100644 index 0000000..71ddb20 --- /dev/null +++ b/.gitmessage @@ -0,0 +1,21 @@ + +# **Conventional Commits** +# +# [optional scope]: +# +# feat: feature (minor) +# deps: dependencies (minor/patch) +# fix: bug fix (patch) +# refactor: refactoring code +# test: test fix; no code change +# docs: documentation fix; no code change +# style: formatting, missing semi colons, etc; no code change +# chore: updating build tasks, package manager configs, etc; no code change +# +# **Install** +# +# git config commit.template .gitmessage +# +# **Reference** +# +# - https://www.conventionalcommits.org/en/v1.0.0/ diff --git a/LICENSE b/LICENSE index 5bee15b..c4b69db 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright 2021 Alisue +Copyright 2024 jsr-core Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 647f2db..bcbf1b5 100644 --- a/README.md +++ b/README.md @@ -1,62 +1,45 @@ -# async +# asyncutil -[![jsr](https://img.shields.io/jsr/v/%40lambdalisue/async?logo=javascript&logoColor=white)](https://jsr.io/@lambdalisue/async) -[![denoland](https://img.shields.io/github/v/release/lambdalisue/deno-async?logo=deno&label=denoland)](https://github.com/lambdalisue/deno-async/releases) -[![deno doc](https://doc.deno.land/badge.svg)](https://doc.deno.land/https/deno.land/x/async/mod.ts) -[![Test](https://github.com/lambdalisue/deno-async/workflows/Test/badge.svg)](https://github.com/lambdalisue/deno-async/actions?query=workflow%3ATest) +[![Test](https://github.com/jsr-core/asyncutil/actions/workflows/test.yml/badge.svg)](https://github.com/jsr-core/asyncutil/actions/workflows/test.yml) -Asynchronous primitive modules for [Deno][deno]. - -[python's asyncio]: https://docs.python.org/3/library/asyncio.html -[deno]: https://deno.land/ +Asynchronous primitive utility pack. ## Usage -### Barrier +### AsyncValue -`Barrier` is a synchronization primitive that allows multiple tasks to wait -until all of them have reached a certain point of execution before continuing. +`AsyncValue` is a class that wraps a value and allows it to be set +asynchronously. ```ts -import { Barrier } from "https://deno.land/x/async@$MODULE_VERSION/barrier.ts"; - -const barrier = new Barrier(3); - -async function worker(id: number) { - console.log(`worker ${id} is waiting`); - await barrier.wait(); - console.log(`worker ${id} is done`); -} +import { assertEquals } from "@std/assert"; +import { AsyncValue } from "@core/asyncutil/async-value"; -worker(1); -worker(2); -worker(3); +const v = new AsyncValue(0); +assertEquals(await v.get(), 0); +await v.set(1); +assertEquals(await v.get(), 1); ``` -### WaitGroup +### Barrier -`WaitGroup` is a synchronization primitive that enables promises to coordinate -and synchronize their execution. It is particularly useful in scenarios where a -specific number of tasks must complete before the program can proceed. +`Barrier` is a synchronization primitive that allows multiple tasks to wait +until all of them have reached a certain point of execution before continuing. ```ts -import { delay } from "https://deno.land/std@0.211.0/async/delay.ts"; -import { WaitGroup } from "https://deno.land/x/async@$MODULE_VERSION/wait_group.ts"; +import { Barrier } from "@core/asyncutil/barrier"; -const wg = new WaitGroup(); +const barrier = new Barrier(3); async function worker(id: number) { - wg.add(1); console.log(`worker ${id} is waiting`); - await delay(100); + await barrier.wait(); console.log(`worker ${id} is done`); - wg.done(); } worker(1); worker(2); worker(3); -await wg.wait(); ``` ### Lock/RwLock @@ -65,8 +48,8 @@ await wg.wait(); shared value. ```ts -import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts"; -import { Lock } from "https://deno.land/x/async@$MODULE_VERSION/lock.ts"; +import { AsyncValue } from "@core/asyncutil/async-value"; +import { Lock } from "@core/asyncutil/lock"; // Critical section const count = new Lock(new AsyncValue(0)); @@ -82,8 +65,8 @@ as long as there are no writers holding the lock. Writers block all other readers and writers until the write operation completes. ```ts -import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts"; -import { RwLock } from "https://deno.land/x/async@$MODULE_VERSION/rw_lock.ts"; +import { AsyncValue } from "@core/asyncutil/async-value"; +import { RwLock } from "@core/asyncutil/rw-lock"; const count = new RwLock(new AsyncValue(0)); @@ -117,8 +100,8 @@ This is a low-level primitive. Use `Lock` instead of `Mutex` if you need to access a shared value concurrently. ```ts -import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts"; -import { Mutex } from "https://deno.land/x/async@$MODULE_VERSION/mutex.ts"; +import { AsyncValue } from "@core/asyncutil/async-value"; +import { Mutex } from "@core/asyncutil/mutex"; const count = new AsyncValue(0); @@ -127,13 +110,12 @@ async function doSomething() { await count.set(v + 1); } -// Critical section const mu = new Mutex(); -await mu.acquire(); -try { + +// Critical section +{ + using _lock = await mu.acquire(); await doSomething(); -} finally { - mu.release(); } ``` @@ -143,9 +125,9 @@ try { notification. ```ts -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { promiseState } from "https://deno.land/x/async@$MODULE_VERSION/state.ts"; -import { Notify } from "https://deno.land/x/async@$MODULE_VERSION/notify.ts"; +import { assertEquals } from "@std/assert"; +import { promiseState } from "@core/asyncutil/promise-state"; +import { Notify } from "@core/asyncutil/notify"; const notify = new Notify(); const waiter1 = notify.notified(); @@ -158,14 +140,32 @@ assertEquals(await promiseState(waiter1), "fulfilled"); assertEquals(await promiseState(waiter2), "fulfilled"); ``` +### promiseState + +`promiseState` is used to determine the state of the promise. Mainly for testing +purpose. + +```typescript +import { promiseState } from "@core/asyncutil/promise-state"; + +const p1 = Promise.resolve("Resolved promise"); +console.log(await promiseState(p1)); // fulfilled + +const p2 = Promise.reject("Rejected promise").catch(() => undefined); +console.log(await promiseState(p2)); // rejected + +const p3 = new Promise(() => undefined); +console.log(await promiseState(p3)); // pending +``` + ### Queue/Stack `Queue` is a queue implementation that allows for adding and removing elements, with optional waiting when popping elements from an empty queue. ```ts -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { Queue } from "https://deno.land/x/async@$MODULE_VERSION/queue.ts"; +import { assertEquals } from "@std/assert"; +import { Queue } from "@core/asyncutil/queue"; const queue = new Queue(); queue.push(1); @@ -180,8 +180,8 @@ assertEquals(await queue.pop(), 3); with optional waiting when popping elements from an empty stack. ```ts -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { Stack } from "https://deno.land/x/async@$MODULE_VERSION/stack.ts"; +import { assertEquals } from "@std/assert"; +import { Stack } from "@core/asyncutil/stack"; const stack = new Stack(); stack.push(1); @@ -198,7 +198,7 @@ A semaphore that allows a limited number of concurrent executions of an operation. ```ts -import { Semaphore } from "https://deno.land/x/async@$MODULE_VERSION/semaphore.ts"; +import { Semaphore } from "@core/asyncutil/semaphore"; const sem = new Semaphore(5); const worker = () => { @@ -209,37 +209,30 @@ const worker = () => { await Promise.all([...Array(10)].map(() => worker())); ``` -### promiseState - -`promiseState` is used to determine the state of the promise. Mainly for testing -purpose. - -```typescript -import { promiseState } from "https://deno.land/x/async@$MODULE_VERSION/mod.ts"; - -const p1 = Promise.resolve("Resolved promise"); -console.log(await promiseState(p1)); // fulfilled - -const p2 = Promise.reject("Rejected promise").catch(() => undefined); -console.log(await promiseState(p2)); // rejected +### WaitGroup -const p3 = new Promise(() => undefined); -console.log(await promiseState(p3)); // pending -``` +`WaitGroup` is a synchronization primitive that enables promises to coordinate +and synchronize their execution. It is particularly useful in scenarios where a +specific number of tasks must complete before the program can proceed. -### AsyncValue +```ts +import { delay } from "@std/async/delay"; +import { WaitGroup } from "@core/asyncutil/wait-group"; -`AsyncValue` is a class that wraps a value and allows it to be set -asynchronously. +const wg = new WaitGroup(); -```ts -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts"; +async function worker(id: number) { + wg.add(1); + console.log(`worker ${id} is waiting`); + await delay(100); + console.log(`worker ${id} is done`); + wg.done(); +} -const v = new AsyncValue(0); -assertEquals(await v.get(), 0); -await v.set(1); -assertEquals(await v.get(), 1); +worker(1); +worker(2); +worker(3); +await wg.wait(); ``` ## License diff --git a/testutil.ts b/async_value.ts similarity index 73% rename from testutil.ts rename to async_value.ts index 6252f21..d922a95 100644 --- a/testutil.ts +++ b/async_value.ts @@ -2,16 +2,14 @@ * A class that wraps a value and allows it to be set asynchronously. * * ```ts - * import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; - * import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts"; + * import { assertEquals } from "@std/assert"; + * import { AsyncValue } from "@core/asyncutil/async-value"; * * const v = new AsyncValue(0); * assertEquals(await v.get(), 0); * await v.set(1); * assertEquals(await v.get(), 1); * ``` - * - * @typeParam T - The type of the value. */ export class AsyncValue { #value: T; @@ -19,7 +17,7 @@ export class AsyncValue { /** * Constructs a new AsyncValue with the given initial value. * - * @param value - The initial value. + * @param value The initial value. */ constructor(value: T) { this.#value = value; diff --git a/testutil_test.ts b/async_value_test.ts similarity index 71% rename from testutil_test.ts rename to async_value_test.ts index 3321dde..327c614 100644 --- a/testutil_test.ts +++ b/async_value_test.ts @@ -1,5 +1,5 @@ -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { AsyncValue } from "./testutil.ts"; +import { assertEquals } from "@std/assert"; +import { AsyncValue } from "./async_value.ts"; Deno.test("AsyncValue", async (t) => { await t.step( diff --git a/barrier.ts b/barrier.ts index b7d35be..321c167 100644 --- a/barrier.ts +++ b/barrier.ts @@ -10,7 +10,7 @@ import { Notify } from "./notify.ts"; * unblock and continue executing. * * ```ts - * import { Barrier } from "https://deno.land/x/async@$MODULE_VERSION/barrier.ts"; + * import { Barrier } from "@core/asyncutil/barrier"; * * const barrier = new Barrier(3); * @@ -32,8 +32,8 @@ export class Barrier { /** * Creates a new `Barrier` that blocks until `size` threads have called `wait`. * - * @param size - The number of threads that must reach the barrier before it unblocks. - * @throws Error if size is negative. + * @param size The number of threads that must reach the barrier before it unblocks. + * @throws Error if the size is negative. */ constructor(size: number) { if (size < 0) { @@ -46,15 +46,16 @@ export class Barrier { * Wait for all threads to reach the barrier. * Blocks until all threads reach the barrier. */ - async wait(): Promise { + async wait({ signal }: { signal?: AbortSignal } = {}): Promise { + signal?.throwIfAborted(); this.#rest -= 1; if (this.#rest === 0) { await Promise.all([ - this.#notify.notified(), + this.#notify.notified({ signal }), this.#notify.notifyAll(), ]); } else { - await this.#notify.notified(); + await this.#notify.notified({ signal }); } } } diff --git a/barrier_test.ts b/barrier_test.ts index c02c4f5..acb3b0b 100644 --- a/barrier_test.ts +++ b/barrier_test.ts @@ -1,4 +1,5 @@ -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; +import { assertEquals, assertRejects } from "@std/assert"; +import { deadline, delay } from "@std/async"; import { Barrier } from "./barrier.ts"; Deno.test("Barrier", async (t) => { @@ -30,4 +31,52 @@ Deno.test("Barrier", async (t) => { ]); }, ); + + await t.step( + "'wait' with non-aborted signal", + async () => { + const controller = new AbortController(); + const barrier = new Barrier(2); + + await assertRejects( + () => deadline(barrier.wait({ signal: controller.signal }), 100), + DOMException, + "Signal timed out.", + ); + }, + ); + + await t.step( + "'wait' with signal aborted after delay", + async () => { + const controller = new AbortController(); + const barrier = new Barrier(2); + const reason = new Error("Aborted"); + + delay(50).then(() => controller.abort(reason)); + + await assertRejects( + () => deadline(barrier.wait({ signal: controller.signal }), 100), + Error, + "Aborted", + ); + }, + ); + + await t.step( + "'wait' with already aborted signal", + async () => { + const controller = new AbortController(); + const barrier = new Barrier(2); + const reason = new Error("Aborted"); + + controller.abort(reason); + + await assertRejects( + () => deadline(barrier.wait({ signal: controller.signal }), 100), + Error, + "Aborted", + ); + }, + ); }); diff --git a/deno.jsonc b/deno.jsonc index 264e08b..e841ebb 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -1,17 +1,57 @@ { "lock": false, - "name": "@lambdalisue/async", + "name": "@core/asyncutil", "version": "0.0.0", - "exports": "./mod.ts", + "exports": { + ".": "./mod.ts", + "./async-value": "./async_value.ts", + "./barrier": "./barrier.ts", + "./lock": "./lock.ts", + "./mutex": "./mutex.ts", + "./notify": "./notify.ts", + "./promise-state": "./promise_state.ts", + "./queue": "./queue.ts", + "./rw-lock": "./rw_lock.ts", + "./semaphore": "./semaphore.ts", + "./stack": "./stack.ts", + "./wait-group": "./wait_group.ts" + }, + "exclude": [ + ".coverage/**" + ], + "publish": { + "include": [ + "**/*.ts", + "README.md", + "LICENSE" + ], + "exclude": [ + "**/*_test.ts", + ".*" + ] + }, "imports": { - "https://deno.land/x/async@$MODULE_VERSION/": "./" + "@core/asyncutil": "./mod.ts", + "@core/asyncutil/async-value": "./async_value.ts", + "@core/asyncutil/barrier": "./barrier.ts", + "@core/asyncutil/lock": "./lock.ts", + "@core/asyncutil/mutex": "./mutex.ts", + "@core/asyncutil/notify": "./notify.ts", + "@core/asyncutil/promise-state": "./promise_state.ts", + "@core/asyncutil/queue": "./queue.ts", + "@core/asyncutil/rw-lock": "./rw_lock.ts", + "@core/asyncutil/semaphore": "./semaphore.ts", + "@core/asyncutil/stack": "./stack.ts", + "@core/asyncutil/wait-group": "./wait_group.ts", + "@std/assert": "jsr:@std/assert@^1.0.2", + "@std/async": "jsr:@std/async@^1.0.2" }, "tasks": { + "check": "deno check ./**/*.ts", "test": "deno test -A --parallel --shuffle --doc", "test:coverage": "deno task test --coverage=.coverage", - "check": "deno check ./**/*.ts", "coverage": "deno coverage .coverage", - "upgrade": "deno run -q -A https://deno.land/x/molt@0.14.2/cli.ts ./**/*.ts", - "upgrade:commit": "deno task -q upgrade --commit --prefix :package: --pre-commit=fmt" + "update": "deno run --allow-env --allow-read --allow-write=. --allow-run=git,deno --allow-net=jsr.io,registry.npmjs.org jsr:@molt/cli ./*.ts", + "update:commit": "deno task -q update --commit --prefix :package: --pre-commit=fmt,lint" } } diff --git a/lock.ts b/lock.ts index 2984cb5..bf49c90 100644 --- a/lock.ts +++ b/lock.ts @@ -4,8 +4,8 @@ import { Mutex } from "./mutex.ts"; * A mutual exclusion lock that provides safe concurrent access to a shared value. * * ```ts - * import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts"; - * import { Lock } from "https://deno.land/x/async@$MODULE_VERSION/lock.ts"; + * import { AsyncValue } from "@core/asyncutil/async-value"; + * import { Lock } from "@core/asyncutil/lock"; * * // Critical section * const count = new Lock(new AsyncValue(0)); @@ -14,8 +14,6 @@ import { Mutex } from "./mutex.ts"; * count.set(v + 1); * }); * ``` - * - * @typeParam T - The type of the shared value. */ export class Lock { #mu = new Mutex(); @@ -24,7 +22,7 @@ export class Lock { /** * Constructs a new lock with the given initial value. * - * @param value - The initial value of the lock. + * @param value The initial value of the lock. */ constructor(value: T) { this.#value = value; @@ -41,16 +39,11 @@ export class Lock { * Acquires the lock and applies the given function to the shared value, * returning the result. * - * @typeParam R - The return type of the function. - * @param f - The function to apply to the shared value. + * @param fn The function to apply to the shared value. * @returns A Promise that resolves with the result of the function. */ - async lock(f: (value: T) => R | PromiseLike): Promise { - await this.#mu.acquire(); - try { - return await f(this.#value); - } finally { - this.#mu.release(); - } + async lock(fn: (value: T) => R | PromiseLike): Promise { + using _lock = await this.#mu.acquire(); + return await fn(this.#value); } } diff --git a/lock_test.ts b/lock_test.ts index 5734e46..3f2f0d4 100644 --- a/lock_test.ts +++ b/lock_test.ts @@ -1,5 +1,5 @@ -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { AsyncValue } from "./testutil.ts"; +import { assertEquals } from "@std/assert"; +import { AsyncValue } from "./async_value.ts"; import { Lock } from "./lock.ts"; Deno.test("Lock", async (t) => { diff --git a/mod.ts b/mod.ts index 1c5eadd..69c649a 100644 --- a/mod.ts +++ b/mod.ts @@ -1,11 +1,11 @@ +export * from "./async_value.ts"; export * from "./barrier.ts"; export * from "./lock.ts"; export * from "./mutex.ts"; export * from "./notify.ts"; +export * from "./promise_state.ts"; export * from "./queue.ts"; export * from "./rw_lock.ts"; export * from "./semaphore.ts"; export * from "./stack.ts"; -export * from "./state.ts"; -export * from "./testutil.ts"; export * from "./wait_group.ts"; diff --git a/mutex.ts b/mutex.ts index f6e48ff..4851dbb 100644 --- a/mutex.ts +++ b/mutex.ts @@ -6,8 +6,8 @@ * concurrently. * * ```ts - * import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts"; - * import { Mutex } from "https://deno.land/x/async@$MODULE_VERSION/mutex.ts"; + * import { AsyncValue } from "@core/asyncutil/async-value"; + * import { Mutex } from "@core/asyncutil/mutex"; * * const count = new AsyncValue(0); * @@ -16,18 +16,17 @@ * await count.set(v + 1); * } * - * // Critical section * const mu = new Mutex(); - * await mu.acquire(); - * try { + * + * // Critical section + * { + * using _lock = await mu.acquire(); * await doSomething(); - * } finally { - * mu.release(); * } * ``` */ export class Mutex { - #waiters: { promise: Promise; resolve: () => void }[] = []; + #waiters: Promise[] = []; /** * Returns true if the mutex is locked, false otherwise. @@ -37,25 +36,22 @@ export class Mutex { } /** - * Acquire the mutex, waiting if necessary for it to become available. - * @returns A Promise that resolves when the mutex is acquired. + * Acquire the mutex and return a promise with disposable that releases the mutex when disposed. + * + * @returns A Promise with Disposable that releases the mutex when disposed. */ - async acquire(): Promise { + acquire(): Promise & Disposable { const waiters = [...this.#waiters]; const { promise, resolve } = Promise.withResolvers(); - this.#waiters.push({ promise, resolve }); - if (waiters.length) { - await Promise.all(waiters.map(({ promise }) => promise)); - } - } - - /** - * Release the mutex, allowing the next pending acquirer to proceed. - */ - release(): void { - const waiter = this.#waiters.shift(); - if (waiter) { - waiter.resolve(); - } + this.#waiters.push(promise); + const disposable = { + [Symbol.dispose]: () => { + resolve(); + }, + }; + return Object.assign( + Promise.all(waiters).then(() => disposable), + disposable, + ); } } diff --git a/mutex_test.ts b/mutex_test.ts index 7293599..c4ce0ee 100644 --- a/mutex_test.ts +++ b/mutex_test.ts @@ -1,5 +1,5 @@ -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { AsyncValue } from "./testutil.ts"; +import { assertEquals } from "@std/assert"; +import { AsyncValue } from "./async_value.ts"; import { Mutex } from "./mutex.ts"; Deno.test("Mutex", async (t) => { @@ -22,13 +22,9 @@ Deno.test("Mutex", async (t) => { const mu = new Mutex(); const count = new AsyncValue(0); const operation = async () => { - await mu.acquire(); - try { - const v = await count.get(); - await count.set(v + 1); - } finally { - mu.release(); - } + using _lock = await mu.acquire(); + const v = await count.get(); + await count.set(v + 1); }; await Promise.all([...Array(10)].map(() => operation())); assertEquals(await count.get(), 10); diff --git a/notify.ts b/notify.ts index d248a89..a44b8d9 100644 --- a/notify.ts +++ b/notify.ts @@ -1,21 +1,19 @@ -export interface WaitOptions { - signal?: AbortSignal; -} - /** * Async notifier that allows one or more "waiters" to wait for a notification. * * ```ts - * import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; - * import { promiseState } from "https://deno.land/x/async@$MODULE_VERSION/state.ts"; - * import { Notify } from "https://deno.land/x/async@$MODULE_VERSION/notify.ts"; + * import { assertEquals } from "@std/assert"; + * import { promiseState } from "@core/asyncutil/promise-state"; + * import { Notify } from "@core/asyncutil/notify"; * * const notify = new Notify(); * const waiter1 = notify.notified(); * const waiter2 = notify.notified(); + * * notify.notify(); * assertEquals(await promiseState(waiter1), "fulfilled"); * assertEquals(await promiseState(waiter2), "pending"); + * * notify.notify(); * assertEquals(await promiseState(waiter1), "fulfilled"); * assertEquals(await promiseState(waiter2), "fulfilled"); @@ -31,7 +29,7 @@ export class Notify { /** * Returns the number of waiters that are waiting for notification. */ - get waiters(): number { + get waiterCount(): number { return this.#waiters.length; } @@ -62,19 +60,15 @@ export class Notify { * Asynchronously waits for notification. The caller's execution is suspended until * the `notify` method is called. The method returns a Promise that resolves when the caller is notified. * Optionally takes an AbortSignal to abort the waiting if the signal is aborted. - * - * @param options Optional parameters. - * @param options.signal An optional AbortSignal to abort the waiting if the signal is aborted. - * @throws {DOMException} If the signal is aborted. */ - async notified({ signal }: WaitOptions = {}): Promise { + async notified({ signal }: { signal?: AbortSignal } = {}): Promise { if (signal?.aborted) { - throw new DOMException("Aborted", "AbortError"); + throw signal.reason; } const waiter = Promise.withResolvers(); const abort = () => { removeItem(this.#waiters, waiter); - waiter.reject(new DOMException("Aborted", "AbortError")); + waiter.reject(signal!.reason); }; signal?.addEventListener("abort", abort, { once: true }); this.#waiters.push(waiter); diff --git a/notify_test.ts b/notify_test.ts index 960d1c0..f20a539 100644 --- a/notify_test.ts +++ b/notify_test.ts @@ -1,9 +1,6 @@ -import { delay } from "https://deno.land/std@0.211.0/async/delay.ts"; -import { - assertEquals, - assertRejects, -} from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { promiseState } from "./state.ts"; +import { delay } from "@std/async/delay"; +import { assertEquals, assertRejects } from "@std/assert"; +import { promiseState } from "./promise_state.ts"; import { Notify } from "./notify.ts"; Deno.test("Notify", async (t) => { @@ -11,9 +8,11 @@ Deno.test("Notify", async (t) => { const notify = new Notify(); const waiter1 = notify.notified(); const waiter2 = notify.notified(); + notify.notify(); assertEquals(await promiseState(waiter1), "fulfilled"); assertEquals(await promiseState(waiter2), "pending"); + notify.notify(); assertEquals(await promiseState(waiter1), "fulfilled"); assertEquals(await promiseState(waiter2), "fulfilled"); @@ -28,21 +27,6 @@ Deno.test("Notify", async (t) => { assertEquals(await promiseState(waiter2), "fulfilled"); }); - await t.step( - "'notified' with already aborted signal throws DOMException", - async () => { - const controller = new AbortController(); - const notify = new Notify(); - - controller.abort(); - await assertRejects( - () => notify.notified({ signal: controller.signal }), - DOMException, - "Aborted", - ); - }, - ); - await t.step( "'notified' with non-aborted signal", async () => { @@ -59,11 +43,12 @@ Deno.test("Notify", async (t) => { async () => { const controller = new AbortController(); const notify = new Notify(); + const reason = new Error("Aborted"); - delay(100).then(() => controller.abort()); + delay(100).then(() => controller.abort(reason)); await assertRejects( () => notify.notified({ signal: controller.signal }), - DOMException, + Error, "Aborted", ); }, @@ -74,11 +59,12 @@ Deno.test("Notify", async (t) => { async () => { const controller = new AbortController(); const notify = new Notify(); + const reason = new Error("Aborted"); - controller.abort(); + controller.abort(reason); await assertRejects( () => notify.notified({ signal: controller.signal }), - DOMException, + Error, "Aborted", ); }, diff --git a/state.ts b/promise_state.ts similarity index 60% rename from state.ts rename to promise_state.ts index d9eede1..4be0408 100644 --- a/state.ts +++ b/promise_state.ts @@ -5,6 +5,15 @@ export type PromiseState = "fulfilled" | "rejected" | "pending"; /** * Return state (fulfilled/rejected/pending) of a promise + * + * ```ts + * import { assertEquals } from "@std/assert"; + * import { promiseState } from "@core/asyncutil/promise-state"; + * + * assertEquals(await promiseState(Promise.resolve("value")), "fulfilled"); + * assertEquals(await promiseState(Promise.reject("error")), "rejected"); + * assertEquals(await promiseState(new Promise(() => {})), "pending"); + * ``` */ export async function promiseState(p: Promise): Promise { // NOTE: diff --git a/state_test.ts b/promise_state_test.ts similarity index 89% rename from state_test.ts rename to promise_state_test.ts index d391932..2129fc0 100644 --- a/state_test.ts +++ b/promise_state_test.ts @@ -1,5 +1,5 @@ -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { promiseState } from "./state.ts"; +import { assertEquals } from "@std/assert"; +import { promiseState } from "./promise_state.ts"; Deno.test( "promiseState() returns 'fulfilled' for resolved promise", diff --git a/queue.ts b/queue.ts index 68360c0..c655a94 100644 --- a/queue.ts +++ b/queue.ts @@ -1,12 +1,12 @@ -import { Notify, type WaitOptions } from "./notify.ts"; +import { Notify } from "./notify.ts"; /** * A queue implementation that allows for adding and removing elements, with optional waiting when * popping elements from an empty queue. * * ```ts - * import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; - * import { Queue } from "https://deno.land/x/async@$MODULE_VERSION/queue.ts"; + * import { assertEquals } from "@std/assert"; + * import { Queue } from "@core/asyncutil/queue"; * * const queue = new Queue(); * queue.push(1); @@ -16,8 +16,6 @@ import { Notify, type WaitOptions } from "./notify.ts"; * assertEquals(await queue.pop(), 2); * assertEquals(await queue.pop(), 3); * ``` - * - * @template T The type of items in the queue. */ export class Queue { #notify = new Notify(); @@ -34,13 +32,11 @@ export class Queue { * Returns true if the queue is currently locked. */ get locked(): boolean { - return this.#notify.waiters > 0; + return this.#notify.waiterCount > 0; } /** * Adds an item to the end of the queue and notifies any waiting consumers. - * - * @param {T} value The item to add to the queue. */ push(value: T): void { this.#items.push(value); @@ -50,19 +46,16 @@ export class Queue { /** * Removes the next item from the queue, optionally waiting if the queue is currently empty. * - * @param {WaitOptions} [options] Optional parameters to pass to the wait operation. - * @param {AbortSignal} [options.signal] An optional AbortSignal used to abort the wait operation if the signal is aborted. - * @returns {Promise} A promise that resolves to the next item in the queue. - * @throws {DOMException} Throws a DOMException with "Aborted" and "AbortError" codes if the wait operation was aborted. + * @returns A promise that resolves to the next item in the queue. */ - async pop({ signal }: WaitOptions = {}): Promise { - while (!signal?.aborted) { + async pop({ signal }: { signal?: AbortSignal } = {}): Promise { + while (true) { + signal?.throwIfAborted(); const value = this.#items.shift(); if (value) { return value; } await this.#notify.notified({ signal }); } - throw new DOMException("Aborted", "AbortError"); } } diff --git a/queue_test.ts b/queue_test.ts index aa0f661..c69d0a4 100644 --- a/queue_test.ts +++ b/queue_test.ts @@ -1,9 +1,6 @@ -import { delay } from "https://deno.land/std@0.211.0/async/delay.ts"; -import { - assertEquals, - assertRejects, -} from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { promiseState } from "./state.ts"; +import { delay } from "@std/async/delay"; +import { assertEquals, assertRejects } from "@std/assert"; +import { promiseState } from "./promise_state.ts"; import { Queue } from "./queue.ts"; Deno.test("Queue", async (t) => { @@ -36,12 +33,13 @@ Deno.test("Queue", async (t) => { await t.step("'pop' with signal aborted after delay", async () => { const controller = new AbortController(); const q = new Queue(); + const reason = new Error("Aborted"); - delay(100).then(() => controller.abort()); + delay(100).then(() => controller.abort(reason)); await assertRejects( () => q.pop({ signal: controller.signal }), - DOMException, + Error, "Aborted", ); }); @@ -49,12 +47,13 @@ Deno.test("Queue", async (t) => { await t.step("'pop' with signal already aborted", async () => { const controller = new AbortController(); const q = new Queue(); + const reason = new Error("Aborted"); - controller.abort(); + controller.abort(reason); await assertRejects( () => q.pop({ signal: controller.signal }), - DOMException, + Error, "Aborted", ); }); diff --git a/rw_lock.ts b/rw_lock.ts index 5a62616..073c881 100644 --- a/rw_lock.ts +++ b/rw_lock.ts @@ -6,8 +6,8 @@ import { Mutex } from "./mutex.ts"; * Writers block all other readers and writers until the write operation completes. * * ```ts - * import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts"; - * import { RwLock } from "https://deno.land/x/async@$MODULE_VERSION/rw_lock.ts"; + * import { AsyncValue } from "@core/asyncutil/async-value"; + * import { RwLock } from "@core/asyncutil/rw-lock"; * * const count = new RwLock(new AsyncValue(0)); * @@ -36,7 +36,7 @@ export class RwLock { /** * Creates a new `RwLock` with the specified initial value. * - * @param value - The initial value of the lock. + * @param value The initial value of the lock. */ constructor(value: T) { this.#value = value; @@ -46,39 +46,28 @@ export class RwLock { * Acquires the lock for both reading and writing, and invokes the specified function with the current * value of the lock. All other readers and writers will be blocked until the function completes. * - * @param f - The function to invoke. + * @param fn The function to invoke. * @returns A promise that resolves to the return value of the specified function. */ - async lock(f: (value: T) => R | PromiseLike): Promise { - await Promise.all([ - this.#write.acquire(), - this.#read.acquire(), - ]); - try { - return await f(this.#value); - } finally { - this.#read.release(); - this.#write.release(); - } + async lock(fn: (value: T) => R | PromiseLike): Promise { + using _wlock = await this.#write.acquire(); + using _rlock = await this.#read.acquire(); + return await fn(this.#value); } /** * Acquires the lock for reading, and invokes the specified function with the current value of the lock. * Other readers can acquire the lock simultaneously, but any writers will be blocked until the function completes. * - * @param f - The function to invoke. + * @param fn The function to invoke. * @returns A promise that resolves to the return value of the specified function. */ - async rlock(f: (value: T) => R | PromiseLike): Promise { - if (this.#write.locked) { - await this.#write.acquire(); - } - this.#read.acquire(); - try { - return await f(this.#value); - } finally { - this.#read.release(); - this.#write.release(); - } + async rlock(fn: (value: T) => R | PromiseLike): Promise { + using _wlock = this.#write.locked + ? await this.#write.acquire() + : { [Symbol.dispose]: () => {} }; + // Acquire the read lock without waiting to allow multiple readers to access the lock. + using _rlock = this.#read.acquire(); + return await fn(this.#value); } } diff --git a/rw_lock_test.ts b/rw_lock_test.ts index e05daba..a59a4a1 100644 --- a/rw_lock_test.ts +++ b/rw_lock_test.ts @@ -1,6 +1,6 @@ -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { promiseState } from "./state.ts"; -import { AsyncValue } from "./testutil.ts"; +import { assertEquals } from "@std/assert"; +import { promiseState } from "./promise_state.ts"; +import { AsyncValue } from "./async_value.ts"; import { RwLock } from "./rw_lock.ts"; Deno.test("RwLock", async (t) => { diff --git a/semaphore.ts b/semaphore.ts index 8f66500..0c90f93 100644 --- a/semaphore.ts +++ b/semaphore.ts @@ -4,7 +4,7 @@ import { Notify } from "./notify.ts"; * A semaphore that allows a limited number of concurrent executions of an operation. * * ```ts - * import { Semaphore } from "https://deno.land/x/async@$MODULE_VERSION/semaphore.ts"; + * import { Semaphore } from "@core/asyncutil/semaphore"; * * const sem = new Semaphore(5); * const worker = () => { @@ -22,8 +22,8 @@ export class Semaphore { /** * Creates a new semaphore with the specified limit. * - * @param size - The maximum number of times the semaphore can be acquired before blocking. - * @throws Error if size is less than 1. + * @param size The maximum number of times the semaphore can be acquired before blocking. + * @throws Error if the size is less than 1. */ constructor(size: number) { if (size < 0) { @@ -42,13 +42,13 @@ export class Semaphore { /** * Acquires a lock on the semaphore, and invokes the specified function. * - * @param f - The function to invoke. + * @param fn The function to invoke. * @returns A promise that resolves to the return value of the specified function. */ - async lock(f: () => R | PromiseLike): Promise { + async lock(fn: () => R | PromiseLike): Promise { await this.#acquire(); try { - return await f(); + return await fn(); } finally { this.#release(); } @@ -64,10 +64,10 @@ export class Semaphore { } #release(): void { - if (this.#notify.waiters > 0) { + if (this.#notify.waiterCount > 0) { this.#notify.notify(); } - if (this.#notify.waiters === 0) { + if (this.#notify.waiterCount === 0) { this.#rest += 1; } } diff --git a/semaphore_test.ts b/semaphore_test.ts index 8a22950..9b21457 100644 --- a/semaphore_test.ts +++ b/semaphore_test.ts @@ -1,4 +1,4 @@ -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; +import { assertEquals } from "@std/assert"; import { Semaphore } from "./semaphore.ts"; Deno.test("Semaphore", async (t) => { diff --git a/stack.ts b/stack.ts index 57a6464..a519a06 100644 --- a/stack.ts +++ b/stack.ts @@ -1,12 +1,12 @@ -import { Notify, type WaitOptions } from "./notify.ts"; +import { Notify } from "./notify.ts"; /** * A stack implementation that allows for adding and removing elements, with optional waiting when * popping elements from an empty stack. * * ```ts - * import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; - * import { Stack } from "https://deno.land/x/async@$MODULE_VERSION/stack.ts"; + * import { assertEquals } from "@std/assert"; + * import { Stack } from "@core/asyncutil/stack"; * * const stack = new Stack(); * stack.push(1); @@ -34,13 +34,13 @@ export class Stack { * Returns true if the stack is currently locked. */ get locked(): boolean { - return this.#notify.waiters > 0; + return this.#notify.waiterCount > 0; } /** * Adds an item to the top of the stack and notifies any waiting consumers. * - * @param {T} value The item to add to the stack. + * @param value The item to add to the stack. */ push(value: T): void { this.#items.push(value); @@ -50,19 +50,16 @@ export class Stack { /** * Removes the next item from the stack, optionally waiting if the stack is currently empty. * - * @param {WaitOptions} [options] Optional parameters to pass to the wait operation. - * @param {AbortSignal} [options.signal] An optional AbortSignal used to abort the wait operation if the signal is aborted. - * @returns {Promise} A promise that resolves to the next item in the stack. - * @throws {DOMException} Throws a DOMException with "Aborted" and "AbortError" codes if the wait operation was aborted. + * @returns A promise that resolves to the next item in the stack. */ - async pop({ signal }: WaitOptions = {}): Promise { - while (!signal?.aborted) { + async pop({ signal }: { signal?: AbortSignal } = {}): Promise { + while (true) { + signal?.throwIfAborted(); const value = this.#items.pop(); if (value) { return value; } await this.#notify.notified({ signal }); } - throw new DOMException("Aborted", "AbortError"); } } diff --git a/stack_test.ts b/stack_test.ts index 58eebe5..347225d 100644 --- a/stack_test.ts +++ b/stack_test.ts @@ -1,9 +1,6 @@ -import { delay } from "https://deno.land/std@0.211.0/async/delay.ts"; -import { - assertEquals, - assertRejects, -} from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { promiseState } from "./state.ts"; +import { delay } from "@std/async/delay"; +import { assertEquals, assertRejects } from "@std/assert"; +import { promiseState } from "./promise_state.ts"; import { Stack } from "./stack.ts"; Deno.test("Stack", async (t) => { @@ -36,12 +33,13 @@ Deno.test("Stack", async (t) => { await t.step("'pop' with signal aborted after delay", async () => { const controller = new AbortController(); const q = new Stack(); + const reason = new Error("Aborted"); - delay(100).then(() => controller.abort()); + delay(100).then(() => controller.abort(reason)); await assertRejects( () => q.pop({ signal: controller.signal }), - DOMException, + Error, "Aborted", ); }); @@ -49,12 +47,13 @@ Deno.test("Stack", async (t) => { await t.step("'pop' with signal already aborted", async () => { const controller = new AbortController(); const q = new Stack(); + const reason = new Error("Aborted"); - controller.abort(); + controller.abort(reason); await assertRejects( () => q.pop({ signal: controller.signal }), - DOMException, + Error, "Aborted", ); }); diff --git a/wait_group.ts b/wait_group.ts index c4bc4c9..b98d165 100644 --- a/wait_group.ts +++ b/wait_group.ts @@ -6,8 +6,8 @@ import { Notify } from "./notify.ts"; * a specific number of tasks must complete before the program can proceed. * * ```ts - * import { delay } from "https://deno.land/std@0.211.0/async/delay.ts"; - * import { WaitGroup } from "https://deno.land/x/async@$MODULE_VERSION/wait_group.ts"; + * import { delay } from "@std/async/delay"; + * import { WaitGroup } from "@core/asyncutil/wait-group"; * * const wg = new WaitGroup(); * @@ -32,6 +32,7 @@ export class WaitGroup { /** * Adds the specified `delta` to the WaitGroup counter. If the counter becomes * zero, it signals all waiting promises to proceed. + * * @param delta The number to add to the counter. It can be positive or negative. */ add(delta: number): void { @@ -50,9 +51,10 @@ export class WaitGroup { /** * Returns a promise that waits for the WaitGroup counter to reach zero. + * * @returns A Promise that resolves when the counter becomes zero. */ - wait(): Promise { - return this.#notify.notified(); + wait({ signal }: { signal?: AbortSignal } = {}): Promise { + return this.#notify.notified({ signal }); } } diff --git a/wait_group_test.ts b/wait_group_test.ts index 51966f7..786cb20 100644 --- a/wait_group_test.ts +++ b/wait_group_test.ts @@ -1,5 +1,5 @@ -import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts"; -import { delay } from "https://deno.land/std@0.211.0/async/delay.ts"; +import { assertEquals, assertRejects } from "@std/assert"; +import { deadline, delay } from "@std/async"; import { WaitGroup } from "./wait_group.ts"; Deno.test("WaitGroup", async (t) => { @@ -33,4 +33,54 @@ Deno.test("WaitGroup", async (t) => { ]); }, ); + + await t.step( + "'wait' with non-aborted signal", + async () => { + const controller = new AbortController(); + const wg = new WaitGroup(); + wg.add(1); + await assertRejects( + () => deadline(wg.wait({ signal: controller.signal }), 100), + DOMException, + "Signal timed out.", + ); + }, + ); + + await t.step( + "'wait' with signal aborted after delay", + async () => { + const controller = new AbortController(); + const wg = new WaitGroup(); + wg.add(1); + + const reason = new Error("Aborted"); + delay(50).then(() => controller.abort(reason)); + + await assertRejects( + () => deadline(wg.wait({ signal: controller.signal }), 100), + Error, + "Aborted", + ); + }, + ); + + await t.step( + "'wait' with already aborted signal", + async () => { + const controller = new AbortController(); + const wg = new WaitGroup(); + wg.add(1); + + const reason = new Error("Aborted"); + controller.abort(reason); + + await assertRejects( + () => deadline(wg.wait({ signal: controller.signal }), 100), + Error, + "Aborted", + ); + }, + ); });