-
Notifications
You must be signed in to change notification settings - Fork 275
Description
When building with BehaviorSubject, the need arose to be able to clear the subject to reset it to the same state it has after having been instantiated without a seed.
This can be helpful if the subject holds an error and we want to retry the operation that caused it. During the retry, any new listeners still immediately receive the error value, while we want them to have to wait for the first value after the retry.
Related: #233
A simple fix for this is a re-implementation of BehaviorSubject that supports .clear():
/// Clears the subject and removes the last value or error.
void clear() => _wrapper
..value = EMPTY
..isValue = false
..errorAndStackTrace = null;Full Code
```dart // ignore_for_file: implementation_imports, // ignore_for_file: avoid_equals_and_hash_code_on_mutable_classesimport 'dart:async';
import 'package:rxdart/src/rx.dart';
import 'package:rxdart/src/streams/value_stream.dart';
import 'package:rxdart/src/transformers/start_with.dart';
import 'package:rxdart/src/transformers/start_with_error.dart';
import 'package:rxdart/src/utils/empty.dart';
import 'package:rxdart/src/utils/error_and_stacktrace.dart';
import 'package:rxdart/src/utils/notification.dart';
import 'package:rxdart/subjects.dart';
/// A variant of [BehaviorSubject] that allows clearing the subject and removing
/// the last value or error and resets it to the state as if it was just created
/// without a seed value.
///
/// Other than that, this works exactly like [BehaviorSubject].
class ClearableBehaviorSubject extends Subject implements ValueStream {
/// Constructs a [ClearableBehaviorSubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ClearableBehaviorSubject({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final wrapper = _Wrapper<T>();
return ClearableBehaviorSubject<T>._(
controller,
Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),
wrapper,
);
}
/// Constructs a [ClearableBehaviorSubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// [seedValue] becomes the current [value] and is emitted immediately.
///
/// See also [StreamController.broadcast]
factory ClearableBehaviorSubject.seeded(
T seedValue, {
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final wrapper = _Wrapper<T>.seeded(seedValue);
return ClearableBehaviorSubject<T>._(
controller,
Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),
wrapper,
);
}
ClearableBehaviorSubject._(
super.controller,
super.stream,
this._wrapper,
);
final _Wrapper _wrapper;
static Stream Function() _deferStream(
_Wrapper wrapper,
StreamController controller,
bool sync,
) =>
() {
final errorAndStackTrace = wrapper.errorAndStackTrace;
if (errorAndStackTrace != null && !wrapper.isValue) {
return controller.stream.transform(
StartWithErrorStreamTransformer(
errorAndStackTrace.error,
errorAndStackTrace.stackTrace,
),
);
}
final value = wrapper.value;
if (isNotEmpty(value) && wrapper.isValue) {
return controller.stream
.transform(StartWithStreamTransformer(value as T));
}
return controller.stream;
};
@OverRide
void onAdd(T event) => _wrapper.setValue(event);
@OverRide
void onAddError(Object error, [StackTrace? stackTrace]) =>
_wrapper.setError(error, stackTrace);
/// Clears the subject and removes the last value or error.
void clear() => _wrapper.clear();
@OverRide
ValueStream get stream => _Stream(this);
@OverRide
bool get hasValue => isNotEmpty(_wrapper.value);
@OverRide
T get value {
final value = _wrapper.value;
if (isNotEmpty(value)) {
return value as T;
}
throw ValueStreamError.hasNoValue();
}
@OverRide
T? get valueOrNull => unbox(_wrapper.value);
/// Set and emit the new value.
set value(T newValue) => add(newValue);
@OverRide
bool get hasError => _wrapper.errorAndStackTrace != null;
@OverRide
Object? get errorOrNull => _wrapper.errorAndStackTrace?.error;
@OverRide
Object get error {
final errorAndSt = _wrapper.errorAndStackTrace;
if (errorAndSt != null) {
return errorAndSt.error;
}
throw ValueStreamError.hasNoError();
}
@OverRide
StackTrace? get stackTrace => _wrapper.errorAndStackTrace?.stackTrace;
@OverRide
StreamNotification? get lastEventOrNull {
// data event
if (_wrapper.isValue) {
return StreamNotification.data(_wrapper.value as T);
}
// error event
final errorAndSt = _wrapper.errorAndStackTrace;
if (errorAndSt != null) {
return ErrorNotification(errorAndSt);
}
// no event
return null;
}
}
class _Wrapper {
/// Non-seeded constructor
_Wrapper() : isValue = false;
_Wrapper.seeded(T v) {
setValue(v);
}
bool isValue = false;
Object? value = EMPTY;
ErrorAndStackTrace? errorAndStackTrace;
void setValue(T event) {
value = event;
isValue = true;
}
void setError(Object error, StackTrace? stackTrace) {
errorAndStackTrace = ErrorAndStackTrace(error, stackTrace);
isValue = false;
}
void clear() {
value = EMPTY;
isValue = false;
errorAndStackTrace = null;
}
}
class _Stream extends Stream implements ValueStream {
_Stream(this._subject);
final ClearableBehaviorSubject _subject;
@OverRide
bool get isBroadcast => true;
// Override == and hashCode so that new streams returned by the same
// subject are considered equal.
// The subject returns a new stream each time it's queried,
// but doesn't have to cache the result.
@OverRide
int get hashCode => _subject.hashCode ^ 0x35323532;
@OverRide
bool operator ==(Object other) {
if (identical(this, other)) {
return true;
}
return other is _Stream && identical(other._subject, _subject);
}
@OverRide
StreamSubscription listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
_subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
@OverRide
Object get error => _subject.error;
@OverRide
Object? get errorOrNull => _subject.errorOrNull;
@OverRide
bool get hasError => _subject.hasError;
@OverRide
bool get hasValue => _subject.hasValue;
@OverRide
StackTrace? get stackTrace => _subject.stackTrace;
@OverRide
T get value => _subject.value;
@OverRide
T? get valueOrNull => _subject.valueOrNull;
@OverRide
StreamNotification? get lastEventOrNull => _subject.lastEventOrNull;
}
</details>