Skip to content

Commit

Permalink
fix: allow async-iter fetchers do clean up on early return/break
Browse files Browse the repository at this point in the history
  • Loading branch information
jiftechnify committed Oct 4, 2024
1 parent 70626e5 commit f79e6ef
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 8 deletions.
9 changes: 5 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"private": true,
"license": "MIT",
"type": "module",
"workspaces": ["packages/*"],
"workspaces": [
"packages/*"
],
"scripts": {
"lint": "turbo lint --filter=!@nostr-fetch/examples",
"fix": "turbo fix --filter=!@nostr-fetch/examples",
Expand Down Expand Up @@ -33,7 +35,7 @@
"prettier": "^3.0.0",
"tsx": "^4.6.2",
"turbo": "^1.11.1",
"typescript": "^5.1.0",
"typescript": "^5.6.2",
"vitest": "^2.1.2",
"vitest-websocket-mock": "^0.4.0",
"websocket-polyfill": "^0.0.3",
Expand Down
59 changes: 57 additions & 2 deletions packages/nostr-fetch/src/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
getKeysOfEvent,
initDefaultRelayCapChecker,
initSeenEvents,
makeBreakableSignal,
} from "./fetcherHelper";
import {
type FetchFilter,
Expand Down Expand Up @@ -502,7 +503,29 @@ export class NostrFetcher {
};
this.#debugLogger?.log("verbose", "finalOpts=%O", finalOpts);

return this.#allEventsIterBody(relayUrls, filter, timeRangeFilter, finalOpts);
return this.#allEventsIterWithCleanupOnBreak(relayUrls, filter, timeRangeFilter, finalOpts);
}

async *#allEventsIterWithCleanupOnBreak<SeenOn extends boolean>(
relayUrls: string[],
filter: FetchFilter,
timeRangeFilter: FetchTimeRangeFilter,
options: Required<AllEventsIterOptions<SeenOn>>,
): AsyncIterable<NostrEventExt<SeenOn>> {
const [breakableSig, breakSignal] = makeBreakableSignal(options.abortSignal);
try {
yield* this.#allEventsIterBody(relayUrls, filter, timeRangeFilter, {
...options,
abortSignal: breakableSig,
});
} finally {
// this block will be executed when:
// - iteration is finished normally (whether throws or not)
// - iteration is terminated early by break/return in a for-await-of loop
//
// in second case, breakSignal() causes the "cleanup" (abortion of event fetching by backend).
breakSignal();
}
}

async *#allEventsIterBody<SeenOn extends boolean>(
Expand Down Expand Up @@ -1047,7 +1070,39 @@ export class NostrFetcher {
skipVerification: filledOpts.skipVerification || filledOpts.reduceVerification,
};

return this.#fetchLatestEventPerKeyBody(keyName, keysAndRelays, otherFilter, limit, finalOpts);
return this.#fetchLatestEventPerKeyWithCleanupOnBreak(
keyName,
keysAndRelays,
otherFilter,
limit,
finalOpts,
);
}

async *#fetchLatestEventPerKeyWithCleanupOnBreak<
K extends FetchFilterKeyName,
SeenOn extends boolean = false,
>(
keyName: K,
keysAndRelays: KeysAndRelays<K>,
otherFilter: FetchFilter,
limit: number,
options: Required<FetchLatestOptions<SeenOn>>,
): AsyncIterable<NostrEventListWithKey<K, SeenOn>> {
const [breakableSig, breakSignal] = makeBreakableSignal(options.abortSignal);
try {
yield* this.#fetchLatestEventPerKeyBody(keyName, keysAndRelays, otherFilter, limit, {
...options,
abortSignal: breakableSig,
});
} finally {
// this block will be executed when:
// - iteration is finished normally (whether throws or not)
// - iteration is terminated early by break/return in a for-await-of loop
//
// in second case, breakSignal() causes the "cleanup" (abortion of event fetching by backend).
breakSignal();
}
}

async *#fetchLatestEventPerKeyBody<K extends FetchFilterKeyName, SeenOn extends boolean = false>(
Expand Down
40 changes: 40 additions & 0 deletions packages/nostr-fetch/src/fetcherHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,46 @@ export class KeyRelayMatrix<K extends string | number, V> {
}
}

/**
* Takes an `AbortSignal` and makes a new `AbortSignal` and a "breaker" function.
*
* The signal returned is aborted when:
* - the original signal is aborted, or
* - the "breaker" function is called.
*
* The "breaker" function is designed to be called in a `finally` block:
* ```ts
* const f = (origSignal: AbortSignal) => {
* const [signal, breakSignal] = makeBreakableSignal(origSignal);
* try {
* // ...
* } finally {
* breakSignal();
* }
* }
*/
export const makeBreakableSignal = (signal?: AbortSignal): [AbortSignal, () => void] => {
const ac = new AbortController();

if (signal === undefined) {
return [ac.signal, () => ac.abort()];
}

if (AbortSignal.any !== undefined) {
const breakable = AbortSignal.any([signal, ac.signal]);
return [breakable, () => ac.abort()];
}

const onAbort = () => {
ac.abort();
};
signal.addEventListener("abort", onAbort, { once: true });
ac.signal.addEventListener("abort", () => {
signal.removeEventListener("abort", onAbort);
});
return [ac.signal, () => ac.abort()];
};

export interface RelayCapabilityChecker {
relaySupportsNips(relayUrl: string, requiredNips: number[]): Promise<boolean>;
}
Expand Down

0 comments on commit f79e6ef

Please sign in to comment.