Skip to content

Commit

Permalink
Merge pull request #24 from jsr-core/refine-asyncutil
Browse files Browse the repository at this point in the history
Refine for `@core/asyncutil`
  • Loading branch information
lambdalisue authored Aug 4, 2024
2 parents c86ef00 + 66f62fb commit 0b3dd81
Show file tree
Hide file tree
Showing 28 changed files with 409 additions and 291 deletions.
21 changes: 17 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ env:
DENO_VERSION: 1.x

on:
schedule:
- cron: "0 7 * * 0"
push:
branches:
- main
Expand Down Expand Up @@ -36,8 +34,23 @@ jobs:
deno-version: ${{ env.DENO_VERSION }}
- name: Test
run: |
deno task test
deno task test:coverage
timeout-minutes: 5
- name: JSR publish (dry-run)
- run: |
deno task coverage --lcov > coverage.lcov
- uses: codecov/codecov-action@v4
with:
os: ${{ runner.os }}
files: ./coverage.lcov
token: ${{ secrets.CODECOV_TOKEN }}

jsr-publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: denoland/setup-deno@v1
with:
deno-version: ${{ env.DENO_VERSION }}
- name: Publish (dry-run)
run: |
deno publish --dry-run
21 changes: 21 additions & 0 deletions .gitmessage
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

# **Conventional Commits**
#
# <type>[optional scope]: <description>
#
# feat: feature (minor)
# deps: dependencies (minor/patch)
# fix: bug fix (patch)
# refactor: refactoring code
# test: test fix; no code change
# docs: documentation fix; no code change
# style: formatting, missing semi colons, etc; no code change
# chore: updating build tasks, package manager configs, etc; no code change
#
# **Install**
#
# git config commit.template .gitmessage
#
# **Reference**
#
# - https://www.conventionalcommits.org/en/v1.0.0/
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2021 Alisue <lambdalisue@gmail.com>
Copyright 2024 jsr-core

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
153 changes: 73 additions & 80 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,62 +1,45 @@
# async
# asyncutil

[![jsr](https://img.shields.io/jsr/v/%40lambdalisue/async?logo=javascript&logoColor=white)](https://jsr.io/@lambdalisue/async)
[![denoland](https://img.shields.io/github/v/release/lambdalisue/deno-async?logo=deno&label=denoland)](https://github.com/lambdalisue/deno-async/releases)
[![deno doc](https://doc.deno.land/badge.svg)](https://doc.deno.land/https/deno.land/x/async/mod.ts)
[![Test](https://github.com/lambdalisue/deno-async/workflows/Test/badge.svg)](https://github.com/lambdalisue/deno-async/actions?query=workflow%3ATest)
[![Test](https://github.com/jsr-core/asyncutil/actions/workflows/test.yml/badge.svg)](https://github.com/jsr-core/asyncutil/actions/workflows/test.yml)

Asynchronous primitive modules for [Deno][deno].

[python's asyncio]: https://docs.python.org/3/library/asyncio.html
[deno]: https://deno.land/
Asynchronous primitive utility pack.

## Usage

### Barrier
### AsyncValue

`Barrier` is a synchronization primitive that allows multiple tasks to wait
until all of them have reached a certain point of execution before continuing.
`AsyncValue` is a class that wraps a value and allows it to be set
asynchronously.

```ts
import { Barrier } from "https://deno.land/x/async@$MODULE_VERSION/barrier.ts";

const barrier = new Barrier(3);

async function worker(id: number) {
console.log(`worker ${id} is waiting`);
await barrier.wait();
console.log(`worker ${id} is done`);
}
import { assertEquals } from "@std/assert";
import { AsyncValue } from "@core/asyncutil/async-value";

worker(1);
worker(2);
worker(3);
const v = new AsyncValue(0);
assertEquals(await v.get(), 0);
await v.set(1);
assertEquals(await v.get(), 1);
```

### WaitGroup
### Barrier

`WaitGroup` is a synchronization primitive that enables promises to coordinate
and synchronize their execution. It is particularly useful in scenarios where a
specific number of tasks must complete before the program can proceed.
`Barrier` is a synchronization primitive that allows multiple tasks to wait
until all of them have reached a certain point of execution before continuing.

```ts
import { delay } from "https://deno.land/std@0.211.0/async/delay.ts";
import { WaitGroup } from "https://deno.land/x/async@$MODULE_VERSION/wait_group.ts";
import { Barrier } from "@core/asyncutil/barrier";

const wg = new WaitGroup();
const barrier = new Barrier(3);

async function worker(id: number) {
wg.add(1);
console.log(`worker ${id} is waiting`);
await delay(100);
await barrier.wait();
console.log(`worker ${id} is done`);
wg.done();
}

worker(1);
worker(2);
worker(3);
await wg.wait();
```

### Lock/RwLock
Expand All @@ -65,8 +48,8 @@ await wg.wait();
shared value.

```ts
import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts";
import { Lock } from "https://deno.land/x/async@$MODULE_VERSION/lock.ts";
import { AsyncValue } from "@core/asyncutil/async-value";
import { Lock } from "@core/asyncutil/lock";

// Critical section
const count = new Lock(new AsyncValue(0));
Expand All @@ -82,8 +65,8 @@ as long as there are no writers holding the lock. Writers block all other
readers and writers until the write operation completes.

```ts
import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts";
import { RwLock } from "https://deno.land/x/async@$MODULE_VERSION/rw_lock.ts";
import { AsyncValue } from "@core/asyncutil/async-value";
import { RwLock } from "@core/asyncutil/rw-lock";

const count = new RwLock(new AsyncValue(0));

Expand Down Expand Up @@ -117,8 +100,8 @@ This is a low-level primitive. Use `Lock` instead of `Mutex` if you need to
access a shared value concurrently.

```ts
import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts";
import { Mutex } from "https://deno.land/x/async@$MODULE_VERSION/mutex.ts";
import { AsyncValue } from "@core/asyncutil/async-value";
import { Mutex } from "@core/asyncutil/mutex";

const count = new AsyncValue(0);

Expand All @@ -127,13 +110,12 @@ async function doSomething() {
await count.set(v + 1);
}

// Critical section
const mu = new Mutex();
await mu.acquire();
try {

// Critical section
{
using _lock = await mu.acquire();
await doSomething();
} finally {
mu.release();
}
```

Expand All @@ -143,9 +125,9 @@ try {
notification.

```ts
import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts";
import { promiseState } from "https://deno.land/x/async@$MODULE_VERSION/state.ts";
import { Notify } from "https://deno.land/x/async@$MODULE_VERSION/notify.ts";
import { assertEquals } from "@std/assert";
import { promiseState } from "@core/asyncutil/promise-state";
import { Notify } from "@core/asyncutil/notify";

const notify = new Notify();
const waiter1 = notify.notified();
Expand All @@ -158,14 +140,32 @@ assertEquals(await promiseState(waiter1), "fulfilled");
assertEquals(await promiseState(waiter2), "fulfilled");
```

### promiseState

`promiseState` is used to determine the state of the promise. Mainly for testing
purpose.

```typescript
import { promiseState } from "@core/asyncutil/promise-state";

const p1 = Promise.resolve("Resolved promise");
console.log(await promiseState(p1)); // fulfilled

const p2 = Promise.reject("Rejected promise").catch(() => undefined);
console.log(await promiseState(p2)); // rejected

const p3 = new Promise(() => undefined);
console.log(await promiseState(p3)); // pending
```

### Queue/Stack

`Queue` is a queue implementation that allows for adding and removing elements,
with optional waiting when popping elements from an empty queue.

```ts
import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts";
import { Queue } from "https://deno.land/x/async@$MODULE_VERSION/queue.ts";
import { assertEquals } from "@std/assert";
import { Queue } from "@core/asyncutil/queue";

const queue = new Queue<number>();
queue.push(1);
Expand All @@ -180,8 +180,8 @@ assertEquals(await queue.pop(), 3);
with optional waiting when popping elements from an empty stack.

```ts
import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts";
import { Stack } from "https://deno.land/x/async@$MODULE_VERSION/stack.ts";
import { assertEquals } from "@std/assert";
import { Stack } from "@core/asyncutil/stack";

const stack = new Stack<number>();
stack.push(1);
Expand All @@ -198,7 +198,7 @@ A semaphore that allows a limited number of concurrent executions of an
operation.

```ts
import { Semaphore } from "https://deno.land/x/async@$MODULE_VERSION/semaphore.ts";
import { Semaphore } from "@core/asyncutil/semaphore";

const sem = new Semaphore(5);
const worker = () => {
Expand All @@ -209,37 +209,30 @@ const worker = () => {
await Promise.all([...Array(10)].map(() => worker()));
```

### promiseState

`promiseState` is used to determine the state of the promise. Mainly for testing
purpose.

```typescript
import { promiseState } from "https://deno.land/x/async@$MODULE_VERSION/mod.ts";

const p1 = Promise.resolve("Resolved promise");
console.log(await promiseState(p1)); // fulfilled

const p2 = Promise.reject("Rejected promise").catch(() => undefined);
console.log(await promiseState(p2)); // rejected
### WaitGroup

const p3 = new Promise(() => undefined);
console.log(await promiseState(p3)); // pending
```
`WaitGroup` is a synchronization primitive that enables promises to coordinate
and synchronize their execution. It is particularly useful in scenarios where a
specific number of tasks must complete before the program can proceed.

### AsyncValue
```ts
import { delay } from "@std/async/delay";
import { WaitGroup } from "@core/asyncutil/wait-group";

`AsyncValue` is a class that wraps a value and allows it to be set
asynchronously.
const wg = new WaitGroup();

```ts
import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts";
import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts";
async function worker(id: number) {
wg.add(1);
console.log(`worker ${id} is waiting`);
await delay(100);
console.log(`worker ${id} is done`);
wg.done();
}

const v = new AsyncValue(0);
assertEquals(await v.get(), 0);
await v.set(1);
assertEquals(await v.get(), 1);
worker(1);
worker(2);
worker(3);
await wg.wait();
```

## License
Expand Down
8 changes: 3 additions & 5 deletions testutil.ts → async_value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@
* A class that wraps a value and allows it to be set asynchronously.
*
* ```ts
* import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts";
* import { AsyncValue } from "https://deno.land/x/async@$MODULE_VERSION/testutil.ts";
* import { assertEquals } from "@std/assert";
* import { AsyncValue } from "@core/asyncutil/async-value";
*
* const v = new AsyncValue(0);
* assertEquals(await v.get(), 0);
* await v.set(1);
* assertEquals(await v.get(), 1);
* ```
*
* @typeParam T - The type of the value.
*/
export class AsyncValue<T> {
#value: T;

/**
* Constructs a new AsyncValue with the given initial value.
*
* @param value - The initial value.
* @param value The initial value.
*/
constructor(value: T) {
this.#value = value;
Expand Down
4 changes: 2 additions & 2 deletions testutil_test.ts → async_value_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { assertEquals } from "https://deno.land/std@0.211.0/assert/mod.ts";
import { AsyncValue } from "./testutil.ts";
import { assertEquals } from "@std/assert";
import { AsyncValue } from "./async_value.ts";

Deno.test("AsyncValue", async (t) => {
await t.step(
Expand Down
13 changes: 7 additions & 6 deletions barrier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Notify } from "./notify.ts";
* unblock and continue executing.
*
* ```ts
* import { Barrier } from "https://deno.land/x/async@$MODULE_VERSION/barrier.ts";
* import { Barrier } from "@core/asyncutil/barrier";
*
* const barrier = new Barrier(3);
*
Expand All @@ -32,8 +32,8 @@ export class Barrier {
/**
* Creates a new `Barrier` that blocks until `size` threads have called `wait`.
*
* @param size - The number of threads that must reach the barrier before it unblocks.
* @throws Error if size is negative.
* @param size The number of threads that must reach the barrier before it unblocks.
* @throws Error if the size is negative.
*/
constructor(size: number) {
if (size < 0) {
Expand All @@ -46,15 +46,16 @@ export class Barrier {
* Wait for all threads to reach the barrier.
* Blocks until all threads reach the barrier.
*/
async wait(): Promise<void> {
async wait({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
signal?.throwIfAborted();
this.#rest -= 1;
if (this.#rest === 0) {
await Promise.all([
this.#notify.notified(),
this.#notify.notified({ signal }),
this.#notify.notifyAll(),
]);
} else {
await this.#notify.notified();
await this.#notify.notified({ signal });
}
}
}
Loading

0 comments on commit 0b3dd81

Please sign in to comment.