Skip to content

Commit 89332af

Browse files
committed
use an observable of the stop method
1 parent 69edf32 commit 89332af

File tree

1 file changed

+15
-13
lines changed
  • x-pack/plugins/reporting_notifier/public

1 file changed

+15
-13
lines changed

x-pack/plugins/reporting_notifier/public/plugin.tsx

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*/
66

77
import * as Rx from 'rxjs';
8-
import { catchError, filter, map, mergeMap } from 'rxjs/operators';
8+
import { catchError, filter, map, mergeMap, takeUntil } from 'rxjs/operators';
99
import { i18n } from '@kbn/i18n';
1010
import {
1111
CoreSetup,
@@ -34,7 +34,6 @@ function handleError(
3434
notifications: NotificationsService,
3535
err: Error
3636
): Rx.Observable<JobStatusBuckets> {
37-
// show general toast, log the error and resume
3837
notifications.toasts.addDanger(
3938
getGeneralErrorToast(
4039
i18n.translate('xpack.reportingNotifier.pollingErrorMessage', {
@@ -48,7 +47,7 @@ function handleError(
4847
}
4948

5049
export class ReportingNotifierPublicPlugin implements Plugin<any, any> {
51-
private poller$: Rx.Observable<JobStatusBuckets> | null = null;
50+
private readonly stop$ = new Rx.ReplaySubject(1);
5251

5352
// FIXME: License checking: only active, non-expired licenses allowed
5453
// Depends on https://github.com/elastic/kibana/pull/44922
@@ -62,18 +61,21 @@ export class ReportingNotifierPublicPlugin implements Plugin<any, any> {
6261
const { http, notifications } = core;
6362
const streamHandler = new StreamHandler(http, notifications);
6463

65-
this.poller$ = Rx.timer(0, JOBS_REFRESH_INTERVAL).pipe(
66-
map(() => getStored()), // Read all pending job IDs from session storage
67-
filter(storedJobs => storedJobs.length > 0), // stop the pipeline here if there are none pending
68-
mergeMap(storedJobs => streamHandler.findChangedStatusJobs(storedJobs)), // look up the latest status of all pending jobs on the server
69-
mergeMap(({ completed, failed }) => streamHandler.showNotifications({ completed, failed })),
70-
catchError(err => handleError(notifications, err))
71-
);
72-
73-
this.poller$.subscribe();
64+
Rx.timer(0, JOBS_REFRESH_INTERVAL)
65+
.pipe(
66+
takeUntil(this.stop$), // stop the interval when stop method is called
67+
map(() => getStored()), // read all pending job IDs from session storage
68+
filter(storedJobs => storedJobs.length > 0), // stop the pipeline here if there are none pending
69+
mergeMap(storedJobs => streamHandler.findChangedStatusJobs(storedJobs)), // look up the latest status of all pending jobs on the server
70+
mergeMap(({ completed, failed }) => streamHandler.showNotifications({ completed, failed })),
71+
catchError(err => handleError(notifications, err))
72+
)
73+
.subscribe();
7474
}
7575

76-
public stop() {} // cancel poller$
76+
public stop() {
77+
this.stop$.next();
78+
}
7779
}
7880

7981
export type Setup = ReturnType<ReportingNotifierPublicPlugin['setup']>;

0 commit comments

Comments
 (0)