Skip to content
This repository was archived by the owner on May 29, 2023. It is now read-only.

fix: close all subscriptions, if the worker was terminated #3

Merged
merged 2 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions projects/workers/src/worker/classes/web-worker.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,25 @@ describe('WebWorker', () => {
'Uncaught reason',
);
});

it('should close all subscriptions, if the worker was terminated', async () => {
const worker = WebWorker.fromFunction<void, string>(() => 'some data');

const subscriptions = [
worker.subscribe(),
worker.subscribe(),
worker.subscribe(),
];

worker.terminate();
expect(subscriptions.map(s => s.closed)).toEqual([true, true, true]);
});

it("shouldn't throw any errors, if the worker was terminated twice", async () => {
const worker = WebWorker.fromFunction<void, string>(() => 'some data');

worker.terminate();
worker.terminate();
expect(await worker.toPromise()).toBeUndefined();
});
});
45 changes: 30 additions & 15 deletions projects/workers/src/worker/classes/web-worker.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import {EMPTY, fromEvent, merge, Observable} from 'rxjs';
import {take, tap} from 'rxjs/operators';
import {EMPTY, fromEvent, merge, Observable, Subject} from 'rxjs';
import {take, takeUntil, tap} from 'rxjs/operators';
import {WORKER_BLANK_FN} from '../consts/worker-fn-template';
import {TypedMessageEvent} from '../types/typed-message-event';
import {WorkerFunction} from '../types/worker-function';

export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>> {
private readonly worker: Worker | undefined;
private readonly url: string;
private readonly destroy$: Subject<void>;

constructor(url: string, options?: WorkerOptions) {
let worker: Worker | undefined;
Expand All @@ -19,26 +20,29 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
}

super(subscriber => {
let eventStream$: Observable<TypedMessageEvent<R> | ErrorEvent> = EMPTY;

if (error) {
subscriber.error(error);
} else if (this.destroy$.isStopped) {
subscriber.complete();
} else if (worker) {
eventStream$ = merge(
fromEvent<TypedMessageEvent<R>>(worker, 'message').pipe(
tap(event => subscriber.next(event)),
),
fromEvent<ErrorEvent>(worker, 'error').pipe(
tap(event => subscriber.error(event)),
),
).pipe(takeUntil(this.destroy$));
}

const eventStream$ = worker
? merge(
fromEvent<TypedMessageEvent<R>>(worker, 'message').pipe(
tap(event => subscriber.next(event)),
),
fromEvent<ErrorEvent>(worker, 'error').pipe(
tap(event => subscriber.error(event)),
),
)
: EMPTY;

return eventStream$.subscribe();
eventStream$.subscribe().add(subscriber);
});

this.worker = worker;
this.url = url;
this.destroy$ = new Subject<void>();
}

static fromFunction<T, R>(
Expand All @@ -57,7 +61,11 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>

worker.postMessage(data);

return promise;
return promise.then(result => {
worker.terminate();

return result;
});
}

private static createFnUrl(fn: WorkerFunction): string {
Expand All @@ -69,11 +77,18 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
}

terminate() {
if (this.destroy$.isStopped) {
return;
}

if (this.worker) {
this.worker.terminate();
}

URL.revokeObjectURL(this.url);

this.destroy$.next();
this.destroy$.complete();
}

postMessage(value: T) {
Expand Down
3 changes: 2 additions & 1 deletion projects/workers/src/worker/pipes/worker.pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ export class WorkerPipe implements PipeTransform {
private observers = new WeakMap<WebWorker, Observable<any>>();

transform<T, R>(value: T, fn: WorkerFunction<T, R>): Observable<R> {
const worker = this.workers.get(fn) || WebWorker.fromFunction(fn);
const worker: WebWorker<T, R> =
this.workers.get(fn) || WebWorker.fromFunction(fn);

this.workers.set(fn, worker);
worker.postMessage(value);
Expand Down