Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 156 additions & 34 deletions packages/dart/lib/src/utils/parse_live_list.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,23 @@ class ParseLiveList<T extends ParseObject> {
List<String>? preloadedColumns,
}) : _preloadedColumns = preloadedColumns ?? const <String>[] {
_debug = isDebugEnabled();
_debugLoggedInit = isDebugEnabled();
}

/// Creates a new [ParseLiveList] for the given [query].
///
/// [lazyLoading] enables lazy loading of full object data. When `true` and
/// [preloadedColumns] is provided, the initial query fetches only those columns,
/// and full objects are loaded on-demand when accessed via [getAt].
/// When [preloadedColumns] is empty or null, all fields are fetched regardless
/// of [lazyLoading] value. Default is `true`.
///
/// [preloadedColumns] specifies which fields to fetch in the initial query when
/// lazy loading is enabled. Order fields are automatically included to ensure
/// proper sorting. If null or empty, all fields are fetched.
///
/// [listenOnAllSubItems] and [listeningIncludes] control which nested objects
/// receive live query updates.
static Future<ParseLiveList<T>> create<T extends ParseObject>(
QueryBuilder<T> query, {
bool? listenOnAllSubItems,
Expand All @@ -26,7 +41,7 @@ class ParseLiveList<T extends ParseObject> {
)
: _toIncludeMap(listeningIncludes ?? <String>[]),
lazyLoading,
preloadedColumns: preloadedColumns ?? const <String>[],
preloadedColumns: preloadedColumns,
);

return parseLiveList._init().then((_) {
Expand All @@ -45,6 +60,8 @@ class ParseLiveList<T extends ParseObject> {
late StreamController<ParseLiveListEvent<T>> _eventStreamController;
int _nextID = 0;
late bool _debug;
// Separate from _debug to allow one-time initialization logging without affecting error logging
late bool _debugLoggedInit;

int get nextID => _nextID++;

Expand Down Expand Up @@ -134,12 +151,20 @@ class ParseLiveList<T extends ParseObject> {

Future<ParseResponse> _runQuery() async {
final QueryBuilder<T> query = QueryBuilder<T>.copy(_query);
if (_debug) {
print('ParseLiveList: lazyLoading is ${_lazyLoading ? 'on' : 'off'}');

// Log lazy loading mode only once during initialization to avoid log spam
if (_debugLoggedInit) {
print('ParseLiveList: Initialized with lazyLoading=${_lazyLoading ? 'on' : 'off'}, preloadedColumns=${_preloadedColumns.isEmpty ? 'none' : _preloadedColumns.join(", ")}');
_debugLoggedInit = false;
}
if (_lazyLoading) {

// Only restrict fields if lazy loading is enabled AND preloaded columns are specified
// This allows fetching minimal data upfront and loading full objects on-demand
if (_lazyLoading && _preloadedColumns.isNotEmpty) {
final List<String> keys = _preloadedColumns.toList();
if (_lazyLoading && query.limiters.containsKey('order')) {

// Automatically include order fields to ensure sorting works correctly
if (query.limiters.containsKey('order')) {
keys.addAll(
query.limiters['order'].toString().split(',').map((String string) {
if (string.startsWith('-')) {
Expand All @@ -149,10 +174,10 @@ class ParseLiveList<T extends ParseObject> {
}),
);
}
if (keys.isNotEmpty) {
query.keysToReturn(keys);
}

query.keysToReturn(keys);
}

return await query.query<T>();
}

Expand All @@ -161,13 +186,20 @@ class ParseLiveList<T extends ParseObject> {

final ParseResponse parseResponse = await _runQuery();
if (parseResponse.success) {
// Determine if fields were actually restricted in the query
// Only mark as not loaded if lazy loading AND we actually restricted fields
final bool fieldsRestricted =
_lazyLoading && _preloadedColumns.isNotEmpty;

_list =
parseResponse.results
?.map<ParseLiveListElement<T>>(
(dynamic element) => ParseLiveListElement<T>(
element,
updatedSubItems: _listeningIncludes,
loaded: !_lazyLoading,
// Mark as loaded if we fetched all fields (no restriction)
// Mark as not loaded only if fields were actually restricted
loaded: !fieldsRestricted,
),
)
.toList() ??
Expand Down Expand Up @@ -486,34 +518,114 @@ class ParseLiveList<T extends ParseObject> {
}
}

Stream<T> getAt(final int index) async* {
if (index < _list.length) {
if (!_list[index].loaded) {
final QueryBuilder<T> queryBuilder = QueryBuilder<T>.copy(_query)
..whereEqualTo(
keyVarObjectId,
_list[index].object.get<String>(keyVarObjectId),
)
..setLimit(1);
final ParseResponse response = await queryBuilder.query<T>();
if (_list.isEmpty) {
yield* _createStreamError<T>(
ParseError(message: 'ParseLiveList: _list is empty'),
);
return;
/// Returns a stream for the element at the given [index].
///
/// Returns the element's existing broadcast stream, which allows multiple
/// listeners without creating redundant network requests or stream instances.
///
/// When lazy loading is enabled and an element is not yet loaded, the first
/// access will trigger loading. This is useful for pagination scenarios.
/// Subsequent calls return the same stream without additional loads.
///
/// The returned stream is a broadcast stream from ParseLiveListElement,
/// preventing the N+1 query bug that occurred with async* generators.
Stream<T> getAt(final int index) {
if (index >= _list.length) {
// Return an empty stream for out-of-bounds indices
return const Stream.empty();
}

final element = _list[index];

// If not yet loaded (happens with lazy loading), trigger loading
// This will only happen once per element due to the loaded flag
if (!element.loaded) {
_loadElementAt(index);
}

// Return the element's broadcast stream
// Multiple subscriptions to this stream won't trigger multiple loads
return element.stream;
}

/// Asynchronously loads the full data for the element at [index].
///
/// Called when an element is accessed for the first time.
/// Errors are emitted to the element's stream so listeners can handle them.
Future<void> _loadElementAt(int index) async {
if (index >= _list.length) {
return;
}

final element = _list[index];

// Race condition protection: skip if element is already loaded or
// currently being loaded by another concurrent call
if (element.loaded || element._isLoading) {
return;
}

// Set loading flag to prevent concurrent load operations
element._isLoading = true;

try {
final QueryBuilder<T> queryBuilder = QueryBuilder<T>.copy(_query)
..whereEqualTo(
keyVarObjectId,
element.object.get<String>(keyVarObjectId),
)
..setLimit(1);

final ParseResponse response = await queryBuilder.query<T>();

// Check if list was modified during async operation
if (_list.isEmpty || index >= _list.length) {
if (_debug) {
print('ParseLiveList: List was modified during element load');
}
if (response.success) {
_list[index].object = response.results?.first;
} else {
ParseError? error = response.error;
if (error != null) yield* _createStreamError<T>(error);
return;
}

if (response.success &&
response.results != null &&
response.results!.isNotEmpty) {
// Verify we're still updating the same object (list may have been modified)
final currentElement = _list[index];
if (currentElement.object.objectId != element.object.objectId) {
if (_debug) {
print('ParseLiveList: Element at index $index changed during load');
}
return;
}
// Setting the object will mark it as loaded and emit it to the stream
_list[index].object = response.results!.first;
} else if (response.error != null) {
// Emit error to the element's stream so listeners can handle it
element.emitError(response.error!, StackTrace.current);
if (_debug) {
print(
'ParseLiveList: Error loading element at index $index: ${response.error}',
);
}
} else {
// Object not found (possibly deleted between initial query and load)
if (_debug) {
print(
'ParseLiveList: Element at index $index not found during load',
);
}
}
// just for testing
// await Future<void>.delayed(const Duration(seconds: 2));
yield _list[index].object;
yield* _list[index].stream;
} catch (e, stackTrace) {
// Emit exception to the element's stream
element.emitError(e, stackTrace);
if (_debug) {
print(
'ParseLiveList: Exception loading element at index $index: $e\n$stackTrace',
);
}
} finally {
// Clear loading flag to allow future retry attempts
element._isLoading = false;
}
}

Expand Down Expand Up @@ -650,7 +762,8 @@ class ParseLiveListElement<T extends ParseObject> {
this._object, {
bool loaded = false,
Map<String, dynamic>? updatedSubItems,
}) : _loaded = loaded {
}) : _loaded = loaded,
_isLoading = false {
_updatedSubItems = _toSubscriptionMap(
updatedSubItems ?? <String, dynamic>{},
);
Expand All @@ -663,6 +776,7 @@ class ParseLiveListElement<T extends ParseObject> {
final StreamController<T> _streamController = StreamController<T>.broadcast();
T _object;
bool _loaded = false;
bool _isLoading = false;
late Map<PathKey, dynamic> _updatedSubItems;
LiveQuery? _liveQuery;
final Future<void> _subscriptionQueue = Future<void>.value();
Expand Down Expand Up @@ -791,6 +905,14 @@ class ParseLiveListElement<T extends ParseObject> {

bool get loaded => _loaded;

/// Emits an error to the stream for listeners to handle.
/// Used when lazy loading fails to fetch the full object data.
void emitError(Object error, StackTrace stackTrace) {
if (!_streamController.isClosed) {
_streamController.addError(error, stackTrace);
}
}

void dispose() {
_unsubscribe(_updatedSubItems);
_streamController.close();
Expand Down
Loading
Loading