83
83
) -> Arc < SubscriptionFunc < X > > {
84
84
let _func = Arc :: new ( Mutex :: new ( func) ) ;
85
85
self . subscribe_fn ( move |x : Arc < X > | {
86
- let _func = _func. clone ( ) ;
87
- let func = & mut * _func. lock ( ) . unwrap ( ) ;
88
- ( func) ( x) ;
86
+ ( _func. lock ( ) . unwrap ( ) ) ( x) ;
89
87
} )
90
88
}
91
89
pub fn unsubscribe ( & mut self , s : Arc < SubscriptionFunc < X > > ) {
@@ -136,7 +134,7 @@ impl<X: Send + Sync + 'static> Observable<X, SubscriptionFunc<X>> for Publisher<
136
134
observer. as_ref ( ) . clone ( ) . close_stream ( ) ;
137
135
}
138
136
139
- for ( index, obs) in self . observers . clone ( ) . iter ( ) . enumerate ( ) {
137
+ for ( index, obs) in self . observers . iter ( ) . enumerate ( ) {
140
138
if obs. get_id ( ) == id {
141
139
// println!("delete_observer({});", observer);
142
140
self . observers . remove ( index) ;
@@ -145,34 +143,25 @@ impl<X: Send + Sync + 'static> Observable<X, SubscriptionFunc<X>> for Publisher<
145
143
}
146
144
}
147
145
fn notify_observers ( & mut self , val : Arc < X > ) {
148
- let _observers = self . observers . clone ( ) ;
149
- let observers = Arc :: new ( _observers) ;
146
+ let observers = self . observers . clone ( ) ;
150
147
let mut _do_sub = Arc :: new ( move || {
151
- let mut _observers = observers. clone ( ) ;
152
- let observers = Arc :: make_mut ( & mut _observers) ;
153
-
154
148
for ( _, observer) in observers. iter ( ) . enumerate ( ) {
155
149
{
156
150
observer. on_next ( val. clone ( ) ) ;
157
151
}
158
152
}
159
153
} ) ;
160
154
161
- let sub_handler_thread = & mut self . sub_handler ;
162
- let mut do_sub_thread_ob = _do_sub. clone ( ) ;
163
-
164
- match sub_handler_thread {
155
+ match & mut self . sub_handler {
165
156
Some ( ref mut sub_handler) => {
166
- let mut do_sub_thread_sub = _do_sub. clone ( ) ;
167
-
168
157
sub_handler. lock ( ) . unwrap ( ) . post ( RawFunc :: new ( move || {
169
- let sub = Arc :: make_mut ( & mut do_sub_thread_sub ) ;
158
+ let sub = Arc :: make_mut ( & mut _do_sub ) ;
170
159
171
160
( sub) ( ) ;
172
161
} ) ) ;
173
162
}
174
163
None => {
175
- let sub = Arc :: make_mut ( & mut do_sub_thread_ob ) ;
164
+ let sub = Arc :: make_mut ( & mut _do_sub ) ;
176
165
( sub) ( ) ;
177
166
}
178
167
} ;
@@ -195,15 +184,15 @@ async fn test_publisher_stream() {
195
184
let mut pub1 = Publisher :: new_with_handlers ( Some ( _h. clone ( ) ) ) ;
196
185
//*
197
186
let s = pub1. subscribe_as_stream ( Arc :: new ( SubscriptionFunc :: new ( move |x| {
198
- println ! ( "{:?}: {:?}" , "SS" , x) ;
187
+ println ! ( "test_publisher_stream {:?}: {:?}" , "SS" , x) ;
199
188
} ) ) ) ;
200
189
// */
201
190
{
202
191
let h = & mut _h. lock ( ) . unwrap ( ) ;
203
192
204
- println ! ( "hh2" ) ;
193
+ println ! ( "test_publisher_stream hh2" ) ;
205
194
h. start ( ) ;
206
- println ! ( "hh2 running" ) ;
195
+ println ! ( "test_publisher_stream hh2 running" ) ;
207
196
}
208
197
pub1. publish ( 1 ) ;
209
198
pub1. publish ( 2 ) ;
@@ -213,12 +202,12 @@ async fn test_publisher_stream() {
213
202
{
214
203
let mut result = s. clone ( ) ;
215
204
for n in 1 ..5 {
216
- println ! ( "{:?}: {:?}" , n, "Before" ) ;
205
+ println ! ( "test_publisher_stream {:?}: {:?}" , n, "Before" ) ;
217
206
let item = result. next ( ) . await ;
218
207
if let Some ( result) = item. clone ( ) {
219
208
( & mut got_list) . push ( result. clone ( ) ) ;
220
209
}
221
- println ! ( "{:?}: {:?}" , n, item) ;
210
+ println ! ( "test_publisher_stream {:?}: {:?}" , n, item) ;
222
211
}
223
212
// got_list = s.collect::<Vec<_>>().await;
224
213
}
@@ -252,6 +241,8 @@ async fn test_publisher_stream() {
252
241
} ) ) ;
253
242
}
254
243
244
+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 10 ) ) ;
245
+
255
246
got_list = s. clone ( ) . collect :: < Vec < _ > > ( ) . await ;
256
247
assert_eq ! (
257
248
vec![ Arc :: new( 5 ) , Arc :: new( 6 ) , Arc :: new( 7 ) , Arc :: new( 8 ) , ] ,
0 commit comments