@@ -67,9 +67,9 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
67
67
private static final Log log = LogFactory .getLog (ReactiveStateMachineExecutor .class );
68
68
private final StateMachine <S , E > stateMachine ;
69
69
private final StateMachine <S , E > relayStateMachine ;
70
- private final Map <Trigger <S , E >, Transition <S ,E >> triggerToTransitionMap ;
70
+ private final Map <Trigger <S , E >, Transition <S , E >> triggerToTransitionMap ;
71
71
private final List <Transition <S , E >> triggerlessTransitions ;
72
- private final Collection <Transition <S ,E >> transitions ;
72
+ private final Collection <Transition <S , E >> transitions ;
73
73
private final Transition <S , E > initialTransition ;
74
74
private final Message <E > initialEvent ;
75
75
private final TransitionComparator <S , E > transitionComparator ;
@@ -106,8 +106,7 @@ public ReactiveStateMachineExecutor(StateMachine<S, E> stateMachine, StateMachin
106
106
@ Override
107
107
protected void onInit () throws Exception {
108
108
triggerSink = triggerProcessor .sink ();
109
- triggerFlux = Flux .from (triggerProcessor )
110
- .flatMap (trigger -> handleTrigger (trigger ));
109
+ triggerFlux = Flux .from (triggerProcessor ).flatMap (trigger -> handleTrigger (trigger ));
111
110
}
112
111
113
112
@ Override
@@ -144,8 +143,7 @@ protected Mono<Void> doPreStopReactively() {
144
143
triggerDisposable = null ;
145
144
}
146
145
initialHandled .set (false );
147
- })
148
- ;
146
+ });
149
147
}
150
148
151
149
@ Override
@@ -158,7 +156,6 @@ public void queueTrigger(Trigger<S, E> trigger, Message<E> message) {
158
156
159
157
@ Override
160
158
public void queueDeferredEvent (Message <E > message ) {
161
- // TODO Auto-generated method stub
162
159
if (log .isDebugEnabled ()) {
163
160
log .debug ("Deferring message " + message );
164
161
}
@@ -296,9 +293,10 @@ private Mono<Void> handleTrigger(TriggerQueueItem queueItem) {
296
293
297
294
298
295
private Mono <Void > handleInitialTrans (Transition <S , E > tran , Message <E > queuedMessage ) {
299
- StateContext <S , E > stateContext = buildStateContext (queuedMessage , tran , relayStateMachine );
300
- tran .transit (stateContext );
301
- return stateMachineExecutorTransit .transit (tran , stateContext , queuedMessage );
296
+ return Mono .defer (() -> {
297
+ StateContext <S , E > stateContext = buildStateContext (queuedMessage , tran , relayStateMachine );
298
+ return tran .transit (stateContext ).then (stateMachineExecutorTransit .transit (tran , stateContext , queuedMessage ));
299
+ });
302
300
}
303
301
304
302
private Mono <Void > handleTriggerlessTransitions (StateContext <S , E > context , State <S , E > state ) {
@@ -317,36 +315,30 @@ private Mono<Boolean> handleTriggerTrans(List<Transition<S, E>> trans, Message<E
317
315
}
318
316
319
317
private Mono <Boolean > handleTriggerTrans (List <Transition <S , E >> trans , Message <E > queuedMessage , State <S , E > completion ) {
320
- return Mono .defer (() -> {
321
- Mono <Boolean > mono = Mono .just (false );
322
- boolean transit = false ;
323
- for (Transition <S , E > t : trans ) {
324
- if (t == null ) {
325
- continue ;
326
- }
318
+ return Flux .fromIterable (trans )
319
+ .filter (t -> {
327
320
State <S ,E > source = t .getSource ();
328
321
if (source == null ) {
329
- continue ;
322
+ return false ;
330
323
}
331
324
State <S ,E > currentState = stateMachine .getState ();
332
325
if (currentState == null ) {
333
- continue ;
326
+ return false ;
334
327
}
335
328
if (!StateMachineUtils .containsAtleastOne (source .getIds (), currentState .getIds ())) {
336
- continue ;
329
+ return false ;
337
330
}
338
-
339
- if ( transitionConflictPolicy != TransitionConflictPolicy . PARENT && completion != null && !source .getId ().equals (completion .getId ())) {
331
+ if ( transitionConflictPolicy != TransitionConflictPolicy . PARENT && completion != null
332
+ && !source .getId ().equals (completion .getId ())) {
340
333
if (source .isOrthogonal ()) {
341
- continue ;
342
- }
343
- else if (!StateMachineUtils .isSubstate (source , completion )) {
344
- continue ;
345
-
334
+ return false ;
335
+ } else if (!StateMachineUtils .isSubstate (source , completion )) {
336
+ return false ;
346
337
}
347
338
}
348
-
349
- // special handling of join
339
+ return true ;
340
+ })
341
+ .flatMap (t -> {
350
342
if (StateMachineUtils .isPseudoState (t .getTarget (), PseudoStateKind .JOIN )) {
351
343
if (joinSyncStates .isEmpty ()) {
352
344
List <List <State <S ,E >>> joins = ((JoinPseudoState <S , E >)t .getTarget ().getPseudoState ()).getJoins ();
@@ -358,55 +350,45 @@ else if (!StateMachineUtils.isSubstate(source, completion)) {
358
350
boolean removed = joinSyncStates .remove (t .getSource ());
359
351
boolean joincomplete = removed & joinSyncStates .isEmpty ();
360
352
if (joincomplete ) {
361
- for (Transition <S , E > tt : joinSyncTransitions ) {
362
- StateContext <S , E > stateContext = buildStateContext (queuedMessage , tt , relayStateMachine );
363
- tt .transit (stateContext );
364
- // TODO: REACTOR damn, this is not chained! we tests didn't fail?
365
- stateMachineExecutorTransit .transit (tt , stateContext , queuedMessage ).block ();
366
- }
367
- joinSyncTransitions .clear ();
368
- break ;
353
+ return Flux .fromIterable (joinSyncTransitions )
354
+ .flatMap (tt -> {
355
+ StateContext <S , E > stateContext = buildStateContext (queuedMessage , tt , relayStateMachine );
356
+ return tt .transit (stateContext ).then (stateMachineExecutorTransit .transit (t , stateContext , queuedMessage ));
357
+ })
358
+ .doFinally (s -> {
359
+ joinSyncTransitions .clear ();
360
+ })
361
+ .then (Mono .just (true ))
362
+ ;
369
363
} else {
370
- continue ;
364
+ return Mono . just ( false ) ;
371
365
}
366
+ } else {
367
+ StateContext <S , E > stateContext = buildStateContext (queuedMessage , t , relayStateMachine );
368
+ return Mono .just (stateContext )
369
+ .map (context -> interceptors .preTransition (stateContext ))
370
+ .then (t .transit (stateContext )
371
+ .flatMap (at -> {
372
+ if (at ) {
373
+ return stateMachineExecutorTransit .transit (t , stateContext , queuedMessage )
374
+ .thenReturn (true )
375
+ .doOnNext (a -> {
376
+ interceptors .postTransition (stateContext );
377
+ })
378
+ .onErrorResume (e -> {
379
+ interceptors .postTransition (stateContext );
380
+ return Mono .just (false );
381
+ });
382
+ } else {
383
+ return Mono .just (false );
384
+ }
385
+ })
386
+ )
387
+ .onErrorResume (e -> Mono .just (false ));
372
388
}
373
-
374
- StateContext <S , E > stateContext = buildStateContext (queuedMessage , t , relayStateMachine );
375
- try {
376
- stateContext = interceptors .preTransition (stateContext );
377
- } catch (Exception e ) {
378
- // currently expect that if exception is
379
- // thrown, this transition will not match.
380
- // i.e. security may throw AccessDeniedException
381
- log .info ("Interceptors threw exception" , e );
382
- stateContext = null ;
383
- }
384
- if (stateContext == null ) {
385
- break ;
386
- }
387
-
388
- try {
389
- transit = t .transit (stateContext );
390
- } catch (Exception e ) {
391
- log .warn ("Aborting as transition " + t , e );
392
- }
393
- if (transit ) {
394
- // if executor transit is raising exception, stop here
395
- final StateContext <S , E > st = stateContext ;
396
- mono = stateMachineExecutorTransit .transit (t , stateContext , queuedMessage )
397
- .thenReturn (true )
398
- .doOnNext (a -> {
399
- interceptors .postTransition (st );
400
- })
401
- .onErrorResume (e -> {
402
- interceptors .postTransition (st );
403
- return Mono .just (false );
404
- });
405
- break ;
406
- }
407
- }
408
- return mono ;
409
- });
389
+ })
390
+ .takeUntil (x -> x )
391
+ .last (false );
410
392
}
411
393
412
394
private StateContext <S , E > buildStateContext (Message <E > message , Transition <S ,E > transition , StateMachine <S , E > stateMachine ) {
0 commit comments