Skip to content

Commit

Permalink
feat(create): adds subscribe; renames waitUntil to until; reafactors …
Browse files Browse the repository at this point in the history
…until and wait

BREAKING CHANGE: waitUntil has been renamed to until; takes ms as a third parameter instead of as
second; most other create functions have been removed
  • Loading branch information
rafamel committed Oct 31, 2019
1 parent a05e28d commit aab4ea4
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/create/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './subscribe';
export * from './until';
export * from './wait';
30 changes: 30 additions & 0 deletions src/create/subscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { AbstractObservable } from '~/types';
import { until } from './until';

/**
* Subscribe to an observable and resolve/reject with its first value.
* It will reject it the observable completes before emitting any values.
*/
export function subscribe<T>(observable: AbstractObservable<T>): Promise<T> {
return new Promise((resolve, reject) => {
const subscription = observable.subscribe({
next: (value) => {
resolve(value);
unsubscribe();
},
error: (error) => {
reject(error);
unsubscribe();
},
complete: () => {
reject(Error(`Source completed without emitting any values`));
unsubscribe();
}
});
function unsubscribe(): void {
until(() => Boolean(subscription), true).then(() =>
subscription.unsubscribe()
);
}
});
}
26 changes: 26 additions & 0 deletions src/create/until.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Will not resolve until `test` returns `true`, running it every `ms`
* milliseconds. If `safe` is `true`, it will treat `test` throws and
* rejections as `false`, instead of rejecting itself.
*/
export function until(
test: () => boolean | Promise<boolean>,
safe?: boolean,
ms: number = 25
): Promise<void> {
return new Promise((resolve, reject) => {
const reset = (): any => setTimeout(trunk, ms);

trunk();
function trunk(): void {
try {
Promise.resolve(test()).then(
(value) => (value ? resolve() : reset()),
(reason) => (safe ? reset() : reject(reason))
);
} catch (err) {
safe ? reset() : reject(err);
}
}
});
}
5 changes: 4 additions & 1 deletion src/create/wait.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export default function wait(ms: number): Promise<void> {
/**
* Will wait for `ms` milliseconds before resolving with an empty response.
*/
export function wait(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './create';
export * from './utils';
export * from './types';
13 changes: 13 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export interface AbstractObservable<T> {
subscribe: (observer: AbstractObserver<T>) => AbstractSubscription;
}

export interface AbstractObserver<T> {
next: (value: T) => void;
error: (error: any) => void;
complete: () => void;
}

export interface AbstractSubscription {
unsubscribe: () => void;
}

0 comments on commit aab4ea4

Please sign in to comment.