diff --git a/package-lock.json b/package-lock.json index 153229c..6bc4a18 100644 --- a/package-lock.json +++ b/package-lock.json @@ -28,7 +28,7 @@ "prettier": "^3.0.0", "tsx": "^4.6.2", "turbo": "^1.11.1", - "typescript": "^5.1.0", + "typescript": "^5.6.2", "vitest": "^2.1.2", "vitest-websocket-mock": "^0.4.0", "websocket-polyfill": "^0.0.3", @@ -9820,10 +9820,11 @@ } }, "node_modules/typescript": { - "version": "5.4.3", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.4.3.tgz", - "integrity": "sha512-KrPd3PKaCLr78MalgiwJnA25Nm8HAmdwN3mYUYZgG/wizIo9EainNVQI9/yDavtVFRN2h3k8uf3GLHuhDMgEHg==", + "version": "5.6.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.6.2.tgz", + "integrity": "sha512-NW8ByodCSNCwZeghjN3o+JX5OFH0Ojg6sadjEKY4huZ52TqbJTJnDo5+Tw98lSy63NZvi4n+ez5m2u5d4PkZyw==", "devOptional": true, + "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" diff --git a/package.json b/package.json index 9256b8b..c0489a1 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,9 @@ "private": true, "license": "MIT", "type": "module", - "workspaces": ["packages/*"], + "workspaces": [ + "packages/*" + ], "scripts": { "lint": "turbo lint --filter=!@nostr-fetch/examples", "fix": "turbo fix --filter=!@nostr-fetch/examples", @@ -33,7 +35,7 @@ "prettier": "^3.0.0", "tsx": "^4.6.2", "turbo": "^1.11.1", - "typescript": "^5.1.0", + "typescript": "^5.6.2", "vitest": "^2.1.2", "vitest-websocket-mock": "^0.4.0", "websocket-polyfill": "^0.0.3", diff --git a/packages/nostr-fetch/src/fetcher.ts b/packages/nostr-fetch/src/fetcher.ts index 19036de..78f6b47 100644 --- a/packages/nostr-fetch/src/fetcher.ts +++ b/packages/nostr-fetch/src/fetcher.ts @@ -35,6 +35,7 @@ import { getKeysOfEvent, initDefaultRelayCapChecker, initSeenEvents, + makeBreakableSignal, } from "./fetcherHelper"; import { type FetchFilter, @@ -502,7 +503,29 @@ export class NostrFetcher { }; this.#debugLogger?.log("verbose", "finalOpts=%O", finalOpts); - return this.#allEventsIterBody(relayUrls, filter, timeRangeFilter, finalOpts); + return this.#allEventsIterWithCleanupOnBreak(relayUrls, filter, timeRangeFilter, finalOpts); + } + + async *#allEventsIterWithCleanupOnBreak( + relayUrls: string[], + filter: FetchFilter, + timeRangeFilter: FetchTimeRangeFilter, + options: Required>, + ): AsyncIterable> { + const [breakableSig, breakSignal] = makeBreakableSignal(options.abortSignal); + try { + yield* this.#allEventsIterBody(relayUrls, filter, timeRangeFilter, { + ...options, + abortSignal: breakableSig, + }); + } finally { + // this block will be executed when: + // - iteration is finished normally (whether throws or not) + // - iteration is terminated early by break/return in a for-await-of loop + // + // in second case, breakSignal() causes the "cleanup" (abortion of event fetching by backend). + breakSignal(); + } } async *#allEventsIterBody( @@ -1047,7 +1070,39 @@ export class NostrFetcher { skipVerification: filledOpts.skipVerification || filledOpts.reduceVerification, }; - return this.#fetchLatestEventPerKeyBody(keyName, keysAndRelays, otherFilter, limit, finalOpts); + return this.#fetchLatestEventPerKeyWithCleanupOnBreak( + keyName, + keysAndRelays, + otherFilter, + limit, + finalOpts, + ); + } + + async *#fetchLatestEventPerKeyWithCleanupOnBreak< + K extends FetchFilterKeyName, + SeenOn extends boolean = false, + >( + keyName: K, + keysAndRelays: KeysAndRelays, + otherFilter: FetchFilter, + limit: number, + options: Required>, + ): AsyncIterable> { + const [breakableSig, breakSignal] = makeBreakableSignal(options.abortSignal); + try { + yield* this.#fetchLatestEventPerKeyBody(keyName, keysAndRelays, otherFilter, limit, { + ...options, + abortSignal: breakableSig, + }); + } finally { + // this block will be executed when: + // - iteration is finished normally (whether throws or not) + // - iteration is terminated early by break/return in a for-await-of loop + // + // in second case, breakSignal() causes the "cleanup" (abortion of event fetching by backend). + breakSignal(); + } } async *#fetchLatestEventPerKeyBody( diff --git a/packages/nostr-fetch/src/fetcherHelper.ts b/packages/nostr-fetch/src/fetcherHelper.ts index 76c4bf0..eb914d3 100644 --- a/packages/nostr-fetch/src/fetcherHelper.ts +++ b/packages/nostr-fetch/src/fetcherHelper.ts @@ -243,6 +243,46 @@ export class KeyRelayMatrix { } } +/** + * Takes an `AbortSignal` and makes a new `AbortSignal` and a "breaker" function. + * + * The signal returned is aborted when: + * - the original signal is aborted, or + * - the "breaker" function is called. + * + * The "breaker" function is designed to be called in a `finally` block: + * ```ts + * const f = (origSignal: AbortSignal) => { + * const [signal, breakSignal] = makeBreakableSignal(origSignal); + * try { + * // ... + * } finally { + * breakSignal(); + * } + * } + */ +export const makeBreakableSignal = (signal?: AbortSignal): [AbortSignal, () => void] => { + const ac = new AbortController(); + + if (signal === undefined) { + return [ac.signal, () => ac.abort()]; + } + + if (AbortSignal.any !== undefined) { + const breakable = AbortSignal.any([signal, ac.signal]); + return [breakable, () => ac.abort()]; + } + + const onAbort = () => { + ac.abort(); + }; + signal.addEventListener("abort", onAbort, { once: true }); + ac.signal.addEventListener("abort", () => { + signal.removeEventListener("abort", onAbort); + }); + return [ac.signal, () => ac.abort()]; +}; + export interface RelayCapabilityChecker { relaySupportsNips(relayUrl: string, requiredNips: number[]): Promise; }