Skip to content

Commit

Permalink
feat: added AsyncPusher
Browse files Browse the repository at this point in the history
- added fieldValueIsAvailableValidator
  • Loading branch information
dereekb committed May 12, 2022
1 parent db9a4d3 commit 8cb2052
Show file tree
Hide file tree
Showing 13 changed files with 359 additions and 13 deletions.
3 changes: 3 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ jobs:
- run:
name: run dbx-core tests
command: npx nx test dbx-core
- run:
name: run dbx-analytics tests
command: npx nx test dbx-analytics
- run:
name: run dbx-web tests
command: npx nx test dbx-web
Expand Down
1 change: 0 additions & 1 deletion packages/dbx-analytics/jest.config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
module.exports = {
displayName: 'dbx-analytics',

setupFilesAfterEnv: ['<rootDir>/src/test-setup.ts'],
globals: {
'ts-jest': {
Expand Down
1 change: 0 additions & 1 deletion packages/dbx-core/jest.config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
module.exports = {
displayName: 'dbx-core',

setupFilesAfterEnv: ['<rootDir>/src/test-setup.ts'],
globals: {
'ts-jest': {
Expand Down
2 changes: 1 addition & 1 deletion packages/dbx-form/src/lib/formly/formly.context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export function ProvideFormlyContext(): Provider[] {
/**
* DbxForm Instance that registers a delegate and manages the state of that form/delegate.
*/
export class DbxFormlyContext<T> implements DbxForm<T> {
export class DbxFormlyContext<T = any> implements DbxForm<T> {

readonly lockSet = new LockSet();

Expand Down
22 changes: 22 additions & 0 deletions packages/dbx-form/src/lib/formly/template/available.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { FormlyFieldConfig } from '@ngx-formly/core/lib/core';
import { FieldValueIsAvailableValidatorConfig, fieldValueIsAvailableValidator } from '../../validator/available';
import { textField, TextFieldConfig } from '../field/value/text/text.field';

export interface TextAvailableFieldConfig extends TextFieldConfig, Omit<FieldValueIsAvailableValidatorConfig<string>, 'message'> {
isNotAvailableErrorMessage?: string;
}

export function textIsAvailableField(config: TextAvailableFieldConfig): FormlyFieldConfig {
const field = textField(config);

field.asyncValidators = {
validation: [{
expression: fieldValueIsAvailableValidator({
...config,
message: config?.isNotAvailableErrorMessage
}),
}]
};

return field;
}
1 change: 1 addition & 0 deletions packages/dbx-form/src/lib/formly/template/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './login';
export * from './available';
57 changes: 57 additions & 0 deletions packages/dbx-form/src/lib/validator/available.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { switchMap } from 'rxjs/operators';
import { firstValueFrom, map, Observable } from 'rxjs';
import { AbstractControl, AsyncValidatorFn } from "@angular/forms";
import { asyncPusherCache, tapLog } from '@dereekb/rxjs';

export const FIELD_VALUE_IS_AVAILABLE_VALIDATION_KEY = 'fieldValueIsAvailable';

export type FieldValueIsAvailableValidatorFunction<T> = (value: T) => Observable<boolean>;

export interface FieldValueIsAvailableValidatorConfig<T> {

/**
* How long to wait in between value changes.
*/
debounceTime?: number;

/**
* Returns an observable that checks whether or not the value is currently available.
*
* @param value
*/
readonly checkValueIsAvailable: FieldValueIsAvailableValidatorFunction<T>;

/**
* Custom message for this validator.
*/
message?: string;

}

/**
* Validator for validating all values within an object.
*
* This is useful for validating a control group where two or more values are expected to be the same, such as a password and a password verification field.
*
* @param config
* @returns
*/
export function fieldValueIsAvailableValidator<T>(config: FieldValueIsAvailableValidatorConfig<T>): AsyncValidatorFn {
const {
checkValueIsAvailable,
message = 'This value is not available.'
} = config;

const pusher = asyncPusherCache();
return (control: AbstractControl) => firstValueFrom(pusher(control.valueChanges)(control.value).pipe(
switchMap(() => checkValueIsAvailable(control.value)),
map((isAvailable) => {
if (isAvailable) {
return null;
} else {
return {
[FIELD_VALUE_IS_AVAILABLE_VALIDATION_KEY]: { message }
};
}
})));
}
2 changes: 2 additions & 0 deletions packages/dbx-form/src/lib/validator/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './boolean';
export * from './email';
export * from './field';
export * from './number';
export * from './available';
1 change: 1 addition & 0 deletions packages/rxjs/src/lib/rxjs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export * from './misc';
export * from './map';
export * from './number';
export * from './rxjs';
export * from './rxjs.async';
export * from './rxjs.map';
export * from './set';
export * from './use';
Expand Down
117 changes: 117 additions & 0 deletions packages/rxjs/src/lib/rxjs/rxjs.async.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import { Subject } from 'rxjs';
import { SubscriptionObject } from '../subscription';
import { asyncPusher, AsyncPusher, asyncPusherCache } from './rxjs.async';

describe('async pusher', () => {

let pusher: AsyncPusher<number>;
let sub: SubscriptionObject;

beforeEach(() => {
sub = new SubscriptionObject();
});

afterEach(() => {
if (pusher) {
pusher.destroy();
}

sub.destroy();
});

describe('asyncPusher()', () => {

it('should create an AsyncPusher', () => {
pusher = asyncPusher();

expect(pusher).toBeDefined();
expect(typeof pusher).toBe('function');
expect(pusher.destroy).toBeDefined();
expect(pusher.watchForCleanup).toBeDefined();
});


describe('function', () => {

it('should return an observable that emits the value.', (done) => {
const pusher = asyncPusher<number>();

const obs = pusher(10);
sub.subscription = obs.subscribe((value) => {
expect(value).toBe(10);
done();
});

});

it('should return an observable that throttles values.', (done) => {

const pusher = asyncPusher<number>();

const obs = pusher(10);
pusher(20);
pusher(30);
pusher(40);

const expectedValue = 50;
pusher(expectedValue);

sub.subscription = obs.subscribe((value) => {
expect(value).toBe(expectedValue);
done();
});

});

describe('watchForCleanup()', () => {

it('should call destroy when the input observable completes.', (done) => {
const pusher = asyncPusher<number>();

const subjectToWatchForCleanup = new Subject();
pusher.watchForCleanup(subjectToWatchForCleanup);
subjectToWatchForCleanup.complete();

sub.subscription = pusher._subject.subscribe({
complete: () => {
done();
}
});
});

});

});

});

describe('asyncPusherCache()', () => {

it('should create a cache that contains the AsyncPusher', () => {
const cache = asyncPusherCache<number>();
pusher = cache();

expect(pusher).toBeDefined();
expect(typeof pusher).toBe('function');
expect(pusher.destroy).toBeDefined();
expect(pusher.watchForCleanup).toBeDefined();
});

it('should create a cache that contains the AsyncPusher and', (done) => {
const cache = asyncPusherCache<number>();

const subjectToWatchForCleanup = new Subject();
pusher = cache(subjectToWatchForCleanup);

subjectToWatchForCleanup.complete();

sub.subscription = pusher._subject.subscribe({
complete: () => {
done();
}
});
});

});

});
121 changes: 121 additions & 0 deletions packages/rxjs/src/lib/rxjs/rxjs.async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { CachedFactoryWithInput, cachedGetter, Destroyable } from '@dereekb/util';
import { throttleTime, distinctUntilChanged, BehaviorSubject, Observable, Subject } from 'rxjs';
import { SubscriptionObject } from '../subscription';
import { skipFirstMaybe } from './value';

/**
* Default amount of throttle in milliseconds used by AsyncPusher.
*/
export const DEFAULT_ASYNC_PUSHER_THROTTLE = 200;

/**
* Special function that when called pushes a value onto an internal subject, and returns an observable.
*
* This is useful for cases where a function may get called and subscribes to an observable each time, but we need to throttle that.
*/
export type AsyncPusher<T> = ((value: T) => Observable<T>) & Destroyable & {

/**
* Configures the pusher to watch this input observable for complete.
*
* @param obs
*/
watchForCleanup(obs: Observable<any>): void;

/**
* The internal subject.
*/
readonly _subject: Subject<T>;

};

export interface AsyncPusherConfig<T> {
/**
* Time to throttle each emission.
*/
throttle?: number;
/**
* Whether or not to filter on distinct values.
*/
distinct?: boolean;
/**
* Configuration function to build onto the internal observable.
*/
pipe?: (obs: Observable<T>) => Observable<T>;
/**
* (Optional) Observable to watch for cleaunup.
*/
cleanupObs?: Observable<any>;
}

/**
* Creates an AsyncPusher.
*
* @param config
* @returns
*/
export function asyncPusher<T>(config: AsyncPusherConfig<T> = {}): AsyncPusher<T> {
const { throttle, cleanupObs, distinct = true, pipe: pipeObs } = config;

const _subject = new BehaviorSubject<T>(undefined as any);
const _sub = new SubscriptionObject();

let obs: Observable<T> = _subject.pipe(
skipFirstMaybe(),
throttleTime(throttle ?? DEFAULT_ASYNC_PUSHER_THROTTLE, undefined, { leading: true, trailing: true })
) as Observable<T>;

if (distinct) {
obs = obs.pipe(distinctUntilChanged());
}

if (pipeObs) {
obs = pipeObs(obs);
}

const pusher: AsyncPusher<T> = ((value: T) => {
_subject.next(value);
return obs;
}) as AsyncPusher<T>;

pusher.destroy = () => {
_subject.complete();
_sub.destroy();
};

pusher.watchForCleanup = (obs: Observable<any>) => {
_sub.subscription = obs.subscribe({
complete: () => {
pusher.destroy();
}
});
};

(pusher as any)._subject = _subject;

if (cleanupObs) {
pusher.watchForCleanup(cleanupObs);
}

return pusher;
}

/**
* Creates a cache that returns an AsyncPusher.
*
* The CachedFactoryWithInput resunt can optionally be pass an observable to watch for the cleanup process.
*
* @param config
* @returns
*/
export function asyncPusherCache<T>(config?: AsyncPusherConfig<T>): CachedFactoryWithInput<AsyncPusher<T>, Observable<any>> {
return cachedGetter((cleanupObs?: Observable<any>) => {
const pusher = asyncPusher(config);

if (cleanupObs) {
pusher.watchForCleanup(cleanupObs);
}

return pusher;
});
}
4 changes: 2 additions & 2 deletions packages/rxjs/src/lib/rxjs/value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ export function filterMaybe<T>(): OperatorFunction<Maybe<T>, T> {
/**
* Skips all initial maybe values, and then returns all values after the first non-null/undefined value is returned.
*/
export function skipFirstMaybe<T>(): MonoTypeOperatorFunction<Maybe<T>> {
return skipWhile((x: Maybe<T>) => (x == null));
export function skipFirstMaybe<T>(): MonoTypeOperatorFunction<T> {
return skipWhile((x: T) => (x == null));
}

/**
Expand Down
Loading

0 comments on commit 8cb2052

Please sign in to comment.