Skip to content

Commit

Permalink
Make checks engine more declarative, model its state more strictly, a…
Browse files Browse the repository at this point in the history
…dd another test, make loop more typesafe
  • Loading branch information
Krever committed Mar 24, 2024
1 parent d4bd829 commit 695f73b
Show file tree
Hide file tree
Showing 17 changed files with 359 additions and 323 deletions.
19 changes: 10 additions & 9 deletions src/main/scala/workflow4s/wio/Interpreter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object Interpreter {
def onAndThen[Out1, StOut1](wio: WIO.AndThen[Err, Out1, Out, StIn, StOut1, StOut]): DispatchResult

def onPure(wio: WIO.Pure[Err, Out, StIn, StOut]): DispatchResult
def onDoWhile(wio: WIO.DoWhile[Err, Out, StIn, StOut]): DispatchResult
def onDoWhile[StOut1](wio: WIO.DoWhile[Err, Out, StIn, StOut1, StOut]): DispatchResult
def onFork(wio: WIO.Fork[Err, Out, StIn, StOut]): DispatchResult

def run: DispatchResult = {
Expand Down Expand Up @@ -213,9 +213,9 @@ object Interpreter {
)
}

def applyOnDoWhile(
wio: WIO.DoWhile[Err, Out, StIn, StOut],
wf: NextWfState[Err, Out, StOut],
def applyOnDoWhile[StOut1](
wio: WIO.DoWhile[Err, Out, StIn, StOut1, StOut],
wf: NextWfState[Err, Out, StOut1],
): NextWfState[Err, Out, StOut] = {
wf.fold[NextWfState[Err, Out, StOut]](
b => {
Expand All @@ -224,12 +224,13 @@ object Interpreter {
},
v =>
v.value match {
case Left(value) => v
case Left(err) => NewValue(Left(err))
case Right((state, value)) =>
if (wio.stopCondition(state, value)) v
else {
val newWIO = WIO.DoWhile(wio.loop, wio.stopCondition, wio.loop)
NewBehaviour(newWIO, v.value)
wio.stopCondition(state) match {
case Some(newState) => NewValue(Right((newState, value)))
case None =>
val newWIO = WIO.DoWhile(wio.loop, wio.stopCondition, wio.loop)
NewBehaviour(newWIO, v.value)
}
},
)
Expand Down
24 changes: 17 additions & 7 deletions src/main/scala/workflow4s/wio/WIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ sealed trait WIO[+Err, +Out, -StateIn, +StateOut] {
def transformInputState[NewStateIn](
f: NewStateIn => StateIn,
): WIO[Err, Out, NewStateIn, StateOut] = transformState(f, (_, x) => x)
def transformOutputState[NewStateOut, StIn1 <: StateIn](
f: (StIn1, StateOut) => NewStateOut,
): WIO[Err, Out, StIn1, NewStateOut] = transformState(identity, f)

def handleError[Err1, StIn1 <: StateIn, Out1 >: Out, StOut1 >: StateOut, ErrIn >: Err](
f: ErrIn => WIO[Err1, Out1, StIn1, StOut1],
Expand Down Expand Up @@ -133,11 +136,11 @@ object WIO {
extends WIO[Err, Out2, StIn, StOut2]

//TODO name for condition
case class DoWhile[Err, Out, StIn, StOut](
case class DoWhile[Err, Out, StIn, StOut, StOut2](
loop: WIO[Err, Out, StOut, StOut],
stopCondition: (StOut, Out) => Boolean,
stopCondition: StOut => Option[StOut2],
current: WIO[Err, Out, StIn, StOut],
) extends WIO[Err, Out, StIn, StOut]
) extends WIO[Err, Out, StIn, StOut2]

case class Fork[+Err, +Out, -StIn, +StOut](branches: Vector[Branch[Err, Out, StIn, StOut]]) extends WIO[Err, Out, StIn, StOut]

Expand Down Expand Up @@ -231,8 +234,12 @@ object WIO {

def pure[St]: PurePartiallyApplied[St] = new PurePartiallyApplied
class PurePartiallyApplied[StIn] {
def apply[O](value: O): WIO[Nothing, O, StIn, StIn] = WIO.Pure(s => Right(s -> value), None)
def make[O](f: StIn => O): WIO[Nothing, O, StIn, StIn] = WIO.Pure(s => Right(s -> f(s)), None)
def apply[O](value: O): WIO[Nothing, O, StIn, StIn] = WIO.Pure(s => Right(s -> value), None)
def state[StOut](value: StOut): WIO[Nothing, Unit, StIn, StOut] = WIO.Pure(s => Right(value -> ()), None)
def make[O](f: StIn => O): WIO[Nothing, O, StIn, StIn] = WIO.Pure(s => Right(s -> f(s)), None)
def makeState[StOut](f: StIn => StOut): WIO[Nothing, Unit, StIn, StOut] = WIO.Pure(s => Right(f(s) -> ()), None)
def makeError[Err](f: StIn => Option[Err])(implicit ct: ClassTag[Err]): WIO[Err, Unit, StIn, StIn] =
WIO.Pure(s => f(s).map(Left(_)).getOrElse(Right(s, ())), Some(ct))
}

def unit[St] = pure[St](())
Expand All @@ -242,8 +249,11 @@ object WIO {
def apply[Err](value: Err)(implicit ct: ClassTag[Err]): WIO[Err, Nothing, StIn, Nothing] = WIO.Pure(s => Left(value), None)
}

def doWhile[Err, Out, State](action: WIO[Err, Out, State, State])(stopCondition: (State, Out) => Boolean) = WIO.DoWhile(action, stopCondition, action)
def whileDo[Err, Out, State, T](condition: State => Option[T])(action: WIO[Err, Out, (State, T), State]) = ???
def repeat[Err, Out, State](action: WIO[Err, Out, State, State]) = RepeatBuilder(action)

case class RepeatBuilder[Err, Out, State](action: WIO[Err, Out, State, State]) {
def untilSome[StOut](f: State => Option[StOut]): WIO[Err, Out, State, StOut] = WIO.DoWhile(action, f, action)
}

def fork[State]: ForkBuilder[Nothing, Nothing, State, Nothing] = ForkBuilder(Vector())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object CurrentStateEvaluator {
): DispatchResult =
s"(${recurse(wio.base)}, on error: ${recurse(wio.handleError)}"

override def onDoWhile(wio: WIO.DoWhile[Err, Out, StIn, StOut]): DispatchResult = s"do-while; current = ${recurse(wio.current)}"
override def onDoWhile[StOut1](wio: WIO.DoWhile[Err, Out, StIn, StOut1, StOut]): DispatchResult = s"do-while; current = ${recurse(wio.current)}"

override def onFork(wio: WIO.Fork[Err, Out, StIn, StOut]): String = "fork"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object EventEvaluator {
})
}

override def onDoWhile(wio: WIO.DoWhile[Err, Out, StIn, StOut]): DispatchResult =
override def onDoWhile[StOut1](wio: WIO.DoWhile[Err, Out, StIn, StOut1, StOut]): DispatchResult =
recurse(wio.current, state).map(applyOnDoWhile(wio, _))

override def onHandleErrorWith[ErrIn, HandlerStateIn >: StIn, BaseOut >: Out](
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/workflow4s/wio/internal/ProceedEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ object ProceedEvaluator {
}))
}

override def onDoWhile(wio: WIO.DoWhile[Err, Out, StIn, StOut]): DispatchResult = {
recurse(wio.current, state).map(_.map((newWf: NextWfState[Err, Out, StOut]) => {
override def onDoWhile[StOut1](wio: WIO.DoWhile[Err, Out, StIn, StOut1, StOut]): DispatchResult = {
recurse(wio.current, state).map(_.map((newWf: NextWfState[Err, Out, StOut1]) => {
applyOnDoWhile(wio, newWf)
}))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object QueryEvaluator {
wio: WIO.HandleErrorWith[Err, BaseOut, StIn, StOut, ErrIn, HandlerStateIn, Out],
): DispatchResult = recurse(wio.base, state)

override def onDoWhile(wio: WIO.DoWhile[Err, Out, StIn, StOut]): DispatchResult = recurse(wio.current, state)
override def onDoWhile[StOut1](wio: WIO.DoWhile[Err, Out, StIn, StOut1, StOut]): DispatchResult = recurse(wio.current, state)

def recurse[E1, O1, StIn1, SOut1](wio: WIO[E1, O1, StIn1, SOut1], s: StIn1): QueryVisitor[E1, O1, StIn, SOut1, Resp, Req]#DispatchResult =
new QueryVisitor(wio, signalDef, req, s).run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object SignalEvaluator {
applyHandleErrorWith(wio, casted, state) -> resp
}))

override def onDoWhile(wio: WIO.DoWhile[Err, Out, StIn, StOut]): DispatchResult =
override def onDoWhile[StOut1](wio: WIO.DoWhile[Err, Out, StIn, StOut1, StOut]): DispatchResult =
recurse(wio.current, state).map(_.map({ case (wf, resp) => applyOnDoWhile(wio, wf) -> resp }))

override def onFork(wio: WIO.Fork[Err, Out, StIn, StOut]): Option[IO[(NewWf, Resp)]] = ??? // TODO, proper error handling, should never happen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object WIOModelInterpreter {
def onNamed(wio: WIO.Named[Err, Out, StIn, StOut]): DispatchResult =
new ModelVisitor(wio.base, Metadata(wio.name.some, wio.description)).run

override def onDoWhile(wio: WIO.DoWhile[Err, Out, StIn, StOut]): FlatMapOut =
override def onDoWhile[StOut1](wio: WIO.DoWhile[Err, Out, StIn, StOut1, StOut]): FlatMapOut =
WIOModel.Loop(recurse(wio.loop), None)

override def onFork(wio: WIO.Fork[Err, Out, StIn, StOut]): FlatMapOut =
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/workflow4s/wio/simple/SimpleActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import workflow4s.wio.{ActiveWorkflow, Interpreter, JournalPersistance, SignalDe

class SimpleActor(private var wf: ActiveWorkflow, journal: JournalPersistance)(implicit IORuntime: IORuntime) extends StrictLogging {

def state: Either[Any, Any] = wf.value.map(_._1)

def handleSignal[Req, Resp](signalDef: SignalDef[Req, Resp])(req: Req): SimpleActor.SignalResponse[Resp] = {
logger.debug(s"Handling signal ${req}")
wf.handleSignal(signalDef)(req) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ object WithdrawalData {
def validated(fee: Fee) = Validated(txId, amount, recipient, fee)
}
case class Validated(txId: String, amount: BigDecimal, recipient: Iban, fee: Fee) extends WithdrawalData {
def checked(checksState: ChecksState) = Checked(txId, amount, recipient, fee, checksState)
def checked(checksState: ChecksState.Decided) = Checked(txId, amount, recipient, fee, checksState)
}
case class Checked(txId: String, amount: BigDecimal, recipient: Iban, fee: Fee, checkResults: ChecksState) extends WithdrawalData {
case class Checked(txId: String, amount: BigDecimal, recipient: Iban, fee: Fee, checkResults: ChecksState.Decided) extends WithdrawalData {
def netAmount = amount - fee.value
def executed(externalTxId: String) = Executed(txId, amount, recipient, fee, checkResults, externalTxId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,22 @@ class WithdrawalWorkflow(service: WithdrawalService, checksEngine: ChecksEngine)
}
.autoNamed()

private def runChecks: WIO[WithdrawalRejection.RejectedInChecks, Unit, WithdrawalData.Validated, WithdrawalData.Checked] =
(for {
state <- WIO.getState[WithdrawalData.Validated]
decision <- checksEngine
.runChecks(ChecksInput(state, List()))
.transformState[WithdrawalData.Validated, WithdrawalData.Checked](
_ => ChecksState.empty,
(validated, checkState) => validated.checked(checkState),
)
_ <- decision match {
case Decision.RejectedBySystem() => WIO.raise[WithdrawalData.Checked](WithdrawalRejection.RejectedInChecks())
case Decision.ApprovedBySystem() => WIO.unit[WithdrawalData.Checked]
case Decision.RejectedByOperator() => WIO.raise[WithdrawalData.Checked](WithdrawalRejection.RejectedInChecks())
case Decision.ApprovedByOperator() => WIO.unit[WithdrawalData.Checked]
}
} yield ()).autoNamed()
private def runChecks: WIO[WithdrawalRejection.RejectedInChecks, Unit, WithdrawalData.Validated, WithdrawalData.Checked] = {
val doRunChecks: WIO[Nothing, Unit, WithdrawalData.Validated, WithdrawalData.Checked] = checksEngine.runChecks
.transformInputState((x: WithdrawalData.Validated) => ChecksInput(x, List()))
.transformOutputState((validated, checkState) => validated.checked(checkState))

val actOnDecision = WIO
.pure[WithdrawalData.Checked]
.makeError(_.checkResults.decision match {
case Decision.RejectedBySystem() => Some(WithdrawalRejection.RejectedInChecks())
case Decision.ApprovedBySystem() => None
case Decision.RejectedByOperator() => Some(WithdrawalRejection.RejectedInChecks())
case Decision.ApprovedByOperator() => None
})

doRunChecks >>> actOnDecision
}

// TODO can fail with provider fatal failure, need retries, needs cancellation
private def execute: WIO[WithdrawalRejection.RejectedByExecutionEngine, Unit, WithdrawalData.Checked, WithdrawalData.Executed] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,75 +5,77 @@ import cats.syntax.all._
import workflow4s.wio.{SignalDef, WIO}

trait ChecksEngine {
def runChecks(input: ChecksInput): WIO[Nothing, Decision, ChecksState, ChecksState]
def runChecks: WIO[Nothing, Unit, ChecksInput, ChecksState.Decided]
}

object ChecksEngine extends ChecksEngine {

val reviewSignalDef: SignalDef[ReviewDecision, Unit] = SignalDef()

def runChecks(input: ChecksInput): WIO[Nothing, Decision, ChecksState, ChecksState] =
refreshChecksUntilAllComplete(input) >>> getDecision()
def runChecks: WIO[Nothing, Unit, ChecksInput, ChecksState.Decided] =
refreshChecksUntilAllComplete >>> getDecision()
// .checkpointed((s, decision) => ChecksEvent.CheckCompleted(s.results, decision))((s, e) => (s, e.decision))

private def getDecision(): WIO[Nothing, Decision, ChecksState, ChecksState] = {
private def getDecision(): WIO[Nothing, Unit, ChecksState.Executed, ChecksState.Decided] = {
WIO
.fork[ChecksState]
.fork[ChecksState.Executed]
.addBranch(requiresReviewBranch)
.addBranch(decidedBySystemBranch) // TODO if/else api
.done
}

private def refreshChecksUntilAllComplete(input: ChecksInput): WIO[Nothing, Unit, ChecksState, ChecksState] = {
val refreshChecks = (for {
state <- WIO.getState[ChecksState]
pending = (input.checks -- state.finished).values.toList
_ <- doRun(pending, input.data)
} yield ()).autoNamed()
private def refreshChecksUntilAllComplete: WIO[Nothing, Unit, ChecksInput, ChecksState.Executed] = {

WIO.doWhile(refreshChecks)((state, _) => {
val allFinished = state.finished.size == input.checks.size
allFinished
})
def initialize: WIO[Nothing, Unit, ChecksInput, ChecksState.Pending] =
WIO.pure[ChecksInput].makeState(ci => ChecksState.Pending(ci, Map()))

def isDone(checksState: ChecksState.Pending): Option[ChecksState.Executed] = checksState.asExecuted

initialize >>> WIO
.repeat(runPendingChecks)
.untilSome(isDone)
}

private def doRun[Data](checks: List[Check[Data]], data: Data): WIO[Nothing, Unit, ChecksState, ChecksState] =
private def runPendingChecks: WIO[Nothing, Unit, ChecksState.Pending, ChecksState.Pending] =
WIO
.runIO[ChecksState](_ =>
.runIO[ChecksState.Pending](state => {
val pending = state.pendingChecks
val checks = state.input.checks.view.filterKeys(pending.contains).values.toList
checks
.traverse(check =>
check
.run(data)
.run(state.input.data)
.handleError(_ => CheckResult.RequiresReview()) // TODO logging
.tupleLeft(check.key),
)
.map(results => ChecksEvent.ChecksRun(results.toMap)),
)
.map(results => ChecksEvent.ChecksRun(results.toMap))
})
.handleEvent((state, evt) => (state.addResults(evt.results), ()))
.autoNamed()

private def decidedBySystemBranch =
private val decidedBySystemBranch: WIO.Branch[Nothing, Unit, ChecksState.Executed, ChecksState.Decided] =
WIO
.branch[ChecksState]
.branch[ChecksState.Executed]
.when(!_.requiresReview)(
WIO.pure.make(st =>
if (st.isRejected) Decision.RejectedBySystem()
else Decision.ApprovedBySystem(),
),
WIO.pure.makeState(st => {
val decision =
if (st.isRejected) Decision.RejectedBySystem()
else Decision.ApprovedBySystem()
st.asDecided(decision)
}),
)

private def requiresReviewBranch =
WIO.branch[ChecksState].when(_.requiresReview)(handleReview)
private val requiresReviewBranch: WIO.Branch[Nothing, Unit, ChecksState.Executed, ChecksState.Decided] =
WIO.branch[ChecksState.Executed].when(_.requiresReview)(handleReview)

private def handleReview: WIO[Nothing, Decision, ChecksState, ChecksState] = WIO
.handleSignal[ChecksState](reviewSignalDef)({ case (_, sig) => IO(ChecksEvent.ReviewDecisionTaken(sig)) })
private def handleReview: WIO[Nothing, Unit, ChecksState.Executed, ChecksState.Decided] = WIO
.handleSignal[ChecksState.Executed](reviewSignalDef)({ case (_, sig) => IO(ChecksEvent.ReviewDecisionTaken(sig)) })
.handleEvent({ case (st, evt) =>
(
st,
evt.decision match {
case ReviewDecision.Approve => Decision.ApprovedByOperator()
case ReviewDecision.Reject => Decision.RejectedByOperator()
},
)
val decision = evt.decision match {
case ReviewDecision.Approve => Decision.ApprovedByOperator()
case ReviewDecision.Reject => Decision.RejectedByOperator()
}
(st.asDecided(decision), ())
})
.produceResponse((_, _) => ())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ object Decision {

sealed trait CheckResult
object CheckResult {
sealed trait Final extends CheckResult
sealed trait Finished extends CheckResult
sealed trait Final extends Finished
case class Pending() extends CheckResult
case class Approved() extends Final
case class Rejected() extends Final
case class Pending() extends CheckResult
case class RequiresReview() extends CheckResult
case class RequiresReview() extends Finished
}

case class CheckKey(value: String)
Expand All @@ -32,24 +33,33 @@ trait Check[Data] {
def run(data: Data): IO[CheckResult]
}

case class ChecksState(results: Map[CheckKey, CheckResult]) {
def finished: Set[CheckKey] = results.flatMap({ case (key, result) =>
result match {
case value: CheckResult.Final => Some(key)
case CheckResult.Pending() => None
case CheckResult.RequiresReview() => Some(key)
}
}).toSet

def addResults(newResults: Map[CheckKey, CheckResult]) = ChecksState(results ++ newResults)
sealed trait ChecksState {
def results: Map[CheckKey, CheckResult]

def isRejected = results.exists(_._2 == CheckResult.Rejected())
def requiresReview = !isRejected && results.exists(_._2 == CheckResult.RequiresReview())
def isApproved = !isRejected && !requiresReview
}

object ChecksState {
val empty = ChecksState(Map())

case class Pending(input: ChecksInput, results: Map[CheckKey, CheckResult]) extends ChecksState {
private def finishedChecks: Map[CheckKey, CheckResult.Finished] = results.collect({ case (key, result: CheckResult.Finished) => key -> result })
def pendingChecks: Set[CheckKey] = input.checks.keySet -- finishedChecks.keySet

def addResults(newResults: Map[CheckKey, CheckResult]) = ChecksState.Pending(input, results ++ newResults)

def asExecuted: Option[ChecksState.Executed] = {
val finished = finishedChecks
Option.when(finished.size == input.checks.size)(Executed(finished))
}
}
case class Executed(results: Map[CheckKey, CheckResult.Finished]) extends ChecksState {
def isRejected = results.exists(_._2 == CheckResult.Rejected())
def requiresReview = !isRejected && results.exists(_._2 == CheckResult.RequiresReview())
def isApproved = !isRejected && !requiresReview

def asDecided(decision: Decision) = ChecksState.Decided(results, decision)
}
case class Decided(results: Map[CheckKey, CheckResult.Finished], decision: Decision) extends ChecksState

}

trait ChecksInput {
Expand Down
Loading

0 comments on commit 695f73b

Please sign in to comment.