Skip to content

Commit

Permalink
Initial support for recovery from events
Browse files Browse the repository at this point in the history
  • Loading branch information
Krever committed Feb 12, 2024
1 parent 3429365 commit aa6de3e
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 28 deletions.
6 changes: 5 additions & 1 deletion src/main/scala/workflow4s/wio/ActiveWorkflow.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package workflow4s.wio

import workflow4s.wio.Interpreter.EventResponse

case class ActiveWorkflow[St, +Out](state: St, wio: WIO.Total[St], interpreter: Interpreter[St], value: Out) {

def handleSignal[Req, Resp](signalDef: SignalDef[Req, Resp])(req: Req): SignalResponse[St, Resp] =
interpreter.handleSignal[Req, Resp, Any](signalDef, req, wio, state)
interpreter.handleSignal[Req, Resp](signalDef, req, wio, state)
def handleQuery[Req, Resp](signalDef: SignalDef[Req, Resp])(req: Req): QueryResponse[Resp] =
interpreter.handleQuery[Req, Resp, Nothing, Any](signalDef, req, wio, state)
def handleEvent(event: Any): EventResponse[St] =
interpreter.handleEvent(event, wio, state)

}
71 changes: 56 additions & 15 deletions src/main/scala/workflow4s/wio/Interpreter.scala
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
package workflow4s.wio

import workflow4s.wio.Interpreter.EventResponse
import workflow4s.wio.WIO.HandleSignal

class Interpreter[St](journal: JournalPersistance) {

def handleSignal[Req, Resp, Out](signalDef: SignalDef[Req, Resp], req: Req, wio: WIO[Nothing, Out, St], state: St): SignalResponse[St, Resp] =
def handleSignal[Req, Resp](signalDef: SignalDef[Req, Resp], req: Req, wio: WIO[Nothing, Any, St], state: St): SignalResponse[St, Resp] =
wio match {
case x @ WIO.HandleSignal(_, _) =>
x.expects(signalDef) match {
case Some(handleSignalWio) => doHandleSignal(req, handleSignalWio, state)
case None => SignalResponse.UnexpectedSignal()
}
case WIO.Or(first, second) =>
def preserveOr(
resp: SignalResponse.Ok[St, Resp],
firstToStay: Boolean,
): SignalResponse.Ok[St, Resp] =
SignalResponse.Ok(
resp.value.map(newWf => newWf.copy(wio = if (firstToStay) WIO.Or(first, newWf.wio) else WIO.Or(newWf.wio, second))),
)
case or @ WIO.Or(first, second) =>
handleSignal(signalDef, req, first, state) match {
case x @ SignalResponse.Ok(_) => preserveOr(x, false)
case x @ SignalResponse.Ok(_) => preserveOr(or, x, false)
case SignalResponse.UnexpectedSignal() =>
handleSignal(signalDef, req, second, state) match {
case x @ SignalResponse.Ok(_) => preserveOr(x, true)
case x @ SignalResponse.Ok(_) => preserveOr(or, x, true)
case SignalResponse.UnexpectedSignal() => SignalResponse.UnexpectedSignal()
}
}
Expand All @@ -33,10 +27,9 @@ class Interpreter[St](journal: JournalPersistance) {
def handleQuery[Req, Resp, Err, Out](signalDef: SignalDef[Req, Resp], req: Req, wio: WIO[Err, Out, St], state: St): QueryResponse[Resp] =
wio match {
case x @ WIO.HandleQuery(_) =>
x.expects(signalDef) match {
case Some(handleQueryWio) => doHandleQuery(req, handleQueryWio, state)
case None => QueryResponse.UnexpectedQuery()
}
x.expects(signalDef)
.map(handler => doHandleQuery(req, handler, state))
.getOrElse(QueryResponse.UnexpectedQuery())
case WIO.Or(first, second) =>
handleQuery(signalDef, req, first, state) match {
case x @ QueryResponse.Ok(_) => x
Expand All @@ -45,6 +38,39 @@ class Interpreter[St](journal: JournalPersistance) {
case _ => QueryResponse.UnexpectedQuery()
}

def handleEvent(event: Any, wio: WIO.Total[St], state: St): EventResponse[St] = {
def go(wio: WIO.Total[St]): Option[ActiveWorkflow[St, Any]] = wio match {
case HandleSignal(_, evtHandler) =>
evtHandler
.expects(event)
.map(validatedEvent => {
val (newState, resp) = evtHandler.handle(state, validatedEvent)
ActiveWorkflow(newState, WIO.Noop(), this, resp)
})
case WIO.HandleQuery(queryHandler) => None
case or @ WIO.Or(first, second) =>
// TODO we could alert in case both branches expects the event
go(first)
.map(preserveOr1(or, _, false))
.orElse(go(second))
.map(preserveOr1(or, _, true))
case WIO.Noop() => None
}
go(wio).map(EventResponse.Ok(_)).getOrElse(EventResponse.UnexpectedEvent())
}

private def preserveOr[Resp](
or: WIO.Or[Nothing, Any, St],
resp: SignalResponse.Ok[St, Resp],
firstToStay: Boolean,
): SignalResponse.Ok[St, Resp] = SignalResponse.Ok(resp.value.map(preserveOr1(or, _, firstToStay)))

private def preserveOr1[Resp](
or: WIO.Or[Nothing, Any, St],
newWf: ActiveWorkflow[St, Resp],
firstToStay: Boolean,
): ActiveWorkflow[St, Resp] = newWf.copy(wio = if (firstToStay) WIO.Or(or.first, newWf.wio) else WIO.Or(newWf.wio, or.second))

private def doHandleSignal[Req, Resp, Evt](req: Req, handler: WIO.HandleSignal[Req, St, Evt, Resp], state: St): SignalResponse.Ok[St, Resp] = {
val io = for {
evt <- handler.sigHandler.handle(state, req)
Expand All @@ -59,3 +85,18 @@ class Interpreter[St](journal: JournalPersistance) {
}

}

object Interpreter {

sealed trait EventResponse[St] {
def asOk: Option[EventResponse.Ok[St]] = this match {
case x @ EventResponse.Ok(_) => Some(x)
case EventResponse.UnexpectedEvent() => None
}
}
object EventResponse {
case class Ok[St](newFlow: ActiveWorkflow[St, Any]) extends EventResponse[St]
case class UnexpectedEvent[St]() extends EventResponse[St]
}

}
12 changes: 8 additions & 4 deletions src/main/scala/workflow4s/wio/WIO.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package workflow4s.wio

import cats.effect.IO

import scala.annotation.unused
import scala.reflect.ClassTag

sealed trait WIO[+Err, +Out, State]

Expand All @@ -20,23 +22,25 @@ object WIO {
Some(this.asInstanceOf[HandleQuery[Req, St, Resp]]) // TODO
}

case class Or[Err, Out, State](first: WIO[Err, Out, State], second: WIO[Err, Out, State]) extends WIO[Err, Out, State]
case class Or[+Err, +Out, State](first: WIO[Err, Out, State], second: WIO[Err, Out, State]) extends WIO[Err, Out, State]

case class Noop[St]() extends WIO[Nothing, Unit, St]

case class SignalHandler[Sig, Evt, St](handle: (St, Sig) => IO[Evt])
case class EventHandler[Evt, St, Out](handle: (St, Evt) => (St, Out))(implicit val jw: JournalWrite[Evt])
case class EventHandler[Evt, St, Out](handle: (St, Evt) => (St, Out))(implicit val jw: JournalWrite[Evt], ct: ClassTag[Evt]){
def expects(any: Any): Option[Evt] = ct.unapply(any)
}
case class QueryHandler[Qr, St, Out](handle: (St, Qr) => Out)

def handleSignal[State] = new HandleSignalPartiallyApplied1[State]

class HandleSignalPartiallyApplied1[St] {
def apply[Sig, Evt: JournalWrite, Resp](@unused signalDef: SignalDef[Sig, Resp])(
def apply[Sig, Evt: JournalWrite: ClassTag, Resp](@unused signalDef: SignalDef[Sig, Resp])(
f: (St, Sig) => IO[Evt],
): HandleSignalPartiallyApplied2[Sig, St, Evt, Resp] = new HandleSignalPartiallyApplied2[Sig, St, Evt, Resp](f)
}

class HandleSignalPartiallyApplied2[Sig, St, Evt: JournalWrite, Resp](handleSignal: (St, Sig) => IO[Evt]) {
class HandleSignalPartiallyApplied2[Sig, St, Evt: JournalWrite : ClassTag, Resp](handleSignal: (St, Sig) => IO[Evt]) {
def handleEvent(f: (St, Evt) => (St, Resp)): WIO[Nothing, Resp, St] = HandleSignal(SignalHandler(handleSignal), EventHandler(f))
}

Expand Down
16 changes: 15 additions & 1 deletion src/main/scala/workflow4s/wio/simple/SimpleActor.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package workflow4s.wio.simple

import cats.effect.unsafe.IORuntime
import workflow4s.wio.Interpreter.EventResponse
import workflow4s.wio.{ActiveWorkflow, QueryResponse, SignalDef, SignalResponse}

class SimpleActor[State](var wf: ActiveWorkflow[State, Any])(implicit IORuntime: IORuntime) {
class SimpleActor[State](private var wf: ActiveWorkflow[State, Any])(implicit IORuntime: IORuntime) {

def handleSignal[Req, Resp](signalDef: SignalDef[Req, Resp])(req: Req): SimpleActor.SignalResponse[Resp] =
wf.handleSignal(signalDef)(req) match {
Expand All @@ -16,6 +17,13 @@ class SimpleActor[State](var wf: ActiveWorkflow[State, Any])(implicit IORuntime:
def handleQuery[Req, Resp](signalDef: SignalDef[Req, Resp])(req: Req): QueryResponse[Resp] =
wf.handleQuery(signalDef)(req)

def handleEvent(event: Any): SimpleActor.EventResponse = wf.handleEvent(event) match {
case EventResponse.Ok(newFlow) =>
wf = newFlow
SimpleActor.EventResponse.Ok
case EventResponse.UnexpectedEvent() => SimpleActor.EventResponse.UnexpectedEvent
}

}

object SimpleActor {
Expand All @@ -24,4 +32,10 @@ object SimpleActor {
case class Ok[Resp](result: Resp) extends SignalResponse[Resp]
case object UnexpectedSignal extends SignalResponse[Nothing]
}

sealed trait EventResponse
object EventResponse{
case object Ok extends EventResponse
case object UnexpectedEvent extends EventResponse
}
}
39 changes: 32 additions & 7 deletions src/test/scala/workflow4s/example/WithdrawalExampleTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,53 @@ package workflow4s.example
import cats.effect.unsafe.implicits.global
import org.scalatest.freespec.AnyFreeSpec
import workflow4s.example.WithdrawalSignal.CreateWithdrawal
import workflow4s.wio.simple.SimpleActor.EventResponse
import workflow4s.wio.simple.{InMemoryJournal, SimpleActor}
import workflow4s.wio.{ActiveWorkflow, Interpreter, QueryResponse}

class WithdrawalExampleTest extends AnyFreeSpec {

"Withdrawal Example" - {

"init" in {
val actor = new WithdrawalActor
assert(actor.getData() == WithdrawalData.Empty)
"init" in new Fixture {
assert(actor.queryData() == WithdrawalData.Empty)
actor.init(CreateWithdrawal(100))
assert(actor.getData() == WithdrawalData.Initiated(100))
assert(actor.queryData() == WithdrawalData.Initiated(100))

checkRecovery()
}
}

trait Fixture {
val journal = new InMemoryJournal
val actor = createActor(journal)

def checkRecovery() = {
val secondActor = createActor(journal)
assert(actor.queryData() == secondActor.queryData())
}
}

class WithdrawalActor
def createActor(journal: InMemoryJournal) = {
val actor = new WithdrawalActor(journal)
actor.recover()
actor
}

class WithdrawalActor(journal: InMemoryJournal)
extends SimpleActor[WithdrawalData](
ActiveWorkflow(WithdrawalData.Empty, WithdrawalExample.workflow, new Interpreter(new InMemoryJournal), ()),
ActiveWorkflow(WithdrawalData.Empty, WithdrawalExample.workflow, new Interpreter(journal), ()),
) {
def init(req: CreateWithdrawal): Unit = this.handleSignal(WithdrawalExample.createWithdrawalSignal)(req).extract

def getData(): WithdrawalData = this.handleQuery(WithdrawalExample.dataQuery)(()).extract
def queryData(): WithdrawalData = this.handleQuery(WithdrawalExample.dataQuery)(()).extract

def recover(): Unit = journal.getEvents.foreach(e =>
this.handleEvent(e) match {
case EventResponse.Ok => ()
case EventResponse.UnexpectedEvent => throw new IllegalArgumentException(s"Unexpected event :${e}")
},
)
}

implicit class SimpleSignalResponseOps[Resp](value: SimpleActor.SignalResponse[Resp]) {
Expand Down

0 comments on commit aa6de3e

Please sign in to comment.