Skip to content

Commit

Permalink
feat: change option name: "abortSignal" -> "signal"
Browse files Browse the repository at this point in the history
  • Loading branch information
jiftechnify committed Oct 4, 2024
1 parent bc55b11 commit 3d5e8f8
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 33 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ const fetcher = NostrFetcher.withCustomPool(simplePoolAdapter(pool));
| [`@nostr-dev-kit/ndk`](https://github.com/nostr-dev-kit/ndk) | `NDK` | `@nostr-fetch/adapter-ndk` | `ndkAdapter` |
| [`rx-nostr`](https://github.com/penpenpng/rx-nostr) | `RxNostr` | `@nostr-fetch/adapter-rx-nostr` | `rxNostrAdapter` |

### Cancelling by AbortController
### Cancelling by AbortSignal

```ts
import { NostrFecher } from "nostr-fetch"
Expand All @@ -189,8 +189,8 @@ const evIter = fetcher.allEventsIterator(
relayUrls,
{/* filter */},
{/* time range */},
/* pass an `AbortSsignal` here to enable abortion! */
{ abortSignal: AbortSignal.timeout(1000) },
/* pass an `AbortSignal` here to enable cancellation! */
{ signal: AbortSignal.timeout(1000) },
);

for await (const ev of evIter) {
Expand Down
6 changes: 4 additions & 2 deletions packages/adapter-ndk/src/adapter.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { FetchTillEoseOptions } from "@nostr-fetch/kernel/fetcherBackend";
import { noopVerifier } from "@nostr-fetch/kernel/utils";
import { collectAsyncIterUntilThrow } from "@nostr-fetch/testutil/asyncIter";
import { setupMockRelayServer } from "@nostr-fetch/testutil/mockRelayServer";
import { NDKAdapter } from "./adapter";
Expand All @@ -12,7 +13,8 @@ describe("NDKAdapter", () => {
describe("fetchTillEose", () => {
// `skipVerification` has no effect.
const defaultOpts: FetchTillEoseOptions = {
abortSignal: undefined,
eventVerifier: noopVerifier,
signal: undefined,
abortSubBeforeEoseTimeoutMs: 5000,
connectTimeoutMs: 1000,
skipVerification: false,
Expand Down Expand Up @@ -101,7 +103,7 @@ describe("NDKAdapter", () => {
}, 500);

await backend.ensureRelays([url], { connectTimeoutMs: 1000 });
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ abortSignal: ac.signal }));
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ signal: ac.signal }));
const evs = await collectAsyncIterUntilThrow(iter);
expect(evs.length).toBeLessThan(10);

Expand Down
6 changes: 4 additions & 2 deletions packages/adapter-nostr-relaypool/src/adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { collectAsyncIterUntilThrow } from "@nostr-fetch/testutil/asyncIter";
import { setupMockRelayServer } from "@nostr-fetch/testutil/mockRelayServer";
import { NRTPoolAdapter } from "./adapter";

import { noopVerifier } from "nostr-fetch";
import { RelayPool } from "nostr-relaypool";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { WS } from "vitest-websocket-mock";
Expand All @@ -12,7 +13,8 @@ describe.skip("NRTPoolAdapter", () => {
describe("fetchTillEose", () => {
// `skipVerification` has no effect.
const defaultOpts: FetchTillEoseOptions = {
abortSignal: undefined,
eventVerifier: noopVerifier,
signal: undefined,
abortSubBeforeEoseTimeoutMs: 5000,
connectTimeoutMs: 1000,
skipVerification: false,
Expand Down Expand Up @@ -114,7 +116,7 @@ describe.skip("NRTPoolAdapter", () => {
}, 500);

await backend.ensureRelays([url], { connectTimeoutMs: 1000 });
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ abortSignal: ac.signal }));
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ signal: ac.signal }));
const evs = await collectAsyncIterUntilThrow(iter);
expect(evs.length).toBeLessThan(10);

Expand Down
6 changes: 4 additions & 2 deletions packages/adapter-nostr-tools-v2/src/adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { collectAsyncIterUntilThrow } from "@nostr-fetch/testutil/asyncIter";
import { setupMockRelayServer } from "@nostr-fetch/testutil/mockRelayServer";
import { SimplePoolAdapter } from "./adapter";

import { noopVerifier } from "@nostr-fetch/kernel/utils";
import { SimplePool, useWebSocketImplementation } from "nostr-tools/pool";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { WS } from "vitest-websocket-mock";
Expand All @@ -15,7 +16,8 @@ useWebSocketImplementation(WebSocket);
describe("SimplePoolAdapter", () => {
describe("fetchTillEose", () => {
const defaultOpts: FetchTillEoseOptions = {
abortSignal: undefined,
eventVerifier: noopVerifier,
signal: undefined,
abortSubBeforeEoseTimeoutMs: 5000,
connectTimeoutMs: 1000,
skipVerification: false,
Expand Down Expand Up @@ -121,7 +123,7 @@ describe("SimplePoolAdapter", () => {
}, 500);

await backend.ensureRelays([url], { connectTimeoutMs: 1000 });
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ abortSignal: ac.signal }));
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ signal: ac.signal }));
const evs = await collectAsyncIterUntilThrow(iter);
expect(evs.length).toBeLessThan(10);

Expand Down
6 changes: 4 additions & 2 deletions packages/adapter-nostr-tools/src/adapter.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { FetchTillEoseOptions, NostrFetcherBackend } from "@nostr-fetch/kernel/fetcherBackend";
import { noopVerifier } from "@nostr-fetch/kernel/utils";
import { collectAsyncIterUntilThrow } from "@nostr-fetch/testutil/asyncIter";
import { setupMockRelayServer } from "@nostr-fetch/testutil/mockRelayServer";
import { SimplePoolExt } from "./adapter";
Expand All @@ -10,7 +11,8 @@ import { WS } from "vitest-websocket-mock";
describe("SimplePoolExt", () => {
describe("fetchTillEose", () => {
const defaultOpts: FetchTillEoseOptions = {
abortSignal: undefined,
eventVerifier: noopVerifier,
signal: undefined,
abortSubBeforeEoseTimeoutMs: 5000,
connectTimeoutMs: 1000,
skipVerification: false,
Expand Down Expand Up @@ -113,7 +115,7 @@ describe("SimplePoolExt", () => {
}, 500);

await backend.ensureRelays([url], { connectTimeoutMs: 1000 });
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ abortSignal: ac.signal }));
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ signal: ac.signal }));
const evs = await collectAsyncIterUntilThrow(iter);
expect(evs.length).toBeLessThan(10);

Expand Down
6 changes: 4 additions & 2 deletions packages/adapter-rx-nostr/src/adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import { RxNostrAdapter } from "./adapter";

import { createRxNostr } from "rx-nostr";

import { noopVerifier } from "@nostr-fetch/kernel/utils";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { WS } from "vitest-websocket-mock";

// FIXME: make tests work
describe.skip("RxNostrAdapter", () => {
describe("fetchTillEose", () => {
const defaultOpts: FetchTillEoseOptions = {
abortSignal: undefined,
eventVerifier: noopVerifier,
signal: undefined,
abortSubBeforeEoseTimeoutMs: 5000,
connectTimeoutMs: 1000,
skipVerification: false,
Expand Down Expand Up @@ -118,7 +120,7 @@ describe.skip("RxNostrAdapter", () => {
}, 500);

await backend.ensureRelays([url], { connectTimeoutMs: 1000 });
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ abortSignal: ac.signal }));
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ signal: ac.signal }));
const evs = await collectAsyncIterUntilThrow(iter);
expect(evs.length).toBeLessThan(10);

Expand Down
2 changes: 1 addition & 1 deletion packages/examples/src/abort.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const main = async () => {
{
since: nHoursAgo(24),
},
{ abortSignal: AbortSignal.timeout(1000) }, // pass an `AbortSignal` to enable cancallation
{ signal: AbortSignal.timeout(1000) }, // pass an `AbortSignal` to enable cancallation
);

for await (const ev of evIter) {
Expand Down
4 changes: 2 additions & 2 deletions packages/kernel/src/adapterHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ export const setupSubscriptionAutoAbortion = (
resetTimer(); // initiate subscription auto abortion timer

// handle abortion by AbortController
if (options.abortSignal?.aborted) {
if (options.signal?.aborted) {
closeSub();
clearTimer();
tx.error(new FetchTillEoseAbortedSignal("subscription aborted by AbortController"));
}
options.abortSignal?.addEventListener("abort", () => {
options.signal?.addEventListener("abort", () => {
closeSub();
clearTimer();
tx.error(new FetchTillEoseAbortedSignal("subscription aborted by AbortController"));
Expand Down
2 changes: 1 addition & 1 deletion packages/kernel/src/fetcherBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export type FetchTillEoseOptions = {
skipFilterMatching: boolean;
connectTimeoutMs: number;
abortSubBeforeEoseTimeoutMs: number;
abortSignal: AbortSignal | undefined;
signal: AbortSignal | undefined;
};

/**
Expand Down
32 changes: 24 additions & 8 deletions packages/nostr-fetch/src/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ export type FetchOptions<SeenOn extends boolean = false> = {
*
* @default undefined
*/
signal?: AbortSignal | undefined;

/**
* The `AbortSignal` used to abort an event fetching.
*
* @default undefined
*
* @deprecated Set `signal` option instead.
*/
abortSignal?: AbortSignal | undefined;

/**
Expand All @@ -208,6 +217,7 @@ const defaultFetchOptions: Required<FetchOptions> = {
statsListener: undefined,
statsNotifIntervalMs: 1000,
connectTimeoutMs: 5000,
signal: undefined,
abortSignal: undefined,
abortSubBeforeEoseTimeoutMs: 10000,
limitPerReq: MAX_LIMIT_PER_REQ,
Expand Down Expand Up @@ -494,9 +504,11 @@ export class NostrFetcher {
...options,
} as Required<AllEventsIterOptions<SeenOn>>;

// use smaller limit if backpressure is enabled
const finalOpts: Required<AllEventsIterOptions<SeenOn>> = {
...filledOpts,
// if only `abortSignal` is specified, copy it to `signal`
signal: filledOpts.signal ?? filledOpts.abortSignal,
// use smaller limit if backpressure is enabled
limitPerReq: filledOpts.enableBackpressure
? Math.min(filledOpts.limitPerReq, MAX_LIMIT_PER_REQ_IN_BACKPRESSURE)
: filledOpts.limitPerReq,
Expand All @@ -512,11 +524,11 @@ export class NostrFetcher {
timeRangeFilter: FetchTimeRangeFilter,
options: Required<AllEventsIterOptions<SeenOn>>,
): AsyncIterable<NostrEventExt<SeenOn>> {
const [breakableSig, breakSignal] = makeBreakableSignal(options.abortSignal);
const [breakableSig, breakSignal] = makeBreakableSignal(options.signal);
try {
yield* this.#allEventsIterBody(relayUrls, filter, timeRangeFilter, {
...options,
abortSignal: breakableSig,
signal: breakableSig,
});
} finally {
// this block will be executed when:
Expand Down Expand Up @@ -646,7 +658,7 @@ export class NostrFetcher {
statsMngr?.setCurrentProgress(progTracker.calcTotalProgress());
}

if (options.abortSignal?.aborted) {
if (options.signal?.aborted) {
// termination contidion 2
logger?.log("info", "aborted");
statsMngr?.setRelayStatus(rurl, "aborted");
Expand Down Expand Up @@ -753,6 +765,8 @@ export class NostrFetcher {
const finalOpts = {
...defaultFetchLatestOptions,
...options,
// if only `abortSignal` is specified, copy it to `signal`
signal: options.signal ?? options.abortSignal,
} as Required<FetchLatestOptions<SeenOn>>;
this.#debugLogger?.log("verbose", "finalOpts=%O", finalOpts);

Expand Down Expand Up @@ -860,7 +874,7 @@ export class NostrFetcher {
statsMngr?.setRelayStatus(rurl, isAboutToAbort ? "aborted" : "completed");
break;
}
if (finalOpts.abortSignal?.aborted) {
if (finalOpts.signal?.aborted) {
// termination condition 3
logger?.log("info", "aborted");
statsMngr?.setRelayStatus(rurl, "aborted");
Expand Down Expand Up @@ -1068,6 +1082,8 @@ export class NostrFetcher {
...filledOpts,
// skip "full" verification if `reduceVerification` is enabled
skipVerification: filledOpts.skipVerification || filledOpts.reduceVerification,
// if only `abortSignal` is specified, copy it to `signal`
signal: filledOpts.signal ?? filledOpts.abortSignal,
};

return this.#fetchLatestEventPerKeyWithCleanupOnBreak(
Expand All @@ -1089,11 +1105,11 @@ export class NostrFetcher {
limit: number,
options: Required<FetchLatestOptions<SeenOn>>,
): AsyncIterable<NostrEventListWithKey<K, SeenOn>> {
const [breakableSig, breakSignal] = makeBreakableSignal(options.abortSignal);
const [breakableSig, breakSignal] = makeBreakableSignal(options.signal);
try {
yield* this.#fetchLatestEventPerKeyBody(keyName, keysAndRelays, otherFilter, limit, {
...options,
abortSignal: breakableSig,
signal: breakableSig,
});
} finally {
// this block will be executed when:
Expand Down Expand Up @@ -1237,7 +1253,7 @@ export class NostrFetcher {
resolveAllOnEarlyBreak();
break;
}
if (options.abortSignal?.aborted) {
if (options.signal?.aborted) {
// termination condition 3
logger?.log("info", "aborted");
statsMngr?.setRelayStatus(rurl, isAboutToAbort ? "aborted" : "completed");
Expand Down
4 changes: 2 additions & 2 deletions packages/nostr-fetch/src/fetcherBackend.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe("DefaultFetcherBackend", () => {
describe("fetchTillEose", () => {
const defaultOpts: FetchTillEoseOptions = {
eventVerifier: verifyEventSig,
abortSignal: undefined,
signal: undefined,
abortSubBeforeEoseTimeoutMs: 5000,
connectTimeoutMs: 1000,
skipVerification: false,
Expand Down Expand Up @@ -143,7 +143,7 @@ describe("DefaultFetcherBackend", () => {
}, 500);

await backend.ensureRelays([url], { connectTimeoutMs: 1000 });
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ abortSignal: ac.signal }));
const iter = backend.fetchTillEose(url, {}, optsWithDefault({ signal: ac.signal }));
const evs = await collectAsyncIterUntilThrow(iter);
expect(evs.length).toBeLessThan(10);

Expand Down
4 changes: 2 additions & 2 deletions packages/nostr-fetch/src/fetcherBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@ export class DefaultFetcherBackend implements NostrFetcherBackend {
}

// handle abortion
if (options.abortSignal?.aborted) {
if (options.signal?.aborted) {
closeSub();
tx.error(
new FetchTillEoseAbortedSignal(
`subscription (id: ${sub.subId}) aborted by AbortController`,
),
);
}
options.abortSignal?.addEventListener("abort", () => {
options.signal?.addEventListener("abort", () => {
closeSub();
tx.error(
new FetchTillEoseAbortedSignal(
Expand Down
19 changes: 15 additions & 4 deletions packages/nostr-fetch/src/testutil/fakedFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,31 @@ class FakeFetcherBackend implements NostrFetcherBackend {

// auto abortion
let subAutoAbortTimer: ReturnType<typeof setTimeout> | undefined;
const resetAutoAbortTimer = () => {
const clearTimer = () => {
if (subAutoAbortTimer !== undefined) {
clearTimeout(subAutoAbortTimer);
subAutoAbortTimer = undefined;
}
};

const resetTimer = () => {
if (subAutoAbortTimer !== undefined) {
clearTimeout(subAutoAbortTimer);
subAutoAbortTimer = undefined;
}
subAutoAbortTimer = setTimeout(() => abortSub(), options.abortSubBeforeEoseTimeoutMs);
};
resetAutoAbortTimer(); // initiate subscription auto abortion timer
resetTimer(); // initiate subscription auto abortion timer

// handle abortion by AbortController
if (options.abortSignal?.aborted) {
if (options.signal?.aborted) {
abortSub();
clearTimer();
}
options.abortSignal?.addEventListener("abort", () => abortSub());
options.signal?.addEventListener("abort", () => {
abortSub();
clearTimer();
});

return iter;
}
Expand Down

0 comments on commit 3d5e8f8

Please sign in to comment.