@@ -34,6 +34,7 @@ struct BDMutexGuardInner<T: 'static + DBSynchronizedDocument<'static>> {
34
34
elements : HashSet < T :: Key > ,
35
35
change_flag : DBUuid ,
36
36
alive_job : Option < JoinHandle < ( ) > > ,
37
+ collection : Arc < T :: Collection > ,
37
38
}
38
39
39
40
impl < T : ' static + DBSynchronizedDocument < ' static > > BDMutexGuard < T > {
@@ -45,6 +46,7 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
45
46
node_id : & ArcStr ,
46
47
fields : Option < & T > ,
47
48
timeout : Option < u64 > ,
49
+ collection : & Arc < T :: Collection > ,
48
50
) -> Result < ( T , BDMutexGuard < T > ) , DBMutexError > {
49
51
let time_out = timeout. map ( |v| DBDateTime :: now ( ) . after_seconds ( v) ) ;
50
52
let mut checked_doc_exists = false ;
@@ -56,7 +58,8 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
56
58
}
57
59
58
60
// Prepare filter.
59
- let ( mut list, mutex) = Self :: acquire_list ( & [ key. clone ( ) ] , node_id, fields) . await ?;
61
+ let ( mut list, mutex) =
62
+ Self :: acquire_list ( & [ key. clone ( ) ] , node_id, fields, collection) . await ?;
60
63
61
64
let value = list. pop ( ) . unwrap ( ) ;
62
65
@@ -67,7 +70,6 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
67
70
// Check the document exists and exit if not.
68
71
// This prevents waiting until timeout when the document
69
72
// is not present in the DB.
70
- let collection = T :: collection ( ) ;
71
73
let exists_in_db = collection. exists_by_key ( key) . await ?;
72
74
73
75
if !exists_in_db {
@@ -94,6 +96,7 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
94
96
keys : & [ T :: Key ] ,
95
97
node_id : & ArcStr ,
96
98
fields : Option < & T > ,
99
+ collection : & Arc < T :: Collection > ,
97
100
) -> Result < ( Vec < Option < T > > , BDMutexGuard < T > ) , Box < dyn Error > > {
98
101
// Shortcut for empty sets.
99
102
if keys. is_empty ( ) {
@@ -105,12 +108,12 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
105
108
elements : HashSet :: new ( ) ,
106
109
change_flag : DBUuid :: new ( ) ,
107
110
alive_job : Some ( tokio:: spawn ( async { } ) ) ,
111
+ collection : collection. clone ( ) ,
108
112
} ) ) ,
109
113
} ,
110
114
) ) ;
111
115
}
112
116
113
- let collection = T :: collection ( ) ;
114
117
let collection_name = T :: Collection :: name ( ) ;
115
118
let mutex_path = DBDocumentField :: Mutex . path ( ) ;
116
119
@@ -183,6 +186,7 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
183
186
elements : result_ids,
184
187
change_flag,
185
188
alive_job : None ,
189
+ collection : collection. clone ( ) ,
186
190
} ) ) ,
187
191
} ;
188
192
@@ -224,8 +228,8 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
224
228
limits : Option < AqlLimit > ,
225
229
node_id : & ArcStr ,
226
230
fields : Option < & T > ,
231
+ collection : & Arc < T :: Collection > ,
227
232
) -> Result < ( Vec < T > , BDMutexGuard < T > ) , Box < dyn Error > > {
228
- let collection = T :: collection ( ) ;
229
233
let collection_name = T :: Collection :: name ( ) ;
230
234
let mutex_path = DBDocumentField :: Mutex . path ( ) ;
231
235
@@ -303,6 +307,7 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
303
307
elements : result_ids,
304
308
change_flag,
305
309
alive_job : None ,
310
+ collection : collection. clone ( ) ,
306
311
} ) ) ,
307
312
} ;
308
313
@@ -380,6 +385,7 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
380
385
elements : new_elements,
381
386
change_flag : DBUuid :: new ( ) ,
382
387
alive_job : Some ( tokio:: spawn ( async { } ) ) ,
388
+ collection : lock. collection . clone ( ) ,
383
389
} ) ) ,
384
390
}
385
391
} else {
@@ -389,6 +395,7 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
389
395
elements : new_elements,
390
396
change_flag : lock. change_flag . clone ( ) ,
391
397
alive_job : None ,
398
+ collection : lock. collection . clone ( ) ,
392
399
} ) ) ,
393
400
} ;
394
401
@@ -427,7 +434,7 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
427
434
return ;
428
435
}
429
436
430
- let collection = T :: collection ( ) ;
437
+ let collection = & lock . collection ;
431
438
let node_id = & lock. node_id ;
432
439
let now = DBDateTime :: now ( ) ;
433
440
let expiration = now. after_seconds ( MUTEX_EXPIRATION ) ;
@@ -519,7 +526,7 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
519
526
return ;
520
527
}
521
528
522
- let collection = T :: collection ( ) ;
529
+ let collection = & lock . collection ;
523
530
let node_id = & lock. node_id ;
524
531
let keys = & lock. elements ;
525
532
@@ -590,12 +597,11 @@ impl<T: 'static + DBSynchronizedDocument<'static>> BDMutexGuard<T> {
590
597
}
591
598
}
592
599
593
- pub async fn release_all_mutexes ( node_id : & str ) {
600
+ pub async fn release_all_mutexes ( node_id : & str , collection : & Arc < T :: Collection > ) {
594
601
// FOR i IN <collection>
595
602
// FILTER i.<mutex.node> == <node>
596
603
// UPDATE i WITH { <mutex>: null } IN <collection> OPTIONS { mergeObjects: true, keepNulls: false, ignoreErrors: true }
597
604
let mutex_path = DBDocumentField :: Mutex . path ( ) ;
598
- let collection = T :: collection ( ) ;
599
605
let collection_name = T :: Collection :: name ( ) ;
600
606
let mut aql = AqlBuilder :: new_for_in_collection ( AQL_DOCUMENT_ID , collection_name) ;
601
607
aql. filter_step (
0 commit comments