Skip to content

Commit

Permalink
Fixed issue with FSM timeout not actually timing out (#5)
Browse files Browse the repository at this point in the history
* Fixed issue with FSM timeout not actually timing out.

* Added a test for the cancelling of timeout upon a state transition

* Refactored the TimeoutFSMSuite tests and cleaned up implementation. Added test for override of timeouts

---------

Co-authored-by: Massimo Saliba <massimo@suprnation.com>
  • Loading branch information
Mitrug and Massimo Saliba authored Aug 6, 2024
1 parent 3f1df93 commit 354ec1c
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 92 deletions.
14 changes: 7 additions & 7 deletions src/main/scala/com/suprnation/actor/fsm/FSM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,17 @@ case class FSMBuilder[F[+_]: Parallel: Async: Temporal, S, D, Request, Response]

def processStateTransition(currentState: State[S, D, Request, Response]): F[Unit] =
(if (currentState.stateName != nextState.stateName || nextState.notifies) {
nextStateRef.set(Some(nextState)) >>
handleTransition(currentState.stateName, nextState.stateName) >>
nextStateRef.set(None)
} else Sync[F].unit) >>
nextStateRef.set(Some(nextState)) >>
handleTransition(currentState.stateName, nextState.stateName) >>
nextStateRef.set(None)
} else Sync[F].unit) >>
(if (config.debug) config.transition(currentState, nextState) else Sync[F].unit) >>
currentStateRef.set(nextState) >>
(currentState.timeout match {
case timeoutData @ Some((d: FiniteDuration, msg)) if d.length >= 0 =>
(nextState.timeout match {
case Some((d: FiniteDuration, msg)) if d.length >= 0 =>
scheduleTimeout(d -> msg)
case _ =>
stateTimeouts(currentState.stateName).fold(Sync[F].unit)(scheduleTimeout)
stateTimeouts(nextState.stateName).fold(Sync[F].unit)(scheduleTimeout)
})

stateFunctions.get(nextState.stateName) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ trait FSMStateSyntax {
def using(nextStateData: D): F[State[S, D, Request, Response]] =
sF.map(s => s.using(nextStateData))

def forMax(duration: Option[(FiniteDuration, Request)]): F[State[S, D, Request, Response]] =
sF.map(s => s.forMax(duration))

def withNotification(notifies: Boolean): F[State[S, D, Request, Response]] =
sF.map(_.withNotification(notifies))

Expand Down
202 changes: 117 additions & 85 deletions src/test/scala/com/suprnation/fsm/TimeoutFSMSuite.scala
Original file line number Diff line number Diff line change
@@ -1,156 +1,188 @@
package com.suprnation.fsm

import cats.effect.unsafe.implicits.global
import cats.effect.{IO, Ref}
import cats.effect.{Deferred, IO, Ref}
import cats.implicits.catsSyntaxOptionId
import com.suprnation.actor.Actor.Actor
import com.suprnation.actor.ActorSystem
import com.suprnation.actor.fsm.FSM
import com.suprnation.actor.fsm.FSM.Event
import com.suprnation.actor.fsm.{FSM, FSMConfig}
import com.suprnation.typelevel.actors.syntax.ActorSystemDebugOps
import com.suprnation.typelevel.fsm.syntax.Timeout
import com.suprnation.typelevel.fsm.syntax.FSMStateSyntaxOps
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration.{DurationInt, FiniteDuration}

sealed trait Replies
case object WakingUp extends Replies
case object GotNudged extends Replies
case object StateTimeoutSleep extends Replies
case object TransitionTimeoutSleep extends Replies

sealed trait TimeoutState
case object NoTimeout extends TimeoutState
case object DefaultTimeout extends TimeoutState
case object Awake extends TimeoutState
case object Nudged extends TimeoutState
case object Asleep extends TimeoutState

sealed trait TimeoutRequest
case class GotoNoTimeoutState(forceTimeout: Timeout[TimeoutRequest] = None) extends TimeoutRequest
case class GotoTimeoutState(forceTimeout: Timeout[TimeoutRequest] = None) extends TimeoutRequest
case class StateSleep() extends TimeoutRequest
case class TransitionSleep() extends TimeoutRequest
case class Nudge() extends TimeoutRequest
case class WakeUp(stayAwakeFor: Option[FiniteDuration] = None) extends TimeoutRequest


object TimeoutActor {
def timeoutActor(startWith: TimeoutState): IO[Actor[IO, TimeoutRequest]] =

def forMaxTimeoutActor(
startWith: TimeoutState,
defaultStateStayAwakeFor: FiniteDuration,
timeOutDef: Deferred[IO, Boolean]
): IO[Actor[IO, TimeoutRequest]] =

FSM[IO, TimeoutState, Int, TimeoutRequest, Any]
.when(NoTimeout) {
case (Event(GotoNoTimeoutState(None), _), sM) => sM.stay()
case (Event(GotoNoTimeoutState(fd), _), sM) => sM.forMax(fd)
case (Event(GotoTimeoutState(_), data), sM) => sM.stayAndReply(data)
.when(Awake, defaultStateStayAwakeFor, StateSleep()) {
case (Event(StateSleep(), _), sM) =>
timeOutDef.complete(true) *>
sM.goto(Asleep).replying(StateTimeoutSleep)

case (Event(TransitionSleep(), _), sM) =>
timeOutDef.complete(true) *>
sM.goto(Asleep).replying(TransitionTimeoutSleep)

case (Event(Nudge(), _), sM) => sM.goto(Nudged).replying(GotNudged)
}
.when(Nudged) {
case (_, sM) => sM.stayAndReply(GotNudged)
}
.when(Asleep) {
case (Event(WakeUp(stayAwakeFor), _), sM) =>
sM.goto(Awake)
.forMax(stayAwakeFor.map((_, TransitionSleep())))
.replying(WakingUp)

case (Event(Nudge(), _), sM) => sM.goto(Nudged).replying(GotNudged)
}
// .withConfig(FSMConfig.withConsoleInformation)
.withConfig(FSMConfig.withConsoleInformation)
.startWith(startWith, 0)
.initialize

}

class TimeoutFSMSuite extends AsyncFlatSpec with Matchers {
it should "should not timeout if timeout not set" in {

it should "timeout the Awake state using the 'forMax' and go back to sleep" in {
(for {
actorSystem <- ActorSystem[IO]("FSM Actor", (_: Any) => IO.unit).allocated.map(_._1)
buffer <- Ref[IO].of(Vector.empty[Any])
peanoNumber <- actorSystem.replyingActorOf[PeanoNumber, Int](PeanoNumbers.peanoNumbers)

peanoNumberActor <- actorSystem.actorOf[PeanoNumber](
AbsorbReplyActor(peanoNumber, buffer),
"peano-number-absorb-actor"
timeOutDef <- Deferred[IO, Boolean]
timeoutActor <- actorSystem.actorOf(
TimeoutActor.forMaxTimeoutActor(
Asleep,
3.seconds,
timeOutDef
)
)
_ <- peanoNumberActor ! Zero
_ <- peanoNumberActor ! Succ
_ <- peanoNumberActor ! Succ
_ <- peanoNumberActor ! Succ
_ <- actorSystem.waitForIdle()
messages <- buffer.get
} yield messages).unsafeToFuture().map { messages =>
messages.toList should be(List(0, 1, 2, 3))
}
}

it should "timeout based on default timeout state" in {
(for {
actorSystem <- ActorSystem[IO]("FSM Actor", (_: Any) => IO.unit).allocated.map(_._1)
buffer <- Ref[IO].of(Vector.empty[Any])
peanoNumber <- actorSystem.replyingActorOf[PeanoNumber, Int](PeanoNumbers.peanoNumbers)

peanoNumberActor <- actorSystem.actorOf[PeanoNumber](
AbsorbReplyActor(peanoNumber, buffer),
"peano-number-absorb-actor"
actor <- actorSystem.actorOf[TimeoutRequest](
AbsorbReplyActor(timeoutActor, buffer),
"actor"
)
_ <- peanoNumberActor ! Zero
_ <- peanoNumberActor ! Succ
_ <- peanoNumberActor ! Succ
_ <- peanoNumberActor ! Succ
_ <- actor ! WakeUp(stayAwakeFor = 2.seconds.some)

_ <- IO.race(IO.sleep(5.seconds), timeOutDef.get)
_ <- actorSystem.waitForIdle()
messages <- buffer.get
} yield messages).unsafeToFuture().map { messages =>
messages.toList should be(List(0, 1, 2, 3))
messages.toList should be(List(WakingUp, TransitionTimeoutSleep))
}
}

it should "should timeout if an override timeout is set (when initial was not set)" in {
it should "timeout the Awake state using the 'when' timeout and go back to sleep" in {
(for {
actorSystem <- ActorSystem[IO]("FSM Actor", (_: Any) => IO.unit).allocated.map(_._1)
buffer <- Ref[IO].of(Vector.empty[Any])
peanoNumber <- actorSystem.replyingActorOf[PeanoNumber, Int](PeanoNumbers.peanoNumbers)

peanoNumberActor <- actorSystem.actorOf[PeanoNumber](
AbsorbReplyActor(peanoNumber, buffer),
"peano-number-absorb-actor"
timeOutDef <- Deferred[IO, Boolean]
timeoutActor <- actorSystem.actorOf(
TimeoutActor.forMaxTimeoutActor(
Asleep,
3.seconds,
timeOutDef
)
)
_ <- peanoNumberActor ! Zero
_ <- peanoNumberActor ! Succ
_ <- peanoNumberActor ! Succ
_ <- peanoNumberActor ! Succ
_ <- actorSystem.waitForIdle()
messages <- buffer.get
} yield messages).unsafeToFuture().map { messages =>
messages.toList should be(List(0, 1, 2, 3))
}
}

it should "should timeout if an override timeout is set (when state has default)" in {
(for {
actorSystem <- ActorSystem[IO]("FSM Actor", (_: Any) => IO.unit).allocated.map(_._1)
buffer <- Ref[IO].of(Vector.empty[Any])
peanoNumber <- actorSystem.replyingActorOf[PeanoNumber, Int](PeanoNumbers.peanoNumbers)

peanoNumberActor <- actorSystem.actorOf[PeanoNumber](
AbsorbReplyActor(peanoNumber, buffer),
"peano-number-absorb-actor"
actor <- actorSystem.actorOf[TimeoutRequest](
AbsorbReplyActor(timeoutActor, buffer),
"actor"
)
_ <- peanoNumberActor ! Zero
_ <- (peanoNumberActor ! Succ).replicateA(10000)
_ <- actor ! WakeUp()

_ <- IO.race(IO.sleep(5.seconds), timeOutDef.get)
_ <- actorSystem.waitForIdle()
messages <- buffer.get
} yield messages).unsafeToFuture().map { messages =>
messages.toList should be(Range.inclusive(0, 10000).toList)
messages.toList should be(List(WakingUp, StateTimeoutSleep))
}
}

it should "cancel any timeout from the current state once we move to another state" in {
it should "override state default timeout with the 'forMax' one" in {
(for {
actorSystem <- ActorSystem[IO]("FSM Actor", (_: Any) => IO.unit).allocated.map(_._1)
buffer <- Ref[IO].of(Vector.empty[Any])
peanoNumber <- actorSystem.replyingActorOf[PeanoNumber, Int](PeanoNumbers.peanoNumbers)

peanoNumberActor <- actorSystem.actorOf[PeanoNumber](
AbsorbReplyActor(peanoNumber, buffer),
"peano-number-absorb-actor"
timeOutDef <- Deferred[IO, Boolean]
timeoutActor <- actorSystem.actorOf(
TimeoutActor.forMaxTimeoutActor(
Asleep,
2.seconds,
timeOutDef
)
)
_ <- peanoNumberActor ! Zero
_ <- (peanoNumberActor ! Succ).replicateA(10000)

actor <- actorSystem.actorOf[TimeoutRequest](
AbsorbReplyActor(timeoutActor, buffer),
"actor"
)
_ <- actor ! WakeUp(stayAwakeFor = 3.seconds.some)

_ <- IO.race(IO.sleep(5.seconds), timeOutDef.get)
_ <- actorSystem.waitForIdle()
messages <- buffer.get
} yield messages).unsafeToFuture().map { messages =>
messages.toList should be(Range.inclusive(0, 10000).toList)
messages.toList should be(List(WakingUp, TransitionTimeoutSleep))
}
}

it should "cancel any timeouts set once we move to another state" in {
it should "not timeout once we move to another state" in {
(for {
actorSystem <- ActorSystem[IO]("FSM Actor", (_: Any) => IO.unit).allocated.map(_._1)
buffer <- Ref[IO].of(Vector.empty[Any])
peanoNumber <- actorSystem.replyingActorOf[PeanoNumber, Int](PeanoNumbers.peanoNumbers)

peanoNumberActor <- actorSystem.actorOf[PeanoNumber](
AbsorbReplyActor(peanoNumber, buffer),
"peano-number-absorb-actor"
timeOutDef <- Deferred[IO, Boolean]
timeoutActor <- actorSystem.actorOf(
TimeoutActor.forMaxTimeoutActor(
Asleep,
2.seconds,
timeOutDef
)
)
_ <- peanoNumberActor ! Zero
_ <- (peanoNumberActor ! Succ).replicateA(10000)

actor <- actorSystem.actorOf[TimeoutRequest](
AbsorbReplyActor(timeoutActor, buffer),
"actor"
)
_ <- actor ! WakeUp(stayAwakeFor = 2.seconds.some)
_ <- actor ! Nudge()

//IO.sleep should win here as the actor's timeout should be cancelled
_ <- IO.race(IO.sleep(4.seconds), timeOutDef.get)
_ <- actorSystem.waitForIdle()
messages <- buffer.get
} yield messages).unsafeToFuture().map { messages =>
messages.toList should be(Range.inclusive(0, 10000).toList)
messages.toList should be(List(WakingUp, GotNudged))
}
}

}

0 comments on commit 354ec1c

Please sign in to comment.