Skip to content
This repository was archived by the owner on May 13, 2023. It is now read-only.

Commit 8165b75

Browse files
authored
fix: stream filter other than eq is not properly applied. (#156)
* fix: add tests to make sure rest api request from stream has correct filter * fix: test properly makes request * stream filters are properly applied * proper filter is applied to realtime listener within stream * fix: failing test * can efficiently listen to multiple streams * adds test to make sure listening to the same stream twice is supported * update postgrest * fix docs of stream * use Behavior for the stream controller * adds test on stream
1 parent 9cec568 commit 8165b75

File tree

4 files changed

+86
-62
lines changed

4 files changed

+86
-62
lines changed

lib/src/supabase_query_builder.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
2929
/// [primaryKey] list of name of primary key column(s).
3030
///
3131
/// ```dart
32-
/// supabase.from('chats').stream(['my_primary_key']).execute().listen(_onChatsReceived);
32+
/// supabase.from('chats').stream(primaryKey: ['my_primary_key']).execute().listen(_onChatsReceived);
3333
/// ```
3434
///
3535
/// `eq`, `order`, `limit` filter are available to limit the data being queried.
3636
///
3737
/// ```dart
38-
/// supabase.from('chats:room_id=eq.123').stream(['my_primary_key']).order('created_at').limit(20).execute().listen(_onChatsReceived);
38+
/// supabase.from('chats:room_id=eq.123').stream(primaryKey: ['my_primary_key']).order('created_at').limit(20).execute().listen(_onChatsReceived);
3939
/// ```
4040
SupabaseStreamBuilder stream({required List<String> primaryKey}) {
4141
assert(primaryKey.isNotEmpty, 'Please specify primary key column(s).');

lib/src/supabase_stream_builder.dart

Lines changed: 26 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:async';
22

3+
import 'package:rxdart/rxdart.dart';
34
import 'package:supabase/supabase.dart';
45

56
enum _FilterType { eq, neq, lt, lte, gt, gte }
@@ -41,8 +42,6 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
4142

4243
RealtimeChannel? _channel;
4344

44-
PostgrestBuilder? _postgrestBuilder;
45-
4645
final String _schema;
4746

4847
final String _table;
@@ -51,7 +50,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
5150
final List<String> _uniqueColumns;
5251

5352
/// StreamController for `stream()` method.
54-
StreamController<SupabaseStreamEvent>? _streamController;
53+
BehaviorSubject<SupabaseStreamEvent>? _streamController;
5554

5655
/// Contains the combined data of postgrest and realtime to emit as stream.
5756
SupabaseStreamEvent _streamData = [];
@@ -84,7 +83,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
8483
/// Only one filter can be applied to `.stream()`.
8584
///
8685
/// ```dart
87-
/// supabase.from('users').stream(['id']).eq('name', 'Supabase');
86+
/// supabase.from('users').stream(primaryKey: ['id']).eq('name', 'Supabase');
8887
/// ```
8988
SupabaseStreamBuilder eq(String column, dynamic value) {
9089
assert(
@@ -104,7 +103,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
104103
/// Only one filter can be applied to `.stream()`.
105104
///
106105
/// ```dart
107-
/// supabase.from('users').stream(['id']).neq('name', 'Supabase');
106+
/// supabase.from('users').stream(primaryKey: ['id']).neq('name', 'Supabase');
108107
/// ```
109108
SupabaseStreamBuilder neq(String column, dynamic value) {
110109
assert(
@@ -124,7 +123,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
124123
/// Only one filter can be applied to `.stream()`.
125124
///
126125
/// ```dart
127-
/// supabase.from('users').stream(['id']).lt('likes', 100);
126+
/// supabase.from('users').stream(primaryKey: ['id']).lt('likes', 100);
128127
/// ```
129128
SupabaseStreamBuilder lt(String column, dynamic value) {
130129
assert(
@@ -144,7 +143,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
144143
/// Only one filter can be applied to `.stream()`.
145144
///
146145
/// ```dart
147-
/// supabase.from('users').stream(['id']).lte('likes', 100);
146+
/// supabase.from('users').stream(primaryKey: ['id']).lte('likes', 100);
148147
/// ```
149148
SupabaseStreamBuilder lte(String column, dynamic value) {
150149
assert(
@@ -164,7 +163,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
164163
/// Only one filter can be applied to `.stream()`.
165164
///
166165
/// ```dart
167-
/// supabase.from('users').stream(['id']).gt('likes', '100');
166+
/// supabase.from('users').stream(primaryKey: ['id']).gt('likes', '100');
168167
/// ```
169168
SupabaseStreamBuilder gt(String column, dynamic value) {
170169
assert(
@@ -184,7 +183,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
184183
/// Only one filter can be applied to `.stream()`.
185184
///
186185
/// ```dart
187-
/// supabase.from('users').stream(['id']).gte('likes', 100);
186+
/// supabase.from('users').stream(primaryKey: ['id']).gte('likes', 100);
188187
/// ```
189188
SupabaseStreamBuilder gte(String column, dynamic value) {
190189
assert(
@@ -204,7 +203,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
204203
/// When `ascending` value is true, the result will be in ascending order.
205204
///
206205
/// ```dart
207-
/// supabase.from('users').stream(['id']).order('username', ascending: false);
206+
/// supabase.from('users').stream(primaryKey: ['id']).order('username', ascending: false);
208207
/// ```
209208
SupabaseStreamBuilder order(String column, {bool ascending = false}) {
210209
_orderBy = _Order(column: column, ascending: ascending);
@@ -214,7 +213,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
214213
/// Limits the result with the specified `count`.
215214
///
216215
/// ```dart
217-
/// supabase.from('users').stream(['id']).limit(10);
216+
/// supabase.from('users').stream(primaryKey: ['id']).limit(10);
218217
/// ```
219218
SupabaseStreamBuilder limit(int count) {
220219
_limit = count;
@@ -223,13 +222,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
223222

224223
@Deprecated('Directly listen without execute instead. Deprecated in 1.0.0')
225224
Stream<SupabaseStreamEvent> execute() {
226-
_streamController = StreamController.broadcast(
227-
onCancel: () {
228-
_channel?.unsubscribe();
229-
_streamController?.close();
230-
},
231-
);
232-
_getStreamData();
225+
_setupStream();
233226
return _streamController!.stream;
234227
}
235228

@@ -240,37 +233,27 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
240233
void Function()? onDone,
241234
bool? cancelOnError,
242235
}) {
243-
if (_postgrestBuilder == null) {
244-
PostgrestFilterBuilder query = _queryBuilder.select();
245-
if (_streamFilter != null) {
246-
query = query.eq(_streamFilter!.column, _streamFilter!.value);
247-
}
248-
PostgrestTransformBuilder? transformQuery;
249-
if (_orderBy != null) {
250-
transformQuery =
251-
query.order(_orderBy!.column, ascending: _orderBy!.ascending);
252-
}
253-
if (_limit != null) {
254-
transformQuery = (transformQuery ?? query).limit(_limit!);
255-
}
256-
_postgrestBuilder = transformQuery ?? query;
257-
}
236+
_setupStream();
237+
return _streamController!.stream.listen(
238+
onData,
239+
onError: onError,
240+
onDone: onDone,
241+
cancelOnError: cancelOnError,
242+
);
243+
}
258244

259-
_streamController = StreamController.broadcast(
245+
/// Sets up the stream controller and calls the method to get data as necessary
246+
void _setupStream() {
247+
_streamController ??= BehaviorSubject(
260248
onListen: () {
261249
_getStreamData();
262250
},
263251
onCancel: () {
264252
_channel?.unsubscribe();
265253
_streamController?.close();
254+
_streamController = null;
266255
},
267256
);
268-
return _streamController!.stream.listen(
269-
onData,
270-
onError: onError,
271-
onDone: onDone,
272-
cancelOnError: cancelOnError,
273-
);
274257
}
275258

276259
Future<void> _getStreamData() async {
@@ -300,9 +283,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
300283
event: 'UPDATE',
301284
schema: _schema,
302285
table: _table,
303-
filter: _streamFilter != null
304-
? '${_streamFilter!.column}=eq.${_streamFilter!.value}'
305-
: null,
286+
filter: realtimeFilter,
306287
), (payload, [ref]) {
307288
final updatedIndex = _streamData.indexWhere(
308289
(element) => _isTargetRecord(record: element, payload: payload),
@@ -320,9 +301,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
320301
event: 'DELETE',
321302
schema: _schema,
322303
table: _table,
323-
filter: _streamFilter != null
324-
? '${_streamFilter!.column}=eq.${_streamFilter!.value}'
325-
: null,
304+
filter: realtimeFilter,
326305
), (payload, [ref]) {
327306
final deletedIndex = _streamData.indexWhere(
328307
(element) => _isTargetRecord(record: element, payload: payload),
@@ -368,7 +347,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
368347
}
369348

370349
try {
371-
final data = await _postgrestBuilder;
350+
final data = await (transformQuery ?? query);
372351
final rows = SupabaseStreamEvent.from(data as List);
373352
_streamData.addAll(rows);
374353
_addStream();

pubspec.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ dependencies:
1111
functions_client: ^1.0.0
1212
gotrue: ^1.0.0
1313
http: ^0.13.4
14-
postgrest: ^1.0.0
14+
postgrest: ^1.0.1
1515
realtime_client: ^1.0.0
1616
storage_client: ^1.0.0
17+
rxdart: ^0.27.5
1718

1819
dev_dependencies:
1920
lints: ^1.0.1

test/mock_test.dart

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ void main() {
2121
/// `testFilter` is used to test incoming realtime filter. The value should match the realtime filter set by the library.
2222
Future<void> handleRequests(
2323
HttpServer server, {
24-
String? testFilter,
24+
String? expectedFilter,
2525
}) async {
2626
await for (final HttpRequest request in server) {
2727
final headers = request.headers;
@@ -35,6 +35,11 @@ void main() {
3535
if (foundApiKey == customApiKey) {
3636
expect(headers.value('customfield'), 'customvalue');
3737
}
38+
39+
// Check that rest api contains the correct filter in the URL
40+
if (expectedFilter != null) {
41+
expect(url.contains(expectedFilter), isTrue);
42+
}
3843
}
3944
if (url == '/rest/v1/todos?select=task%2Cstatus') {
4045
final jsonString = jsonEncode([
@@ -87,6 +92,13 @@ void main() {
8792
..headers.contentType = ContentType.json
8893
..write(jsonString)
8994
..close();
95+
} else if (url.contains('rest')) {
96+
// Just return an empty string as dummy data if any other rest request
97+
request.response
98+
..statusCode = HttpStatus.ok
99+
..headers.contentType = ContentType.json
100+
..write('[]')
101+
..close();
90102
} else if (url.contains('realtime')) {
91103
webSocket = await WebSocketTransformer.upgrade(request);
92104
if (hasListener) {
@@ -107,8 +119,8 @@ void main() {
107119
['postgres_changes']
108120
.first['filter'];
109121

110-
if (testFilter != null) {
111-
expect(realtimeFilter, testFilter);
122+
if (expectedFilter != null) {
123+
expect(realtimeFilter, expectedFilter);
112124
}
113125

114126
final replyString = jsonEncode({
@@ -327,6 +339,23 @@ void main() {
327339

328340
stream.listen(expectAsync1((event) {}, count: 4));
329341
});
342+
test("can listen twice at the same time", () async {
343+
final stream = client.from('todos').stream(primaryKey: ['id']);
344+
stream.listen(expectAsync1((event) {}, count: 4));
345+
stream.listen(expectAsync1((event) {}, count: 4));
346+
347+
// All realtime events are done emitting, so should receive the currnet data
348+
});
349+
test("stream should emit the last emitted data when listened to",
350+
() async {
351+
final stream = client.from('todos').stream(primaryKey: ['id']);
352+
stream.listen(expectAsync1((event) {}, count: 4));
353+
354+
await Future.delayed(Duration(seconds: 3));
355+
356+
// All realtime events are done emitting, so should receive the currnet data
357+
stream.listen(expectAsync1((event) {}, count: 1));
358+
});
330359
test('emits data', () {
331360
final stream = client.from('todos').stream(primaryKey: ['id']);
332361
expect(
@@ -469,7 +498,7 @@ void main() {
469498

470499
group('realtime filter', () {
471500
test('can filter stream results with eq', () {
472-
handleRequests(mockServer, testFilter: 'status=eq.true');
501+
handleRequests(mockServer, expectedFilter: 'status=eq.true');
473502
final stream =
474503
client.from('todos').stream(primaryKey: ['id']).eq('status', true);
475504
expect(
@@ -486,24 +515,39 @@ void main() {
486515
);
487516
});
488517

518+
test('can filter stream results with neq', () {
519+
handleRequests(mockServer, expectedFilter: 'id=neq.2');
520+
final stream =
521+
client.from('todos').stream(primaryKey: ['id']).neq('id', 2);
522+
expect(stream, emits(isList));
523+
});
524+
489525
test('can filter stream results with gt', () {
490-
handleRequests(mockServer, testFilter: 'id=gt.2');
491-
client.from('todos').stream(primaryKey: ['id']).gt('id', 2);
526+
handleRequests(mockServer, expectedFilter: 'id=gt.2');
527+
final stream =
528+
client.from('todos').stream(primaryKey: ['id']).gt('id', 2);
529+
expect(stream, emits(isList));
492530
});
493531

494532
test('can filter stream results with gte', () {
495-
handleRequests(mockServer, testFilter: 'id=gte.2');
496-
client.from('todos').stream(primaryKey: ['id']).gte('id', 2);
533+
handleRequests(mockServer, expectedFilter: 'id=gte.2');
534+
final stream =
535+
client.from('todos').stream(primaryKey: ['id']).gte('id', 2);
536+
expect(stream, emits(isList));
497537
});
498538

499539
test('can filter stream results with lt', () {
500-
handleRequests(mockServer, testFilter: 'id=lt.2');
501-
client.from('todos').stream(primaryKey: ['id']).lt('id', 2);
540+
handleRequests(mockServer, expectedFilter: 'id=lt.2');
541+
final stream =
542+
client.from('todos').stream(primaryKey: ['id']).lt('id', 2);
543+
expect(stream, emits(isList));
502544
});
503545

504546
test('can filter stream results with lte', () {
505-
handleRequests(mockServer, testFilter: 'id=lte.2');
506-
client.from('todos').stream(primaryKey: ['id']).lte('id', 2);
547+
handleRequests(mockServer, expectedFilter: 'id=lte.2');
548+
final stream =
549+
client.from('todos').stream(primaryKey: ['id']).lte('id', 2);
550+
expect(stream, emits(isList));
507551
});
508552
});
509553
}

0 commit comments

Comments
 (0)